Skip to content
This repository was archived by the owner on Nov 20, 2020. It is now read-only.

Commit 7bc4f82

Browse files
authored
Merge pull request #154 from eulerfx/fix_152
ensure offset commit loop reset on rebalance
2 parents 5b8ab28 + e6ad245 commit 7bc4f82

File tree

7 files changed

+194
-28
lines changed

7 files changed

+194
-28
lines changed

RELEASE_NOTES.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
### 0.1.2-alpha - 03.07.2017
2+
3+
* Fixed consumer offset commit [bug](https://github.com/jet/kafunk/issues/152) wherein after a rebalance a consumer gets assigned a new partition
4+
which doesn't receive any messages for longer than the offset retention period, the offsets would be lost.
5+
This would only happen after a rebalance not after initial join.
6+
17
### 0.1.1-alpha - 25.05.2017
28

39
* Snappy compression.

src/kafunk/Consumer.fs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,10 @@ module Consumer =
570570
let! state = Group.stateInternal c.groupMember
571571
return consumerStateFromGroupMemberState state.state }
572572

573+
/// Returns the stream of consumer states as of the invocation, including the current state.
574+
let states (c:Consumer) : AsyncSeq<ConsumerState> =
575+
Group.states c.groupMember |> AsyncSeq.map (consumerStateFromGroupMemberState)
576+
573577
let private combineFetchResponses
574578
(r1:(ConsumerMessageSet[] * (Partition * HighwaterMarkOffset)[]) option)
575579
(r2:(ConsumerMessageSet[] * (Partition * HighwaterMarkOffset)[]) option) =
@@ -862,32 +866,32 @@ module Consumer =
862866
(c:Consumer)
863867
(commitInterval:TimeSpan)
864868
(handler:ConsumerState -> ConsumerMessageSet -> Async<unit>) : Async<unit> = async {
865-
866869
let assignedPartitions = state c |> Async.map (fun s -> s.assignments)
867-
868-
use commitQueue = Offsets.createPeriodicCommitQueue (commitInterval, assignedPartitions, commitOffsets c)
869-
870-
let! currentOffsets = async {
871-
let! assignedPartitions = assignedPartitions
872-
let! currentOffsets = fetchOffsets c.conn c.config.groupId [| c.config.topic, assignedPartitions |]
873-
return
874-
currentOffsets
875-
|> Seq.choose (fun (t,os) -> if t = c.config.topic then Some os else None)
876-
|> Seq.concat
877-
|> Seq.where (fun (_,o) -> o <> -1L)
878-
|> Seq.toArray }
879-
880-
// commit current offets so that they're in-memory, and will be committed periodically
881-
// even if no messages are consumed
882-
Offsets.enqueuePeriodicCommit commitQueue currentOffsets
883-
870+
use commitQueue = Offsets.createPeriodicCommitQueue (commitInterval, assignedPartitions, commitOffsets c)
871+
// commit current offets on group rebalance, including first join,
872+
// so that they're committed periodically even if no messages are consumed
873+
let commitCurrentOffsetsOnGroupJoinProc =
874+
states c
875+
|> AsyncSeq.iterAsync (fun s -> async {
876+
let! currentOffsets = async {
877+
let! currentOffsets = fetchOffsets c.conn c.config.groupId [| c.config.topic, s.assignments |]
878+
return
879+
currentOffsets
880+
|> Seq.choose (fun (t,os) -> if t = c.config.topic then Some os else None)
881+
|> Seq.concat
882+
|> Seq.where (fun (_,o) -> o <> -1L)
883+
|> Seq.toArray }
884+
Offsets.enqueuePeriodicCommit commitQueue currentOffsets })
884885
let handler s ms = async {
885886
do! handler s ms
886-
Offsets.enqueuePeriodicCommit commitQueue (ConsumerMessageSet.commitPartitionOffsets ms) }
887-
888-
do! consume c handler
889-
890-
do! Offsets.flushPeriodicCommit commitQueue }
887+
Offsets.enqueuePeriodicCommit commitQueue (ConsumerMessageSet.commitPartitionOffsets ms) }
888+
do!
889+
Async.parallel2
890+
(async {
891+
do! consume c handler
892+
do! Offsets.flushPeriodicCommit commitQueue },
893+
commitCurrentOffsetsOnGroupJoinProc)
894+
|> Async.Ignore }
891895

892896
/// Closes the consumer and leaves the consumer group.
893897
/// This causes all underlying streams to complete.

