Skip to content

Conversation

@florentianayuwono
Copy link
Collaborator

@florentianayuwono florentianayuwono commented Nov 27, 2025

Overview

Add implementation for queue package to include implementation for consuming messages (webhooks).

Rationale

So that planner can consume webhooks published by webhook-gateway.

Checklist

  • The PR is tagged with appropriate label (urgent, trivial, senior-review-required, documentation).

@cbartz cbartz requested a review from Copilot November 27, 2025 06:42
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds AMQP consumer functionality to the planner service to consume webhook messages from the webhook-gateway service. The implementation enables tracking of GitHub workflow job lifecycle events (queued, in_progress, completed) in the database.

Key Changes:

  • Added AmqpConsumer implementation with message handling for webhook events
  • Integrated consumer service into the planner main application
  • Added comprehensive unit tests for consumer behavior

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
internal/queue/consumer.go Implements the AMQP consumer with webhook processing logic for job lifecycle events
internal/queue/consumer_test.go Comprehensive test suite covering consumer behavior and edge cases
internal/queue/types.go Adds Consume method to AMQP channel interface and wrapper
internal/queue/producer_test.go Updates mock implementation with Consume method stub
cmd/planner/main.go Integrates consumer service into planner application startup
.github/workflows/planner_tests.yaml Adds RabbitMQ connection string for integration tests
internal/database/database.go Minor formatting fix removing unnecessary fmt.Sprintf

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Collaborator

@cbartz cbartz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Looks good. Some comments.

}

// parseToTime parses a string pointer to time.Time, returning zero time if nil or invalid.
func (c *AmqpConsumer) parseToTime(timeStr *string) time.Time {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This looks like a helper function that could be used elsewhere too. This could be converted to a static helper function by putting the logger as argument to the function and removing the requirement to be a method on AmqpConsumer

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An integration test is missing. As discussed in the standup, you don't need to test integration with webhook-gateway here (we can do that in the charm integration test later).

msg.Nack(false, false) // don't requeue
return fmt.Errorf("failed to unmarshal message body: %w", err)
}
switch webhook.Action {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a future story, please don't implement now: We should filter supported labels in the future, as there are a lot of jobs that we will never fulfill (e.g. all the github hosted ones, with the label "ubuntu-latest" or "ubuntu-24.04"). These would just fill up the database table. But implementation of this still needs discussion.

}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal webhook: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +51 to +68
rabbitURI := os.Getenv("RABBITMQ_CONNECT_STRING")
jobID := rand.Intn(1_000_000)
payload := map[string]any{
"action": "queued",
"workflow_job": map[string]any{
"id": jobID,
"labels": labels,
"status": "queued",
"created_at": time.Now().UTC().Format(time.RFC3339),
},
}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal webhook: %v", err)
}

publishWebhookMessage(t, rabbitURI, "webhook-queue", body)
time.Sleep(1 * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put this into a subfunction to increase readability? When I want to understand the test, it would make it much simpler without the details . If I want to look at the details, I would look into the subfunction.

Comment on lines +51 to +68
rabbitURI := os.Getenv("RABBITMQ_CONNECT_STRING")
jobID := rand.Intn(1_000_000)
payload := map[string]any{
"action": "queued",
"workflow_job": map[string]any{
"id": jobID,
"labels": labels,
"status": "queued",
"created_at": time.Now().UTC().Format(time.RFC3339),
},
}
body, err := json.Marshal(payload)
if err != nil {
t.Fatalf("marshal webhook: %v", err)
}

publishWebhookMessage(t, rabbitURI, "webhook-queue", body)
time.Sleep(1 * time.Second)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually using sleeps makes test a bit brittle and unnecessarily slower. Is there another way to test if the message has been published? Maybe using an ack mechanism to ensure the message has been published?

Comment on lines +74 to +75
if p, ok := pressures[flavor]; ok && p > pressure {
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use an assert here to clearly see what the tests expects?

Comment on lines 26 to 28
arrange: server is listening on the configured port and prepare request payload
act: send create flavor request and get flavor pressure request
assert: 201 Created and 200 OK with expected pressure value
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be updated to reflect that we are putting something to the queue.

c.queueName, // queue
"", // consumer tag, empty string means a unique random tag will be generated
false, // whether rabbitmq auto-acknowledges messages
false, // whether only this consumer can access the queue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will have consequences if/when we'll scale up the planner. This flag will allow only 1 unit to consume the events when set to trie, while if set to false, the events will be distributed.
If we don't plan to scale up, I'd rather put this flag to true to avoid potential race conditions
Otherwise, we need to pay attention to race conditions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest implementing true for now (don't allow scale-up usecase), and we discuss in design meeting for our long term approach.

return fmt.Errorf("failed to put channel in confirm mode: %w", err)
}

_, err = ch.QueueDeclare(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use QueueDeclarePassive here to keep the responsibility of creating a queue only to the producer. This is usually a good practice to keep the responsibility on one side


// consumeMsgs starts consuming messages from the AMQP queue.
func (c *AmqpConsumer) consumeMsgs() (<-chan amqp.Delivery, error) {
msgs, err := c.amqpChannel.Consume(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no QoS declared, I don't know what's the default behavior.
QoS allows to have more than 1 message in flight, which could have consequences on race conditions.
But consuming 1 event at a time, will also reduce performance.
Probably to be discussed in the design meeting on which approach we want to take.
MVP is fine with consuming 1 event at a time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intended to be the pr for the mvp, so I'd suggest looking for consuming 1 event at time for now (and add follow-up discussion for design meeting).

// handleMessage processes a single AMQP message.
func (c *AmqpConsumer) handleMessage(ctx context.Context, msg amqp.Delivery) error {
webhook := WorkflowJobWebhook{}
if err := json.Unmarshal(msg.Body, &webhook); err != nil {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message will contain headers, some of them will be used for tracing using Otel.
I see we're not consuming them here, but they are injected by the producer.
@cbartz should we configure otel and tracing as part of this story ? If yes, then this is missing here

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we agreed to keep the observability part out of scope for this story, except for functionality that is very trivial and coming almost out of the box. But I wanted to keep the scope of the MVP small.

func (c *AmqpConsumer) handleMessage(ctx context.Context, msg amqp.Delivery) error {
webhook := WorkflowJobWebhook{}
if err := json.Unmarshal(msg.Body, &webhook); err != nil {
msg.Nack(false, false) // don't requeue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't requeue, and there's no deadletter configured when the exchange / queue is created, then we loose the message.
We really need to make sure we're resilient and no messages are lost. Maybe we can inspect the queue configuration, if there's no deadletter, we should at least log the event (maybe in a dedicated log ?)

msg.Nack(false, false) // don't requeue
return fmt.Errorf("failed to unmarshal message body: %w", err)
}
switch webhook.Action {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting action must be acknoledged not going through the default case

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default should only used when an unexpected value comes. If there's a deadletter, it will end in the deadletter and we'll able to analyze the value and the message

Comment on lines +113 to +118
case "queued": // other possible action: "waiting"
return c.insertJobToDB(ctx, webhook.WorkflowJob, msg)
case "in_progress":
return c.updateJobStartedInDB(ctx, webhook.WorkflowJob, msg)
case "completed":
return c.updateJobCompletedInDB(ctx, webhook.WorkflowJob, msg)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like passing the object (to ack and nack) so deep in the business logic. I'd rather manage every ack/nack from the handle message side (easier for maintenance and readability).
So I would use the potential error returned by the insert and update methods to know if I ack or nack and I requeue

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the same spirit, I'd avoid passing the amqp.Delivery to the database methods, but pass the message body/headers to these methods. Will be easier to unit test, will even allow to decouple the DB operations from AmqpConsumer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants