Skip to content

Commit

Permalink
Improve Adaptive Server Selection to penalize servers returning serve…
Browse files Browse the repository at this point in the history
…r side exceptions (apache#14029)

* PINOT-19249 ADSS penalize server with hardware issues WIP push

PINOT-19249 Adding more unit test cases

PINOT-19249 Addressing comments and changing code to cover edge cases

PINOT-19249 Code changes based on comments

PINOT-19249 Fixing test cases for ADSS penalizing servers with exceptions

* Fixing linter

* Empty-Commit

* Review comments - Adding more comments and delta to assertEquals
  • Loading branch information
kiruphabalu authored Sep 30, 2024
1 parent f07685d commit 3452ef9
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;

Expand Down Expand Up @@ -96,18 +97,44 @@ public Map<ServerRoutingInstance, ServerResponse> getFinalResponses()
// servers even if the query times out or if servers have not responded.
for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : _responseMap.entrySet()) {
ServerResponse response = entry.getValue();

// ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received
// from the server. Hence we set the latency to the timeout value.
long latency =
(response != null && response.getResponseDelayMs() >= 0) ? response.getResponseDelayMs() : _timeoutMs;
long latency;

// If server has not responded or if the server response has exceptions, the latency is set to timeout
if (hasServerNotResponded(response) || hasServerReturnedExceptions(response)) {
latency = _timeoutMs;
} else {
latency = response.getResponseDelayMs();
}
_serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), latency);
}

_queryRouter.markQueryDone(_requestId);
}
}

private boolean hasServerReturnedExceptions(ServerResponse response) {
if (response.getDataTable() != null && response.getDataTable().getExceptions().size() > 0) {
DataTable dataTable = response.getDataTable();
Map<Integer, String> exceptions = dataTable.getExceptions();

// If Server response has exceptions in Datatable set the latency for timeout value.
for (Map.Entry<Integer, String> exception : exceptions.entrySet()) {
// Check if the exceptions received are server side exceptions
if (!QueryException.isClientError(exception.getKey())) {
return true;
}
}
return false;
}
return false;
}

private boolean hasServerNotResponded(ServerResponse response) {
// ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received
// from the server. Hence we set the latency to the timeout value.
return response == null || response.getResponseDelayMs() < 0;
}

@Override
public String getServerStats() {
StringBuilder stringBuilder = new StringBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
Expand Down Expand Up @@ -201,6 +203,180 @@ public void testInvalidResponse()
queryServer.shutDown();
}

@Test
public void testLatencyForQueryServerException()
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
Exception exception = new UnsupportedOperationException("Caught exception.");
ProcessingException processingException =
QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception);
dataTable.addException(processingException);
byte[] responseBytes = dataTable.toBytes();
String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
queryServer.start();

// Send a query with ServerSide exception and check if the latency is set to timeout value.
Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));

_requestCount += 2;
waitForStatsUpdate(_requestCount);
Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);

if (latencyBefore == null) {
// This means that no queries were run before this test. So we can just make sure that latencyAfter is equal to
//666.334.
// This corresponds to the EWMA value when a latency timeout value of 1000 is set. Latency set to timeout value
//when server side exception occurs.
double serverEWMALatency = 666.334;
// Leaving an error budget of 2%
double delta = 13.32;
assertEquals(latencyAfter, serverEWMALatency, delta);
} else {
assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore);
}

// Shut down the server
queryServer.shutDown();
}

@Test
public void testLatencyForClientException()
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
Exception exception = new UnsupportedOperationException("Caught exception.");
ProcessingException processingException =
QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception);
dataTable.addException(processingException);
byte[] responseBytes = dataTable.toBytes();
String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
queryServer.start();

// Send a query with client side errors.
Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);

AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);

_requestCount += 2;
waitForStatsUpdate(_requestCount);

Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);

if (latencyBefore == null) {
// Latency for the server with client side exception is assigned as serverResponse.getResponseDelayMs() and the
//calculated
// EWMLatency for the server will be less than serverResponse.getResponseDelayMs()
assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
} else {
assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore);
}

// Shut down the server
queryServer.shutDown();
}

@Test
public void testLatencyForMultipleExceptions()
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
Exception exception = new UnsupportedOperationException("Caught exception.");
ProcessingException processingException =
QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception);
ProcessingException processingServerException =
QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception);
dataTable.addException(processingServerException);
dataTable.addException(processingException);
byte[] responseBytes = dataTable.toBytes();
String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
queryServer.start();

// Send a query with multiple exceptions. Make sure that the latency is set to timeout value even if a single
//server-side exception is seen.
Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));

_requestCount += 2;
waitForStatsUpdate(_requestCount);
Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);

if (latencyBefore == null) {
// This means that no queries where run before this test. So we can just make sure that latencyAfter is equal
//to 666.334.
// This corresponds to the EWMA value when a latency timeout value of 1000 is set.
double serverEWMALatency = 666.334;
// Leaving an error budget of 2%
double delta = 13.32;
assertEquals(latencyAfter, serverEWMALatency, delta);
} else {
assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore);
}

// Shut down the server
queryServer.shutDown();
}

@Test
public void testLatencyForNoException()
throws Exception {
long requestId = 123;
DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable();
dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
byte[] responseBytes = dataTable.toBytes();
String serverId = SERVER_INSTANCE.getInstanceId();
// Start the server
QueryServer queryServer = getQueryServer(0, responseBytes);
queryServer.start();

// Send a valid query and get latency
Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);
AsyncQueryResponse asyncQueryResponse =
_queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L);
Map<ServerRoutingInstance, ServerResponse> response = asyncQueryResponse.getFinalResponses();
assertEquals(response.size(), 1);
assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE));
ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE);

_requestCount += 2;
waitForStatsUpdate(_requestCount);
Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId);

if (latencyBefore == null) {
// Latency for the server with no exceptions is assigned as serverResponse.getResponseDelayMs() and the calculated
// EWMLatency for the server will be less than serverResponse.getResponseDelayMs()
assertTrue(latencyAfter <= serverResponse.getResponseDelayMs());
} else {
assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore);
}

// Shut down the server
queryServer.shutDown();
}

@Test
public void testNonMatchingRequestId()
throws Exception {
Expand Down

0 comments on commit 3452ef9

Please sign in to comment.