src/kafunk/Group.fs

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ and GroupConfig = {
7777
and [<NoEquality;NoComparison;AutoSerializable(false)>] GroupMember = internal {
7878
conn : KafkaConn
7979
config : GroupConfig
80-
state : MVar<GroupMemberStateWrapper>
80+
state : SVar<GroupMemberStateWrapper>
8181
}
8282

8383
/// The action to take upon leaving the group.
@@ -150,17 +150,23 @@ module Group =
150150

151151
/// Leaves a group, sending a leave group request to Kafka.
152152
let leave (gm:GroupMember) = async {
153-
let! state = MVar.get gm.state
153+
let! state = SVar.get gm.state
154154
return! leaveInternal gm state }
155155

156156
let internal stateInternal (gm:GroupMember) =
157-
gm.state |> MVar.get
157+
gm.state |> SVar.get
158158

159159
/// Returns the group member state.
160160
let state (gm:GroupMember) = async {
161161
let! state = stateInternal gm
162162
return state.state }
163163

164+
/// Returns a stream of group member states as of the invocation, including the current state.
165+
let states (gm:GroupMember) =
166+
gm.state
167+
|> SVar.tap
168+
|> AsyncSeq.map (fun s -> s.state)
169+
164170
/// Joins a group.
165171
let join (gm:GroupMember) (prevState:GroupMemberState option, prevErrorCode:ErrorCode option) = async {
166172

@@ -287,7 +293,7 @@ module Group =
287293
let cts = CancellationTokenSource.CreateLinkedTokenSource (ct, state.state.closed)
288294
Async.Start (heartbeatProcess, cts.Token)
289295

290-
let! _ = gm.state |> MVar.put state
296+
let! _ = gm.state |> SVar.put state
291297
return () }
292298

293299

@@ -360,6 +366,9 @@ module Group =
360366

361367
return! joinSyncHeartbeat (prevState |> Option.map (fun s -> s.memberId), prevErrorCode) }
362368

369+
// let internal generationsInternal (gm:GroupMember) =
370+
// SVar.tap gm.state
371+
363372
let internal generationsInternal (gm:GroupMember) =
364373
let rec loop () = asyncSeq {
365374
let! state = stateInternal gm
@@ -385,8 +394,22 @@ module Group =
385394
let createJoin (conn:KafkaConn) (config:GroupConfig) = async {
386395
let gm =
387396
{ GroupMember.conn = conn
388-
state = MVar.create ()
397+
state = SVar.create ()
389398
config = config }
399+
390400
let! _ = join gm (None, None)
401+
402+
// let rec rejoinProc () = async {
403+
// let! state = stateInternal gm
404+
// let! action = IVar.get state.closed
405+
// match action with
406+
// | GroupLeaveAction.LeaveAndRejoin ec ->
407+
// let! _ = join gm (Some state.state, Some ec)
408+
// return! rejoinProc ()
409+
// | GroupLeaveAction.LeaveGroup ->
410+
// return () }
411+
//
412+
// let! _ = Async.StartChild (rejoinProc ())
413+
391414
return gm }
392415

src/kafunk/Utility/SVar.fs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
[<AutoOpen>]
2+
module internal Kafunk.SVar
3+
4+
open FSharp.Control
5+
open System.Threading
6+
7+
/// A stream variable.
8+
type SVar<'a> = private {
9+
state : MVar<'a>
10+
tail : SVarNode<'a> ref }
11+
12+
and SVarNode<'a> = {
13+
ivar : IVar<('a * SVarNode<'a>)> }
14+
15+
/// Operations on stream variables.
16+
module SVar =
17+
18+
let private createNode () =
19+
{ ivar = IVar.create () }
20+
21+
let private putNode (s:SVar<'a>) (a:'a) =
22+
let newTail = createNode ()
23+
let tail = Interlocked.Exchange(s.tail, newTail)
24+
IVar.put (a,newTail) tail.ivar
25+
26+
/// Creates an empty stream variable.
27+
let create () : SVar<'a> =
28+
{ state = MVar.create () ; tail = ref (createNode ()) }
29+
30+
/// Puts a new value into the stream variable.
31+
let put (a:'a) (s:SVar<'a>) : Async<unit> = async {
32+
let! a = MVar.put a s.state
33+
do putNode s a
34+
return () }
35+
36+
/// Atomically updates a value in a stream variable.
37+
let updateAsync (f:'a -> Async<'a>) (s:SVar<'a>) : Async<'a> =
38+
s.state
39+
|> MVar.updateAsync (fun a -> async {
40+
let! a' = f a
41+
do putNode s a'
42+
return a' })
43+
44+
/// Atomically updates a value in a stream variable.
45+
let updateStateAsync (f:'a -> Async<'a * 's>) (s:SVar<'a>) : Async<'s> =
46+
s.state
47+
|> MVar.updateStateAsync (fun a -> async {
48+
let! a',st = f a
49+
do putNode s a'
50+
return a',st })
51+
52+
/// Gets the current value of the stream variable.
53+
let get (s:SVar<'a>) : Async<'a> =
54+
MVar.get s.state
55+
56+
let rec private tapImpl (s:SVarNode<'a>) : AsyncSeq<'a> =
57+
asyncSeq {
58+
let! (a,tl) = IVar.get s.ivar
59+
yield a
60+
yield! tapImpl tl }
61+
62+
/// Returns a stream corresponding to changes in the stream variable from the time of invocation, starting with the
63+
/// current value, if any.
64+
let tap (s:SVar<'a>) : AsyncSeq<'a> =
65+
let curr = MVar.getFastUnsafe s.state
66+
let tail =
67+
let tail = !s.tail
68+
match curr with
69+
| None -> tail
70+
| Some a -> { ivar = IVar.createFull (a,tail) }
71+
tapImpl tail
72+
73+
/// Publishes an error to all waiting readers of a tapped stream.
74+
let error ex (s:SVar<'a>) =
75+
let tail = !s.tail
76+
IVar.error ex tail.ivar

src/kafunk/kafunk.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
<Compile Include="Utility\Crc.fs" />
6969
<Compile Include="Utility\Async.fs" />
7070
<Compile Include="Utility\MVar.fs" />
71+
<Compile Include="Utility\SVar.fs" />
7172
<Compile Include="Utility\BoundedMb.fs" />
7273
<Compile Include="Utility\AsyncSeq.fs" />
7374
<Compile Include="Utility\Faults.fs" />

tests/kafunk.Tests/SVarTests.fs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
module SVarTests
2+
3+
open System
4+
open Kafunk
5+
open Kafunk.Testing
6+
open Kafunk.SVar
7+
open FSharp.Control
8+
open NUnit.Framework
9+
10+
let asyncSeqToList (s:AsyncSeq<'a>) =
11+
let ls =
12+
s
13+
|> AsyncSeq.toListAsync
14+
|> Async.timeoutOption (TimeSpan.FromMilliseconds 100.0)
15+
|> Async.RunSynchronously
16+
match ls with
17+
| Some ls -> ls
18+
| None -> failwith "AsyncSeq timed out!"
19+
20+
[<Test>]
21+
let ``SVar.tap should return stream of all values put after tap`` () =
22+
for put in [ [] ; [1] ; [1;2;3;4] ] do
23+
let v = SVar.create ()
24+
let stream = SVar.tap v
25+
async {
26+
for x in put do
27+
do! SVar.put x v } |> Async.RunSynchronously
28+
let actual = stream |> AsyncSeq.take put.Length |> asyncSeqToList
29+
shouldEqual put actual None
30+
31+
[<Test>]
32+
let ``SVar.tap should return current value`` () =
33+
let v = SVar.create ()
34+
let expected = 1
35+
SVar.put expected v |> Async.RunSynchronously
36+
let expected = [expected]
37+
let actual = SVar.tap v |> AsyncSeq.take 1 |> asyncSeqToList
38+
shouldEqual expected actual None
39+
40+
[<Test>]
41+
let ``SVar.tap should return current and subsequent values`` () =
42+
for expected in [ [1] ; [1;2] ; [1;2;3;4;5] ] do
43+
let v = SVar.create ()
44+
SVar.put 0 v |> Async.RunSynchronously // skipped
45+
SVar.put expected.Head v |> Async.RunSynchronously
46+
let tapped = SVar.tap v
47+
async {
48+
for x in expected.Tail do
49+
do! SVar.put x v } |> Async.RunSynchronously
50+
let actual = tapped |> AsyncSeq.take expected.Length |> asyncSeqToList
51+
shouldEqual expected actual None
52+
53+
54+
55+

tests/kafunk.Tests/kafunk.Tests.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
<Compile Include="AsyncTests.fs" />
7777
<Compile Include="FaultsTests.fs" />
7878
<Compile Include="AssignmentStrategyTests.fs" />
79+
<Compile Include="SVarTests.fs" />
7980
<None Include="Refs.fsx" />
8081
<Compile Include="Metadata.fsx" />
8182
<None Include="Producer.fsx" />

0 commit comments

Comments
 (0)