Skip to content

Commit

Permalink
Merge pull request #63 from tulios/undefined-partitions-on-produce-re…
Browse files Browse the repository at this point in the history
…try-#62

 Prevent crash when re-producing after metadata refresh
  • Loading branch information
tulios authored May 18, 2018
2 parents 4d71d3d + d5b3dfc commit 6f86a56
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.0.1] - 2018-05-18
### Fixed
- Prevent crash when re-producing after metadata refresh #62

## [1.0.0] - 2018-05-14
## Changed
- Updated readme
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kafkajs",
"version": "1.0.0",
"version": "1.0.1",
"description": "A modern Apache Kafka client for node.js",
"author": "Tulio Ornelas <[email protected]>",
"main": "index.js",
Expand Down
9 changes: 7 additions & 2 deletions src/producer/sendMessages.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,13 @@ module.exports = ({ logger, cluster, partitioner }) => {
const partitions = partitionsPerLeader[broker.nodeId]
const topicData = createTopicData({ topic, partitions, messagesPerPartition })

const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
try {
const response = await broker.produce({ acks, timeout, compression, topicData })
responsePerBroker.set(broker, responseSerializer(response))
} catch (e) {
responsePerBroker.delete(broker)
throw e
}
})
}

Expand Down
30 changes: 29 additions & 1 deletion src/producer/sendMessages.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ describe('Producer > sendMessages', () => {
1: { nodeId: 1, produce: jest.fn(() => createProducerResponse(topic, 0)) },
2: { nodeId: 2, produce: jest.fn(() => createProducerResponse(topic, 1)) },
3: { nodeId: 3, produce: jest.fn(() => createProducerResponse(topic, 2)) },
4: { nodeId: 4, produce: jest.fn(() => createProducerResponse(topic, 1)) },
}
cluster = {
addTargetTopic: jest.fn(),
Expand Down Expand Up @@ -75,8 +76,8 @@ describe('Producer > sendMessages', () => {
expect(brokers[2].produce).toHaveBeenCalledTimes(1)
expect(brokers[3].produce).toHaveBeenCalledTimes(3)
expect(response).toEqual([
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '1', partition: 1, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '2', partition: 2, timestamp: '-1', topicName: 'topic-name' },
])
})
Expand Down Expand Up @@ -108,4 +109,31 @@ describe('Producer > sendMessages', () => {
expect(cluster.refreshMetadata).toHaveBeenCalled()
})
}

test('does not re-produce messages to brokers that are no longer leaders after metadata refresh', async () => {
const sendMessages = createSendMessages({ logger: newLogger(), cluster, partitioner })

brokers[2].produce
.mockImplementationOnce(() => {
const e = new Error('Some error broker 1')
e.type = 'NOT_LEADER_FOR_PARTITION'
throw e
})
.mockImplementationOnce(() => createProducerResponse(topic, 0))
cluster.findLeaderForPartitions
.mockImplementationOnce(() => partitionsPerLeader)
.mockImplementationOnce(() => ({
1: [0],
4: [1], // Broker 4 replaces broker 2 as leader for partition 1
3: [2],
}))

const response = await sendMessages({ topic, messages })

expect(response).toEqual([
{ errorCode: 0, offset: '0', partition: 0, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '2', partition: 2, timestamp: '-1', topicName: 'topic-name' },
{ errorCode: 0, offset: '1', partition: 1, timestamp: '-1', topicName: 'topic-name' },
])
})
})

0 comments on commit 6f86a56

Please sign in to comment.