StreamKafkaReader提供了kafka中流数据的能力。在底层实现上,StreamKafkaReader获取kafka对应topic下的数据,并转换为DataX传输协议传递给Writer。
kafka中的数据应为符合要求的json格式。
StreamKafkaReader实现了从kafka中读取数据并转为DataX协议的功能。目前StreamKafkaReader支持功能如下:
-
支持且仅支持读取符合要求的json格式,例如:[{"value":"hadh5" , "type":"string"} , {"value":324 , "type":"long"}]。
我们暂时不能做到:
-
在读取的同时会阻塞writer的写入,暂时无法处理
-
不能同时处理多个topic数据
-
jobkafkatomysql.json
{ "job": { "setting": { "speed": { "byte":10485760 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "streamkafkareader", "parameter": { "topic" : "testdata", "groupid" : "datax", "servers" : "localhost:9092", } }, "writer": { "name":"mysqlwriter", "parameter":{ "column":[ "name", "isreg", "number" ], "connection":[ { "jdbcUrl":"jdbc:mysql://localhost:3306/test2", "table":[ "testTable" ] } ], "password":"root", "username":"root" } } } ] } }
jobkafka.json
{ "job": { "setting": { "speed": { "byte":10485760 }, "errorLimit": { "record": 0, "percentage": 0.02 } }, "content": [ { "reader": { "name": "streamkafkareader", "parameter": { "topic" : "testdata", "groupid" : "datax", "servers" : "localhost:9092", } }, "writer": { "name": "streamkafkawriter", "parameter": { "topic" : "testdata2", "servers" : "localhost:9092", "print": true, "encoding": "UTF-8" } } } ] } }
-
topic
-
描述:从kafka对应的topic读取数据
-
必选:是
-
默认值:无
-
-
groupid
-
描述:kafka消费者的groupid
-
必选:是
-
默认值:无
-
-
servers
-
描述:kafka服务端的地址端口。
-
必选:是
-
默认值:无
-
-
执行以下命令后,会等待kafka通道中的数据,需要停止时,在命令行输入exit并回车,即停止从kafka读取数据
python datax.py jobkafkatomysql.json
使用命令行发送数据到kafka
datax控制台输入exit退出监听
数据写入mysql
python datax.py jobkafka.json
同理,这里需要使用exit停止kafka监听,此配置文件从topic(testdata)中读取数据,传入topic(testdata2)中。
向testdata中发送数据
从testdata2中接收数据
略
datax是用于离线数据同步的工具,kafka处理的是实时流数据,所以当以kafka作为数据源时,必须要等待kafkareader将需要的数据从kafka中读取完毕,才能使用writer插件写入要导入的数据库或文件系统。这里的暂时没有实现边读边写。
略