2929import io .trino .gateway .ha .router .OAuth2GatewayCookie ;
3030import io .trino .gateway .ha .router .QueryHistoryManager ;
3131import io .trino .gateway .ha .router .RoutingManager ;
32+ import io .trino .gateway .ha .router .TrinoRequestUser ;
3233import io .trino .gateway .proxyserver .ProxyResponseHandler .ProxyResponse ;
3334import jakarta .annotation .PreDestroy ;
3435import jakarta .servlet .http .HttpServletRequest ;
4344import java .util .Arrays ;
4445import java .util .HashMap ;
4546import java .util .List ;
47+ import java .util .Optional ;
4648import java .util .concurrent .ExecutorService ;
4749
4850import static com .google .common .collect .ImmutableList .toImmutableList ;
5860import static io .airlift .http .client .Request .Builder .preparePost ;
5961import static io .airlift .http .client .StaticBodyGenerator .createStaticBodyGenerator ;
6062import static io .airlift .jaxrs .AsyncResponseHandler .bindAsyncResponse ;
61- import static io .trino .gateway .ha .handler .ProxyUtils .AUTHORIZATION ;
6263import static io .trino .gateway .ha .handler .ProxyUtils .QUERY_TEXT_LENGTH_FOR_HISTORY ;
6364import static io .trino .gateway .ha .handler .ProxyUtils .SOURCE_HEADER ;
64- import static io .trino .gateway .ha .handler .ProxyUtils .getQueryUser ;
65- import static io .trino .gateway .ha .handler .QueryIdCachingProxyHandler .USER_HEADER ;
6665import static jakarta .ws .rs .core .MediaType .TEXT_PLAIN_TYPE ;
6766import static jakarta .ws .rs .core .Response .Status .BAD_GATEWAY ;
6867import static jakarta .ws .rs .core .Response .Status .OK ;
@@ -86,6 +85,7 @@ public class ProxyRequestHandler
8685 private final boolean addXForwardedHeaders ;
8786 private final List <String > statementPaths ;
8887 private final boolean includeClusterInfoInResponse ;
88+ private final TrinoRequestUser .TrinoRequestUserProvider trinoRequestUserProvider ;
8989
9090 @ Inject
9191 public ProxyRequestHandler (
@@ -97,6 +97,7 @@ public ProxyRequestHandler(
9797 this .httpClient = requireNonNull (httpClient , "httpClient is null" );
9898 this .routingManager = requireNonNull (routingManager , "routingManager is null" );
9999 this .queryHistoryManager = requireNonNull (queryHistoryManager , "queryHistoryManager is null" );
100+ trinoRequestUserProvider = new TrinoRequestUser .TrinoRequestUserProvider (haGatewayConfiguration .getRequestAnalyzerConfig ());
100101 cookiesEnabled = GatewayCookieConfigurationPropertiesProvider .getInstance ().isEnabled ();
101102 asyncTimeout = haGatewayConfiguration .getRouting ().getAsyncTimeout ();
102103 addXForwardedHeaders = haGatewayConfiguration .getRouting ().isAddXForwardedHeaders ();
@@ -173,7 +174,8 @@ private void performRequest(
173174 FluentFuture <ProxyResponse > future = executeHttp (request );
174175
175176 if (statementPaths .stream ().anyMatch (request .getUri ().getPath ()::startsWith ) && request .getMethod ().equals (HttpMethod .POST )) {
176- future = future .transform (response -> recordBackendForQueryId (request , response ), executor );
177+ Optional <String > username = trinoRequestUserProvider .getInstance (servletRequest ).getUser ();
178+ future = future .transform (response -> recordBackendForQueryId (request , response , username ), executor );
177179 if (includeClusterInfoInResponse ) {
178180 cookieBuilder .add (new NewCookie .Builder ("trinoClusterHost" ).value (remoteUri .getHost ()).build ());
179181 }
@@ -250,11 +252,11 @@ private static WebApplicationException badRequest(String message)
250252 .build ());
251253 }
252254
253- private ProxyResponse recordBackendForQueryId (Request request , ProxyResponse response )
255+ private ProxyResponse recordBackendForQueryId (Request request , ProxyResponse response , Optional < String > username )
254256 {
255257 log .debug ("For Request [%s] got Response [%s]" , request .getUri (), response .body ());
256258
257- QueryHistoryManager .QueryDetail queryDetail = getQueryDetailsFromRequest (request );
259+ QueryHistoryManager .QueryDetail queryDetail = getQueryDetailsFromRequest (request , username );
258260
259261 log .debug ("Extracting proxy destination : [%s] for request : [%s]" , queryDetail .getBackendUrl (), request .getUri ());
260262
@@ -276,12 +278,12 @@ private ProxyResponse recordBackendForQueryId(Request request, ProxyResponse res
276278 return response ;
277279 }
278280
279- public static QueryHistoryManager .QueryDetail getQueryDetailsFromRequest (Request request )
281+ public static QueryHistoryManager .QueryDetail getQueryDetailsFromRequest (Request request , Optional < String > username )
280282 {
281283 QueryHistoryManager .QueryDetail queryDetail = new QueryHistoryManager .QueryDetail ();
282284 queryDetail .setBackendUrl (getRemoteTarget (request .getUri ()));
283285 queryDetail .setCaptureTime (System .currentTimeMillis ());
284- queryDetail . setUser ( getQueryUser ( request . getHeader ( USER_HEADER ), request . getHeader ( AUTHORIZATION )) );
286+ username . ifPresent ( queryDetail :: setUser );
285287 queryDetail .setSource (request .getHeader (SOURCE_HEADER ));
286288
287289 String queryText = new String (((StaticBodyGenerator ) request .getBodyGenerator ()).getBody (), UTF_8 );
0 commit comments