Skip to content

Commit 1431cad

Browse files
authored
Enhance gRPC server side async support. (#741)
Change context and parent entry span propagation mechanism from gRPC ThreadLocal context to SkyWalking native dynamic field as new propagation mechanism, to better support async scenarios.
1 parent b4ad5b1 commit 1431cad

File tree

4 files changed

+37
-21
lines changed

4 files changed

+37
-21
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ Release Notes.
2323
* [doc] Add Spring Gateway Plugin document
2424
* [doc] Add 4 menu items guiding users to find important notices for Spring Annotation Plugin, Custom Trace Ignoring
2525
Plugin, Kotlin Coroutine Plugin, and Spring Gateway Plugin
26+
* Change context and parent entry span propagation mechanism from gRPC ThreadLocal context to SkyWalking native dynamic
27+
field as new propagation mechanism, to better support async scenarios.
2628

2729
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/222?closed=1)
2830

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/ServerInterceptor.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,6 @@
3636

3737
public class ServerInterceptor implements io.grpc.ServerInterceptor {
3838

39-
static final Context.Key<ContextSnapshot> CONTEXT_SNAPSHOT_KEY = Context.key("skywalking-grpc-context-snapshot");
40-
static final Context.Key<AbstractSpan> ACTIVE_SPAN_KEY = Context.key("skywalking-grpc-active-span");
41-
4239
@Override
4340
public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall<REQUEST, RESPONSE> call,
4441
Metadata headers, ServerCallHandler<REQUEST, RESPONSE> handler) {
@@ -59,15 +56,17 @@ public <REQUEST, RESPONSE> ServerCall.Listener<REQUEST> interceptCall(ServerCall
5956
ContextSnapshot contextSnapshot = ContextManager.capture();
6057
AbstractSpan asyncSpan = span.prepareForAsync();
6158

62-
Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);
59+
//Context context = Context.current().withValues(CONTEXT_SNAPSHOT_KEY, contextSnapshot, ACTIVE_SPAN_KEY, asyncSpan);
6360

6461
ServerCall.Listener<REQUEST> listener = Contexts.interceptCall(
65-
context,
66-
new TracingServerCall<>(call),
62+
Context.current(),
63+
new TracingServerCall<>(call, contextSnapshot, asyncSpan),
6764
headers,
6865
(serverCall, metadata) -> new TracingServerCallListener<>(
6966
handler.startCall(serverCall, metadata),
70-
serverCall.getMethodDescriptor()
67+
serverCall.getMethodDescriptor(),
68+
contextSnapshot,
69+
asyncSpan
7170
)
7271
);
7372
ContextManager.stopSpan(asyncSpan);

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCall.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,38 +18,46 @@
1818

1919
package org.apache.skywalking.apm.plugin.grpc.v1.server;
2020

21-
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_CLOSE_OPERATION_NAME;
22-
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_MESSAGE_OPERATION_NAME;
23-
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.SERVER;
24-
2521
import io.grpc.ForwardingServerCall;
2622
import io.grpc.Metadata;
2723
import io.grpc.ServerCall;
2824
import io.grpc.Status;
2925
import org.apache.skywalking.apm.agent.core.context.ContextManager;
26+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
3027
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
3128
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
3229
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
3330
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
3431
import org.apache.skywalking.apm.plugin.grpc.v1.OperationNameFormatUtil;
3532

33+
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_CLOSE_OPERATION_NAME;
34+
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.RESPONSE_ON_MESSAGE_OPERATION_NAME;
35+
import static org.apache.skywalking.apm.plugin.grpc.v1.Constants.SERVER;
36+
3637
public class TracingServerCall<REQUEST, RESPONSE> extends ForwardingServerCall.SimpleForwardingServerCall<REQUEST, RESPONSE> {
3738

3839
private final String operationPrefix;
40+
private final ContextSnapshot contextSnapshot;
41+
private final AbstractSpan parentEntrySpan;
3942

40-
protected TracingServerCall(ServerCall<REQUEST, RESPONSE> delegate) {
43+
protected TracingServerCall(ServerCall<REQUEST, RESPONSE> delegate,
44+
final ContextSnapshot contextSnapshot,
45+
final AbstractSpan parentEntrySpan) {
4146
super(delegate);
4247
this.operationPrefix = OperationNameFormatUtil.formatOperationName(delegate.getMethodDescriptor()) + SERVER;
48+
this.contextSnapshot = contextSnapshot;
49+
this.parentEntrySpan = parentEntrySpan;
4350
}
4451

4552
@Override
4653
public void sendMessage(RESPONSE message) {
4754
// We just create the request on message span for server stream calls.
4855
if (!getMethodDescriptor().getType().serverSendsOneMessage()) {
49-
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
56+
final AbstractSpan span = ContextManager.createLocalSpan(
57+
operationPrefix + RESPONSE_ON_MESSAGE_OPERATION_NAME);
5058
span.setComponent(ComponentsDefine.GRPC);
5159
span.setLayer(SpanLayer.RPC_FRAMEWORK);
52-
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
60+
ContextManager.continued(contextSnapshot);
5361
try {
5462
super.sendMessage(message);
5563
} catch (Throwable t) {
@@ -68,7 +76,7 @@ public void close(Status status, Metadata trailers) {
6876
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + RESPONSE_ON_CLOSE_OPERATION_NAME);
6977
span.setComponent(ComponentsDefine.GRPC);
7078
span.setLayer(SpanLayer.RPC_FRAMEWORK);
71-
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
79+
ContextManager.continued(contextSnapshot);
7280
switch (status.getCode()) {
7381
case OK:
7482
break;
@@ -94,7 +102,7 @@ public void close(Status status, Metadata trailers) {
94102
break;
95103
}
96104
Tags.RPC_RESPONSE_STATUS_CODE.set(span, status.getCode().name());
97-
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
105+
parentEntrySpan.asyncFinish();
98106

99107
try {
100108
super.close(status, trailers);

apm-sniffer/apm-sdk-plugin/grpc-1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/grpc/v1/server/TracingServerCallListener.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.grpc.MethodDescriptor;
2323
import io.grpc.ServerCall;
2424
import org.apache.skywalking.apm.agent.core.context.ContextManager;
25+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
2526
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
2627
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
2728
import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
@@ -34,11 +35,17 @@
3435
public class TracingServerCallListener<REQUEST> extends ForwardingServerCallListener.SimpleForwardingServerCallListener<REQUEST> {
3536
private final MethodDescriptor.MethodType methodType;
3637
private final String operationPrefix;
38+
private final ContextSnapshot contextSnapshot;
39+
private final AbstractSpan parentEntrySpan;
3740

38-
protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor) {
41+
protected TracingServerCallListener(ServerCall.Listener<REQUEST> delegate, MethodDescriptor<REQUEST, ?> descriptor,
42+
final ContextSnapshot contextSnapshot,
43+
final AbstractSpan parentEntrySpan) {
3944
super(delegate);
4045
this.methodType = descriptor.getType();
4146
this.operationPrefix = OperationNameFormatUtil.formatOperationName(descriptor) + SERVER;
47+
this.contextSnapshot = contextSnapshot;
48+
this.parentEntrySpan = parentEntrySpan;
4249
}
4350

4451
@Override
@@ -48,7 +55,7 @@ public void onMessage(REQUEST message) {
4855
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_MESSAGE_OPERATION_NAME);
4956
span.setComponent(ComponentsDefine.GRPC);
5057
span.setLayer(SpanLayer.RPC_FRAMEWORK);
51-
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
58+
ContextManager.continued(contextSnapshot);
5259
try {
5360
super.onMessage(message);
5461
} catch (Throwable t) {
@@ -67,15 +74,15 @@ public void onCancel() {
6774
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_CANCEL_OPERATION_NAME);
6875
span.setComponent(ComponentsDefine.GRPC);
6976
span.setLayer(SpanLayer.RPC_FRAMEWORK);
70-
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
77+
ContextManager.continued(contextSnapshot);
7178
try {
7279
super.onCancel();
7380
} catch (Throwable t) {
7481
ContextManager.activeSpan().log(t);
7582
throw t;
7683
} finally {
7784
ContextManager.stopSpan();
78-
ServerInterceptor.ACTIVE_SPAN_KEY.get().asyncFinish();
85+
parentEntrySpan.asyncFinish();
7986
}
8087
}
8188

@@ -84,7 +91,7 @@ public void onHalfClose() {
8491
final AbstractSpan span = ContextManager.createLocalSpan(operationPrefix + REQUEST_ON_HALF_CLOSE_OPERATION_NAME);
8592
span.setComponent(ComponentsDefine.GRPC);
8693
span.setLayer(SpanLayer.RPC_FRAMEWORK);
87-
ContextManager.continued(ServerInterceptor.CONTEXT_SNAPSHOT_KEY.get());
94+
ContextManager.continued(contextSnapshot);
8895
try {
8996
super.onHalfClose();
9097
} catch (Throwable t) {

0 commit comments

Comments
 (0)