Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion API2.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ state "Cache and report success" as SuccessBranch {
OK --> UpdateCache

UpdateCache : redis.set(event.id, OK, ttl)
UpdateCache --> [*] : return (OK, delta)
UpdateCache --> [*] : return (status, delta)
}

Handler --> BadRequest
Expand Down
86 changes: 73 additions & 13 deletions securedrop/journalist_app/api2/events.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from dataclasses import asdict
from typing import List

from db import db
from journalist_app import utils
Expand Down Expand Up @@ -69,6 +70,7 @@ def process(self, event: Event, minor: int) -> EventResult:
EventType.SOURCE_CONVERSATION_DELETED: self.handle_source_conversation_deleted,
EventType.SOURCE_STARRED: self.handle_source_starred,
EventType.SOURCE_UNSTARRED: self.handle_source_unstarred,
EventType.SOURCE_CONVERSATION_TRUNCATED: self.handle_source_conversation_truncated,
}[event.type]
except KeyError:
return EventResult(
Expand Down Expand Up @@ -118,12 +120,18 @@ def handle_item_deleted(event: Event, minor: int) -> EventResult:
status=(EventStatusCode.Gone, None),
)

utils.delete_file_object(item)
return EventResult(
event_id=event.id,
status=(EventStatusCode.OK, None),
items={event.target.item_uuid: None},
)
try:
utils.delete_file_object(item)
return EventResult(
event_id=event.id,
status=(EventStatusCode.OK, None),
items={event.target.item_uuid: None},
)
except ValueError as exc:
return EventResult(
event_id=event.id,
status=(EventStatusCode.InternalServerError, str(exc)),
)

@staticmethod
def handle_reply_sent(event: Event, minor: int) -> EventResult:
Expand Down Expand Up @@ -174,13 +182,19 @@ def handle_source_deleted(event: Event, minor: int) -> EventResult:
# Mark as deleted all the items in the source's collection
deleted_items = {item.uuid: None for item in source.collection}

utils.delete_collection(source.filesystem_id)
return EventResult(
event_id=event.id,
status=(EventStatusCode.OK, None),
sources={event.target.source_uuid: None},
items=deleted_items,
)
try:
utils.delete_collection(source.filesystem_id)
return EventResult(
event_id=event.id,
status=(EventStatusCode.OK, None),
sources={event.target.source_uuid: None},
items=deleted_items,
)
except ValueError as exc:
return EventResult(
event_id=event.id,
status=(EventStatusCode.InternalServerError, str(exc)),
)

@staticmethod
def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
Expand Down Expand Up @@ -208,6 +222,7 @@ def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
# Mark as deleted all the items in the source's collection
deleted_items = {item.uuid: None for item in source.collection}

# NB. Does not raise exceptions from `utils.delete_file_object()`.
utils.delete_source_files(source.filesystem_id)
db.session.refresh(source)

Expand All @@ -218,6 +233,51 @@ def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
items=deleted_items,
)

@staticmethod
def handle_source_conversation_truncated(event: Event, minor: int) -> EventResult:
"""
A `source_conversation_truncated` event involves deleting all the items
in the source's collection with interaction counts less than or equal to
the specified upper bound, assumed to be the last item known to the
client. This achieves the same consistency as a
`source_conversation_deleted` event without requiring its strict
versioning.
"""

try:
source = Source.query.filter(Source.uuid == event.target.source_uuid).one()
except NoResultFound:
return EventResult(
event_id=event.id,
status=(
EventStatusCode.Gone,
None,
),
)

deleted: List[ItemUUID] = []
for item in source.collection:
if item.interaction_count <= event.data.upper_bound:
try:
utils.delete_file_object(item)
except ValueError:
# `utils.delete_file_object()` is non-atomic: it guarantees
# database deletion but not filesystem deletion. The former
# is all we need for consistency with the client, and the
# latter will be caught by monitoring for "disconnected"
# submissions.
pass

deleted.append(item.uuid)

db.session.refresh(source)
return EventResult(
event_id=event.id,
status=(EventStatusCode.OK, None),
sources={source.uuid: source},
items={item_uuid: None for item_uuid in deleted},
)

@staticmethod
def handle_source_starred(event: Event, minor: int) -> EventResult:
try:
Expand Down
18 changes: 17 additions & 1 deletion securedrop/journalist_app/api2/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class EventType(StrEnum):
ITEM_SEEN = auto()
SOURCE_DELETED = auto()
SOURCE_CONVERSATION_DELETED = auto()
SOURCE_CONVERSATION_TRUNCATED = auto()
SOURCE_STARRED = auto()
SOURCE_UNSTARRED = auto()

Expand All @@ -52,6 +53,7 @@ class EventStatusCode(IntEnum):
Conflict = 409
# The target UUID doesn't exist and it was a deletion request
Gone = 410
InternalServerError = 500
NotImplemented = 501


Expand Down Expand Up @@ -144,7 +146,21 @@ def __post_init__(self) -> None:
raise ValueError("reply must be a non-empty string")


EVENT_DATA_TYPES = {EventType.REPLY_SENT: ReplySentData}
@dataclass(frozen=True)
class SourceConversationTruncatedData(EventData):
# An upper bound of n means "delete items with interaction counts (sparsely)
# up to and including n".
upper_bound: int

def __post_init__(self) -> None:
if self.upper_bound < 0:
raise ValueError("upper_bound must be non-negative")


EVENT_DATA_TYPES = {
EventType.REPLY_SENT: ReplySentData,
EventType.SOURCE_CONVERSATION_TRUNCATED: SourceConversationTruncatedData,
}


@dataclass(frozen=True)
Expand Down
22 changes: 13 additions & 9 deletions securedrop/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ def query_options(cls, base: Optional[Load] = None) -> Tuple[Load, ...]:
base.joinedload(cls.seen_messages).joinedload(SeenMessage.journalist), # type: ignore[attr-defined]
)

@property
def interaction_count(self) -> int:
# Extract interaction_count from filename
# (format: {interaction_count}-{journalist_filename}-*)
return int(self.filename.split("-")[0])

@property
def is_file(self) -> bool:
return self.filename.endswith("doc.gz.gpg") or self.filename.endswith("doc.zip.gpg")
Expand All @@ -293,10 +299,6 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
else: # is_message
seen_by = [m.journalist.uuid for m in self.seen_messages if m.journalist]

# Extract interaction_count from filename
# (format: {interaction_count}-{journalist_filename}-*)
interaction_count = int(self.filename.split("-")[0])

data = {
"kind": "file" if self.is_file else "message",
"uuid": self.uuid,
Expand All @@ -308,7 +310,7 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
}

if minor >= 2:
data["interaction_count"] = interaction_count
data["interaction_count"] = self.interaction_count

return data

Expand Down Expand Up @@ -409,11 +411,13 @@ def query_options(cls, base: Optional[Load] = None) -> Tuple[Load, ...]:
base.joinedload(cls.seen_replies).joinedload(SeenReply.journalist), # type: ignore[attr-defined]
)

def to_api_v2(self, minor: int) -> Dict[str, Any]:
@property
def interaction_count(self) -> int:
# Extract interaction_count from filename
# (format: {interaction_count}-{journalist_filename}-reply.gpg)
interaction_count = int(self.filename.split("-")[0])
# (format: {interaction_count}-{journalist_filename}-*)
return int(self.filename.split("-")[0])

def to_api_v2(self, minor: int) -> Dict[str, Any]:
data = {
"kind": "reply",
"uuid": self.uuid,
Expand All @@ -425,7 +429,7 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
}

if minor >= 2:
data["interaction_count"] = interaction_count
data["interaction_count"] = self.interaction_count

return data

Expand Down
101 changes: 101 additions & 0 deletions securedrop/tests/test_journalist_api2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,104 @@ def test_api_minor_versions(journalist_app, journalist_api_token, test_files, mi

else:
assert "events" not in resp.json


def test_api2_source_conversation_truncated(
journalist_app,
journalist_api_token,
test_files,
):
"""
Test processing of the "source_conversation_truncated" event.
Items with interaction_count <= upper_bound must be deleted.
Items with interaction_count > upper_bound must remain.
"""
with journalist_app.test_client() as app:
source = test_files["source"]

# Ensure we have submissions/replies and interaction_count fields
assert len(test_files["submissions"]) >= 1
assert len(test_files["replies"]) >= 1

# Fetch index to get current versions and interaction counts
index = app.get(
url_for("api2.index"),
headers=get_api_headers(journalist_api_token),
)
assert index.status_code == 200

# Build a map of item_uuid -> interaction_count
item_uuids = [item.uuid for item in (test_files["submissions"] + test_files["replies"])]

batch_resp = app.post(
url_for("api2.data"),
json={"items": item_uuids},
headers=get_api_headers(journalist_api_token),
)
assert batch_resp.status_code == 200
data = batch_resp.json

initial_counts = {
item_uuid: item["interaction_count"] for item_uuid, item in data["items"].items()
}

# Choose a bound that deletes some but not all items
# Pick the median interaction_count so we get both outcomes
sorted_counts = sorted(initial_counts.values())
upper_bound = sorted_counts[len(sorted_counts) // 2]

source_version = index.json["sources"][source.uuid]

event = Event(
id="999001",
target=SourceTarget(source_uuid=source.uuid, version=source_version),
type=EventType.SOURCE_CONVERSATION_TRUNCATED,
data={"upper_bound": upper_bound},
)

response = app.post(
url_for("api2.data"),
json={"events": [asdict(event)]},
headers=get_api_headers(journalist_api_token),
)
assert response.status_code == 200

status_code, msg = response.json["events"][event.id]
# Because some deletes may fail (simulated) and some succeed, the handler
# returns 200 if all succeed.
# The test_files fixtures never cause delete_file_object() to raise,
# so OK (200) is expected.
assert status_code == 200

# Verify item-wise results
returned_items = response.json["items"]
assert isinstance(returned_items, dict)

for item_uuid, count in initial_counts.items():
if count <= upper_bound:
# Must be returned as deleted: {uuid: None}
assert item_uuid in returned_items
assert returned_items[item_uuid] is None
# Also confirm removal in DB
assert (
Submission.query.filter(Submission.uuid == item_uuid).one_or_none()
or Reply.query.filter(Reply.uuid == item_uuid).one_or_none()
) is None
else:
# Must not be deleted
assert (
Submission.query.filter(Submission.uuid == item_uuid).one_or_none()
or Reply.query.filter(Reply.uuid == item_uuid).one_or_none()
) is not None

# Source must still exist
assert Source.query.filter(Source.uuid == source.uuid).one_or_none() is not None

# Resubmission must yield "Already Reported" (208)
res2 = app.post(
url_for("api2.data"),
json={"events": [asdict(event)]},
headers=get_api_headers(journalist_api_token),
)
assert res2.status_code == 200
assert res2.json["events"][event.id][0] == 208