Skip to content

Commit 0d3f465

Browse files
authored
HTTP-187 HTTP content tracing (#189)
Signed-off-by: davidradl <[email protected]>
1 parent 125c9ec commit 0d3f465

File tree

15 files changed

+544
-22
lines changed

15 files changed

+544
-22
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## [Unreleased]
4+
- Allow config control of log HTTP request, response and header logging content
45

56
- allow format options to be applied to the http response decoding.
67
- change deserialize method so it can work with Flink 2

README.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,26 @@ an example of a customised grant type token request. The supplied `token request
564564
a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will
565565
be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`.
566566

567+
## Logging the http content
568+
Debug level logging has been added for class `com.getindata.connectors.http.internal.HttpLogger`. To enable this, alter the log4j properties.
569+
This logging puts out log entries for the HTTP requests and responses. This can be useful for diagnostics to confirm that HTTP requests have been issued and what
570+
that HTTP responses or an exception has occurred (for example connection Refused).
571+
572+
Logging HTTP may not be appropriate for production systems; where sensitive information is not allowed into the logs. But in development environments it is useful
573+
to be able to see HTTP content. Sensitive information can occur in the headers for example authentication tokens and passwords. Also the HTTP request and response bodies
574+
could sensitive. The default minimal logging should be used in production. For development, you can specify config option `gid.connector.http.logging.level`.
575+
This dictates the amount of content that debug logging will show around HTTP calls; the valid values are:
576+
577+
| log level | Request method | URI | HTTP Body | Response status code | Headers |
578+
|-------------|----------------|-----|-----------|----------------------|---------|
579+
| MIN | Y | Y | N | Y | N |
580+
| REQRESPONSE | Y | Y | Y | Y | N |
581+
| MAX | Y | Y | Y | Y | Y |
582+
583+
Notes:
584+
- you can customize what is traced for lookups using the `gid.connector.http.source.lookup.request-callback`.
585+
- where there is an N in the table the output is obfuscated.
586+
567587
### Restrictions at this time
568588
* No authentication is applied to the token request.
569589
* The processing does not use the refresh token if it present.
@@ -577,6 +597,7 @@ be requested if the current time is later than the cached token expiry time minu
577597
| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. |
578598
| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ |
579599
| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. |
600+
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
580601
| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. |
581602
| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). |
582603
| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following <a href="#lookup-cache">Lookup Cache</a> section for more details. |
@@ -624,6 +645,7 @@ be requested if the current time is later than the cached token expiry time minu
624645
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
625646
| format | required | Specify what format to use. |
626647
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
648+
| gid.connector.http.logging.level | optional | Logging levels for HTTP content. Valid values are `MIN` (the default), `REQRESPONSE` and `MAX`. |
627649
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
628650
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
629651
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
import java.net.http.HttpHeaders;
4+
import java.net.http.HttpRequest;
5+
import java.net.http.HttpResponse;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Properties;
9+
import java.util.StringJoiner;
10+
11+
import lombok.extern.slf4j.Slf4j;
12+
13+
import com.getindata.connectors.http.internal.table.lookup.HttpLookupSourceRequestEntry;
14+
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.HTTP_LOGGING_LEVEL;
15+
16+
@Slf4j
17+
public class HttpLogger {
18+
19+
private final HttpLoggingLevelType httpLoggingLevelType;
20+
21+
public static HttpLogger getHttpLogger(Properties properties) {
22+
return new HttpLogger(properties);
23+
}
24+
25+
public void logRequest(HttpRequest httpRequest) {
26+
log.debug(createStringForRequest(httpRequest));
27+
}
28+
29+
public void logResponse(HttpResponse<String> response) {
30+
log.debug(createStringForResponse(response));
31+
}
32+
33+
public void logRequestBody(String body) {
34+
log.debug(createStringForBody(body));
35+
}
36+
37+
public void logExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
38+
log.debug(createStringForExceptionResponse(request, e));
39+
}
40+
41+
private HttpLogger(Properties properties) {
42+
String code = (String) properties.get(HTTP_LOGGING_LEVEL);
43+
this.httpLoggingLevelType = HttpLoggingLevelType.valueOfStr(code);
44+
}
45+
46+
String createStringForRequest(HttpRequest httpRequest) {
47+
String headersForLog = getHeadersForLog(httpRequest.headers());
48+
return String.format("HTTP %s Request: URL: %s, Headers: %s",
49+
httpRequest.method(),
50+
httpRequest.uri().toString(),
51+
headersForLog
52+
);
53+
}
54+
55+
private String getHeadersForLog(HttpHeaders httpHeaders) {
56+
if (httpHeaders == null) return "None";
57+
Map<String, List<String>> headersMap = httpHeaders.map();
58+
if (headersMap.isEmpty()) return "None";
59+
if (this.httpLoggingLevelType == HttpLoggingLevelType.MAX) {
60+
StringJoiner headers = new StringJoiner(";");
61+
for (Map.Entry<String, List<String>> reqHeaders : headersMap.entrySet()) {
62+
StringJoiner values = new StringJoiner(";");
63+
for (String value : reqHeaders.getValue()) {
64+
values.add(value);
65+
}
66+
String header = reqHeaders.getKey() + ":[" + values + "]";
67+
headers.add(header);
68+
}
69+
return headers.toString();
70+
}
71+
return "***";
72+
}
73+
74+
String createStringForResponse(HttpResponse<String> response) {
75+
String headersForLog = getHeadersForLog(response.headers());
76+
77+
String bodyForLog = "***";
78+
if (response.body() == null || response.body().isEmpty()) {
79+
bodyForLog = "None";
80+
} else {
81+
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
82+
bodyForLog = response.body().toString();
83+
}
84+
}
85+
return String.format("HTTP %s Response: URL: %s,"
86+
+ " Response Headers: %s, status code: %s, Response Body: %s",
87+
response.request().method(),
88+
response.uri(),
89+
headersForLog,
90+
response.statusCode(),
91+
bodyForLog
92+
);
93+
}
94+
95+
private String createStringForExceptionResponse(HttpLookupSourceRequestEntry request, Exception e) {
96+
HttpRequest httpRequest = request.getHttpRequest();
97+
return String.format("HTTP %s Exception Response: URL: %s Exception %s",
98+
httpRequest.method(),
99+
httpRequest.uri(),
100+
e
101+
);
102+
}
103+
104+
String createStringForBody(String body) {
105+
String bodyForLog = "***";
106+
if (body == null || body.isEmpty()) {
107+
bodyForLog = "None";
108+
} else {
109+
if (this.httpLoggingLevelType != HttpLoggingLevelType.MIN) {
110+
bodyForLog = body.toString();
111+
}
112+
}
113+
return bodyForLog;
114+
}
115+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.getindata.connectors.http.internal;
2+
3+
public enum HttpLoggingLevelType {
4+
MIN,
5+
REQRESPONSE,
6+
MAX;
7+
8+
public static HttpLoggingLevelType valueOfStr(String code) {
9+
if (code == null) {
10+
return MIN;
11+
} else {
12+
return valueOf(code);
13+
}
14+
}
15+
}

