Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elsa.Kafka Module for Kafka Integration with Message Sending and Receiving Activities #6108

Merged
merged 35 commits into from
Nov 18, 2024

Conversation

sfmskywalker
Copy link
Member

@sfmskywalker sfmskywalker commented Nov 10, 2024

This PR introduces a new module, Elsa.Kafka, to support Kafka integration in Elsa Workflows. The module includes two key activities to enable Kafka-based messaging workflows:

  • SendMessage: Sends a message to a Kafka topic using a specified producer configuration.
  • MessageReceived: Triggers when a message is received on a specified topic or set of topics based on a given consumer configuration.

Key Features:

  1. Message Correlation:

    • Allows specifying a predicate in the MessageReceived activity to filter messages.
    • The predicate has access to a message variable representing the received message.
    • Messages can be handled as raw strings or parsed into types like ExpandoObject or custom .NET classes.
  2. Flexible Configuration:

    • Kafka producers and consumers can be defined via appsettings.json or custom implementations of IConsumerDefinitionProvider and IProducerDefinitionProvider.
    • Multiple implementations are supported to expand the set of available producers and consumers.

Configuration

An example appsettings.json configuration is provided to set up topics, producers, and consumers, specifying parameters such as bootstrap servers, topics to consume from, and correlation fields for message filtering:

{
   "Kafka": {
      "Topics": [
         { "Id": "topic-1", "Name": "topic-1" },
         { "Id": "topic-2", "Name": "topic-2" }
      ],
      "Producers": [
         {
            "Id": "producer-1",
            "Name": "Producer 1",
            "BootstrapServers": [ "localhost:9092" ]
         }
      ],
      "Consumers": [
         {
            "Id": "consumer-1",
            "Name": "Consumer 1",
            "BootstrapServers": [ "localhost:9092" ],
            "GroupId": "group-1",
            "AutoOffsetReset": "earliest",
            "EnableAutoCommit": "true",
            "CorrelatingFields": [ "orderId", "customerId" ]
         }
      ]
   }
}

Sample Workflows

Producer Workflow Example

A sample ProducerWorkflow using the SendMessage activity:

public class ProducerWorkflow : WorkflowBase
{
    protected override void Build(IWorkflowBuilder builder)
    {
        builder.Name = "Producer Workflow";
        builder.Root = new SendMessage
        {
            Topic = new("topic-2"),
            ProducerDefinitionId = new("producer-1"),
            Content = new(() => new { OrderId = "1", CustomerId = "1" })
        };
    }
}

Consumer Workflow Example

A sample ConsumerWorkflow using the MessageReceived activity:

public class ConsumerWorkflow : WorkflowBase
{
    protected override void Build(IWorkflowBuilder builder)
    {
        var orderReceivedMessage = builder.WithVariable<ExpandoObject>();
        builder.Name = "Consumer Workflow";
        builder.Root = new Sequence
        {
            Activities =
            {
                new MessageReceived
                {
                    ConsumerDefinitionId = new("consumer-1"),
                    MessageType = new(typeof(ExpandoObject)),
Predicate = new(JavaScriptExpression.Create("message => message.OrderId == '1'")), 
                    Result = new(orderReceivedMessage),
                    CanStartWorkflow = true
                },
                new WriteLine(c => JsonSerializer.Serialize(orderReceivedMessage.Get(c)))
            }
        };
    }
}

The module supports parsing messages as ExpandoObject or custom .NET types, allowing for flexible message processing in workflows.

This addition provides essential Kafka messaging capabilities, expanding Elsa’s integration options with event-driven and message-based architectures.


This change is Reviewable

