1313import org .apache .kafka .clients .consumer .KafkaConsumer ;
1414import org .apache .kafka .clients .producer .Producer ;
1515import org .apache .kafka .clients .producer .ProducerRecord ;
16- import org .apache .kafka .common .serialization .ByteArrayDeserializer ;
17- import org .apache .kafka .common .serialization .ByteArraySerializer ;
18- import org .apache .kafka .common .serialization .LongDeserializer ;
19- import org .apache .kafka .common .serialization .Serdes ;
16+ import org .apache .kafka .common .serialization .*;
2017import org .apache .kafka .common .serialization .Serdes .StringSerde ;
21- import org .apache .kafka .common .serialization .StringDeserializer ;
22- import org .apache .kafka .streams .KeyValue ;
23- import org .apache .kafka .streams .StreamsConfig ;
24- import org .apache .kafka .streams .TestInputTopic ;
25- import org .apache .kafka .streams .TestOutputTopic ;
26- import org .apache .kafka .streams .TopologyTestDriver ;
27- import org .apache .kafka .streams .kstream .Consumed ;
28- import org .apache .kafka .streams .kstream .KStream ;
29- import org .apache .kafka .streams .kstream .KTable ;
30- import org .apache .kafka .streams .kstream .Materialized ;
31- import org .apache .kafka .streams .kstream .Produced ;
32- import org .apache .kafka .streams .kstream .Transformer ;
18+ import org .apache .kafka .streams .*;
19+ import org .apache .kafka .streams .kstream .*;
3320import org .apache .kafka .streams .processor .ProcessorContext ;
3421import org .apache .kafka .streams .processor .WallclockTimestampExtractor ;
3522import org .apache .kafka .streams .test .TestRecord ;
36- import org .geotools .api .data .DataStoreFinder ;
37- import org .geotools .api .data .Query ;
38- import org .geotools .api .data .SimpleFeatureReader ;
39- import org .geotools .api .data .SimpleFeatureWriter ;
40- import org .geotools .api .data .Transaction ;
23+ import org .geotools .api .data .*;
4124import org .geotools .api .feature .simple .SimpleFeature ;
4225import org .geotools .api .feature .simple .SimpleFeatureType ;
43- import org .junit .AfterClass ;
4426import org .junit .Assert ;
45- import org .junit .BeforeClass ;
27+ import org .junit .ClassRule ;
4628import org .junit .Test ;
4729import org .locationtech .geomesa .features .ScalaSimpleFeature ;
30+ import org .locationtech .geomesa .kafka .KafkaContainerTest ;
4831import org .locationtech .geomesa .kafka .data .KafkaDataStore ;
4932import org .locationtech .geomesa .kafka .streams .GeoMesaMessage ;
5033import org .locationtech .geomesa .utils .geotools .FeatureUtils ;
5134import org .locationtech .geomesa .utils .geotools .SimpleFeatureTypes ;
5235import org .locationtech .geomesa .utils .geotools .converters .FastConverter ;
53- import org .slf4j .Logger ;
5436import org .slf4j .LoggerFactory ;
55- import org .testcontainers .containers .KafkaContainer ;
5637import org .testcontainers .containers .output .Slf4jLogConsumer ;
57- import org .testcontainers .utility . DockerImageName ;
38+ import org .testcontainers .kafka . KafkaContainer ;
5839
5940import java .nio .charset .StandardCharsets ;
6041import java .time .Duration ;
61- import java .util .ArrayList ;
62- import java .util .Arrays ;
63- import java .util .Collections ;
64- import java .util .Comparator ;
65- import java .util .Date ;
66- import java .util .HashMap ;
67- import java .util .List ;
68- import java .util .Map ;
69- import java .util .Properties ;
70- import java .util .Set ;
42+ import java .util .*;
7143import java .util .concurrent .ConcurrentHashMap ;
7244import java .util .stream .Collectors ;
7345
7446public class GeoMesaStreamsBuilderTest {
7547
76- private static final Logger logger = LoggerFactory .getLogger (GeoMesaStreamsBuilderTest .class );
77-
78- static KafkaContainer container = null ;
48+ @ ClassRule
49+ public static final KafkaContainer container =
50+ new KafkaContainer (KafkaContainerTest .KafkaImage ())
51+ .withLogConsumer (new Slf4jLogConsumer (LoggerFactory .getLogger ("kafka" )));
7952
8053 static final SimpleFeatureType sft =
8154 SimpleFeatureTypes .createImmutableType ("streams" , "name:String,age:Int,dtg:Date,*geom:Point:srid=4326" );
8255
8356 static final List <SimpleFeature > features = new ArrayList <>();
8457
85- static final Set <String > zkPaths = Collections .newSetFromMap (new ConcurrentHashMap <>());
58+ static final Set <String > catalogs = Collections .newSetFromMap (new ConcurrentHashMap <>());
8659
87- static String zookeepers () {
88- return String .format ("%s:%s" , container .getHost (), container .getMappedPort (KafkaContainer .ZOOKEEPER_PORT ));
89- }
9060 static String brokers () {
9161 return container .getBootstrapServers ();
9262 }
9363
94- public Map <String , String > getParams (String zkPath ) {
95- if (!zkPaths .add (zkPath )) {
96- throw new IllegalArgumentException ("zk path '" + zkPath + "' is reused between tests, may cause conflicts" );
97- }
98- Map <String , String > params = new HashMap <>();
99- params .put ("kafka.brokers" , brokers ());
100- params .put ("kafka.zookeepers" , zookeepers ());
101- params .put ("kafka.topic.partitions" , "1" );
102- params .put ("kafka.topic.replication" , "1" );
103- params .put ("kafka.consumer.read-back" , "Inf" );
104- params .put ("kafka.zk.path" , zkPath );
105- return params ;
106- }
107-
108- @ BeforeClass
109- public static void init () {
110- DockerImageName image =
111- DockerImageName .parse ("confluentinc/cp-kafka" )
112- .withTag (System .getProperty ("confluent.docker.tag" , "7.3.1" ));
113- container = new KafkaContainer (image );
114- container .start ();
115- container .followOutput (new Slf4jLogConsumer (logger ));
116-
64+ static {
11765 for (int i = 0 ; i < 10 ; i ++) {
11866 ScalaSimpleFeature sf = new ScalaSimpleFeature (sft , "id" + i , null , null );
11967 sf .setAttribute (0 , "name" + i );
@@ -124,11 +72,17 @@ public static void init() {
12472 }
12573 }
12674
127- @ AfterClass
128- public static void destroy () {
129- if (container != null ) {
130- container .stop ();
75+ public Map <String , String > getParams (String catalog ) {
76+ if (!catalogs .add (catalog )) {
77+ throw new IllegalArgumentException ("zk path '" + catalog + "' is reused between tests, may cause conflicts" );
13178 }
79+ Map <String , String > params = new HashMap <>();
80+ params .put ("kafka.brokers" , brokers ());
81+ params .put ("kafka.catalog.topic" , catalog );
82+ params .put ("kafka.topic.partitions" , "1" );
83+ params .put ("kafka.topic.replication" , "1" );
84+ params .put ("kafka.consumer.read-back" , "Inf" );
85+ return params ;
13286 }
13387
13488 @ Test
0 commit comments