Skip to content

unable to connect fromconsumer to DB2 #71

@workingMids

Description

@workingMids

Hi,
I am using this connector to read messages from consumer and write those into DB2 with some checks.
I am calling the runSinkConnector function by pasing config & implemented the etl function but it's not being called.

var config = {
  kafka: {
    //zkConStr: "localhost:2181/",
    kafkaHost: brokaerlist,
    logger: null,
    groupId: 'kafka',
    clientName: "kc-sequelize-test-name",
    workerPerPartition: 1,
    options: {
      sessionTimeout: 8000,
      protocol: ["roundrobin"],
      fromOffset: "earliest", //latest
      fetchMaxBytes: 1024 * 100,
      fetchMinBytes: 1,
      fetchMaxWaitMs: 10,
      heartbeatInterval: 250,
      retryMinTimeout: 250,
      requireAcks: 1,
      //ackTimeoutMs: 100,
      //partitionerType: 3
    }
  },
  topic: topic,
  partitions: 1,
  maxTasks: 1,
  pollInterval: 2000,
  produceKeyed: true,
  produceCompressionType: 0,
  connector: {
    options: {
      host: "XXXX",
      port: 50001,
      dialect: "sqlite",
      pool: {
        max: 5,
        min: 0,
        idle: 10000
      },
      storage: null
    },
    database: "XXX",
    user: "XXXX",
    password: "XXXX",
    maxPollCount: 50,
    table: "table nameXXXX",
    incrementingColumnName: "id"
  },
  http: {
    port: 8000,
    middlewares: []
  },
  enableMetrics: true,
  batch: {
    batchSize: 100,
    commitEveryNBatch: 1,
    concurrency: 1,
    commitSync: true
  }
};
const etlFunc = (messageValue, callback) => {

  //type is an example json format field
  if (messageValue.type === "publish") {
    console.log(messageValue)
    return callback(null, {
      
      id: messageValue.payload.id,
      name: messageValue.payload.name
    });
  }

  if (messageValue.type === "unpublish") {
    return callback(null, null); //null value will cause deletion
  }

  callback(new Error("unknown messageValue.type"));
};

const converter = ConverterFactory.createSinkSchemaConverter(tableSchema, etlFunc);

runSinkConnector(config, [converter], onError).then(config => {
  //runs forever until: config.stop();
});
var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': config1.broker_List,
  'security.protocol': 'SASL_SSL',
  'ssl.endpoint.identification.algorithm': 'https',
  'sasl.mechanism': 'PLAIN',
  'sasl.username': config1.sasl_username,
  'sasl.password': config1.sasl_password,
  'enable.auto.commit': false,
  'rebalance_cb': function (err, assignment) {

    if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
      // Note: this can throw when you are disconnected. Take care and wrap it in
      // a try catch if that matters to you
      this.assign(assignment);
    } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
      // Same as above
      this.unassign();
    } else {
      // We had a real error
      console.error(err);
    }

  }
}, {
});
consumer.on('data', async function (data) {

  if (data && typeof data !== 'undefined') {
    try {
     
      consumer.pause([topic]);
      // function to persist the data
      await processMessage(data, data.offset);
      consumer.commit();
      consumer.resume([topic]);
    } catch (error) {
      console.log('data consuming error', error);
    }
  }
});
```processMessage is function to strore records into DB2 with some checks.this message is being called but query is not executing.
Am I using this connector in right way?
I didn't understand how to use connector for my task.
Could you write  more detail about use custom broker and describe your test example.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions