Skip to content

Commit d34b95f

Browse files
authored
feat: add nsq e2e image (#175)
Signed-off-by: Matt Ulmer <[email protected]>
1 parent 196e145 commit d34b95f

File tree

4 files changed

+123
-0
lines changed

4 files changed

+123
-0
lines changed

e2e/images/nsq/Dockerfile

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
FROM golang:1.22
2+
WORKDIR /cmd
3+
4+
COPY go.mod go.sum ./
5+
RUN go mod download
6+
7+
COPY *.go ./
8+
9+
RUN CGO_ENABLED=0 GOOS=linux go build -o cmd .
10+
11+
ENTRYPOINT ["./cmd"]

e2e/images/nsq/go.mod

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module github.com/kedacore/test-tools/e2e/images/nsq
2+
3+
go 1.22.0
4+
5+
require (
6+
github.com/golang/snappy v0.0.1 // indirect
7+
github.com/nsqio/go-nsq v1.1.0 // indirect
8+
)

e2e/images/nsq/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
2+
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
3+
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
4+
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=

e2e/images/nsq/main.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"log"
7+
"os"
8+
"os/signal"
9+
"syscall"
10+
11+
"github.com/nsqio/go-nsq"
12+
)
13+
14+
type Handler struct{}
15+
16+
func (h *Handler) HandleMessage(m *nsq.Message) error {
17+
log.Printf("Received message: %s", m.Body)
18+
return nil
19+
}
20+
21+
func nsqConsumer(config *nsq.Config, nsqlookupdHTTPAddress, topic, channel string) error {
22+
consumer, err := nsq.NewConsumer(topic, channel, config)
23+
if err != nil {
24+
return err
25+
}
26+
27+
consumer.AddHandler(&Handler{})
28+
29+
err = consumer.ConnectToNSQLookupd(nsqlookupdHTTPAddress)
30+
if err != nil {
31+
return err
32+
}
33+
34+
sigChan := make(chan os.Signal, 1)
35+
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
36+
<-sigChan
37+
38+
consumer.Stop()
39+
40+
return nil
41+
}
42+
43+
func nsqProducer(config *nsq.Config, nsqdTCPAddress string, topic string, messageCount int) error {
44+
producer, err := nsq.NewProducer(nsqdTCPAddress, config)
45+
if err != nil {
46+
return err
47+
}
48+
49+
responseChan := make(chan *nsq.ProducerTransaction, messageCount)
50+
for i := 0; i < messageCount; i++ {
51+
err := producer.PublishAsync(topic, []byte(fmt.Sprintf("%d", i)), responseChan)
52+
if err != nil {
53+
return err
54+
}
55+
}
56+
57+
for i := 0; i < messageCount; i++ {
58+
trans := <-responseChan
59+
if trans.Error != nil {
60+
return trans.Error
61+
}
62+
}
63+
64+
producer.Stop()
65+
66+
return nil
67+
}
68+
69+
func main() {
70+
mode := flag.String("mode", "", "consumer or producer")
71+
topic := flag.String("topic", "", "topic name")
72+
channel := flag.String("channel", "", "channel name")
73+
nsqlookupdHTTPAddress := flag.String("nsqlookupd-http-address", "", "nsqlookupd HTTP address")
74+
messageCount := flag.Int("message-count", 1, "number of messages to send")
75+
nsqdTCPAddress := flag.String("nsqd-tcp-address", "", "nsqd TCP address")
76+
flag.Parse()
77+
78+
config := nsq.NewConfig()
79+
80+
switch *mode {
81+
case "consumer":
82+
log.Println("Consumer mode")
83+
if *topic == "" || *channel == "" || *nsqlookupdHTTPAddress == "" {
84+
log.Fatalf("topic, channel, and nsqlookupd-http-address are required\n")
85+
}
86+
if err := nsqConsumer(config, *nsqlookupdHTTPAddress, *topic, *channel); err != nil {
87+
log.Fatalf("read from nsq failed: %w\n", err)
88+
}
89+
case "producer":
90+
log.Println("Producer mode")
91+
if *topic == "" || *nsqdTCPAddress == "" {
92+
log.Fatalf("topic and nsqd-tcp-address are required\n")
93+
}
94+
if err := nsqProducer(config, *nsqdTCPAddress, *topic, *messageCount); err != nil {
95+
log.Fatalf("write to nsq failed: %w\n", err)
96+
}
97+
default:
98+
log.Fatalf("unknown mode: %s\n", *mode)
99+
}
100+
}

0 commit comments

Comments
 (0)