src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ public final class HttpConnectorConfigConstants {
106106
public static final String SOURCE_PROXY_PASSWORD =
107107
SOURCE_LOOKUP_PREFIX + "proxy.password";
108108

109+
public static final String HTTP_LOGGING_LEVEL =
110+
GID_CONNECTOR_HTTP + "logging.level";
111+
109112
public static final String SINK_HTTP_TIMEOUT_SECONDS =
110113
GID_CONNECTOR_HTTP + "sink.request.timeout";
111114

@@ -118,6 +121,7 @@ public final class HttpConnectorConfigConstants {
118121
public static final String SINK_HTTP_WRITER_THREAD_POOL_SIZE =
119122
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";
120123

124+
121125
// -----------------------------------------------------
122126

123127

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/BatchRequestSubmitter.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,18 +58,18 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
5858
}
5959

6060
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
61-
String previousReqeustMethod = requestsToSubmit.get(0).method;
61+
String previousRequestMethod = requestsToSubmit.get(0).method;
6262
List<HttpSinkRequestEntry> requestBatch = new ArrayList<>(httpRequestBatchSize);
6363

6464
for (var entry : requestsToSubmit) {
6565
if (requestBatch.size() == httpRequestBatchSize
66-
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
66+
|| !previousRequestMethod.equalsIgnoreCase(entry.method)) {
6767
// break batch and submit
6868
responseFutures.add(sendBatch(endpointUrl, requestBatch));
6969
requestBatch.clear();
7070
}
7171
requestBatch.add(entry);
72-
previousReqeustMethod = entry.method;
72+
previousRequestMethod = entry.method;
7373
}
7474

7575
// submit anything that left
@@ -84,9 +84,9 @@ int getBatchSize() {
8484

8585
private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
8686
String endpointUrl,
87-
List<HttpSinkRequestEntry> reqeustBatch) {
87+
List<HttpSinkRequestEntry> requestBatch) {
8888

89-
HttpRequest httpRequest = buildHttpRequest(reqeustBatch, URI.create(endpointUrl));
89+
HttpRequest httpRequest = buildHttpRequest(requestBatch, URI.create(endpointUrl));
9090
return httpClient
9191
.sendAsync(
9292
httpRequest.getHttpRequest(),
@@ -102,19 +102,19 @@ private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
102102
);
103103
}
104104

105-
private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, URI endpointUri) {
105+
private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> requestBatch, URI endpointUri) {
106106

107107
try {
108-
var method = reqeustBatch.get(0).method;
109-
List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
108+
var method = requestBatch.get(0).method;
109+
List<byte[]> elements = new ArrayList<>(requestBatch.size());
110110

111111
BodyPublisher publisher;
112112
// By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
113113
// into the HTTP body without any context.
114114
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
115115
// at the end, and we separate every element with comma.
116116
elements.add(BATCH_START_BYTES);
117-
for (HttpSinkRequestEntry entry : reqeustBatch) {
117+
for (HttpSinkRequestEntry entry : requestBatch) {
118118
elements.add(entry.element);
119119
elements.add(BATCH_ELEMENT_DELIM_BYTES);
120120
}

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/JavaNetSinkHttpClient.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import com.getindata.connectors.http.HttpPostRequestCallback;
1616
import com.getindata.connectors.http.internal.HeaderPreprocessor;
17+
import com.getindata.connectors.http.internal.HttpLogger;
1718
import com.getindata.connectors.http.internal.SinkHttpClient;
1819
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
1920
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
@@ -40,6 +41,8 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {
4041

4142
private final RequestSubmitter requestSubmitter;
4243

44+
private final Properties properties;
45+
4346
public JavaNetSinkHttpClient(
4447
Properties properties,
4548
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
@@ -69,6 +72,7 @@ public JavaNetSinkHttpClient(
6972
properties,
7073
headersAndValues
7174
);
75+
this.properties = properties;
7276
}
7377

7478
@Override
@@ -98,10 +102,9 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
98102
for (var response : responses) {
99103
var sinkRequestEntry = response.getHttpRequest();
100104
var optResponse = response.getResponse();
101-
105+
HttpLogger.getHttpLogger(properties).logResponse(response.getResponse().get());
102106
httpPostRequestCallback.call(
103107
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
104-
105108
// TODO Add response processor here and orchestrate it with statusCodeChecker.
106109
if (optResponse.isEmpty() ||
107110
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {

src/main/java/com/getindata/connectors/http/internal/sink/httpclient/PerRequestSubmitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public PerRequestSubmitter(
2828
HttpClient httpClient) {
2929

3030
super(properties, headersAndValues, httpClient);
31+
3132
}
3233

3334
@Override
@@ -40,8 +41,7 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
4041

4142
for (var entry : requestToSubmit) {
4243
HttpRequest httpRequest = buildHttpRequest(entry, endpointUri);
43-
var response = httpClient
44-
.sendAsync(
44+
var response = httpClient.sendAsync(
4545
httpRequest.getHttpRequest(),
4646
HttpResponse.BodyHandlers.ofString())
4747
.exceptionally(ex -> {

0 commit comments

Comments
 (0)