Skip to content

Random disconnections with RabbitMQ Broker (AWS) #2340

@sharonkwong

Description

@sharonkwong

I set up a celery + beat containers in ECS and used AWS MQ as the broker. However, my setup seems to be very prone to disconnections, where celery workers are suddenly disconnected from MQ.

I end up seeing these logs, and at some point, celery stops retrying to connect and my entire background task container stops working.

Error logs from celery:

Unrecoverable error: OperationalError('_ssl.c:1000: The handshake operation timed out')", "exc_info": ["<class 'kombu.exceptions.OperationalError'>", "OperationalError('_ssl.c:1000: The handshake operation timed out')", "<traceback object at 0x7f442b213200>"

/usr/local/lib/python3.10/site-packages/celery/worker/consumer/consumer.py:495`
**Message:** {"event": "consumer: Cannot connect to amqps://admin:**@instance_did.mq.us-west-2.on.aws:5671//: timed out.\nTrying again in 16.00 seconds... (8/10)

Timed out waiting for UP message from <ForkProcess(ForkPoolWorker-313, started daemon)>


<class 'celery.beat.SchedulingError'>", "SchedulingError(\"Couldn't apply scheduled task scheduler-stress-test: _ssl.c:1000: The handshake operation timed out\")", "<traceback object at 0x7f36272f7840>

Here is my django settings.py config for celery:

# Function to fetch RabbitMQ credentials from AWS Secrets Manager
def get_rabbitmq_broker_url():
    """
    Fetch RabbitMQ credentials from AWS Secrets Manager and construct broker URL.
    Uses the same pattern as database password fetching.
    """
    if IS_LOCAL:
        return ""  # Local development uses Redis broker

    try:
        secrets_client = boto3.client("secretsmanager", region_name=AWS_REGION_NAME)
        print(
            f"allen about to fetch secret at {NON_LOCAL_ENVIRONMENT}/rabbitmq/credentials"
        )
        secret_response = secrets_client.get_secret_value(
            SecretId=f"{NON_LOCAL_ENVIRONMENT}/rabbitmq/credentials"
        )
        creds = json.loads(secret_response["SecretString"])

        # Construct the AMQP URL - FIXED: Use amqps:// for AWS MQ SSL connections
        host = creds["host"]  # now "b-XXXXXX.mq.us-west-2.on.aws:5671"
        username = creds["username"]
        password = creds["password"]
        vhost = creds.get("vhost", "/")

        # URL-encode the vhost if it's "/"
        if vhost == "/":
            encoded_vhost = "%2F"
        else:
            encoded_vhost = vhost

        if ":5671" in host:
            # Host already includes port, use as-is
            final_mq_url = f"amqps://{username}:{password}@{host}/{encoded_vhost}"
        else:
            # Host doesn't include port, add it
            final_mq_url = f"amqps://{username}:{password}@{host}:5671/{encoded_vhost}"

        print(
            f"Connecting to RabbitMQ: host={host}, username={username}, vhost={vhost}"
        )
        print(f"DEBUG: Celery broker URL: {final_mq_url}")
        return final_mq_url

    except Exception as e:
        print(f"Error fetching RabbitMQ credentials from Secrets Manager: {e}")
        return ""


# Celery Configuration
if IS_LOCAL:
    # Use Redis as both broker and result backend for local development
    CELERY_BROKER_URL = REDIS_URL
    print(f"Running with local Redis broker: {CELERY_BROKER_URL}")
else:
    # Use RabbitMQ in production
    CELERY_BROKER_URL = get_rabbitmq_broker_url()
    if not CELERY_BROKER_URL:
        raise RuntimeError("RabbitMQ broker URL not found in environment")
    print(f"Running with RabbitMQ broker: {CELERY_BROKER_URL}")

CELERY_RESULT_BACKEND = REDIS_URL
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = TIME_ZONE
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60  # 30 minutes

# Message persistence - ensures tasks survive broker restarts
CELERY_TASK_DELIVERY_MODE = 2  # Persistent messages

# Task publishing settings for reliability
CELERY_TASK_PUBLISH_RETRY = True  # Retry publishing failed tasks
CELERY_TASK_PUBLISH_RETRY_POLICY = {
    "max_retries": 3,
    "interval_start": 0,
    "interval_step": 1,
    "interval_max": 5,
}

# Redis Cluster configuration to avoid CROSSSLOT errors
CELERY_REDIS_BACKEND_USE_REDIS_CLUSTER = True
CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = {
    "global_keyprefix": "{beat}:"  # Hash tag ensures all keys go to the same slot
}

# https://gist.github.com/fjsj/da41321ac96cf28a96235cb20e7236f6

# RabbitMQ Specific Celery Settings - only apply when not local
if not IS_LOCAL:
    CELERY_BROKER_TRANSPORT_OPTIONS = {
        "confirm_publish": True,  # Enable publisher confirmation for exactly-once delivery
        "max_retries": 3,
        "interval_start": 1,
        "interval_step": 2,
        "interval_max": 10,
        # Connection timeout settings - optimized for AWS MQ with native SSL
        "socket_timeout": 30,
        "connect_timeout": 30,
        "heartbeat": 60,
        # SSL-specific settings - simplified for amqps:// connections
        "ssl_handshake_timeout": 30,  # Increased for AWS MQ latency
        "ssl_timeout": 30,  # Increased timeout
        # Connection pool settings
        "socket_keepalive": True,  # Enable TCP keep-alive
        "socket_keepalive_options": {
            1: 30,
            2: 10,
            3: 3,
        },
        # Additional reliability settings
        "blocked_connection_timeout": 60,
        "connection_attempts": 3,
        "retry_on_timeout": True,  # Retry on timeout errors
        # Additional AWS MQ specific settings
        "failover_strategy": "round-robin",  # Better failover handling
    }

    # RabbitMQ connection settings for better reliability
    CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
    CELERY_BROKER_CONNECTION_RETRY = True
    CELERY_BROKER_CONNECTION_MAX_RETRIES = 10

    # Add connection pool settings - INCREASED for better throughput
    CELERY_BROKER_POOL_LIMIT = None
    CELERY_BROKER_CONNECTION_TIMEOUT = 30
    CELERY_BROKER_HEARTBEAT = 60

    # Channel management settings to prevent ChannelPromise issues
    CELERY_BROKER_CHANNEL_ERROR_RETRY = True  # Retry on channel errors

    # AWS recommended settings for Celery 5.5+ with RabbitMQ
    # These reduce memory pressure on the broker
    CELERY_TASK_CREATE_MISSING_QUEUES = False  # Prevent queue churn
    CELERY_WORKER_ENABLE_REMOTE_CONTROL = (
        True  # Enable for Flower monitoring (slight memory cost)
    )
    CELERY_TASK_DEFAULT_QUEUE_TYPE = (
        "quorum"  # Use quorum queues for better reliability
    )

    CELERY_BROKER_NATIVE_DELAYED_DELIVERY_QUEUE_TYPE = "classic"

    CELERY_BROKER_TRANSPORT_OPTIONS["queue_arguments"] = {
        "x-dead-letter-strategy": "at-least-once"
    }


CELERY_TASK_ACKS_LATE = True
CELERY_TASK_ACKS_ON_FAILURE_OR_TIMEOUT = True
# CELERY_TASK_REJECT_ON_WORKER_LOST = True


CELERY_EVENT_QUEUE_EXPIRES = 60.0
CELERY_EVENT_QUEUE_TTL = 3600  # 1 hour message expiration
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_WORKER_SEND_TASK_EVENTS = True  # Ensure worker events for Flower

TASK_HTTP_CONNECT_TIMEOUT = 5
TASK_HTTP_READ_TIMEOUT = 60

# Celery Logging Configuration
# Disable Celery's built-in logging setup so it uses Django's configuration
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
CELERY_WORKER_LOG_COLOR = False  # Disable colors in production logs

# Task execution logging settings
CELERY_TASK_EAGER_PROPAGATES = True  # Propagate exceptions in eager mode
CELERY_TASK_STORE_EAGER_RESULT = True  # Store results even in eager mode

# Worker logging settings for better debugging
CELERY_WORKER_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
CELERY_WORKER_TASK_LOG_FORMAT = "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s"

# Additional debugging configuration
CELERY_TASK_ANNOTATIONS = {
    "*": {
        "preserve_name": True,  # Keep original task names for debugging
    }
}

# Enable task result metadata for better debugging
CELERY_RESULT_EXTENDED = True

My dockerfile command to start celery:

CMD ["celery", "-A", "server", "worker", "-E", "--autoscale=2,1", "--without-heartbeat", "--max-tasks-per-child=1000", "--time-limit=600", "--loglevel=info", "-n", "worker@%h"]

And my main.tf for my mq instance:

# RabbitMQ password is provided via variable (like other passwords in your infrastructure)

# Create security group for RabbitMQ
resource "aws_security_group" "rabbitmq" {
  name        = "${var.environment}-rabbitmq-sg"
  description = "Security group for RabbitMQ broker"
  vpc_id      = var.vpc_id

  # RabbitMQ AMQP port
  ingress {
    from_port   = 5671
    to_port     = 5671
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr]
    description = "RabbitMQ AMQP over TLS"
  }

  # RabbitMQ Management Console (optional, for debugging)
  ingress {
    from_port   = 443
    to_port     = 443
    protocol    = "tcp"
    cidr_blocks = [var.vpc_cidr]
    description = "RabbitMQ Management Console"
  }

  egress {
    from_port   = 0
    to_port     = 0
    protocol    = "-1"
    cidr_blocks = ["0.0.0.0/0"]
  }

  tags = {
    Name        = "${var.environment}-rabbitmq-sg"
    Environment = var.environment
  }

  lifecycle {
    create_before_destroy = true
  }
}

# Create subnet group for RabbitMQ
resource "aws_mq_broker" "rabbitmq" {
  broker_name        = "${var.environment}-project-rabbitmq"
  engine_type        = "RabbitMQ"
  engine_version     = "3.13"
  host_instance_type = var.instance_type
  security_groups    = [aws_security_group.rabbitmq.id]
  subnet_ids         = var.deployment_mode == "SINGLE_INSTANCE" ? [var.private_subnet_ids[0]] : var.private_subnet_ids

  deployment_mode = var.deployment_mode
  storage_type    = "ebs"

  # Configure the admin user
  user {
    username = var.rabbitmq_username
    password = var.rabbitmq_password
  }

  # Enable logging
  logs {
    general = true
  }

  # Maintenance window (adjust as needed)
  maintenance_window_start_time {
    day_of_week = "sunday"
    time_of_day = "02:00"
    time_zone   = "UTC"
  }

  # Auto minor version upgrade
  auto_minor_version_upgrade = true
  publicly_accessible        = false

  tags = {
    Name        = "${var.environment}-project-rabbitmq"
    Environment = var.environment
  }
}

# Store RabbitMQ credentials in AWS Secrets Manager
resource "aws_secretsmanager_secret" "rabbitmq_credentials" {
  name        = "${var.environment}/rabbitmq/credentials"
  description = "RabbitMQ broker credentials for ${var.environment}"

  tags = {
    Environment = var.environment
  }
}

resource "aws_secretsmanager_secret_version" "rabbitmq_credentials" {
  secret_id     = aws_secretsmanager_secret.rabbitmq_credentials.id
  secret_string = jsonencode({
    username = var.rabbitmq_username
    password = var.rabbitmq_password
    host     = replace(aws_mq_broker.rabbitmq.instances[0].endpoints[0], "amqps://", "")
    port     = "5671"
    vhost    = "/"
    # Add management API URL for Flower
    management_url = aws_mq_broker.rabbitmq.instances[0].console_url
  })

  depends_on = [aws_mq_broker.rabbitmq]
}

# Create CloudWatch log group for RabbitMQ logs
resource "aws_cloudwatch_log_group" "rabbitmq_logs" {
  name              = "/aws/amazonmq/broker/${aws_mq_broker.rabbitmq.broker_name}"
  retention_in_days = var.log_retention_days

  tags = {
    Environment = var.environment
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions