diff --git a/cmd/kaf/group.go b/cmd/kaf/group.go index 86f1a7d0..11ca0c20 100644 --- a/cmd/kaf/group.go +++ b/cmd/kaf/group.go @@ -1,11 +1,11 @@ package main import ( + "context" "errors" "fmt" "os" "sort" - "time" "unicode" "text/tabwriter" @@ -19,6 +19,10 @@ import ( "github.com/Shopify/sarama" "github.com/spf13/cobra" + "strconv" + + "time" + "github.com/birdayz/kaf" ) @@ -112,86 +116,59 @@ func (r *resetHandler) ConsumeClaim(s sarama.ConsumerGroupSession, c sarama.Cons func createGroupCommitOffsetCmd() *cobra.Command { var topic string - var offset int64 + var offset string var partition int32 res := &cobra.Command{ Use: "commit", Args: cobra.ExactArgs(1), Run: func(cmd *cobra.Command, args []string) { client := getClient() - config := getConfig() - admin := getClusterAdmin() group := args[0] - var memberID string + var off int64 - var b *sarama.Broker - - de, err := admin.DescribeConsumerGroups([]string{group}) + i, err := strconv.ParseInt(offset, 10, 64) if err != nil { - errorExit("Failed to fetch consumer group details: %v", err) - } - - rr := &sarama.JoinGroupRequest{ - GroupId: group, - MemberId: "", - SessionTimeout: int32(time.Second * 20 / time.Millisecond), - ProtocolType: de[0].ProtocolType, - } - - rr.AddGroupProtocol("", []byte{}) - - b, err = client.Coordinator(group) - if err != nil { - errorExit("Failed to find coordinator for group: %v", err) - } - _ = b.Open(config) - - resp, err := b.JoinGroup(rr) - if err != nil { - errorExit("Failed to join group: %v", err) - } - - if resp.Err != sarama.ErrNoError { - errorExit("Failed to join group: %v", resp.Err) - } + // No int -> try timestamp + t, err := time.Parse(time.RFC3339, offset) + if err != nil { + errorExit("offset is neither offset nor timestamp", nil) + } + _ = t - generationID := resp.GenerationId - memberID = resp.MemberId + o, err := client.GetOffset(topic, partition, t.UnixNano()/(int64(time.Millisecond)/int64(time.Nanosecond))) + if err != nil { + errorExit("Failed to determine offset for timestamp: %v", err) + } + off = o - commit := &sarama.OffsetCommitRequest{ - Version: 1, - ConsumerGroup: group, - ConsumerGroupGeneration: generationID, - ConsumerID: memberID, + fmt.Printf("Determined offset %v from timestamp.\n", off) + } else { + off = i } - commit.AddBlock(topic, partition, offset, 0, "reset by kaf") - commitResp, err := b.CommitOffset(commit) + g, err := sarama.NewConsumerGroupFromClient(group, client) if err != nil { - errorExit("Failed to commit: %v", err) - } - - for _, partitionErrors := range commitResp.Errors { - for _, _error := range partitionErrors { - if _error == sarama.ErrUnknownMemberId { - errorExit("Failed to commit: %v", _error) - } - } + errorExit("Failed to create consumer group: %v", err) } + err = g.Consume(context.Background(), []string{topic}, &resetHandler{ + topic: topic, + partition: partition, + offset: off, + client: client, + group: group, + }) if err != nil { - errorExit("Failed to set offset: %v", err) + errorExit("Failed to commit offset: %v", err) } - b.LeaveGroup(&sarama.LeaveGroupRequest{GroupId: group, MemberId: memberID}) - - fmt.Printf("Set offset to %v.", offset) + fmt.Printf("Set offset to %v.", off) }, } res.Flags().StringVarP(&topic, "topic", "t", "", "topic") - res.Flags().Int64VarP(&offset, "offset", "o", 0, "offset to commit") + res.Flags().StringVarP(&offset, "offset", "o", "", "offset to commit") res.Flags().Int32VarP(&partition, "partition", "p", 0, "partition") return res }