diff --git a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h index 5fa16e748a..93cefb61cf 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h +++ b/lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h @@ -104,7 +104,8 @@ namespace Org { !EvaluatorRequestorClr2Java(); virtual void OnError(String^ message); virtual void Submit(IEvaluatorRequest^ request); - virtual array^ GetDefinedRuntimes(); + virtual void Remove(String^ requestId); + virtual array^ GetDefinedRuntimes(); }; public ref class TaskMessageClr2Java : public ITaskMessageClr2Java { diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp index 4dc9bd1b3d..a2fc36aa09 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp @@ -53,20 +53,23 @@ namespace Org { ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); - jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V"); + jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(Ljava/lang/String;IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V"); + ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit2"); if (jmidSubmit == NULL) { fprintf(stdout, " jmidSubmit is NULL\n"); fflush(stdout); return; } - env->CallObjectMethod( + ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit3:RequestId:" + request->RequestId); + env->CallObjectMethod( _jobjectEvaluatorRequestor, jmidSubmit, + JavaStringFromManagedString(env, request->RequestId), request->Number, request->MemoryMegaBytes, request->VirtualCore, - request->RelaxLocality, + request->RelaxLocality, JavaStringFromManagedString(env, request->Rack), JavaStringFromManagedString(env, request->RuntimeName), JavaArrayListFromManagedList(env, request->NodeNames), @@ -74,6 +77,24 @@ namespace Org { ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit"); } + void EvaluatorRequestorClr2Java::Remove(String^ requestId) { + ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Remove"); + JNIEnv *env = RetrieveEnv(_jvm); + jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); + jmethodID jmidRemove = env->GetMethodID(jclassEvaluatorRequestor, "remove", "(Ljava/lang/String;)V"); + + if (jmidRemove == NULL) { + fprintf(stdout, " jmidRemove is NULL\n"); + fflush(stdout); + return; + } + env->CallObjectMethod( + _jobjectEvaluatorRequestor, + jmidRemove, + JavaStringFromManagedString(env, requestId)); + ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Remove"); + } + array^ EvaluatorRequestorClr2Java::GetDefinedRuntimes() { ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::GetDefinedRuntimes"); JNIEnv *env = RetrieveEnv(_jvm); diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs index d19079a234..1f237b6fa4 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Clr2java/IEvaluatorRequestorClr2Java.cs @@ -25,6 +25,8 @@ public interface IEvaluatorRequestorClr2Java : IClr2Java { void Submit(IEvaluatorRequest evaluatorRequest); + void Remove(string requestId); + byte[] GetDefinedRuntimes(); } } diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs index 504ae1751d..373e3204a1 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs @@ -65,8 +65,8 @@ internal static IDictionary Evaluators public void Submit(IEvaluatorRequest request) { - LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}.", - request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality); + LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}, requestId {7}.", + request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality, request.RequestId); lock (Evaluators) { for (var i = 0; i < request.Number; i++) @@ -96,6 +96,11 @@ public void Submit(IEvaluatorRequest request) Clr2Java.Submit(request); } + public void Remove(string requestId) + { + Clr2Java.Remove(requestId); + } + public EvaluatorRequestBuilder NewBuilder() { return new EvaluatorRequestBuilder(); diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs index c0c19527c7..4aced22968 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs @@ -19,6 +19,8 @@ using System.Collections.Generic; using System.Linq; using System.Runtime.Serialization; +using Org.Apache.REEF.Driver.Bridge.Events; +using Org.Apache.REEF.Utilities.Logging; namespace Org.Apache.REEF.Driver.Evaluator { @@ -28,11 +30,16 @@ namespace Org.Apache.REEF.Driver.Evaluator [DataContract] internal class EvaluatorRequest : IEvaluatorRequest { - internal EvaluatorRequest() : this(number: 0, megaBytes: 0) - { - } + internal EvaluatorRequest() + : this(number: 0, megaBytes: 0) - internal EvaluatorRequest(int number, int megaBytes) : this(number: number, megaBytes: megaBytes, core: 1) + private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequest)); + + internal EvaluatorRequest(int number, int megaBytes) + : this( + number: number, + megaBytes: megaBytes, + core: 1) { } @@ -92,19 +99,21 @@ internal EvaluatorRequest( ICollection nodeNames, bool relaxLocality) : this( - number: number, - megaBytes: megaBytes, - core: core, - rack: rack, - evaluatorBatchId: evaluatorBatchId, - runtimeName: string.Empty, - nodeNames: nodeNames, - relaxLocality: relaxLocality, - nodeLabelExpression: string.Empty) + requestId: string.Empty, + number: number, + megaBytes: megaBytes, + core: core, + rack: rack, + evaluatorBatchId: evaluatorBatchId, + runtimeName: string.Empty, + nodeNames: nodeNames, + relaxLocality: relaxLocality, + nodeLabelExpression: string.Empty) { } internal EvaluatorRequest( + string requestId, int number, int megaBytes, int core, @@ -115,9 +124,12 @@ internal EvaluatorRequest( bool relaxLocality, string nodeLabelExpression) { + Logger.Log(Level.Verbose, "EvaluatorRequest constructor: RequestId {0}, Number: {1}, Priority: {2}.", requestId, number, priority); + RequestId = requestId; Number = number; MemoryMegaBytes = megaBytes; VirtualCore = core; + Priority = priority; Rack = rack; EvaluatorBatchId = evaluatorBatchId; RuntimeName = runtimeName; @@ -126,6 +138,9 @@ internal EvaluatorRequest( NodeLabelExpression = nodeLabelExpression; } + [DataMember] + public string RequestId { get; private set; } + [DataMember] public int MemoryMegaBytes { get; private set; } diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs index 3554bb610e..fc4d689f1b 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs @@ -33,6 +33,7 @@ public sealed class EvaluatorRequestBuilder internal EvaluatorRequestBuilder(IEvaluatorRequest request) { + RequestId = request.RequestId; Number = request.Number; MegaBytes = request.MemoryMegaBytes; VirtualCore = request.VirtualCore; @@ -46,6 +47,7 @@ internal EvaluatorRequestBuilder(IEvaluatorRequest request) internal EvaluatorRequestBuilder() { + RequestId = "RequestId-" + Guid.NewGuid().ToString("N"); Number = 1; VirtualCore = 1; MegaBytes = 64; @@ -57,10 +59,23 @@ internal EvaluatorRequestBuilder() _nodeLabelExpression = string.Empty; } + public string RequestId { get; private set; } + public int Number { get; private set; } public int MegaBytes { get; private set; } public int VirtualCore { get; private set; } + /// + /// Set the request id for the evaluator request. + /// + /// + /// + public EvaluatorRequestBuilder SetRequestId(string requestId) + { + RequestId = requestId; + return this; + } + /// /// Set the number of evaluators to request. /// @@ -177,7 +192,9 @@ public EvaluatorRequestBuilder SetNodeLabelExpression(string nodeLabelExpression /// public IEvaluatorRequest Build() { - return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality, nodeLabelExpression: _nodeLabelExpression); + return new EvaluatorRequest(RequestId, Number, MegaBytes, VirtualCore, rack: _rackName, + evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, + relaxLocality: _relaxLocality, nodeLabelExpression: _nodeLabelExpression); } } } \ No newline at end of file diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs index 0ed0545ef3..2faea562b2 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs @@ -24,6 +24,11 @@ namespace Org.Apache.REEF.Driver.Evaluator /// public interface IEvaluatorRequest { + /// + /// Evaluator request id. + /// + string RequestId { get; } + /// /// Memory for the Evaluator in megabytes. /// diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequestor.cs index 41c908f6ea..c278dab821 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequestor.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequestor.cs @@ -35,6 +35,12 @@ public interface IEvaluatorRequestor /// void Submit(IEvaluatorRequest request); + /// + /// Remove an evaluator request for specified request id. + /// + /// Request Id to be removed. + void Remove(string requestId); + /// /// Returns a builder for new Evaluator requests. /// diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs index 921c54bbb3..7b4974d437 100644 --- a/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs +++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs @@ -253,6 +253,11 @@ public void Submit(IEvaluatorRequest request) // but can't throw exception here as Driver calls this method before cancellation flow can be initiated. } + public void Remove(string requestId) + { + // for test we don't really remove evaluator request, + } + public EvaluatorRequestBuilder NewBuilder() { var builder = Activator.CreateInstance( diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs index 2609857cdb..92c8c6fb0c 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloDriver.cs @@ -39,6 +39,8 @@ public sealed class TestHelloDriver : IObserver, private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloDriver)); private readonly IEvaluatorRequestor _evaluatorRequestor; + private const string RequestIdPrefix = "RequestId-"; + /// /// Specify if the desired node names is relaxed /// @@ -90,7 +92,9 @@ public void OnNext(IDriverStarted driverStarted) { Logger.Log(Level.Info, "Received IDriverStarted, numberOfContainers: {0}", _numberOfContainers); + var requestId = RequestIdPrefix + Guid.NewGuid(); _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder() + .SetRequestId(requestId) .SetMegabytes(64) .SetNumber(_numberOfContainers) .SetRelaxLocality(_relaxLocality) diff --git a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs index babdb2d4f4..b845e3f5d7 100644 --- a/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs +++ b/lang/cs/Org.Apache.REEF.Tests/Performance/TestHelloREEF/TestHelloREEFClient.cs @@ -63,6 +63,7 @@ public void TestHelloREEFOnLocal() int driverMemory = 1024; string testFolder = DefaultRuntimeFolder + TestId; TestRun(GetRuntimeConfigurationForLocal(numberOfContainers, testFolder), driverMemory); + ValidateSuccessForLocalRuntime(numberOfContainers, testFolder: testFolder); CleanUp(testFolder); } diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java index 23278462c8..cbda71ad24 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java @@ -71,6 +71,17 @@ public void submit(final int evaluatorsNumber, final String runtimeName, final ArrayList nodeNames, final String nodeLabelExpression) { + submit("", evaluatorsNumber, memory, virtualCore, relaxLocality, rack, runtimeName, nodeNames, nodeLabelExpression); + } + public void submit(final String requestId, + final int evaluatorsNumber, + final int memory, + final int virtualCore, + final boolean relaxLocality, + final String rack, + final String runtimeName, + final ArrayList nodeNames, + final String nodeLabelExpression) { if (this.isBlocked) { throw new RuntimeException("Cannot request additional Evaluator, this is probably because " + "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); @@ -84,6 +95,7 @@ public void submit(final int evaluatorsNumber, clrEvaluatorsNumber += evaluatorsNumber; final EvaluatorRequest request = EvaluatorRequest.newBuilder() + .setRequestId(requestId) .setNumber(evaluatorsNumber) .setMemory(memory) .setNumberOfCores(virtualCore) @@ -93,11 +105,17 @@ public void submit(final int evaluatorsNumber, .setNodeLabelExpression(nodeLabelExpression) .build(); - LOG.log(Level.FINE, "submitting evaluator request {0}", request); + LOG.log(Level.FINE, "EvaluatorRequestorBridge.submit(), requestId {0}, evaluatorsNumber:{1}", + new Object[] {requestId, evaluatorsNumber}); jevaluatorRequestor.submit(request); } } + public void remove(final String evaluatorRequestId) { + LOG.log(Level.INFO, "Received remove request for id {0}.", evaluatorRequestId); + jevaluatorRequestor.remove(evaluatorRequestId); + } + public int getEvaluatorNumber() { return clrEvaluatorsNumber; } diff --git a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java index 75208f5e97..83e7a41234 100644 --- a/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java +++ b/lang/java/reef-bridge-proto-java/src/main/java/org/apache/reef/bridge/driver/client/DriverClientEvaluatorRequestor.java @@ -22,6 +22,7 @@ import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.evaluator.EvaluatorRequest; import org.apache.reef.driver.evaluator.EvaluatorRequestor; +import org.apache.reef.runtime.common.driver.EvaluatorRequestorImpl; import javax.inject.Inject; @@ -49,6 +50,16 @@ public void submit(final EvaluatorRequest req) { this.driverServiceClient.onEvaluatorRequest(req); } + @Override + public void remove(final String requestId) { + // The driver service will need to properly handle the remove flag. + EvaluatorRequest request = EvaluatorRequest.newBuilder() + .setNumber(EvaluatorRequestorImpl.REMOVE_FLAG) + .setRequestId(requestId) + .build(); + this.driverServiceClient.onEvaluatorRequest(request); + } + @Override public EvaluatorRequest.Builder newRequest() { return new DriverClientEvaluatorRequestor.Builder(); diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java index b539a9bde4..7a4559b4e0 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java @@ -34,6 +34,7 @@ @Provided public final class EvaluatorRequest { + private final String requestId; private final int megaBytes; private final int number; private final int cores; @@ -57,11 +58,10 @@ public final class EvaluatorRequest { final List nodeNames, final List rackNames, final String runtimeName) { - this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true, null); + this("", number, megaBytes, cores, nodeNames, rackNames, runtimeName, true, null); } - - - EvaluatorRequest(final int number, + EvaluatorRequest(final String requestId, + final int number, final int megaBytes, final int cores, final List nodeNames, @@ -69,6 +69,7 @@ public final class EvaluatorRequest { final String runtimeName, final boolean relaxLocality, final String nodeLabelExpression) { + this.requestId = requestId; this.number = number; this.megaBytes = megaBytes; this.cores = cores; @@ -98,6 +99,15 @@ public static Builder newBuilder(final EvaluatorRequest request) { return new Builder(request); } + /** + * Access the request id of Evaluators requested. + * + * @return The request id. + */ + public String getRequestId() { + return this.requestId; + } + /** * Access the number of Evaluators requested. * @@ -175,6 +185,7 @@ public String getNodeLabelExpression() { public static class Builder implements org.apache.reef.util.Builder { private int n = 1; + private String requestId = ""; private int megaBytes = -1; private int cores = 1; //if not set, default to 1 private final List nodeNames = new ArrayList<>(); @@ -194,6 +205,7 @@ public Builder() { * @return this Builder */ private Builder(final EvaluatorRequest request) { + setRequestId(request.getRequestId()); setNumber(request.getNumber()); setMemory(request.getMegaBytes()); setNumberOfCores(request.getNumberOfCores()); @@ -208,6 +220,18 @@ private Builder(final EvaluatorRequest request) { setNodeLabelExpression(request.getNodeLabelExpression()); } + /** + * Set the request id. + * + * @param requestId for the Evaluator request. + * @return this builder + */ + @SuppressWarnings("checkstyle:hiddenfield") + public T setRequestId(final String requestId) { + this.requestId = requestId; + return (T) this; + } + /** * Set the amount of memory. * @@ -326,9 +350,8 @@ public T setNodeLabelExpression(final String nodeLabelExpr) { */ @Override public EvaluatorRequest build() { - return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, - this.rackNames, this.runtimeName, this.relaxLocality, - this.nodeLabelExpression); + return new EvaluatorRequest(this.requestId, this.n, this.megaBytes, this.cores, this.nodeNames, + this.rackNames, this.runtimeName, this.relaxLocality, this.nodeLabelExpression); } /** diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequestor.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequestor.java index 684553ee45..6f8361060c 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequestor.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequestor.java @@ -36,6 +36,12 @@ public interface EvaluatorRequestor { */ void submit(final EvaluatorRequest req); + /** + * Remove the submitted EvaluatorRequest. + * @param requestId to be removed. + */ + void remove(final String requestId); + /** * Get a new Builder for the evaluator with fluid interface. * @return Builder for the evaluator diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java index a007805f1e..2c5bd42c36 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java @@ -44,6 +44,7 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor { private final ResourceCatalog resourceCatalog; private final ResourceRequestHandler resourceRequestHandler; private final LoggingScopeFactory loggingScopeFactory; + public static final int REMOVE_FLAG = -100; /** * @param resourceCatalog @@ -62,8 +63,8 @@ public EvaluatorRequestorImpl(final ResourceCatalog resourceCatalog, @Override public synchronized void submit(final EvaluatorRequest req) { if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.", - new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()}); + LOG.log(Level.FINEST, "Got an EvaluatorRequest:number:{0}, memory:{1}, cores:{2}, requestId:{3}.", + new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores(), req.getRequestId()}); LOG.log(Level.FINEST, "Node names: " + Arrays.toString(req.getNodeNames().toArray())); } @@ -102,6 +103,7 @@ public synchronized void submit(final EvaluatorRequest req) { try (LoggingScope ls = this.loggingScopeFactory.evaluatorSubmit(req.getNumber())) { final ResourceRequestEvent request = ResourceRequestEventImpl .newBuilder() + .setRequestId(req.getRequestId()) .setResourceCount(req.getNumber()) .setVirtualCores(req.getNumberOfCores()) .setMemorySize(req.getMegaBytes()) @@ -115,6 +117,20 @@ public synchronized void submit(final EvaluatorRequest req) { } } + @Override + public synchronized void remove(final String requestId) { + LOG.log(Level.FINE, "EvaluatorRequestorImpl.remove request for id {0}.", requestId); + if (requestId == null || requestId.equals("")) { + throw new IllegalArgumentException("Given null or empty request id for removing a request."); + } + final ResourceRequestEvent request = ResourceRequestEventImpl + .newBuilder() + .setRequestId(requestId) + .setResourceCount(REMOVE_FLAG) + .build(); + this.resourceRequestHandler.onNext(request); + } + /** * Get a new builder. * diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java index 735662ff5f..77bacc9734 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java @@ -34,6 +34,11 @@ @DefaultImplementation(ResourceRequestEventImpl.class) public interface ResourceRequestEvent { + /** + * @return The request id. + */ + String getRequestId(); + /** * @return The number of resources requested */ diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java index 4b4326557d..44f503f0b8 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java @@ -29,6 +29,7 @@ * Use newBuilder to construct an instance. */ public final class ResourceRequestEventImpl implements ResourceRequestEvent { + private final String requestId; private final int resourceCount; private final List nodeNameList; private final List rackNameList; @@ -40,6 +41,7 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { private final String runtimeName; private ResourceRequestEventImpl(final Builder builder) { + this.requestId = builder.requestId == null ? "" : builder.requestId; this.resourceCount = BuilderUtils.notNull(builder.resourceCount); this.nodeNameList = BuilderUtils.notNull(builder.nodeNameList); this.rackNameList = BuilderUtils.notNull(builder.rackNameList); @@ -51,6 +53,11 @@ private ResourceRequestEventImpl(final Builder builder) { this.runtimeName = builder.runtimeName == null ? "" : builder.runtimeName; } + @Override + public String getRequestId() { + return requestId; + } + @Override public int getResourceCount() { return resourceCount; @@ -104,6 +111,7 @@ public static Builder newBuilder() { * Builder used to create ResourceRequestEvent instances. */ public static final class Builder implements org.apache.reef.util.Builder { + private String requestId; private Integer resourceCount; private List nodeNameList = new ArrayList<>(); private List rackNameList = new ArrayList<>(); @@ -118,6 +126,7 @@ public static final class Builder implements org.apache.reef.util.Builder requestsBeforeSentToRM = new ConcurrentLinkedQueue<>(); - private final Queue requestsAfterSentToRM = new ConcurrentLinkedQueue<>(); private final Map nodeIdToRackName = new ConcurrentHashMap<>(); + /** + * The map that contains original container requests with allocationRequestId as a key. + */ + private final ConcurrentHashMap allocationRequestIdToContainerRequest + = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap> requestIdToAllocationRequestIds + = new ConcurrentHashMap<>(); + private final YarnConfiguration yarnConf; private final AMRMClientAsync resourceManager; private final YarnProxyUser yarnProxyUser; @@ -480,85 +486,120 @@ private void onContainerStatus(final ContainerStatus value) { } void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests) { + onContainerRequest("", containerRequests); + } + + void onContainerRequest(final String requestId, final AMRMClient.ContainerRequest... containerRequests) { + + LOG.log(Level.FINEST, "YarnContainerManager:onContainerRequest:numberOfContainerRequests {0} with requestId: {1}.", + new Object[] {containerRequests.length, requestId}); + this.containerRequestCounter.incrementBy(containerRequests.length); + + final Vector allocationRequestIds = new Vector<>(); + + for (final AMRMClient.ContainerRequest containerRequest : containerRequests) { + this.resourceManager.addContainerRequest(containerRequest); + LOG.log(Level.FINEST, "YarnContainerManager:addContainerRequest:allocationRequestId {0} with requestId: {1}.", + new Object[] {containerRequest.getAllocationRequestId(), requestId}); + + final AMRMClient.ContainerRequest previousRequest = + allocationRequestIdToContainerRequest.putIfAbsent(containerRequest.getAllocationRequestId(), + containerRequest); + if (previousRequest != null) { + LOG.log(Level.SEVERE, "Duplicated allocation request id: {0} is passed in ContainerRequest.", + containerRequest.getAllocationRequestId()); + this.onError(new Exception("Duplicated allocation request id is passed.")); + } + allocationRequestIds.add(containerRequest.getAllocationRequestId()); + } - synchronized (this) { - this.containerRequestCounter.incrementBy(containerRequests.length); - this.requestsBeforeSentToRM.addAll(Arrays.asList(containerRequests)); - this.doHomogeneousRequests(); + if (requestId != null && !requestId.equals("")) { + final Vector previousAllocatedRequests = + this.requestIdToAllocationRequestIds.putIfAbsent(requestId, allocationRequestIds); + if (previousAllocatedRequests != null) { + updateMap(requestId, allocationRequestIds); + } } this.updateRuntimeStatus(); } + private synchronized void updateMap(final String requestId, final VectorallocationRequestIds) { + Vector existing = requestIdToAllocationRequestIds.get(requestId); + existing.addAll(allocationRequestIds); + } + + /*** + * Remove container requests associated with the specified requestId if the container has not been allocated yet. + * @param requestId + */ + void onContainerRequestRemove(final String requestId) { + final Vector allocationRequestIds = requestIdToAllocationRequestIds.get(requestId); + + if (allocationRequestIds != null && allocationRequestIds.size() > 0) { + for (Long allocationRequestId : allocationRequestIds) { + final AMRMClient.ContainerRequest containerRequest = + allocationRequestIdToContainerRequest.get(allocationRequestId); + if (containerRequest != null + && allocationRequestIdToContainerRequest.remove(allocationRequestId, containerRequest)) { + LOG.log(Level.INFO, "onContainerRequestRemove: request Id {0} and allocationRequestId: {1}, count: {2}.", + new Object[]{requestId, allocationRequestId, containerRequestCounter.get()}); + containerRequestCounter.decrement(); + resourceManager.removeContainerRequest(containerRequest); + } + } + this.updateRuntimeStatus(); + } + } + /** * Handles new container allocations. Calls come from YARN. * @param container newly allocated YARN container. */ private void handleNewContainer(final Container container) { - LOG.log(Level.FINE, "allocated container: id[ {0} ]", container.getId()); + LOG.log(Level.FINE, "Allocated container: id[ {0} ], allocationRequestId: {1}, nodeId: {2}.", new Object[] { + container.getId(), container.getAllocationRequestId(), container.getNodeId()}); - synchronized (this) { + final AMRMClient.ContainerRequest containerRequest = + allocationRequestIdToContainerRequest.get(container.getAllocationRequestId()); - if (!matchContainerWithPendingRequest(container)) { - LOG.log(Level.WARNING, "Got an extra container {0} that doesn't match, releasing...", container.getId()); - this.resourceManager.releaseAssignedContainer(container.getId()); - return; - } + if (containerRequest == null || + !allocationRequestIdToContainerRequest.remove(container.getAllocationRequestId(), containerRequest)) { + releaseContainer(container); + return; + } - final AMRMClient.ContainerRequest matchedRequest = this.requestsAfterSentToRM.peek(); - - this.containerRequestCounter.decrement(); - this.containers.add(container); - - LOG.log(Level.FINEST, "{0} matched with {1}", new Object[] {container, matchedRequest}); - - // Due to the bug YARN-314 and the workings of AMRMCClientAsync, when x-priority m-capacity zero-container - // request and x-priority n-capacity nonzero-container request are sent together, where m > n, RM ignores - // the latter. - // Therefore it is necessary avoid sending zero-container request, even if it means getting extra containers. - // It is okay to send nonzero m-capacity and n-capacity request together since bigger containers - // can be matched. - // TODO[JIRA REEF-42, REEF-942]: revisit this when implementing locality-strictness. - // (i.e. a specific rack request can be ignored) - if (this.requestsAfterSentToRM.size() > 1) { - try { - this.resourceManager.removeContainerRequest(matchedRequest); - } catch (final Exception e) { - LOG.log(Level.WARNING, "Error removing request from Async AMRM client queue: " + matchedRequest, e); - } - } + if (!matchContainer(container, containerRequest)) { + LOG.log(Level.SEVERE, "Container with memory {0} doesn't match the original request's memory {1}.", + new Object[] {container.getResource().getMemory(), containerRequest.getCapability().getMemory()}); + handleContainerError(container.getId(), + new Exception("CContainer returned doesn't match the original requests.")); + } - this.requestsAfterSentToRM.remove(); - this.doHomogeneousRequests(); + this.containerRequestCounter.decrement(); + this.containers.add(container); - LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", - new Object[] {container.getResource().getMemory(), container.getResource().getVirtualCores()}); + LOG.log(Level.FINE, "Matched container requestId: {0} with node: {1}, memory = {2}, core number = {3}", + new Object[] {container.getAllocationRequestId(), container.getNodeId(), + container.getResource().getMemory(), container.getResource().getVirtualCores()}); - this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder() - .setIdentifier(container.getId().toString()) - .setNodeId(container.getNodeId().toString()) - .setResourceMemory(container.getResource().getMemory()) - .setVirtualCores(container.getResource().getVirtualCores()) - .setRackName(rackNameFormatter.getRackName(container)) - .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME) - .build()); + this.reefEventHandlers.onResourceAllocation(ResourceEventImpl.newAllocationBuilder() + .setIdentifier(container.getId().toString()) + .setNodeId(container.getNodeId().toString()) + .setResourceMemory(container.getResource().getMemory()) + .setVirtualCores(container.getResource().getVirtualCores()) + .setRackName(rackNameFormatter.getRackName(container)) + .setRuntimeName(RuntimeIdentifier.RUNTIME_NAME) + .build()); - this.updateRuntimeStatus(); - } + this.updateRuntimeStatus(); } - private synchronized void doHomogeneousRequests() { - if (this.requestsAfterSentToRM.isEmpty()) { - final AMRMClient.ContainerRequest firstRequest = this.requestsBeforeSentToRM.peek(); - - while (!this.requestsBeforeSentToRM.isEmpty() && - isSameKindOfRequest(firstRequest, this.requestsBeforeSentToRM.peek())) { - final AMRMClient.ContainerRequest homogeneousRequest = this.requestsBeforeSentToRM.remove(); - this.resourceManager.addContainerRequest(homogeneousRequest); - this.requestsAfterSentToRM.add(homogeneousRequest); - } - } + private void releaseContainer(final Container container) { + LOG.log(Level.INFO, "Cannot find the container allocated request Id {0} from original requests map, releasing.", + container.getAllocationRequestId()); + this.resourceManager.releaseAssignedContainer(container.getId()); } private boolean isSameKindOfRequest(final AMRMClient.ContainerRequest r1, final AMRMClient.ContainerRequest r2) { @@ -573,21 +614,14 @@ private boolean isSameKindOfRequest(final AMRMClient.ContainerRequest r1, final } /** - * Match to see whether the container satisfies the request. - * We take into consideration that RM has some freedom in rounding - * up the allocation and in placing containers on other machines. + * Match between allocated container and original request. + * @param container + * @param request + * @return */ - private boolean matchContainerWithPendingRequest(final Container container) { - - if (this.requestsAfterSentToRM.isEmpty()) { - return false; - } - - final AMRMClient.ContainerRequest request = this.requestsAfterSentToRM.peek(); - + private boolean matchContainer(final Container container, final AMRMClient.ContainerRequest request) { final boolean resourceCondition = container.getResource().getMemory() >= request.getCapability().getMemory(); - // TODO[JIRA REEF-35]: check vcores once YARN-2380 is resolved final boolean nodeCondition = request.getNodes() == null || request.getNodes().contains(container.getNodeId().getHost()); diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java index 359a505b2a..8a3ba8ecd4 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandler.java @@ -31,5 +31,14 @@ public interface YarnContainerRequestHandler { * * @param containerRequests set of container requests */ + void onContainerRequest(final String requestId, final AMRMClient.ContainerRequest... containerRequests); + + /** + * Container requests without request id. Will be deprecated in 0.18. + * @param containerRequests + */ + @Deprecated void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests); + + void onContainerRequestRemove(String requestId); } diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java index 2adcbfedf8..7f359f90e7 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerRequestHandlerImpl.java @@ -39,9 +39,26 @@ public final class YarnContainerRequestHandlerImpl implements YarnContainerReque LOG.log(Level.FINEST, "Instantiated 'YarnContainerRequestHandler'"); } + /** + * Container requests without request id. Will be deprecated in 0.18. + * @param containerRequests + */ + @Deprecated @Override public void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests) { LOG.log(Level.FINEST, "Sending container requests to YarnContainerManager."); this.containerManager.onContainerRequest(containerRequests); } + + @Override + public void onContainerRequest(final String requestId, final AMRMClient.ContainerRequest... containerRequests) { + LOG.log(Level.FINEST, "Sending container requests to YarnContainerManager."); + this.containerManager.onContainerRequest(requestId, containerRequests); + } + + @Override + public void onContainerRequestRemove(final String requestId) { + LOG.log(Level.FINEST, "Sending container request remove to YarnContainerManager."); + this.containerManager.onContainerRequestRemove(requestId); + } } diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java index 8c6f367b31..0516c0c2e4 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java @@ -25,10 +25,12 @@ import org.apache.hadoop.yarn.util.Records; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; +import org.apache.reef.runtime.common.driver.EvaluatorRequestorImpl; import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent; import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler; import javax.inject.Inject; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; @@ -43,6 +45,7 @@ public final class YarnResourceRequestHandler implements ResourceRequestHandler private static final Logger LOG = Logger.getLogger(YarnResourceRequestHandler.class.getName()); private final YarnContainerRequestHandler yarnContainerRequestHandler; private final ApplicationMasterRegistration registration; + private AtomicLong allocationRequestId = new AtomicLong(); @Inject YarnResourceRequestHandler(final YarnContainerRequestHandler yarnContainerRequestHandler, @@ -52,9 +55,21 @@ public final class YarnResourceRequestHandler implements ResourceRequestHandler } @Override - public synchronized void onNext(final ResourceRequestEvent resourceRequestEvent) { - LOG.log(Level.FINEST, "Got ResourceRequestEvent in YarnResourceRequestHandler: memory = {0}, cores = {1}.", - new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores()}); + public void onNext(final ResourceRequestEvent resourceRequestEvent) { + LOG.log(Level.FINEST, "YarnResourceRequestHandler.onNext request for id {0} with resource count {1}.", + new Object[] {resourceRequestEvent.getRequestId(), resourceRequestEvent.getResourceCount()}); + + if (resourceRequestEvent.getResourceCount() == EvaluatorRequestorImpl.REMOVE_FLAG) { + removeRequest(resourceRequestEvent.getRequestId()); + return; + } + if (LOG.isLoggable(Level.FINE)) { + LOG.log(Level.FINE, "Got ResourceRequestEvent in YarnResourceRequestHandler: memory = {0}, cores = {1}," + + "nodes: {2}, racks: {3}, resourceCount: {4}, requestId: {5}.", + new Object[]{resourceRequestEvent.getMemorySize(), resourceRequestEvent.getVirtualCores(), + resourceRequestEvent.getNodeNameList().size(), resourceRequestEvent.getRackNameList().size(), + resourceRequestEvent.getResourceCount(), resourceRequestEvent.getRequestId()}); + } final String[] nodes = resourceRequestEvent.getNodeNameList().size() == 0 ? null : resourceRequestEvent.getNodeNameList().toArray(new String[resourceRequestEvent.getNodeNameList().size()]); @@ -67,34 +82,47 @@ public synchronized void onNext(final ResourceRequestEvent resourceRequestEvent) final boolean relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(true); final String nodeLabelExpression = resourceRequestEvent.getNodeLabelExpression().orElse(""); - final AMRMClient.ContainerRequest[] containerRequests = - new AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()]; + final int count = resourceRequestEvent.getResourceCount(); + final AMRMClient.ContainerRequest[] containerRequests = new AMRMClient.ContainerRequest[count]; + + final boolean noNodes = nodes == null; + if (noNodes && count > 1) { + LOG.log(Level.WARNING, "Number of containers requested is {0} but node names is not null.", count); + } - for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) { + for (int i = 0; i < count; i++) { + long nextId = allocationRequestId.incrementAndGet(); containerRequests[i] = - new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relaxLocality, + new AMRMClient.ContainerRequest(resource, nodes, racks, pri, nextId, relaxLocality, StringUtils.isEmpty(nodeLabelExpression) ? null : nodeLabelExpression); + LOG.log(Level.FINE, "Creating ContainerRequest for allocationRequest id: {0}.", nextId); } - this.yarnContainerRequestHandler.onContainerRequest(containerRequests); + + this.yarnContainerRequestHandler.onContainerRequest(resourceRequestEvent.getRequestId(), containerRequests); + } + + private void removeRequest(final String requestId) { + LOG.log(Level.INFO, "YarnResourceRequestHandler.removeRequest for requestId: {0}", requestId); + this.yarnContainerRequestHandler.onContainerRequestRemove(requestId); } - private synchronized Resource getResource(final ResourceRequestEvent resourceRequestEvent) { + private Resource getResource(final ResourceRequestEvent resourceRequestEvent) { final Resource result = Records.newRecord(Resource.class); final int memory = getMemory(resourceRequestEvent.getMemorySize().get()); final int core = resourceRequestEvent.getVirtualCores().get(); - LOG.log(Level.FINEST, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core}); + LOG.log(Level.FINE, "Resource requested: memory = {0}, virtual core count = {1}.", new Object[]{memory, core}); result.setMemory(memory); result.setVirtualCores(core); return result; } - private synchronized Priority getPriority(final ResourceRequestEvent resourceRequestEvent) { + private static Priority getPriority(final ResourceRequestEvent resourceRequestEvent) { final Priority pri = Records.newRecord(Priority.class); pri.setPriority(resourceRequestEvent.getPriority().orElse(1)); return pri; } - private synchronized int getMemory(final int requestedMemory) { + private int getMemory(final int requestedMemory) { final int result; if (!this.registration.isPresent()) { LOG.log(Level.WARNING, "AM doesn't seem to be registered. Proceed with fingers crossed."); @@ -111,6 +139,4 @@ private synchronized int getMemory(final int requestedMemory) { } return result; } - - } diff --git a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java index 73f5c2cc94..1e05c93198 100644 --- a/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java +++ b/lang/java/reef-runtime-yarn/src/test/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandlerTest.java @@ -26,9 +26,13 @@ import org.apache.reef.tang.Tang; import org.apache.reef.tang.exceptions.InjectionException; import org.apache.reef.util.logging.LoggingScopeFactory; -import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; +import org.junit.Assert; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; /** * Tests for YarnResourceRequestHandler. @@ -46,11 +50,24 @@ public final class YarnResourceRequestHandlerTest { private class MockContainerRequestHandler implements YarnContainerRequestHandler { private AMRMClient.ContainerRequest[] requests; + private ConcurrentHashMap> indexedRequest = new ConcurrentHashMap<>(); + @Override public void onContainerRequest(final AMRMClient.ContainerRequest... containerRequests) { this.requests = containerRequests; } + @Override + public void onContainerRequest(final String requestId, final AMRMClient.ContainerRequest... containerRequests) { + this.indexedRequest.put(requestId, Arrays.asList(containerRequests)); + this.requests = containerRequests; + } + + @Override + public void onContainerRequestRemove(final String requestId) { + this.indexedRequest.remove(requestId); + } + AMRMClient.ContainerRequest[] getRequests() { return requests; }