Skip to content

Commit f155852

Browse files
pods' image policy set to always (for dealing with updates)
1 parent 76fe42a commit f155852

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

helm/beametl/templates/deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ spec:
2929
containers:
3030
#assumes GCP ADC is provided on the GKE cluster's pods for pubsub/bigtable access
3131
- name: beametl-container
32+
imagePullPolicy: Always
3233
image: {{ .Values.image }}
3334
#requires a file named "/secrets/ac.json" with service-acc creds
3435
volumeMounts:

helm/verne/templates/deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ spec:
8080

8181
- name: listener
8282
image: {{ .Values.image }}
83+
imagePullPolicy: Always
8384
resources:
8485
{{- with .Values.resourceOptions.listener }}
8586
requests:

sandbox/docker-compose.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ services:
2020
start_period: 15s
2121

2222
listener:
23-
build:
24-
context: ./listener
25-
dockerfile: Dockerfile
23+
# build:
24+
# context: ./listener
25+
# dockerfile: Dockerfile
26+
image: verne-listener:latest
2627
depends_on:
2728
verne-test:
2829
condition: service_healthy

services/listener/main.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ func localConfigs() {
6767
}
6868
hostIP := string(utils.TrimSpace(data))
6969
pubsubHost = utils.Sprintf("%s:%s", hostIP, pubsubPort)
70-
7170
if err := os.Setenv("PUBSUB_EMULATOR_HOST", pubsubHost); err != nil {
7271
utils.Log(utils.LOG_ERROR, utils.Sprintf("failed to set emulator HOST: %v", err))
7372
}
@@ -123,7 +122,10 @@ func ConnectMQTT() *mqtt.ClientOptions {
123122

124123
func SubscribeMQTT(client mqtt.Client) {
125124
token := client.Subscribe(MqttTopicPath, QOS, mqttMessageHandler)
126-
token.Wait()
125+
if token.Wait() && token.Error() != nil {
126+
utils.Log(utils.LOG_ERROR, utils.Sprintf("MQTT subscribe failed: %v", token.Error()))
127+
panic(token.Error())
128+
}
127129
utils.Log(utils.LOG_DEBUG, utils.Sprintf("subscribed to MQTT topic: %s", MqttTopicPath))
128130
}
129131

@@ -176,6 +178,21 @@ func confPubSub(projectID string) (context.Context, *pubsub.Client) {
176178
}
177179

178180
func publishTopic(msg []byte) {
179-
publisher.Publish(pubctx, &pubsub.Message{Data: msg})
180-
utils.Log(utils.LOG_INFO, utils.Sprintf("queued message for publishing to pubsub: %s", msg))
181+
payload := append([]byte(nil), msg...)
182+
utils.Log(utils.LOG_INFO, utils.Sprintf("queued message for publishing to pubsub.... %s", string(payload)))
183+
utils.Log(utils.LOG_DEBUG, utils.Sprintf("queued message bytes: %v", payload))
184+
185+
// publish and ack
186+
result := publisher.Publish(pubctx, &pubsub.Message{Data: payload})
187+
id, err := result.Get(pubctx)
188+
if err != nil {
189+
utils.Log(utils.LOG_ERROR, utils.Sprintf("failed to publish to pubsub: %v", err))
190+
return
191+
}
192+
utils.Log(utils.LOG_INFO, utils.Sprintf("message published to pubsub with id: %s", id))
181193
}
194+
195+
// func publishTopic(msg []byte) {
196+
// publisher.Publish(pubctx, &pubsub.Message{Data: msg})
197+
// utils.Log(utils.LOG_INFO, utils.Sprintf("queued message for publishing to pubsub: %s", msg))
198+
// }

0 commit comments

Comments
 (0)