Skip to content

Reusable concurrency lease system for AWS Step Functions — implements the Distributed Lease Pattern using Lambda and DynamoDB to coordinate parallel executions safely across distributed workflows.

License

Notifications You must be signed in to change notification settings

GiamPy5/terraform-aws-sfn-concurrency-lease

Repository files navigation

terraform-aws-sfn-concurrency-lease

Composable Terraform module that adds safe concurrency control to AWS Step Functions.
It wraps a purpose-built Lambda function, DynamoDB table, and IAM wiring to implement the distributed lease pattern so you can throttle fan-out workloads without rewriting business logic.


Why this module?

  • Deterministic fan-out throttling – enforce a fixed number of parallel tasks across Step Functions Map states or nested workflows.
  • Drop-in state machine guards – ship pre-defined AcquireLease, optional CheckLeaseStatus + WaitForLease, and ReleaseLease state JSON that can be merged into existing definitions.
  • Production defaults – opinionated CloudWatch log retention, Powertools observability configuration, and optional DynamoDB autoscaling.
  • Transparent IAM – emitted inline policies and managed policy attachments so platform teams can review grants before deploying.
  • Tested runtime – the bundled Lambda function is covered by 100% unit test coverage using moto-backed regression tests.

Quick start

module "sfn_concurrency" {
  source = "GiamPy5/sfn-concurrency-lease/aws"

  name_prefix                = "analytics-pipeline"
  max_concurrent_leases      = 10
  max_lease_duration_seconds = 900

  # Optional: let the module create the table and Lambda package
  create_lambdas         = true
  create_dynamodb_table  = true

  # Optional: reuse an external table
  # create_dynamodb_table = false
  # ddb_table_name        = "shared-concurrency-leases"

  kms_key_arn = aws_kms_key.lambda_env.arn
}

Integrate the concurrency guard states into an existing Step Functions definition:

locals {
  state_machine_definition = jsonencode({
    StartAt = module.sfn_concurrency.sfn_acquire_lease_state_name
    States = merge(
      jsondecode(module.sfn_concurrency.acquire_lease_state),
      jsondecode(module.sfn_concurrency.check_lease_status_state),
      jsondecode(module.sfn_concurrency.wait_for_lease_state),
      {
        ProcessWork = {
          Type = "Task"
          Resource = aws_lambda_function.worker.arn
          Next = module.sfn_concurrency.sfn_release_lease_state_name
        }
      },
      jsondecode(module.sfn_concurrency.release_lease_state)
    )
  })
}

resource "aws_sfn_state_machine" "workflow" {
  name     = "analytics-workflow"
  role_arn = aws_iam_role.workflow.arn
  definition = local.state_machine_definition
}

resource "aws_iam_role_policy" "sfn_lambda_invoke" {
  role   = aws_iam_role.workflow.id
  policy = module.sfn_concurrency.lambda_permissions
}

See a complete end-to-end setup in examples/complete.


Architecture

Component Purpose
AWS Lambda (lease-manager) Handles acquire / release actions, counts active leases, and enforces TTLs. Ships with AWS Lambda Powertools logging, tracing, and metrics.
DynamoDB table Stores leases as {PK, SK} items with TTL attribute so expired leases are reclaimed automatically.
Terraform locals & outputs Provide ready-made Step Functions state fragments and IAM policy JSON to embed in existing workflows.

Execution flow:

  1. Step Function enters AcquireLease state and invokes the Lambda with action=acquire.
  2. Lambda counts active (TTL > now) leases; returns wait if the fleet is saturated.
  3. Business logic executes while holding the lease.
  4. ReleaseLease state calls the same Lambda with action=release to free capacity. If a lease disappears naturally (TTL expiry), the release call is treated as idempotent.

Requirements

Name Version
terraform ~> 1.6
aws ~> 6.0

Providers

Name Version
aws ~> 6.0

Modules

Name Source Version
dynamodb_table terraform-aws-modules/dynamodb-table/aws ~> 5.2
lambda terraform-aws-modules/lambda/aws ~> 8.1

Resources

Name Type
aws_caller_identity.current data source
aws_dynamodb_table.existing_table data source
aws_iam_policy_document.state_machine_permissions data source
aws_region.current data source

Inputs

Name Description Type Default Required
cloudwatch_logs_retention_in_days n/a number 7 no
create n/a bool true no
create_dynamodb_table n/a bool true no
create_lambdas n/a bool true no
ddb_autoscaling_enabled n/a bool false no
ddb_autoscaling_read n/a
object({
max_capacity = number
})
{
"max_capacity": 1
}
no
ddb_autoscaling_write n/a
object({
max_capacity = number
})
{
"max_capacity": 1
}
no
ddb_billing_mode n/a string "PAY_PER_REQUEST" no
ddb_deletion_protection_enabled n/a bool false no
ddb_hash_key n/a string "PK" no
ddb_point_in_time_recovery_enabled n/a bool false no
ddb_range_key n/a string "SK" no
ddb_read_capacity n/a number 10 no
ddb_table_name n/a string "" no
ddb_ttl_attribute_name n/a string "ttl" no
ddb_write_capacity n/a number 10 no
end_state_after_release_lease n/a bool false no
kms_key_arn n/a string "" no
lambdas_tracing_enabled n/a bool false no
lease_prefix n/a string "" no
max_concurrent_leases n/a number 100 no
max_lease_duration_seconds n/a number 600 no
name_prefix n/a string "concurrency-mgmt" no
powertools_configuration n/a
object({
metrics_namespace = optional(string, "terraform-aws-sfn-concurrency-lease")
metrics_disabled = optional(bool, false)
trace_disabled = optional(bool, false)
tracer_capture_response = optional(bool, true)
tracer_capture_error = optional(bool, true)
trace_middlewares = optional(list(string), [])
logger_log_event = optional(bool, false)
logger_sample_rate = optional(number, 0.1)
log_deduplication = optional(bool, false)
parameters_max_age = optional(number, 10)
parameters_ssm_decrypt = optional(bool, false)
dev_mode = optional(bool, false)
log_level = optional(string, "INFO")
})
{
"dev_mode": true,
"log_deduplication": false,
"log_level": "DEBUG",
"logger_log_event": true,
"logger_sample_rate": 0.1,
"metrics_disabled": false,
"metrics_namespace": "terraform-aws-sfn-concurrency-lease",
"parameters_max_age": 10,
"parameters_ssm_decrypt": false,
"trace_disabled": false,
"trace_middlewares": [],
"tracer_capture_error": true,
"tracer_capture_response": true
}
no
sfn_acquire_lease_state_name n/a string "AcquireLease" no
sfn_check_lease_state_name State name for the optional Choice state that inspects the acquire result. string "CheckLeaseStatus" no
sfn_lease_id_jsonpath JSONPath to extract the lease ID from the Step Functions context for the release step. string "$.lease_id" no
sfn_lease_result_path n/a string "$.acquireLease" no
sfn_post_acquire_lease_state n/a string "StartExecution" no
sfn_post_release_lease_state n/a string "NextStep" no
sfn_release_lease_result_path n/a string "$.releaseLease" no
sfn_release_lease_state_name n/a string "ReleaseLease" no
sfn_resource_id_jsonpath JSONPath to extract the resource ID from the Step Functions context for the acquire step. string "$.resource_id" no
sfn_wait_seconds Seconds the Wait state should pause before retrying an acquire call. number 5 no
sfn_wait_state_name State name for the optional Wait state that pauses before retrying an acquire. string "WaitForLease" no

Outputs

Name Description
acquire_lease_state n/a
check_lease_status_state n/a
dynamodb_table_arn n/a
dynamodb_table_name n/a
lambda_permissions n/a
release_lease_state n/a
sfn_acquire_lease_state_name n/a
sfn_check_lease_state_name n/a
sfn_release_lease_state_name n/a
sfn_wait_state_name n/a
wait_for_lease_state n/a

Example IAM integration

resource "aws_iam_role" "state_machine" {
  name               = "workflow-role"
  assume_role_policy = data.aws_iam_policy_document.sfn_assume_role.json
}

resource "aws_iam_role_policy" "invoke_lease_manager" {
  role   = aws_iam_role.state_machine.id
  policy = module.sfn_concurrency.lambda_permissions
}

Local development

# 1. Create a virtual environment
python -m venv .venv
source .venv/bin/activate

# 2. Install test dependencies
pip install -U pip
pip install boto3 moto pytest aws-lambda-powertools

# 3. Run the regression suite (includes 100% coverage for the Lambda runtime)
pytest

# Optional: view coverage
pytest --cov=src/lease_manager/lambda_function.py

Contributing

  1. Fork the repository and create a feature branch.
  2. Run terraform fmt and pytest before opening a PR.
  3. Update examples and documentation when toggling or adding inputs.
  4. Submit the pull request with a concise summary of the change.

Bug reports and feature requests are welcome through GitHub issues. Please include Terraform version, AWS provider version, and reproduction steps when reporting problems.


License

This project is licensed under the Apache License 2.0. You are free to use, modify, and distribute the module in personal or commercial projects as long as you retain the license notice.

About

Reusable concurrency lease system for AWS Step Functions — implements the Distributed Lease Pattern using Lambda and DynamoDB to coordinate parallel executions safely across distributed workflows.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •