Skip to content

Commit 71d3ee5

Browse files
authored
Fix: better (Redis) exception handling (#89)
Redis doesn't wrap all of the errors e.g. (retriable) IOError might be raised from an IO write (#88). With the refactored error handling we make sure these are logged and retried instead of causing pipeline crashes. Also not all specs from the suite were run on the CI, as some are tagged with redis. These require a real Redis server, no reason not to run them against CI as well.
1 parent dba90d8 commit 71d3ee5

File tree

7 files changed

+174
-53
lines changed

7 files changed

+174
-53
lines changed

.ci/docker-compose.override.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
version: '3'
2+
3+
services:
4+
logstash:
5+
network_mode: host

.ci/run.sh

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#!/bin/bash
2+
# This is intended to be run inside the docker container as the command of the docker-compose.
3+
4+
env
5+
6+
set -ex
7+
8+
jruby -rbundler/setup -S rspec -fd
9+
10+
jruby -rbundler/setup -S rspec -fd --tag redis

.travis.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,13 @@
11
import:
2-
- logstash-plugins/.ci:travis/[email protected]
2+
- logstash-plugins/.ci:travis/[email protected]
3+
4+
addons:
5+
apt:
6+
sources:
7+
- sourceline: 'ppa:chris-lea/redis-server'
8+
packages:
9+
- redis-server
10+
11+
before_install:
12+
- sudo service redis-server stop
13+
- sudo service redis-server start --bind 0.0.0.0

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 3.7.0
2+
- Fix: better (Redis) exception handling [#89](https://github.com/logstash-plugins/logstash-input-redis/pull/89)
3+
- Test: start running integration specs on CI
4+
15
## 3.6.1
26
- Fix: resolve crash when commands_map is set [#86](https://github.com/logstash-plugins/logstash-input-redis/pull/86)
37

lib/logstash/inputs/redis.rb

Lines changed: 62 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -107,26 +107,22 @@ def is_list_type?
107107

108108
# private
109109
def redis_params
110+
params = {
111+
:timeout => @timeout,
112+
:db => @db,
113+
:password => @password.nil? ? nil : @password.value,
114+
:ssl => @ssl
115+
}
116+
110117
if @path.nil?
111-
connectionParams = {
112-
:host => @host,
113-
:port => @port
114-
}
118+
params[:host] = @host
119+
params[:port] = @port
115120
else
116121
@logger.warn("Parameter 'path' is set, ignoring parameters: 'host' and 'port'")
117-
connectionParams = {
118-
:path => @path
119-
}
122+
params[:path] = @path
120123
end
121124

122-
baseParams = {
123-
:timeout => @timeout,
124-
:db => @db,
125-
:password => @password.nil? ? nil : @password.value,
126-
:ssl => @ssl
127-
}
128-
129-
return connectionParams.merge(baseParams)
125+
params
130126
end
131127

132128
def new_redis_instance
@@ -174,9 +170,12 @@ def queue_event(msg, output_queue, channel=nil)
174170

175171
# private
176172
def list_stop
177-
return if @redis.nil? || !@redis.connected?
173+
redis = @redis # might change during method invocation
174+
return if redis.nil? || !redis.connected?
178175

179-
@redis.quit rescue nil
176+
redis.quit rescue nil # does client.disconnect internally
177+
# check if input retried while executing
178+
list_stop unless redis.equal? @redis
180179
@redis = nil
181180
end
182181

@@ -186,15 +185,9 @@ def list_runner(output_queue)
186185
begin
187186
@redis ||= connect
188187
@list_method.call(@redis, output_queue)
189-
rescue ::Redis::BaseError => e
190-
info = { message: e.message, exception: e.class }
191-
info[:backtrace] = e.backtrace if @logger.debug?
192-
@logger.warn("Redis connection problem", info)
193-
# Reset the redis variable to trigger reconnect
194-
@redis = nil
195-
# this sleep does not need to be stoppable as its
196-
# in a while !stop? loop
197-
sleep 1
188+
rescue => e
189+
log_error(e)
190+
retry if reset_for_error_retry(e)
198191
end
199192
end
200193
end
@@ -248,18 +241,19 @@ def list_single_listener(redis, output_queue)
248241

249242
# private
250243
def subscribe_stop
251-
return if @redis.nil? || !@redis.connected?
252-
# if its a SubscribedClient then:
253-
# it does not have a disconnect method (yet)
254-
if @redis.subscribed?
244+
redis = @redis # might change during method invocation
245+
return if redis.nil? || !redis.connected?
246+
247+
if redis.subscribed?
255248
if @data_type == 'pattern_channel'
256-
@redis.punsubscribe
249+
redis.punsubscribe
257250
else
258-
@redis.unsubscribe
251+
redis.unsubscribe
259252
end
260-
else
261-
@redis.disconnect!
262253
end
254+
redis.close rescue nil # does client.disconnect
255+
# check if input retried while executing
256+
subscribe_stop unless redis.equal? @redis
263257
@redis = nil
264258
end
265259

@@ -268,15 +262,43 @@ def redis_runner
268262
begin
269263
@redis ||= connect
270264
yield
271-
rescue ::Redis::BaseError => e
272-
@logger.warn("Redis connection problem", :exception => e)
273-
# Reset the redis variable to trigger reconnect
274-
@redis = nil
275-
Stud.stoppable_sleep(1) { stop? }
276-
retry if !stop?
265+
rescue => e
266+
log_error(e)
267+
retry if reset_for_error_retry(e)
268+
end
269+
end
270+
271+
def log_error(e)
272+
info = { message: e.message, exception: e.class }
273+
info[:backtrace] = e.backtrace if @logger.debug?
274+
275+
case e
276+
when ::Redis::TimeoutError
277+
# expected for channels in case no data is available
278+
@logger.debug("Redis timeout, retrying", info)
279+
when ::Redis::BaseConnectionError, ::Redis::ProtocolError
280+
@logger.warn("Redis connection error", info)
281+
when ::Redis::BaseError
282+
@logger.error("Redis error", info)
283+
when ::LogStash::ShutdownSignal
284+
@logger.debug("Received shutdown signal")
285+
else
286+
info[:backtrace] ||= e.backtrace
287+
@logger.error("Unexpected error", info)
277288
end
278289
end
279290

291+
# @return [true] if operation is fine to retry
292+
def reset_for_error_retry(e)
293+
return if e.is_a?(::LogStash::ShutdownSignal)
294+
295+
# Reset the redis variable to trigger reconnect
296+
@redis = nil
297+
298+
Stud.stoppable_sleep(1) { stop? }
299+
!stop? # retry if not stop-ing
300+
end
301+
280302
# private
281303
def channel_runner(output_queue)
282304
redis_runner do
@@ -324,6 +346,4 @@ def pattern_channel_listener(output_queue)
324346
end
325347
end
326348

327-
# end
328-
329349
end end end # Redis Inputs LogStash

logstash-input-redis.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-redis'
4-
s.version = '3.6.1'
4+
s.version = '3.7.0'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "Reads events from a Redis instance"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/redis_spec.rb

Lines changed: 80 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@ def populate(key, event_count)
1717
end
1818

1919
def process(conf, event_count)
20-
events = input(conf) do |pipeline, queue|
21-
event_count.times.map{queue.pop}
20+
events = input(conf) do |_, queue|
21+
sleep 0.1 until queue.size >= event_count
22+
queue.size.times.map { queue.pop }
2223
end
23-
24-
expect(events.map{|evt| evt.get("sequence")}).to eq((0..event_count.pred).to_a)
24+
# due multiple workers we get events out-of-order in the output
25+
events.sort! { |a, b| a.get('sequence') <=> b.get('sequence') }
26+
expect(events[0].get('sequence')).to eq(0)
27+
expect(events[100].get('sequence')).to eq(100)
28+
expect(events[1000].get('sequence')).to eq(1000)
2529
end
2630

2731
# integration tests ---------------------
@@ -31,7 +35,6 @@ def process(conf, event_count)
3135
it "should read events from a list" do
3236
key = SecureRandom.hex
3337
event_count = 1000 + rand(50)
34-
# event_count = 100
3538
conf = <<-CONFIG
3639
input {
3740
redis {
@@ -163,7 +166,6 @@ def process(conf, event_count)
163166
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
164167
expect(command[0]).to eql :blpop
165168
expect(command[1]).to eql ['foo', 0]
166-
expect(command[2]).to eql 1
167169
end.and_return ['foo', "{\"foo1\":\"bar\""], nil
168170

169171
tt = Thread.new do
@@ -178,6 +180,69 @@ def process(conf, event_count)
178180
expect( queue.size ).to be > 0
179181
end
180182

183+
it 'keeps running when a connection error occurs' do
184+
raised = false
185+
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
186+
expect(command[0]).to eql :blpop
187+
unless raised
188+
raised = true
189+
raise Redis::CannotConnectError.new('test')
190+
end
191+
['foo', "{\"after\":\"raise\"}"]
192+
end
193+
194+
expect(subject.logger).to receive(:warn).with('Redis connection error',
195+
hash_including(:message=>"test", :exception=>Redis::CannotConnectError)
196+
).and_call_original
197+
198+
tt = Thread.new do
199+
sleep 2.0 # allow for retry (sleep) after handle_error
200+
subject.do_stop
201+
end
202+
203+
subject.run(queue)
204+
205+
tt.join
206+
207+
try(3) { expect( queue.size ).to be > 0 }
208+
end
209+
210+
context 'error handling' do
211+
212+
let(:config) do
213+
super().merge 'batch_count' => 2
214+
end
215+
216+
it 'keeps running when a (non-Redis) io error occurs' do
217+
raised = false
218+
allow(subject).to receive(:connect).and_return redis = double('redis')
219+
allow(redis).to receive(:blpop).and_return nil
220+
expect(redis).to receive(:evalsha) do
221+
unless raised
222+
raised = true
223+
raise IOError.new('closed stream')
224+
end
225+
[]
226+
end.at_least(1)
227+
redis
228+
allow(subject).to receive(:stop)
229+
230+
expect(subject.logger).to receive(:error).with('Unexpected error',
231+
hash_including(:message=>'closed stream', :exception=>IOError)
232+
).and_call_original
233+
234+
tt = Thread.new do
235+
sleep 2.0 # allow for retry (sleep) after handle_error
236+
subject.do_stop
237+
end
238+
239+
subject.run(queue)
240+
241+
tt.join
242+
end
243+
244+
end
245+
181246
context "when the batch size is greater than 1" do
182247
let(:batch_count) { 10 }
183248

@@ -233,9 +298,6 @@ def process(conf, event_count)
233298
end
234299

235300
it 'multiple close calls, calls to redis once' do
236-
# subject.use_redis(redis)
237-
# allow(redis).to receive(:blpop).and_return(['foo', 'l1'])
238-
# expect(redis).to receive(:connected?).and_return(connected.last)
239301
allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
240302
# allow_any_instance_of( Redis::Client ).to receive(:disconnect)
241303
quit_calls.each do |call|
@@ -249,6 +311,9 @@ def process(conf, event_count)
249311
end
250312

251313
context 'for the subscribe data_types' do
314+
315+
before { subject.register }
316+
252317
def run_it_thread(inst)
253318
Thread.new(inst) do |subj|
254319
subj.run(queue)
@@ -289,6 +354,8 @@ def close_thread(inst, rt)
289354
let(:data_type) { 'channel' }
290355
let(:quit_calls) { [:unsubscribe, :connection] }
291356

357+
before { subject.register }
358+
292359
context 'mocked redis' do
293360
it 'multiple stop calls, calls to redis once', type: :mocked do
294361
subject.do_stop
@@ -367,6 +434,10 @@ def close_thread(inst, rt)
367434

368435
["list", "channel", "pattern_channel"].each do |data_type|
369436
context data_type do
437+
# TODO pending
438+
# redis-rb ends up in a read wait loop since we do not use subscribe_with_timeout
439+
next unless data_type == 'list'
440+
370441
it_behaves_like "an interruptible input plugin", :redis => true do
371442
let(:config) { { 'key' => 'foo', 'data_type' => data_type } }
372443
end

0 commit comments

Comments
 (0)