diff --git a/Makefile b/Makefile index 36dbcf2..cd0fc0e 100644 --- a/Makefile +++ b/Makefile @@ -33,6 +33,10 @@ test-client: test-client-p256 test-client-ed25519 test-client-p256: test-private-p256.pem openssl rand 1024 > random.bin poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin + openssl rand 1024 > random.bin + poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin + poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin + poetry run aggrec_client --http-key-id test-p256 --http-key-file $< random.bin test-client-ed25519: test-private-ed25519.pem openssl rand 1024 > random.bin diff --git a/aggrec/aggregates.py b/aggrec/aggregates.py index c811590..48fa39d 100644 --- a/aggrec/aggregates.py +++ b/aggrec/aggregates.py @@ -31,6 +31,16 @@ description="The number of aggregates stored", ) +aggregates_by_creator_counter = meter.create_counter( + "aggregates.counter_by_creator", + description="The number of aggregates per creator", +) + +aggregates_duplicates_counter = meter.create_counter( + "aggregates.duplicates_counter", + description="The number of duplicate aggregates received", +) + METADATA_HTTP_HEADERS = [ "User-Agent", @@ -107,6 +117,11 @@ def get_http_headers(request: Request, covered_components_headers: List[str]) -> return res +def get_aggregate_location(aggregate_id: ObjectId) -> str: + """Get aggregate location""" + return f"/api/v1/aggregates/{aggregate_id}" + + def get_new_aggregate_event_message(metadata: AggregateMetadata, settings: Settings) -> dict: """Get new aggregate event message""" return { @@ -246,8 +261,15 @@ async def create_aggregate( http_headers = get_http_headers(request, res.covered_components.keys()) + # if we receive an aggregate already seen, return existing metadata + if metadata := AggregateMetadata.objects(content_digest=content_digest).first(): + logger.warning("Received duplicate aggregate from %s", creator) + aggregates_duplicates_counter.add(1, {"aggregate_type": aggregate_type.value, "creator": creator}) + metadata_location = get_aggregate_location(metadata.id) + return Response(status_code=status.HTTP_201_CREATED, headers={"Location": metadata_location}) + aggregate_id = ObjectId() - location = f"/api/v1/aggregates/{aggregate_id}" + metadata_location = get_aggregate_location(aggregate_id) span.set_attribute("aggregate.id", str(aggregate_id)) span.set_attribute("aggregate.type", aggregate_type.value) @@ -273,6 +295,7 @@ async def create_aggregate( creator=creator, http_headers=http_headers, content_type=content_type, + content_digest=content_digest, s3_bucket=s3_bucket, ) @@ -304,6 +327,7 @@ async def create_aggregate( logger.info("Metadata saved: %s", metadata.id) aggregates_counter.add(1, {"aggregate_type": aggregate_type.value}) + aggregates_by_creator_counter.add(1, {"aggregate_type": aggregate_type.value, "creator": creator}) async with request.app.get_mqtt_client() as mqtt_client: with tracer.start_as_current_span("mqtt.publish"): @@ -312,7 +336,7 @@ async def create_aggregate( json.dumps(get_new_aggregate_event_message(metadata, request.app.settings)), ) - return Response(status_code=status.HTTP_201_CREATED, headers={"Location": location}) + return Response(status_code=status.HTTP_201_CREATED, headers={"Location": metadata_location}) @router.get( @@ -372,7 +396,7 @@ async def get_aggregate_payload( async with request.app.get_s3_client() as s3_client: s3_obj = await s3_client.get_object(Bucket=metadata.s3_bucket, Key=metadata.s3_object_key) - metadata_location = f"/api/v1/aggregates/{aggregate_id}" + metadata_location = get_aggregate_location(metadata.id) return StreamingResponse( content=s3_obj["Body"], diff --git a/aggrec/db_models.py b/aggrec/db_models.py index 2e8045b..794aa8f 100644 --- a/aggrec/db_models.py +++ b/aggrec/db_models.py @@ -19,6 +19,7 @@ class AggregateMetadata(Document): content_type = StringField() content_length = IntField() + content_digest = StringField(unique=True, sparse=True) s3_bucket = StringField() s3_object_key = StringField()