Skip to content

Commit 61e48bf

Browse files
authored
EventGate Lambda entrypoint OOP (#108)
1 parent f5985df commit 61e48bf

File tree

9 files changed

+270
-74
lines changed

9 files changed

+270
-74
lines changed

src/event_gate_lambda.py

Lines changed: 30 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
# limitations under the License.
1515
#
1616

17-
"""Event Gate Lambda function implementation."""
17+
"""
18+
This module contains the AWS Lambda entry point for the EventGate service.
19+
"""
20+
1821
import json
1922
import logging
2023
import os
@@ -24,6 +27,7 @@
2427
import boto3
2528
from botocore.exceptions import BotoCoreError, NoCredentialsError
2629

30+
from src.handlers.handler_api import HandlerApi
2731
from src.handlers.handler_token import HandlerToken
2832
from src.handlers.handler_topic import HandlerTopic
2933
from src.handlers.handler_health import HandlerHealth
@@ -34,14 +38,15 @@
3438
from src.writers.writer_postgres import WriterPostgres
3539
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV
3640

37-
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
3841

3942
# Initialize logger
40-
logger = logging.getLogger(__name__)
43+
root_logger = logging.getLogger()
44+
if not root_logger.handlers:
45+
root_logger.addHandler(logging.StreamHandler())
46+
4147
log_level = os.environ.get("LOG_LEVEL", "INFO")
42-
logger.setLevel(log_level)
43-
if not logger.handlers:
44-
logger.addHandler(logging.StreamHandler())
48+
root_logger.setLevel(log_level)
49+
logger = logging.getLogger(__name__)
4550
logger.debug("Initialized logger with level %s", log_level)
4651

4752
# Load main configuration
@@ -52,11 +57,6 @@
5257
config = json.load(file)
5358
logger.debug("Loaded main configuration")
5459

55-
# Load API definition
56-
with open(os.path.join(PROJECT_ROOT, "api.yaml"), "r", encoding="utf-8") as file:
57-
API = file.read()
58-
logger.debug("Loaded API definition")
59-
6060
# Initialize S3 client with SSL verification
6161
try:
6262
ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True)
@@ -66,43 +66,35 @@
6666
logger.exception("Failed to initialize AWS S3 client")
6767
raise RuntimeError("AWS S3 client initialization failed") from exc
6868

69-
# Load access configuration
70-
ACCESS: Dict[str, list[str]] = {}
71-
if config["access_config"].startswith("s3://"):
72-
name_parts = config["access_config"].split("/")
73-
BUCKET_NAME = name_parts[2]
74-
BUCKET_OBJECT_KEY = "/".join(name_parts[3:])
75-
ACCESS = json.loads(aws_s3.Bucket(BUCKET_NAME).Object(BUCKET_OBJECT_KEY).get()["Body"].read().decode("utf-8"))
76-
else:
77-
with open(config["access_config"], "r", encoding="utf-8") as file:
78-
ACCESS = json.load(file)
79-
logger.debug("Loaded access configuration")
80-
81-
# Initialize token handler and load token public keys
82-
handler_token = HandlerToken(config).load_public_keys()
83-
8469
# Initialize EventGate writers
8570
writers = {
8671
"kafka": WriterKafka(config),
8772
"eventbridge": WriterEventBridge(config),
8873
"postgres": WriterPostgres(config),
8974
}
9075

91-
# Initialize topic handler and load topic schemas
92-
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token, writers).load_topic_schemas()
93-
94-
# Initialize health handler
76+
# Initialize EventGate handlers
77+
handler_token = HandlerToken(config).with_public_keys_queried()
78+
handler_topic = HandlerTopic(config, aws_s3, handler_token, writers).with_load_access_config().with_load_topic_schemas()
9579
handler_health = HandlerHealth(writers)
80+
handler_api = HandlerApi().with_api_definition_loaded()
9681

9782

98-
def get_api() -> Dict[str, Any]:
99-
"""Return the OpenAPI specification text."""
100-
return {"statusCode": 200, "body": API}
83+
# Route to handler function mapping
84+
ROUTE_MAP: Dict[str, Any] = {
85+
"/api": lambda _: handler_api.get_api(),
86+
"/token": lambda _: handler_token.get_token_provider_info(),
87+
"/health": lambda _: handler_health.get_health(),
88+
"/topics": lambda _: handler_topic.get_topics_list(),
89+
"/topics/{topic_name}": handler_topic.handle_request,
90+
"/terminate": lambda _: sys.exit("TERMINATING"),
91+
}
10192

