I think every architect or developer knows about the producer consumer pattern. In this repository I want to show how dapr bindings can be used to implement this pattern and how the consumer can be scaled out with keda.
In dapr you can use output and input bindings to send message to and receive messages from a queue. When you decide on a queue technique like Redis, RabbitMQ or Azure ServiceBus Queues, you usually have to use integration libraries for binding in your code. With dapr you can integrate input and output bindings on a higher abstraction level and you don't need to know how the integration library works.
In this sample we create two microservices, one with an input binding, the Consumer, and one with an output binding, the Producer. We will bind to Azure ServiceBus Queues and scale out the Consumer on demand depending on how many messages are in the instance of a Azure ServiceBus Queue. We will use keda to provide scaling metrics for the Horizontal Pod Austoscaler.
The Producer and Consumer are implemented already in Asp.NET Core 3.1. The application code is avalaible as a docker image on my Docker Hub repository:
- Producer: m009/producer:0.1
- Consumer: m009/consumer:0.1
-
Create a new or use an existing Azure ResourceGroup
az group create -n <your RG name> -l <location>
-
Create a new ServiceBus Namespace
az servicebus namespace create -n <namespace name> -g <your RG name> -l <location> --sku Basic
-
We need to be able to manage the namespace, therefore we need to list the RootManageSharedAccessKey connection string
az servicebus namespace authorization-rule keys list -g <your RG name> --namespace-name <namespace name> --name RootManageSharedAccessKey
The output looks as follow:
{ "aliasPrimaryConnectionString": null, "aliasSecondaryConnectionString": null, "keyName": "RootManageSharedAccessKey", "primaryConnectionString": "<connstr1>", "primaryKey": "<redacted>", "secondaryConnectionString": "<connstr2>", "secondaryKey": "<redacted>" }
Create a base64 representation of the connection string (either use primary or secondary)
echo -n '<connstr1>' | base64
Update the Kubernetes secret in binding-deployment.yaml with the base64 encoded value.
-
Create a ServiceBus Queue
az servicebus queue create -n msgqueue -g <your RG name> --namespace-name <namespace name>
-
We need to be able to connect to the queue, therefore we need to create an auth rule with Manage permission
az servicebus queue authorization-rule create -g <your RG name> --namespace-name <namespace name> --queue-name msgqueue --name manage --rights Manage Send Listen
Once the auth rule is created we can list the connection string as follow:
az servicebus queue authorization-rule keys list -g <your RG name> --namespace-name <namespace-name> --queue msgqueue --name manage
The output looks as follow:
{ "aliasPrimaryConnectionString": null, "aliasSecondaryConnectionString": null, "keyName": "order-consumer", "primaryConnectionString": "<connstr1>", "primaryKey": "<redacted>", "secondaryConnectionString": "<connstr2>", "secondaryKey": "<redacted>" }
Create a base64 representation of the connection string (either use primary or secondary)
echo -n '<connstr1>' | base64
Update the Kubernetes secret in consumer-scaling.yaml with the base64 encoded value.
- Deploy the Dapr binding to your cluster:
kubectl apply -f binding-deployment.yaml
- Deploy the Consumer to your cluster:
kubectl apply -f consumer-deployment.yaml
- Check the daprd sidecar for errors:
You should see that the sidecar has loaded the Component message-queue successfully
kubectl logs -l app=consumer -c daprd
... time="2019-12-24T14:27:24Z" level=info msg="loaded component message-queue (bindings.azure.servicebusqueues)" ...
- Deploy the Producer to your cluster:
kubectl apply -f producer-deployment.yaml
-
Get the public ip of the producer service:
kubectl get service
-
Open another shell and watch the logs of the consumer:
kubectl logs -f -l app=consumer -c consumer
-
Now invoke the REST endpoint of the Producer and set parameters as following:
{ "count": 1 "intervalMilliseconds": 0 }
You can either browse the Swagger UI that is published through the public endpoint or you just can use curl to invoke the endpoint. With the above parameters we create just one message to test if everything is working. Watch the logs of the Consumer in the other shell.
curl -X POST "http://<public ip>/MessageProducer" -H "accept: */*" -H "Content-Type: application/json" -d "{\"count\":1,\"intervalMilliseconds\":0}"
You should see the following in the other shell:
Hello World -- Received at: 12/24/2019 14:42:42 -- Finished at: 12/24/2019 14:42:47
The Consumer accepts the message, waits 5sec and prints out the message. Try playing arround the Producer parameters. All messages are processed one after the other.
-
Now its time to scale out the Consumer depending on how many messages are in the queue. In this example we scale out the Consumer in chunks of 5 messages. Deploy the keda ScaledObject to your cluster:
kubectl apply -f consumer-scaling.yaml
After the ScaledObject is deployed wait a few seconds and list your pods. Either you will see that the Consumer pod is Terminating or even no pod is running.
kubectl get pod NAME READY STATUS RESTARTS AGE dapr-operator-76888fdcb9-hhglf 1/1 Running 0 28h dapr-placement-666b996945-f8f7x 1/1 Running 0 28h dapr-sidecar-injector-744d97578f-6mm6l 1/1 Running 0 28h producer-7f988ccd4c-gcxg6 2/2 Running 0 18m
Wait a moment, why does this happen? Take a look at the ScaledObject in consumer-scaling.yaml.
apiVersion: keda.k8s.io/v1alpha1 kind: ScaledObject metadata: name: bindingconsumer-scaler labels: app: bindingconsumer deploymentName: consumer spec: scaleTargetRef: deploymentName: consumer minReplicaCount: 0 maxReplicaCount: 10 triggers: - type: azure-servicebus metadata: queueName: msgqueue queueLength: '5' authenticationRef: name: trigger-auth-servicebus
We specified that keda can scale in the Consumer to 0 pods and scale out to max 10 pods.
-
Now post a message again and see whats happening:
curl -X POST "http://<public ip>/MessageProducer" -H "accept: */*" -H "Content-Type: application/json" -d "{\"count\":1,\"intervalMilliseconds\":0}"
You see that a Consumer pod is created or it is already created:
consumer-fdd8b5997-whh46 0/2 ContainerCreating 0 2s dapr-operator-76888fdcb9-hhglf 1/1 Running 0 28h dapr-placement-666b996945-f8f7x 1/1 Running 0 28h dapr-sidecar-injector-744d97578f-6mm6l 1/1 Running 0 28h producer-7f988ccd4c-gcxg6 2/2 Running 0 24m
-
Now let's see what happens when you send 50 messages:
curl -X POST "http://<public ip>/MessageProducer" -H "accept: */*" -H "Content-Type: application/json" -d "{\"count\":50,\"intervalMilliseconds\":0}" kubectl get pod consumer-fdd8b5997-497ll 0/2 ContainerCreating 0 5s consumer-fdd8b5997-g7zsc 2/2 Running 0 5s consumer-fdd8b5997-vdmsx 2/2 Running 0 5s consumer-fdd8b5997-whh46 2/2 Running 0 3m28s dapr-operator-76888fdcb9-hhglf 1/1 Running 0 28h dapr-placement-666b996945-f8f7x 1/1 Running 0 28h dapr-sidecar-injector-744d97578f-6mm6l 1/1 Running 0 28h producer-7f988ccd4c-gcxg6 2/2 Running 0 27m
You can see that keda scales out the consumer pods!! Now let us see how the Horizontal Pod Autoscaler is configured.
kubectl get hpa NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE keda-hpa-consumer Deployment/consumer 0/5 (avg) 1 10 10 11m
Dapr bindings are described in Kubernetes as Custom Resource Definitions of kind Component.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: message-queue
spec:
type: bindings.azure.servicebusqueues
metadata:
- name: connectionString
secretKeyRef:
name: servicebus-management
key: servicebus-management-connectionstring
- name: queueName
value: "msgqueue"
In the spec section we specify the type of the binding bindings.azure.servicebusqueue. In addition, the connection string of the Azure ServiceBus instance and the name of the Queue must be specified. The name of the binding is set in metadata.name. This is the name that we use in our code to communicate with the dapr system.
The following secret is used to store the connection string.
apiVersion: v1
kind: Secret
metadata:
name: servicebus-management
labels:
app: bindingconsumer
data:
servicebus-management-connectionstring: "<your base64 encoded connection string>"
type: Opaque
When the Producer creates a message it just calls the http endpoint of the daprd injected sidecar and uses the name of the binding in the request url path. Look at the code of the MessageProducerController.
var daprport = "3500";
var daprUrl = $"http://localhost:{daprport}/v1.0/bindings/message-queue";
for (var i = 0; i < produce.Count; i++)
{
var msg = new Message
{
Text = "Hello World"
};
var payload = new
{
data = msg,
operation = "create"
};
var client = new HttpClient();
var data = JsonConvert.SerializeObject(payload, new JsonSerializerSettings { ContractResolver = new CamelCasePropertyNamesContractResolver() });
var result = await client.PostAsync(daprUrl, new StringContent(data, Encoding.UTF8, "application/json"));
if (!result.IsSuccessStatusCode)
{
var text = result.Content.ReadAsStringAsync();
return StatusCode((int)HttpStatusCode.InternalServerError, text);
}
await Task.Delay(produce.IntervalMilliseconds);
}
return Ok();
That's it! A POST request to http://localhost:3500/v1.0/bindings/message-queue is enough to put a message on the Azure Service Bus Queue. The body of the request looks as follow:
{
"data": "<your data>",
"operation": "create"
}
In the Consumer code a endpoint that accepts a request URL path with the name of the binding must be exposed and must accept a POST request.
[ApiController]
[Route("message-queue")]
public class MessageQueueController : ControllerBase
{
[HttpPost]
public async Task<IActionResult> Process([FromBody]Message message)
{
var start = DateTime.Now;
await Task.Delay(5 * 1000);
var end = DateTime.Now;
Console.WriteLine($"{message.Text} -- Received at: {start.ToString()} -- Finished at: {end.ToString()}");
return Ok();
}
}