Directory contents are as follows:
/web- The GAS web app files/ann- Annotator files/util- Utility scripts/apps for notifications, archival, and restoration/aws- AWS user data files
The system decouples communication between the web server frontend from the annotation backend, allowing the web application to publish job requests asynchronously so that each component is capable of independent scaling and fault isolation.
For the frontend, an Application Load Balancer distributes HTTPS traffic across multiple EC2 instances in an Auto Scaling group, ensuring high availability and efficient resource utilization. The Auto Scaling group, configured with a minimum of two instances and a maximum of ten, provides elasticity to handle varying job requests from users while maintaining a baseline capacity for consistent performance.
On the backend, the annotation service is similarly designed for scalability and timely job processing. The annotators utilize polling mechanism and webhook pattern along with Amazon SNS and SQS, with the latter requiring its own load balancer and auto scaling group respond to the depth of SQS (#number of pending annotation jobs).
The system incorporates several reliability features, including health checks for instances and automated instance replacement. Error budgets are implicitly managed through the auto scaling policies, which maintain a minimum number of two healthy instances. Observability is addressed through AWS's built-in CloudWatch, allowing for tracking of key metrics such as instance health, request counts, and latency.
The architecture leverages Amazon S3 for storing input annotation files, results, and logs, while DynamoDB ensures persistent storage of job metadata and user interaction. To enhance scalability and reliability, I implemented asynchronous inter-process communication and serverless workflows for various system functions. For data archival and restoration, I created serverless workflows using AWS Step Functions and Lambda that integrated with Stripe’s payment API, facilitating efficient lifecycle management between S3 and Glacier based on user tiers, including users' account data and uploaded files.
The design of this periodic background archival task is based on a Flask app that presents an endpoint named /archive. This webhook endpoint accept periodic POST requests from the SNS results_archive topic. The delivery of such POST requests is handled by a AWS Step Functions state machine implemented in run.py. After an annotation job is completed, run.py starts state machine execution to wait through the time period in which free users are allowed to download the .annot.vcf file and then publish the payload to SNS results_archive. Finally, inside archive_app.py the endpoint /archive long-polls the archive message from SQS results_archive and does the archival tasks based on the user's role.
At the very beginning I thought I could let the webhook archive_app.py listen to the SNS job_results topic and parse the data (request.data) required by the webhook. But I decided not to do so due to scalability and fault-tolerance concern. Since SNS notifications are sent once only, it is better to retrieve the persisted payload from SQS than parse the it from SNS directly. Having new SNS/SQS results_archive topic/queue handle sudden spikes/high volume in archive demand(workload) more properly since this mechanism can distribute tasks across multiple consumers (and we may not want to have a SNS serving different purposes).
Additionally, adding new results_archive SNS/SQS is more horizontally scalable and cost-effective than continuous polling since the later approach consumes CPU and network resources, even when there are no tasks. A Step Functions state machine can be triggered multiple times concurrently (and scale individual state as needed); each execution of a state machine is independent. That said, the whenever the web hook archive_app.py receives HTTP POST multiple requests from SNS results_archive (after each execution finishes its waiting task), the endpoint can start the archival task based on the user's role.
- The restoration process is as follows:
- In
web/views.py, the endpoint/subscribewill send aPOSTrequest to update the user's profile to premium after that free_user has filled up the subscription form run by Stripe (A15). Then, this endpoint will triggers a SNS thawing topicjycchien_results_thawto send a message to the thawing queuejycchien_results_thaw. util/thaw/thaw_app.pyis a webhook triggered by the SNS thaw topic to poll messages from the corresponding thaw queue. When the webhook receives aPOSTrequest from the SNS thawing topic, it poll messages from the thawing queue and do the followings:- Extract
user_idfrom a received message in order to get all archives belonging to this current premium_user:dynamodb.query()thejob_id(named it asannotation_job_id) andresults_file_archive_idof the archived annotation job items the user has submitted as a free_user. - For each of these
results_file_archive_ids, the webhook then initiate a job for archive_retrieval onglacier, with the SNS restoration topicjycchien_results_restoreattached. This mechanism allows Glacier to publish a restoration message containingannotation_job_idasJobDescriptionwhen the thawing job is completed and the output of the thawed annotation result file is ready to be downloaded.- In this script,
glacier.initiate_job()will default to expedited retrieval, but if this leads toInsufficientCapacityException, the function will re-run with standard retrieval.
- In this script,
- Delete the thawed message from the thaw queue.
- Extract
util/restore/restore.pyis a Lambda Function triggered by the SNS restoration topicjycchien_results_restoreto poll messages from the corresponding restoration queue listening to that topic, and when the function is triggered:- Extract
annotation_job_id,glacier JobId,ArchiveIdfrom a received message. - Use the
glacier JobIdincluded in the restoration message to receive the bytes-file that was un-archived. - Use
dynamodb.get_item()to gets3_key_result_filefor the correspondingannotation_job_id. - Put the bytes-file back up to
S3located ats3_key_result_fileusings3.put_object(). - Delete the archive in Glacier for each processed restoration message with
glacier.delete_archive(). - Update the corresponding
dynamodbstatus to delete theresults_file_archive_idfield so that the front-end webpages are correctly displayed. - Delete the message from the restoration queue.
- Extract
- In
The restoration process is designed with the purpose to enhance scalability as the Lambda function scales up the number of execution environment instances in response if it receives more concurrent triggers. Additionally, implementing a Lambda function is cost-effective and "efficient in terms of input/output operations" due to the fact that serverless architectures have low operational overhead, with Lambda functions being spun up and down transiently.
In terms of preserving reliability, which is probably the most interesting part, I was hesitated between SNS to Lambda versus SNS to SQS to Lambda. The primary advantage of having a SQS in between SNS and Lambda is reprocessing. Assume that the Lambda fails to process certain event for some reason (e.g. timeout or lack of memory footprint), we can increase the timeout (to max 15 minutes) or memory (to max of 1.5GB) and restart the polling to reprocess the older events. This would not be possible in case of SNS to Lambda. However, I decided to do SNS to Lambda but have a SQS subscribe to that SNS (jycchien_results_restore). In this way, I am able to preserve the persistence provided by SQS and maintain the simplicity of implementing SNS to Lambda by serving the restoration topic as a pure trigger (i.e. I don't care about event sending to the Lambda by SNS) and let the Lambda do the long polling.
Reference: https://stackoverflow.com/questions/42656485/sns-to-lambda-vs-sns-to-sqs-to-lambda
Disclaimer: Copyright (C) 2015-2024 Vas Vasiliadis, University of Chicago