10293

10394
def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any]:
10495
"""
10596
AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
97+
10698
Args:
10799
event: The event data from API Gateway.
108100
_context: The mandatory context argument for AWS Lambda invocation (unused).
@@ -113,26 +105,11 @@ def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any
113105
"""
114106
try:
115107
resource = event.get("resource", "").lower()
116-
if resource == "/api":
117-
return get_api()
118-
if resource == "/token":
119-
return handler_token.get_token_provider_info()
120-
if resource == "/health":
121-
return handler_health.get_health()
122-
if resource == "/topics":
123-
return handler_topic.get_topics_list()
124-
if resource == "/topics/{topic_name}":
125-
method = event.get("httpMethod")
126-
if method == "GET":
127-
return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower())
128-
if method == "POST":
129-
return handler_topic.post_topic_message(
130-
event["pathParameters"]["topic_name"].lower(),
131-
json.loads(event["body"]),
132-
handler_token.extract_token(event.get("headers", {})),
133-
)
134-
if resource == "/terminate":
135-
sys.exit("TERMINATING")
108+
route_function = ROUTE_MAP.get(resource)
109+
110+
if route_function:
111+
return route_function(event)
112+
136113
return build_error_response(404, "route", "Resource not found")
137114
except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc:
138115
logger.exception("Request processing error: %s", request_exc)

src/handlers/handler_api.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
#
2+
# Copyright 2026 ABSA Group Limited
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
"""
18+
This module provides the HandlerApi class for serving the OpenAPI specification.
19+
"""
20+
21+
import logging
22+
import os
23+
from typing import Dict, Any
24+
25+
logger = logging.getLogger(__name__)
26+
27+
28+
class HandlerApi:
29+
"""
30+
HandlerApi manages the OpenAPI specification endpoint.
31+
"""
32+
33+
def __init__(self):
34+
self.api_spec: str = ""
35+
36+
def with_api_definition_loaded(self) -> "HandlerApi":
37+
"""
38+
Load the OpenAPI specification from api.yaml file.
39+
40+
Returns:
41+
HandlerApi: The current instance with loaded API definition.
42+
Raises:
43+
RuntimeError: If loading or reading the API specification fails.
44+
"""
45+
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
46+
api_path = os.path.join(project_root, "api.yaml")
47+
48+
try:
49+
with open(api_path, "r", encoding="utf-8") as file:
50+
self.api_spec = file.read()
51+
52+
if not self.api_spec:
53+
raise ValueError("API specification file is empty")
54+
55+
logger.debug("Loaded API definition from %s", api_path)
56+
return self
57+
except (FileNotFoundError, PermissionError, ValueError) as exc:
58+
logger.exception("Failed to load or read API specification from %s", api_path)
59+
raise RuntimeError("API specification initialization failed") from exc
60+
61+
def get_api(self) -> Dict[str, Any]:
62+
"""
63+
Return the OpenAPI specification.
64+
65+
Returns:
66+
Dict[str, Any]: API Gateway response with OpenAPI spec.
67+
"""
68+
logger.debug("Handling GET API")
69+
return {
70+
"statusCode": 200,
71+
"headers": {"Content-Type": "application/yaml"},
72+
"body": self.api_spec,
73+
}

src/handlers/handler_token.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,11 @@ def _refresh_keys_if_needed(self) -> None:
7070
return
7171
try:
7272
logger.debug("Token public keys are stale, refreshing now")
73-
self.load_public_keys()
73+
self.with_public_keys_queried()
7474
except RuntimeError:
7575
logger.warning("Token public key refresh failed, using existing keys")
7676

77-
def load_public_keys(self) -> "HandlerToken":
77+
def with_public_keys_queried(self) -> "HandlerToken":
7878
"""
7979
Load token public keys from the configured URL.
8080
Returns:

src/handlers/handler_topic.py

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@
2323
from typing import Dict, Any
2424

2525
import jwt
26+
from boto3.resources.base import ServiceResource
2627
from jsonschema import validate
2728
from jsonschema.exceptions import ValidationError
2829

2930
from src.handlers.handler_token import HandlerToken
31+
from src.utils.conf_path import CONF_DIR
3032
from src.utils.utils import build_error_response
3133
from src.writers.writer import Writer
3234

3335
logger = logging.getLogger(__name__)
34-
log_level = os.environ.get("LOG_LEVEL", "INFO")
35-
logger.setLevel(log_level)
3636

3737

3838
class HandlerTopic:
@@ -42,24 +42,48 @@ class HandlerTopic:
4242

4343
def __init__(
4444
self,
45-
conf_dir: str,
46-
access_config: Dict[str, list[str]],
45+
config: Dict[str, Any],
46+
aws_s3: ServiceResource,
4747
handler_token: HandlerToken,
4848
writers: Dict[str, Writer],
4949
):
50-
self.conf_dir = conf_dir
51-
self.access_config = access_config
50+
self.config = config
51+
self.aws_s3 = aws_s3
5252
self.handler_token = handler_token
5353
self.writers = writers
54+
self.access_config: Dict[str, list[str]] = {}
5455
self.topics: Dict[str, Dict[str, Any]] = {}
5556

56-
def load_topic_schemas(self) -> "HandlerTopic":
57+
def with_load_access_config(self) -> "HandlerTopic":
58+
"""
59+
Load access control configuration from S3 or local file.
60+
Returns:
61+
HandlerTopic: The current instance with loaded access config.
62+
"""
63+
access_path = self.config["access_config"]
64+
logger.debug("Loading access configuration from %s", access_path)
65+
66+
if access_path.startswith("s3://"):
67+
name_parts = access_path.split("/")
68+
bucket_name = name_parts[2]
69+
bucket_object_key = "/".join(name_parts[3:])
70+
self.access_config = json.loads(
71+
self.aws_s3.Bucket(bucket_name).Object(bucket_object_key).get()["Body"].read().decode("utf-8")
72+
)
73+
else:
74+
with open(access_path, "r", encoding="utf-8") as file:
75+
self.access_config = json.load(file)
76+
77+
logger.debug("Loaded access configuration")
78+
return self
79+
80+
def with_load_topic_schemas(self) -> "HandlerTopic":
5781
"""
5882
Load topic schemas from configuration files.
5983
Returns:
6084
HandlerTopic: The current instance with loaded topic schemas.
6185
"""
62-
topic_schemas_dir = os.path.join(self.conf_dir, "topic_schemas")
86+
topic_schemas_dir = os.path.join(CONF_DIR, "topic_schemas")
6387
logger.debug("Loading topic schemas from %s", topic_schemas_dir)
6488

6589
with open(os.path.join(topic_schemas_dir, "runs.json"), "r", encoding="utf-8") as file:
@@ -85,7 +109,29 @@ def get_topics_list(self) -> Dict[str, Any]:
85109
"body": json.dumps(list(self.topics)),
86110
}
87111

88-
def get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
112+
def handle_request(self, event: Dict[str, Any]) -> Dict[str, Any]:
113+
"""
114+
Handle GET/POST requests for /topics/{topic_name} resource.
115+
116+
Args:
117+
event: The API Gateway event containing path parameters, method, body, and headers.
118+
Returns:
119+
Dict[str, Any]: API Gateway response.
120+
"""
121+
topic_name = event["pathParameters"]["topic_name"].lower()
122+
method = event.get("httpMethod")
123+
124+
if method == "GET":
125+
return self._get_topic_schema(topic_name)
126+
if method == "POST":
127+
return self._post_topic_message(
128+
topic_name,
129+
json.loads(event["body"]),
130+
self.handler_token.extract_token(event.get("headers", {})),
131+
)
132+
return build_error_response(404, "route", "Resource not found")
133+
134+
def _get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
89135
"""
90136
Return the JSON schema for a specific topic.
91137
Args:
@@ -104,7 +150,7 @@ def get_topic_schema(self, topic_name: str) -> Dict[str, Any]:
104150
"body": json.dumps(self.topics[topic_name]),
105151
}
106152

107-
def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
153+
def _post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
108154
"""
109155
Validate auth and schema; dispatch message to all writers.
110156
Args:
@@ -114,11 +160,16 @@ def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], tok
114160
Returns:
115161
Dict[str, Any]: API Gateway response indicating success or failure.
116162
Raises:
163+
RuntimeError: If access configuration is not loaded.
117164
jwt.PyJWTError: If token decoding fails.
118165
ValidationError: If message validation fails.
119166
"""
120167
logger.debug("Handling POST TopicMessage(%s)", topic_name)
121168

169+
if not self.access_config:
170+
logger.error("Access configuration not loaded")
171+
raise RuntimeError("Access configuration not loaded")
172+
122173
try:
123174
token: Dict[str, Any] = self.handler_token.decode_jwt(token_encoded)
124175
except jwt.PyJWTError: # type: ignore[attr-defined]

0 commit comments

Comments
 (0)