@@ -227,6 +227,53 @@ public Response postStatement(
227227 return withCompressionConfiguration (Response .ok (query .getInitialQueryResults (uriInfo , xForwardedProto , xPrestoPrefixUrl , binaryResults )), compressionEnabled ).build ();
228228 }
229229
230+ /**
231+ * HTTP endpoint for submitting queries to the Presto Coordinator.
232+ * Presto performs lazy execution. The submission of a query returns
233+ * a placeholder for the result set, but the query gets
234+ * scheduled/dispatched only when the client polls for results.
235+ * This endpoint accepts a pre-minted queryId and slug, instead of
236+ * generating it.
237+ * @param statement The statement or sql query string submitted
238+ * @param queryId Pre-minted query ID to associate with this query
239+ * @param slug Pre-minted slug to protect this query
240+ * @param xForwardedProto Forwarded protocol (http or https)
241+ * @param servletRequest The http request
242+ * @param uriInfo {@link javax.ws.rs.core.UriInfo}
243+ * @return {@link javax.ws.rs.core.Response} HTTP response code
244+ */
245+ @ POST
246+ @ Path ("/v1/statement/queued/{queryId}" )
247+ @ Produces (APPLICATION_JSON )
248+ public Response postStatement (
249+ String statement ,
250+ @ PathParam ("queryId" ) QueryId queryId ,
251+ @ QueryParam ("slug" ) String slug ,
252+ @ DefaultValue ("false" ) @ QueryParam ("binaryResults" ) boolean binaryResults ,
253+ @ HeaderParam (X_FORWARDED_PROTO ) String xForwardedProto ,
254+ @ HeaderParam (PRESTO_PREFIX_URL ) String xPrestoPrefixUrl ,
255+ @ Context HttpServletRequest servletRequest ,
256+ @ Context UriInfo uriInfo )
257+ {
258+ if (isNullOrEmpty (statement )) {
259+ throw badRequest (BAD_REQUEST , "SQL statement is empty" );
260+ }
261+
262+ abortIfPrefixUrlInvalid (xPrestoPrefixUrl );
263+
264+ // TODO: For future cases we may want to start tracing from client. Then continuation of tracing
265+ // will be needed instead of creating a new trace here.
266+ SessionContext sessionContext = new HttpRequestSessionContext (
267+ servletRequest ,
268+ sqlParserOptions ,
269+ tracerProviderManager .getTracerProvider (),
270+ Optional .of (sessionPropertyManager ));
271+ Query query = new Query (statement , sessionContext , dispatchManager , queryResultsProvider , 0 , queryId , slug );
272+ queries .put (query .getQueryId (), query );
273+
274+ return withCompressionConfiguration (Response .ok (query .getInitialQueryResults (uriInfo , xForwardedProto , xPrestoPrefixUrl , binaryResults )), compressionEnabled ).build ();
275+ }
276+
230277 /**
231278 * HTTP endpoint for re-processing a failed query
232279 * @param queryId Query Identifier of the query to be retried
@@ -455,21 +502,34 @@ private static final class Query
455502 private final DispatchManager dispatchManager ;
456503 private final LocalQueryProvider queryProvider ;
457504 private final QueryId queryId ;
458- private final String slug = "x" + randomUUID (). toString (). toLowerCase ( ENGLISH ). replace ( "-" , "" ) ;
505+ private final String slug ;
459506 private final AtomicLong lastToken = new AtomicLong ();
460507 private final int retryCount ;
461508
462509 @ GuardedBy ("this" )
463510 private ListenableFuture <?> querySubmissionFuture ;
464511
465512 public Query (String query , SessionContext sessionContext , DispatchManager dispatchManager , LocalQueryProvider queryResultsProvider , int retryCount )
513+ {
514+ this (query , sessionContext , dispatchManager , queryResultsProvider , retryCount , dispatchManager .createQueryId (), createSlug ());
515+ }
516+
517+ public Query (
518+ String query ,
519+ SessionContext sessionContext ,
520+ DispatchManager dispatchManager ,
521+ LocalQueryProvider queryResultsProvider ,
522+ int retryCount ,
523+ QueryId queryId ,
524+ String slug )
466525 {
467526 this .query = requireNonNull (query , "query is null" );
468527 this .sessionContext = requireNonNull (sessionContext , "sessionContext is null" );
469528 this .dispatchManager = requireNonNull (dispatchManager , "dispatchManager is null" );
470529 this .queryProvider = requireNonNull (queryResultsProvider , "queryExecutor is null" );
471- this .queryId = dispatchManager .createQueryId ();
472530 this .retryCount = retryCount ;
531+ this .queryId = requireNonNull (queryId , "queryId is null" );
532+ this .slug = requireNonNull (slug , "slug is null" );
473533 }
474534
475535 /**
@@ -640,6 +700,11 @@ private QueryResults createQueryResults(long token, UriInfo uriInfo, String xFor
640700 dispatchInfo .getWaitingForPrerequisitesTime ());
641701 }
642702
703+ private static String createSlug ()
704+ {
705+ return "x" + randomUUID ().toString ().toLowerCase (ENGLISH ).replace ("-" , "" );
706+ }
707+
643708 private URI getNextUri (long token , UriInfo uriInfo , String xForwardedProto , String xPrestoPrefixUrl , DispatchInfo dispatchInfo , boolean binaryResults )
644709 {
645710 // if failed, query is complete
0 commit comments