Introduced the Kafka module providing consumer integration and activities into the project. This includes new classes for consumer handling, configuration, and activities. An example setup using Docker Compose is also added to facilitate development and testing.
Simplify the namespace usage for the Timestamp type within the KafkaTransportMessage record. This change eliminates the need for a separate using directive for Timestamp, enhancing code readability and maintainability.
This commit introduces the capability to handle and trigger workflows based on Kafka transport messages. It adds a new handler, notifications, and updates the message stimulus to include correlating fields. Additionally, the Kafka consumers are now managed more modularly with updated startup tasks and mediator integration.
Added support for Kafka integration in Elsa.Server.Web by setting up Kafka configurations in appsettings.json and updating Program.cs. Also, renamed `ConsumerConfigs` to `ConsumerDefinitions` in Kafka options for clarity.
Introduced `IConsumerDefinitionEnumerator` for managing consumer definitions across providers and implemented in `ConsumerDefinitionEnumerator` class. Enhanced `KafkaFeature` to register these services and updated the `MessageReceived` activity to use a dropdown UI hint for consuming definitions. Improved `StartConsumersTask` by refactoring consumer definition retrieval logic.
Introduce a new SendMessage activity for Kafka, enabling message publishing to specific topics. Refine KafkaTransportMessage model by removing headers and timestamp fields. Adjust the StimulusSender logic to streamline the bookmark queuing process and fix key-value pairing in dropdown options. Update appsettings for corrected Kafka bootstrap server and topic configurations.
Introduced interfaces and implementations for managing producer and topic definitions along with their respective enumerators and list providers. Updated `SendMessage` activity to include producer selection and refactored consumer definition providers for better consistency.
Renamed Kafka configuration properties for better consistency and readability across the codebase. Updated property names from `ProducerDefinitions` to `Producers`, `ConsumerDefinitions` to `Consumers`, and `TopicDefinitions` to `Topics`. Added missing input attribute in `SendMessage.cs` and registered additional handlers in `KafkaFeature.cs`.
Introduced `DefaultSerializers` class for custom serialization and deserialization of Kafka messages. Updated `MessageReceived` and `SendMessage` activities to use these custom serializers, and modified `KafkaOptions` to include them.
Changed the serialization type from `ExpandoObject` to the actual type of the object to ensure proper serialization. This ensures that derived types are correctly handled during the serialization process.
Introduced two new workflows: `ProducerWorkflow` and `ConsumerWorkflow` for handling Kafka messages. Updated `DefaultSerializers` to use camelCase property naming and modified `appsettings.json` to include `topic-2` and format entries.
This change enhances the log output by serializing messages to JSON format before writing them. The addition of System.Text.Json ensures that the message content is presented in a structured and standardized format in logs.
Implemented HeaderCorrelationStrategy and NullCorrelationStrategy, and updated KafkaFeature to support customizable correlation strategies. Added correlation ID handling to Kafka transport messages and updated config and handlers accordingly.
Integrated ITenantAccessor to the provider to support tenant-specific context loading. Updated the constructor and LoadAsync method to retrieve the tenant information and use it for context-specific operations.
This commit changes the SQL database provider from SQLite to MySQL and disables Kafka use. It also includes necessary adjustments such as adding MySQL handling in configuration and connection setups, updating `docker-compose` to include MySQL services, and referencing MySQL projects in the `.csproj` file.
This commit introduces various UI property handlers across several features such as Python, JavaScript, CSharp, and Workflow features to enhance user interface property handling. It also updates the property UI handler resolution logic to better manage cases where providers are not available. Furthermore, adjustments were made in the server configuration to switch database providers and enable Kafka.
Modified the logic to fetch property UI handlers by preloading them into a list and then filtering. This change improves readability and potentially performance by reducing repetitive service provider calls.
Added `IWorkerTopicSubscriber` interface and its implementation for managing topic subscriptions. Enhanced workers to bind triggers and bookmarks dynamically based on existing data. Updated several classes and methods to support topic-based subscriptions and headers.
Extract trigger matching conditions into `IsMatchAsync` method for reuse. This enhances code maintainability and readability by reducing redundancy. The new `GetTopic` helper method isolates the topic retrieval logic.
This commit inserts the Name property in the returned object within the MassTransitActivityTypeProvider class. It ensures that the typeName is included, providing a clearer definition of the activity type.
Added a new event handler for `BookmarksDeleted` to ensure removed bookmarks are processed correctly. Refactored the bookmark removal logic into a helper method to reduce code duplication and streamline the workflow.
Refactored the `TriggerWorkflows` handler to use `IBookmarkQueue` instead of directly invoking the `IBookmarkResumer`. This change aims to improve scalability by queueing bookmark resumption requests, enabling better load distribution and async processing. Added necessary helpers and configuration options to support this functionality.
Simplify the constructor by removing the unused IBookmarkResumer dependency. This cleanup reduces potential confusion and improves code maintainability without impacting functionality.
Introduced an `IsLocal` property to `MessageReceivedStimulus` for determining if the message event is local to a specific workflow instance. Updated `BookmarkBinding` and related handler methods to utilize `CorrelationId` for local event matching. Removed unused `CorrelatingFields` from `MessageReceived` activity.
Updated `GetWorker` methods to return nullable `IWorker` to handle cases where a worker might not exist. Modified code to include null checks and conditional operations to prevent potential null reference exceptions when accessing worker methods.
This commit introduces filtering for triggers and bookmarks based on the `MessageReceived` activity type name. It also adds an option to mark messages as local in the `SendMessage` activity, where local messages are delivered to the current workflow instance only. These changes help enhance the management and targeted delivery of messages within the workflow framework.
New topics "topic-3" and "topic-4" were added to the Kafka settings. Unused topic references were removed from the producers configuration to simplify and improve clarity.
Introduced a predicate to the KafkaConsumerActivity using JavaScript expressions to filter messages based on OrderId. This ensures only relevant messages are processed in the workflow.
@sfmskywalker sfmskywalker marked this pull request as ready for review November 18, 2024 12:41
@sfmskywalker sfmskywalker merged commit f53e024 into main Nov 18, 2024
2 checks passed
@sfmskywalker sfmskywalker deleted the feat/kafka branch November 18, 2024 12:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant