74
74
import java .util .concurrent .atomic .AtomicBoolean ;
75
75
import java .util .stream .Collectors ;
76
76
import java .util .stream .IntStream ;
77
-
78
77
import org .jetbrains .annotations .NotNull ;
79
78
import org .junit .jupiter .api .BeforeAll ;
80
79
import org .junit .jupiter .api .Test ;
@@ -205,75 +204,80 @@ void receivesMessage() throws Exception {
205
204
void observesMessage () throws Exception {
206
205
String messageBody = "observesMessage-payload" ;
207
206
SendResult <Object > sendResult = sqsTemplate
208
- .send (to -> to .queue (OBSERVES_MESSAGE_QUEUE_NAME ).payload (messageBody ));
207
+ .send (to -> to .queue (OBSERVES_MESSAGE_QUEUE_NAME ).payload (messageBody ));
209
208
logger .debug ("Sent message to queue {} with messageBody {}" , OBSERVES_MESSAGE_QUEUE_NAME , messageBody );
210
209
assertThat (latchContainer .observesMessageLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
211
- await ()
212
- .atMost (10 , TimeUnit .SECONDS )
213
- .untilAsserted (() ->
214
- TestObservationRegistryAssert .then (observationRegistry )
215
- .hasHandledContextsThatSatisfy (contexts -> {
216
- ObservationContextAssert .then (getContextWithContextualNameEqualTo (contexts , "observes_message_test_queue send" ))
210
+ await ().atMost (10 , TimeUnit .SECONDS ).untilAsserted (() -> TestObservationRegistryAssert .then (observationRegistry )
211
+ .hasHandledContextsThatSatisfy (contexts -> {
212
+ ObservationContextAssert
213
+ .then (getContextWithContextualNameEqualTo (contexts , "observes_message_test_queue send" ))
217
214
.hasNameEqualTo ("spring.aws.sqs.template" )
218
215
.isInstanceOf (SqsTemplateObservation .Context .class )
219
216
.hasLowCardinalityKeyValue (
220
- AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_OPERATION
221
- .asString (),
222
- "publish" )
217
+ AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_OPERATION
218
+ .asString (),
219
+ "publish" )
223
220
.hasLowCardinalityKeyValue (
224
- AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_DESTINATION_NAME
225
- .asString (),
226
- OBSERVES_MESSAGE_QUEUE_NAME )
221
+ AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_DESTINATION_NAME
222
+ .asString (),
223
+ OBSERVES_MESSAGE_QUEUE_NAME )
227
224
.hasLowCardinalityKeyValue (
228
- AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_DESTINATION_KIND
229
- .asString (),
230
- "queue" )
225
+ AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_DESTINATION_KIND
226
+ .asString (),
227
+ "queue" )
231
228
.hasLowCardinalityKeyValue (
232
- AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_SYSTEM
233
- .asString (),
234
- "sqs" )
229
+ AbstractTemplateObservation .Documentation .LowCardinalityTags .MESSAGING_SYSTEM
230
+ .asString (),
231
+ "sqs" )
235
232
.hasHighCardinalityKeyValue (
236
- AbstractTemplateObservation .Documentation .HighCardinalityTags .MESSAGE_ID .asString (),
237
- sendResult .messageId ().toString ())
233
+ AbstractTemplateObservation .Documentation .HighCardinalityTags .MESSAGE_ID .asString (),
234
+ sendResult .messageId ().toString ())
238
235
.doesNotHaveParentObservation ();
239
- ObservationContextAssert .then (getContextWithContextualNameEqualTo (contexts , "observes_message_test_queue receive" ))
236
+ ObservationContextAssert
237
+ .then (getContextWithContextualNameEqualTo (contexts , "observes_message_test_queue receive" ))
240
238
.hasNameEqualTo ("spring.aws.sqs.listener" )
241
239
.isInstanceOf (SqsListenerObservation .Context .class )
242
240
.hasLowCardinalityKeyValue (
243
- AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_OPERATION
244
- .asString (),
245
- "receive" )
241
+ AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_OPERATION
242
+ .asString (),
243
+ "receive" )
246
244
.hasLowCardinalityKeyValue (
247
- AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SOURCE_NAME
248
- .asString (),
249
- "observes_message_test_queue" )
245
+ AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SOURCE_NAME
246
+ .asString (),
247
+ "observes_message_test_queue" )
250
248
.hasLowCardinalityKeyValue (
251
- AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SOURCE_KIND
252
- .asString (),
253
- "queue" )
249
+ AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SOURCE_KIND
250
+ .asString (),
251
+ "queue" )
254
252
.hasLowCardinalityKeyValue (
255
- AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SYSTEM
256
- .asString (),
257
- "sqs" )
253
+ AbstractListenerObservation .Documentation .LowCardinalityTags .MESSAGING_SYSTEM
254
+ .asString (),
255
+ "sqs" )
258
256
.hasHighCardinalityKeyValue (
259
- AbstractListenerObservation .Documentation .HighCardinalityTags .MESSAGE_ID .asString (),
260
- sendResult .messageId ().toString ())
257
+ AbstractListenerObservation .Documentation .HighCardinalityTags .MESSAGE_ID .asString (),
258
+ sendResult .messageId ().toString ())
261
259
.doesNotHaveHighCardinalityKeyValueWithKey (
262
- SqsListenerObservation .Documentation .HighCardinalityTags .MESSAGE_GROUP_ID
263
- .asString ())
260
+ SqsListenerObservation .Documentation .HighCardinalityTags .MESSAGE_GROUP_ID
261
+ .asString ())
264
262
.doesNotHaveParentObservation ();
265
- ObservationContextAssert .then (getContextWithName (contexts , "listener.process" ))
263
+ ObservationContextAssert .then (getContextWithName (contexts , "listener.process" ))
266
264
.hasParentObservationContextMatching (
267
- contextView -> contextView .getName ().equals ("spring.aws.sqs.listener" ));
268
- }));
265
+ contextView -> contextView .getName ().equals ("spring.aws.sqs.listener" ));
266
+ }));
269
267
}
270
268
271
269
private Observation .@ NotNull Context getContextWithName (List <Observation .Context > contexts , String name ) {
272
- return contexts .stream ().filter (context -> context .getName ().equals (name )).findFirst ().orElseThrow (() -> new AssertionError ("Could not find context with name " + name ));
270
+ return contexts .stream ().filter (context -> context .getName ().equals (name )).findFirst ()
271
+ .orElseThrow (() -> new AssertionError ("Could not find context with name " + name ));
273
272
}
274
273
275
- private Observation .@ NotNull Context getContextWithContextualNameEqualTo (List <Observation .Context > contexts , String contextualName ) {
276
- return contexts .stream ().filter (context -> context .getContextualName () != null && context .getContextualName ().equals (contextualName )).findFirst ().orElseThrow (() -> new AssertionError ("Could not find context with contextual name " + contextualName ));
274
+ private Observation .@ NotNull Context getContextWithContextualNameEqualTo (List <Observation .Context > contexts ,
275
+ String contextualName ) {
276
+ return contexts .stream ()
277
+ .filter (context -> context .getContextualName () != null
278
+ && context .getContextualName ().equals (contextualName ))
279
+ .findFirst ()
280
+ .orElseThrow (() -> new AssertionError ("Could not find context with contextual name " + contextualName ));
277
281
}
278
282
279
283
@ Test
@@ -282,27 +286,25 @@ void observesError() throws Exception {
282
286
sqsTemplate .send (to -> to .queue (OBSERVES_ERROR_QUEUE_NAME ).payload (messageBody ));
283
287
logger .debug ("Sent message to queue {} with messageBody {}" , OBSERVES_ERROR_QUEUE_NAME , messageBody );
284
288
assertThat (latchContainer .observesErrorLatch .await (10 , TimeUnit .SECONDS )).isTrue ();
285
- await ()
286
- .atMost (10 , TimeUnit .SECONDS )
287
- .untilAsserted (() ->
288
- TestObservationRegistryAssert .then (observationRegistry )
289
- .hasHandledContextsThatSatisfy (contexts -> {
290
- ObservationContextAssert .then (getContextWithContextualNameEqualTo (contexts , "observes_error_test_queue send" )).hasNameEqualTo ("spring.aws.sqs.template" )
291
- .isInstanceOf (AbstractTemplateObservation .Context .class )
292
- .doesNotHaveParentObservation ();
293
- List <Observation .Context > receivingContexts = contexts .stream ()
294
- .filter (context -> context .getContextualName () != null && context .getContextualName ().equals ("observes_error_test_queue receive" ))
289
+ await ().atMost (10 , TimeUnit .SECONDS ).untilAsserted (() -> TestObservationRegistryAssert .then (observationRegistry )
290
+ .hasHandledContextsThatSatisfy (contexts -> {
291
+ ObservationContextAssert
292
+ .then (getContextWithContextualNameEqualTo (contexts , "observes_error_test_queue send" ))
293
+ .hasNameEqualTo ("spring.aws.sqs.template" )
294
+ .isInstanceOf (AbstractTemplateObservation .Context .class ).doesNotHaveParentObservation ();
295
+ List <Observation .Context > receivingContexts = contexts .stream ()
296
+ .filter (context -> context .getContextualName () != null
297
+ && context .getContextualName ().equals ("observes_error_test_queue receive" ))
295
298
.toList ();
296
- ObservationContextAssert .then (receivingContexts .get (0 )).hasNameEqualTo ("spring.aws.sqs.listener" )
297
- .isInstanceOf (AbstractListenerObservation .Context .class )
298
- .doesNotHaveParentObservation (). assertThatError ().isInstanceOf (RuntimeException .class )
299
+ ObservationContextAssert .then (receivingContexts .get (0 )).hasNameEqualTo ("spring.aws.sqs.listener" )
300
+ .isInstanceOf (AbstractListenerObservation .Context .class ). doesNotHaveParentObservation ()
301
+ .assertThatError ().isInstanceOf (RuntimeException .class )
299
302
.hasMessage ("Expected exception from observes-error" );
300
- ObservationContextAssert .then (receivingContexts .get (1 )).hasNameEqualTo ("spring.aws.sqs.listener" )
303
+ ObservationContextAssert .then (receivingContexts .get (1 )).hasNameEqualTo ("spring.aws.sqs.listener" )
301
304
.isInstanceOf (AbstractListenerObservation .Context .class )
302
305
.hasContextualNameEqualTo ("observes_error_test_queue receive" )
303
306
.doesNotHaveParentObservation ().doesNotHaveError ();
304
- })
305
- );
307
+ }));
306
308
}
307
309
308
310
@ Test
0 commit comments