Skip to content

Commit

Permalink
fix group commit & introduce date resets
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Jan 21, 2020
1 parent 207b685 commit dd3750f
Showing 1 changed file with 34 additions and 57 deletions.
91 changes: 34 additions & 57 deletions cmd/kaf/group.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"context"
"errors"
"fmt"
"os"
"sort"
"time"
"unicode"

"text/tabwriter"
Expand All @@ -19,6 +19,10 @@ import (
"github.com/Shopify/sarama"
"github.com/spf13/cobra"

"strconv"

"time"

"github.com/birdayz/kaf"
)

Expand Down Expand Up @@ -112,86 +116,59 @@ func (r *resetHandler) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.Cons

func createGroupCommitOffsetCmd() *cobra.Command {
var topic string
var offset int64
var offset string
var partition int32
res := &cobra.Command{
Use: "commit",
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
config := getConfig()
admin := getClusterAdmin()

group := args[0]

var memberID string
var off int64

var b *sarama.Broker

de, err := admin.DescribeConsumerGroups([]string{group})
i, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
errorExit("Failed to fetch consumer group details: %v", err)
}

rr := &sarama.JoinGroupRequest{
GroupId: group,
MemberId: "",
SessionTimeout: int32(time.Second * 20 / time.Millisecond),
ProtocolType: de[0].ProtocolType,
}

rr.AddGroupProtocol("", []byte{})

b, err = client.Coordinator(group)
if err != nil {
errorExit("Failed to find coordinator for group: %v", err)
}
_ = b.Open(config)

resp, err := b.JoinGroup(rr)
if err != nil {
errorExit("Failed to join group: %v", err)
}

if resp.Err != sarama.ErrNoError {
errorExit("Failed to join group: %v", resp.Err)
}
// No int -> try timestamp
t, err := time.Parse(time.RFC3339, offset)
if err != nil {
errorExit("offset is neither offset nor timestamp", nil)
}
_ = t

generationID := resp.GenerationId
memberID = resp.MemberId
o, err := client.GetOffset(topic, partition, t.UnixNano()/(int64(time.Millisecond)/int64(time.Nanosecond)))
if err != nil {
errorExit("Failed to determine offset for timestamp: %v", err)
}
off = o

commit := &sarama.OffsetCommitRequest{
Version: 1,
ConsumerGroup: group,
ConsumerGroupGeneration: generationID,
ConsumerID: memberID,
fmt.Printf("Determined offset %v from timestamp.\n", off)
} else {
off = i
}
commit.AddBlock(topic, partition, offset, 0, "reset by kaf")

commitResp, err := b.CommitOffset(commit)
g, err := sarama.NewConsumerGroupFromClient(group, client)
if err != nil {
errorExit("Failed to commit: %v", err)
}

for _, partitionErrors := range commitResp.Errors {
for _, _error := range partitionErrors {
if _error == sarama.ErrUnknownMemberId {
errorExit("Failed to commit: %v", _error)
}
}
errorExit("Failed to create consumer group: %v", err)
}

err = g.Consume(context.Background(), []string{topic}, &resetHandler{
topic: topic,
partition: partition,
offset: off,
client: client,
group: group,
})
if err != nil {
errorExit("Failed to set offset: %v", err)
errorExit("Failed to commit offset: %v", err)
}

b.LeaveGroup(&sarama.LeaveGroupRequest{GroupId: group, MemberId: memberID})

fmt.Printf("Set offset to %v.", offset)
fmt.Printf("Set offset to %v.", off)
},
}
res.Flags().StringVarP(&topic, "topic", "t", "", "topic")
res.Flags().Int64VarP(&offset, "offset", "o", 0, "offset to commit")
res.Flags().StringVarP(&offset, "offset", "o", "", "offset to commit")
res.Flags().Int32VarP(&partition, "partition", "p", 0, "partition")
return res
}
Expand Down

0 comments on commit dd3750f

Please sign in to comment.