@@ -365,4 +365,126 @@ void seekToOffsetAndReceive(VertxTestContext context) throws InterruptedExceptio
365
365
assertTrue (context .awaitCompletion (TEST_TIMEOUT , TimeUnit .SECONDS ));
366
366
}
367
367
368
+ @ Test
369
+ void seekToBeginningMultipleTopicsWithNotSuscribedTopic (VertxTestContext context ) throws InterruptedException , ExecutionException , TimeoutException {
370
+ String subscribedTopic = "seekToBeginningSubscribedTopic" ;
371
+ String notSubscribedTopic = "seekToBeginningNotSubscribedTopic" ;
372
+ kafkaCluster .createTopic (subscribedTopic , 1 , 1 );
373
+ kafkaCluster .createTopic (notSubscribedTopic , 1 , 1 );
374
+
375
+ JsonObject jsonConsumer = new JsonObject ()
376
+ .put ("name" , name )
377
+ .put ("format" , "json" );
378
+
379
+ JsonObject topics = new JsonObject ()
380
+ .put ("topics" , new JsonArray ().add (subscribedTopic ));
381
+
382
+ // create consumer
383
+ // subscribe to a topic
384
+ consumerService ()
385
+ .createConsumer (context , groupId , jsonConsumer )
386
+ .subscribeConsumer (context , groupId , name , topics );
387
+
388
+ CompletableFuture <Boolean > consume = new CompletableFuture <>();
389
+
390
+ // poll to subscribe
391
+ consumerService ()
392
+ .consumeRecordsRequest (groupId , name , BridgeContentType .KAFKA_JSON_JSON )
393
+ .as (BodyCodec .jsonObject ())
394
+ .send (ar -> consume .complete (true ));
395
+
396
+ consume .get (TEST_TIMEOUT , TimeUnit .SECONDS );
397
+
398
+ // seek
399
+ JsonArray partitions = new JsonArray ();
400
+ partitions .add (new JsonObject ().put ("topic" , subscribedTopic ).put ("partition" , 0 ));
401
+ partitions .add (new JsonObject ().put ("topic" , notSubscribedTopic ).put ("partition" , 0 ));
402
+
403
+ JsonObject root = new JsonObject ();
404
+ root .put ("partitions" , partitions );
405
+
406
+ CompletableFuture <Boolean > seek = new CompletableFuture <>();
407
+ seekService ().positionsBeginningRequest (groupId , name , root )
408
+ .sendJsonObject (root , ar -> {
409
+ context .verify (() -> {
410
+ assertTrue (ar .succeeded ());
411
+ HttpResponse <JsonObject > response = ar .result ();
412
+ HttpBridgeError error = HttpBridgeError .fromJson (response .body ());
413
+ assertEquals (HttpResponseStatus .NOT_FOUND .code (), response .statusCode ());
414
+ assertEquals (HttpResponseStatus .NOT_FOUND .code (), error .getCode ());
415
+ assertEquals ("No current assignment for partition " + notSubscribedTopic + "-0" , error .getMessage ());
416
+ });
417
+ seek .complete (true );
418
+ });
419
+ seek .get (TEST_TIMEOUT , TimeUnit .SECONDS );
420
+
421
+ // consumer deletion
422
+ consumerService ()
423
+ .deleteConsumer (context , groupId , name );
424
+
425
+ context .completeNow ();
426
+ assertTrue (context .awaitCompletion (TEST_TIMEOUT , TimeUnit .SECONDS ));
427
+ }
428
+
429
+ @ Test
430
+ void seekToOffsetMultipleTopicsWithNotSuscribedTopic (VertxTestContext context ) throws InterruptedException , ExecutionException , TimeoutException {
431
+ String subscribedTopic = "seekToOffseSubscribedTopic" ;
432
+ String notSubscribedTopic = "seekToOffsetNotSubscribedTopic" ;
433
+ kafkaCluster .createTopic (subscribedTopic , 1 , 1 );
434
+ kafkaCluster .createTopic (notSubscribedTopic , 1 , 1 );
435
+
436
+ JsonObject jsonConsumer = new JsonObject ()
437
+ .put ("name" , name )
438
+ .put ("format" , "json" );
439
+
440
+ JsonObject topics = new JsonObject ()
441
+ .put ("topics" , new JsonArray ().add (subscribedTopic ));
442
+
443
+ // create consumer
444
+ // subscribe to a topic
445
+ consumerService ()
446
+ .createConsumer (context , groupId , jsonConsumer )
447
+ .subscribeConsumer (context , groupId , name , topics );
448
+
449
+ CompletableFuture <Boolean > consume = new CompletableFuture <>();
450
+
451
+ // poll to subscribe
452
+ consumerService ()
453
+ .consumeRecordsRequest (groupId , name , BridgeContentType .KAFKA_JSON_JSON )
454
+ .as (BodyCodec .jsonObject ())
455
+ .send (ar -> consume .complete (true ));
456
+
457
+ consume .get (TEST_TIMEOUT , TimeUnit .SECONDS );
458
+
459
+ CompletableFuture <Boolean > seek = new CompletableFuture <>();
460
+ // seek
461
+ JsonArray offsets = new JsonArray ();
462
+ offsets .add (new JsonObject ().put ("topic" , subscribedTopic ).put ("partition" , 0 ).put ("offset" , 0 ));
463
+ offsets .add (new JsonObject ().put ("topic" , notSubscribedTopic ).put ("partition" , 0 ).put ("offset" , 0 ));
464
+
465
+ JsonObject root = new JsonObject ();
466
+ root .put ("offsets" , offsets );
467
+
468
+ seekService ()
469
+ .positionsRequest (groupId , name , root )
470
+ .sendJsonObject (root , ar -> {
471
+ context .verify (() -> {
472
+ assertTrue (ar .succeeded ());
473
+ HttpResponse <JsonObject > response = ar .result ();
474
+ HttpBridgeError error = HttpBridgeError .fromJson (response .body ());
475
+ assertEquals (HttpResponseStatus .NOT_FOUND .code (), response .statusCode ());
476
+ assertEquals (HttpResponseStatus .NOT_FOUND .code (), error .getCode ());
477
+ assertEquals ("No current assignment for partition " + notSubscribedTopic + "-0" , error .getMessage ());
478
+ });
479
+ seek .complete (true );
480
+ });
481
+ seek .get (TEST_TIMEOUT , TimeUnit .SECONDS );
482
+
483
+ // consumer deletion
484
+ consumerService ()
485
+ .deleteConsumer (context , groupId , name );
486
+
487
+ context .completeNow ();
488
+ assertTrue (context .awaitCompletion (TEST_TIMEOUT , TimeUnit .SECONDS ));
489
+ }
368
490
}
0 commit comments