diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java index 2ec90ab3b90..a9546cc054e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/AsyncQueryResponse.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.datatable.DataTable; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -96,11 +97,14 @@ public Map getFinalResponses() // servers even if the query times out or if servers have not responded. for (Map.Entry entry : _responseMap.entrySet()) { ServerResponse response = entry.getValue(); - - // ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received - // from the server. Hence we set the latency to the timeout value. - long latency = - (response != null && response.getResponseDelayMs() >= 0) ? response.getResponseDelayMs() : _timeoutMs; + long latency; + + // If server has not responded or if the server response has exceptions, the latency is set to timeout + if (hasServerNotResponded(response) || hasServerReturnedExceptions(response)) { + latency = _timeoutMs; + } else { + latency = response.getResponseDelayMs(); + } _serverRoutingStatsManager.recordStatsUponResponseArrival(_requestId, entry.getKey().getInstanceId(), latency); } @@ -108,6 +112,29 @@ public Map getFinalResponses() } } + private boolean hasServerReturnedExceptions(ServerResponse response) { + if (response.getDataTable() != null && response.getDataTable().getExceptions().size() > 0) { + DataTable dataTable = response.getDataTable(); + Map exceptions = dataTable.getExceptions(); + + // If Server response has exceptions in Datatable set the latency for timeout value. + for (Map.Entry exception : exceptions.entrySet()) { + // Check if the exceptions received are server side exceptions + if (!QueryException.isClientError(exception.getKey())) { + return true; + } + } + return false; + } + return false; + } + + private boolean hasServerNotResponded(ServerResponse response) { + // ServerResponse returns -1 if responseDelayMs is not set. This indicates that a response was not received + // from the server. Hence we set the latency to the timeout value. + return response == null || response.getResponseDelayMs() < 0; + } + @Override public String getServerStats() { StringBuilder stringBuilder = new StringBuilder( diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index cec413e4248..1b32149d064 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -27,9 +27,11 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; +import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; @@ -201,6 +203,180 @@ public void testInvalidResponse() queryServer.shutDown(); } + @Test + public void testLatencyForQueryServerException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with ServerSide exception and check if the latency is set to timeout value. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // This means that no queries were run before this test. So we can just make sure that latencyAfter is equal to + //666.334. + // This corresponds to the EWMA value when a latency timeout value of 1000 is set. Latency set to timeout value + //when server side exception occurs. + double serverEWMALatency = 666.334; + // Leaving an error budget of 2% + double delta = 13.32; + assertEquals(latencyAfter, serverEWMALatency, delta); + } else { + assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForClientException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with client side errors. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // Latency for the server with client side exception is assigned as serverResponse.getResponseDelayMs() and the + //calculated + // EWMLatency for the server will be less than serverResponse.getResponseDelayMs() + assertTrue(latencyAfter <= serverResponse.getResponseDelayMs()); + } else { + assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForMultipleExceptions() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + Exception exception = new UnsupportedOperationException("Caught exception."); + ProcessingException processingException = + QueryException.getException(QueryException.QUERY_CANCELLATION_ERROR, exception); + ProcessingException processingServerException = + QueryException.getException(QueryException.SERVER_TABLE_MISSING_ERROR, exception); + dataTable.addException(processingServerException); + dataTable.addException(processingException); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a query with multiple exceptions. Make sure that the latency is set to timeout value even if a single + //server-side exception is seen. + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // This means that no queries where run before this test. So we can just make sure that latencyAfter is equal + //to 666.334. + // This corresponds to the EWMA value when a latency timeout value of 1000 is set. + double serverEWMALatency = 666.334; + // Leaving an error budget of 2% + double delta = 13.32; + assertEquals(latencyAfter, serverEWMALatency, delta); + } else { + assertTrue(latencyAfter > latencyBefore, latencyAfter + " should be greater than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + + @Test + public void testLatencyForNoException() + throws Exception { + long requestId = 123; + DataTable dataTable = DataTableBuilderFactory.getEmptyDataTable(); + dataTable.getMetadata().put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId)); + byte[] responseBytes = dataTable.toBytes(); + String serverId = SERVER_INSTANCE.getInstanceId(); + // Start the server + QueryServer queryServer = getQueryServer(0, responseBytes); + queryServer.start(); + + // Send a valid query and get latency + Double latencyBefore = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + AsyncQueryResponse asyncQueryResponse = + _queryRouter.submitQuery(requestId, "testTable", BROKER_REQUEST, ROUTING_TABLE, null, null, 1_000L); + Map response = asyncQueryResponse.getFinalResponses(); + assertEquals(response.size(), 1); + assertTrue(response.containsKey(OFFLINE_SERVER_ROUTING_INSTANCE)); + ServerResponse serverResponse = response.get(OFFLINE_SERVER_ROUTING_INSTANCE); + + _requestCount += 2; + waitForStatsUpdate(_requestCount); + Double latencyAfter = _serverRoutingStatsManager.fetchEMALatencyForServer(serverId); + + if (latencyBefore == null) { + // Latency for the server with no exceptions is assigned as serverResponse.getResponseDelayMs() and the calculated + // EWMLatency for the server will be less than serverResponse.getResponseDelayMs() + assertTrue(latencyAfter <= serverResponse.getResponseDelayMs()); + } else { + assertTrue(latencyAfter < latencyBefore, latencyAfter + " should be lesser than " + latencyBefore); + } + + // Shut down the server + queryServer.shutDown(); + } + @Test public void testNonMatchingRequestId() throws Exception {