Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-2052] Use Yarn allocation ID to keep track of allocated containers #1481

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lang/cs/Org.Apache.REEF.Bridge/Clr2JavaImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ namespace Org {
!EvaluatorRequestorClr2Java();
virtual void OnError(String^ message);
virtual void Submit(IEvaluatorRequest^ request);
virtual array<byte>^ GetDefinedRuntimes();
virtual void Remove(String^ requestId);
virtual array<byte>^ GetDefinedRuntimes();
};

public ref class TaskMessageClr2Java : public ITaskMessageClr2Java {
Expand Down
27 changes: 24 additions & 3 deletions lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,48 @@ namespace Org {
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit");
JNIEnv *env = RetrieveEnv(_jvm);
jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor);
jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V");
jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(Ljava/lang/String;IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V");

ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit2");
if (jmidSubmit == NULL) {
fprintf(stdout, " jmidSubmit is NULL\n");
fflush(stdout);
return;
}
env->CallObjectMethod(
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit3:RequestId:" + request->RequestId);
env->CallObjectMethod(
_jobjectEvaluatorRequestor,
jmidSubmit,
JavaStringFromManagedString(env, request->RequestId),
request->Number,
request->MemoryMegaBytes,
request->VirtualCore,
request->RelaxLocality,
request->RelaxLocality,
JavaStringFromManagedString(env, request->Rack),
JavaStringFromManagedString(env, request->RuntimeName),
JavaArrayListFromManagedList(env, request->NodeNames),
JavaStringFromManagedString(env, request->NodeLabelExpression));
ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit");
}

void EvaluatorRequestorClr2Java::Remove(String^ requestId) {
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Remove");
JNIEnv *env = RetrieveEnv(_jvm);
jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor);
jmethodID jmidRemove = env->GetMethodID(jclassEvaluatorRequestor, "remove", "(Ljava/lang/String;)V");

if (jmidRemove == NULL) {
fprintf(stdout, " jmidRemove is NULL\n");
fflush(stdout);
return;
}
env->CallObjectMethod(
_jobjectEvaluatorRequestor,
jmidRemove,
JavaStringFromManagedString(env, requestId));
ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Remove");
}

array<byte>^ EvaluatorRequestorClr2Java::GetDefinedRuntimes() {
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::GetDefinedRuntimes");
JNIEnv *env = RetrieveEnv(_jvm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public interface IEvaluatorRequestorClr2Java : IClr2Java
{
void Submit(IEvaluatorRequest evaluatorRequest);

void Remove(string requestId);

byte[] GetDefinedRuntimes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ internal static IDictionary<string, IEvaluatorDescriptor> Evaluators

public void Submit(IEvaluatorRequest request)
{
LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}.",
request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality);
LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}, requestId {7}.",
request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality, request.RequestId);
lock (Evaluators)
{
for (var i = 0; i < request.Number; i++)
Expand Down Expand Up @@ -96,6 +96,11 @@ public void Submit(IEvaluatorRequest request)
Clr2Java.Submit(request);
}

public void Remove(string requestId)
{
Clr2Java.Remove(requestId);
}

public EvaluatorRequestBuilder NewBuilder()
{
return new EvaluatorRequestBuilder();
Expand Down
41 changes: 28 additions & 13 deletions lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using Org.Apache.REEF.Driver.Bridge.Events;
using Org.Apache.REEF.Utilities.Logging;

namespace Org.Apache.REEF.Driver.Evaluator
{
Expand All @@ -28,11 +30,16 @@ namespace Org.Apache.REEF.Driver.Evaluator
[DataContract]
internal class EvaluatorRequest : IEvaluatorRequest
{
internal EvaluatorRequest() : this(number: 0, megaBytes: 0)
{
}
internal EvaluatorRequest()
: this(number: 0, megaBytes: 0)

internal EvaluatorRequest(int number, int megaBytes) : this(number: number, megaBytes: megaBytes, core: 1)
private static readonly Logger Logger = Logger.GetLogger(typeof(EvaluatorRequest));

internal EvaluatorRequest(int number, int megaBytes)
: this(
number: number,
megaBytes: megaBytes,
core: 1)
{
}

Expand Down Expand Up @@ -92,19 +99,21 @@ internal EvaluatorRequest(
ICollection<string> nodeNames,
bool relaxLocality)
: this(
number: number,
megaBytes: megaBytes,
core: core,
rack: rack,
evaluatorBatchId: evaluatorBatchId,
runtimeName: string.Empty,
nodeNames: nodeNames,
relaxLocality: relaxLocality,
nodeLabelExpression: string.Empty)
requestId: string.Empty,
number: number,
megaBytes: megaBytes,
core: core,
rack: rack,
evaluatorBatchId: evaluatorBatchId,
runtimeName: string.Empty,
nodeNames: nodeNames,
relaxLocality: relaxLocality,
nodeLabelExpression: string.Empty)
{
}

internal EvaluatorRequest(
string requestId,
int number,
int megaBytes,
int core,
Expand All @@ -115,9 +124,12 @@ internal EvaluatorRequest(
bool relaxLocality,
string nodeLabelExpression)
{
Logger.Log(Level.Verbose, "EvaluatorRequest constructor: RequestId {0}, Number: {1}, Priority: {2}.", requestId, number, priority);
RequestId = requestId;
Number = number;
MemoryMegaBytes = megaBytes;
VirtualCore = core;
Priority = priority;
Rack = rack;
EvaluatorBatchId = evaluatorBatchId;
RuntimeName = runtimeName;
Expand All @@ -126,6 +138,9 @@ internal EvaluatorRequest(
NodeLabelExpression = nodeLabelExpression;
}

[DataMember]
public string RequestId { get; private set; }

[DataMember]
public int MemoryMegaBytes { get; private set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public sealed class EvaluatorRequestBuilder

internal EvaluatorRequestBuilder(IEvaluatorRequest request)
{
RequestId = request.RequestId;
Number = request.Number;
MegaBytes = request.MemoryMegaBytes;
VirtualCore = request.VirtualCore;
Expand All @@ -46,6 +47,7 @@ internal EvaluatorRequestBuilder(IEvaluatorRequest request)

internal EvaluatorRequestBuilder()
{
RequestId = "RequestId-" + Guid.NewGuid().ToString("N");
Number = 1;
VirtualCore = 1;
MegaBytes = 64;
Expand All @@ -57,10 +59,23 @@ internal EvaluatorRequestBuilder()
_nodeLabelExpression = string.Empty;
}

public string RequestId { get; private set; }

public int Number { get; private set; }
public int MegaBytes { get; private set; }
public int VirtualCore { get; private set; }

/// <summary>
/// Set the request id for the evaluator request.
/// </summary>
/// <param name="requestId"></param>
/// <returns></returns>
public EvaluatorRequestBuilder SetRequestId(string requestId)
{
RequestId = requestId;
return this;
}

/// <summary>
/// Set the number of evaluators to request.
/// </summary>
Expand Down Expand Up @@ -177,7 +192,9 @@ public EvaluatorRequestBuilder SetNodeLabelExpression(string nodeLabelExpression
/// <returns></returns>
public IEvaluatorRequest Build()
{
return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality, nodeLabelExpression: _nodeLabelExpression);
return new EvaluatorRequest(RequestId, Number, MegaBytes, VirtualCore, rack: _rackName,
evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames,
relaxLocality: _relaxLocality, nodeLabelExpression: _nodeLabelExpression);
}
}
}
5 changes: 5 additions & 0 deletions lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ namespace Org.Apache.REEF.Driver.Evaluator
/// </summary>
public interface IEvaluatorRequest
{
/// <summary>
/// Evaluator request id.
/// </summary>
string RequestId { get; }

/// <summary>
/// Memory for the Evaluator in megabytes.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ public interface IEvaluatorRequestor
/// <param name="request"></param>
void Submit(IEvaluatorRequest request);

/// <summary>
/// Remove an evaluator request for specified request id.
/// </summary>
/// <param name="requestId">Request Id to be removed.</param>
void Remove(string requestId);

/// <summary>
/// Returns a builder for new Evaluator requests.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ public void Submit(IEvaluatorRequest request)
// but can't throw exception here as Driver calls this method before cancellation flow can be initiated.
}

public void Remove(string requestId)
{
// for test we don't really remove evaluator request,
}

public EvaluatorRequestBuilder NewBuilder()
{
var builder = Activator.CreateInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public sealed class TestHelloDriver : IObserver<IAllocatedEvaluator>,
private static readonly Logger Logger = Logger.GetLogger(typeof(TestHelloDriver));
private readonly IEvaluatorRequestor _evaluatorRequestor;

private const string RequestIdPrefix = "RequestId-";

/// <summary>
/// Specify if the desired node names is relaxed
/// </summary>
Expand Down Expand Up @@ -90,7 +92,9 @@ public void OnNext(IDriverStarted driverStarted)
{
Logger.Log(Level.Info, "Received IDriverStarted, numberOfContainers: {0}", _numberOfContainers);

var requestId = RequestIdPrefix + Guid.NewGuid();
_evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
.SetRequestId(requestId)
.SetMegabytes(64)
.SetNumber(_numberOfContainers)
.SetRelaxLocality(_relaxLocality)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public void TestHelloREEFOnLocal()
int driverMemory = 1024;
string testFolder = DefaultRuntimeFolder + TestId;
TestRun(GetRuntimeConfigurationForLocal(numberOfContainers, testFolder), driverMemory);
ValidateSuccessForLocalRuntime(numberOfContainers, testFolder: testFolder);
CleanUp(testFolder);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ public void submit(final int evaluatorsNumber,
final String runtimeName,
final ArrayList<String> nodeNames,
final String nodeLabelExpression) {
submit("", evaluatorsNumber, memory, virtualCore, relaxLocality, rack, runtimeName, nodeNames, nodeLabelExpression);
}
public void submit(final String requestId,
final int evaluatorsNumber,
final int memory,
final int virtualCore,
final boolean relaxLocality,
final String rack,
final String runtimeName,
final ArrayList<String> nodeNames,
final String nodeLabelExpression) {
if (this.isBlocked) {
throw new RuntimeException("Cannot request additional Evaluator, this is probably because " +
"the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
Expand All @@ -84,6 +95,7 @@ public void submit(final int evaluatorsNumber,
clrEvaluatorsNumber += evaluatorsNumber;

final EvaluatorRequest request = EvaluatorRequest.newBuilder()
.setRequestId(requestId)
.setNumber(evaluatorsNumber)
.setMemory(memory)
.setNumberOfCores(virtualCore)
Expand All @@ -93,11 +105,17 @@ public void submit(final int evaluatorsNumber,
.setNodeLabelExpression(nodeLabelExpression)
.build();

LOG.log(Level.FINE, "submitting evaluator request {0}", request);
LOG.log(Level.FINE, "EvaluatorRequestorBridge.submit(), requestId {0}, evaluatorsNumber:{1}",
new Object[] {requestId, evaluatorsNumber});
jevaluatorRequestor.submit(request);
}
}

public void remove(final String evaluatorRequestId) {
LOG.log(Level.INFO, "Received remove request for id {0}.", evaluatorRequestId);
jevaluatorRequestor.remove(evaluatorRequestId);
}

public int getEvaluatorNumber() {
return clrEvaluatorsNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.runtime.common.driver.EvaluatorRequestorImpl;

import javax.inject.Inject;

Expand Down Expand Up @@ -49,6 +50,16 @@ public void submit(final EvaluatorRequest req) {
this.driverServiceClient.onEvaluatorRequest(req);
}

@Override
public void remove(final String requestId) {
// The driver service will need to properly handle the remove flag.
EvaluatorRequest request = EvaluatorRequest.newBuilder()
.setNumber(EvaluatorRequestorImpl.REMOVE_FLAG)
.setRequestId(requestId)
.build();
this.driverServiceClient.onEvaluatorRequest(request);
}

@Override
public EvaluatorRequest.Builder newRequest() {
return new DriverClientEvaluatorRequestor.Builder();
Expand Down
Loading