Skip to content

Commit a252cf9

Browse files
authored
Merge pull request #7723 from freedomofpress/7707-source-conversation-truncated
feat(`api2`): support conversation truncation via `source_conversation_truncated` event
2 parents 8689ccd + 16b7336 commit a252cf9

File tree

5 files changed

+205
-24
lines changed

5 files changed

+205
-24
lines changed

API2.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ state "Cache and report success" as SuccessBranch {
179179
OK --> UpdateCache
180180
181181
UpdateCache : redis.set(event.id, OK, ttl)
182-
UpdateCache --> [*] : return (OK, delta)
182+
UpdateCache --> [*] : return (status, delta)
183183
}
184184
185185
Handler --> BadRequest

securedrop/journalist_app/api2/events.py

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from dataclasses import asdict
2+
from typing import List
23

34
from db import db
45
from journalist_app import utils
@@ -69,6 +70,7 @@ def process(self, event: Event, minor: int) -> EventResult:
6970
EventType.SOURCE_CONVERSATION_DELETED: self.handle_source_conversation_deleted,
7071
EventType.SOURCE_STARRED: self.handle_source_starred,
7172
EventType.SOURCE_UNSTARRED: self.handle_source_unstarred,
73+
EventType.SOURCE_CONVERSATION_TRUNCATED: self.handle_source_conversation_truncated,
7274
}[event.type]
7375
except KeyError:
7476
return EventResult(
@@ -118,12 +120,18 @@ def handle_item_deleted(event: Event, minor: int) -> EventResult:
118120
status=(EventStatusCode.Gone, None),
119121
)
120122

121-
utils.delete_file_object(item)
122-
return EventResult(
123-
event_id=event.id,
124-
status=(EventStatusCode.OK, None),
125-
items={event.target.item_uuid: None},
126-
)
123+
try:
124+
utils.delete_file_object(item)
125+
return EventResult(
126+
event_id=event.id,
127+
status=(EventStatusCode.OK, None),
128+
items={event.target.item_uuid: None},
129+
)
130+
except ValueError as exc:
131+
return EventResult(
132+
event_id=event.id,
133+
status=(EventStatusCode.InternalServerError, str(exc)),
134+
)
127135

128136
@staticmethod
129137
def handle_reply_sent(event: Event, minor: int) -> EventResult:
@@ -174,13 +182,19 @@ def handle_source_deleted(event: Event, minor: int) -> EventResult:
174182
# Mark as deleted all the items in the source's collection
175183
deleted_items = {item.uuid: None for item in source.collection}
176184

177-
utils.delete_collection(source.filesystem_id)
178-
return EventResult(
179-
event_id=event.id,
180-
status=(EventStatusCode.OK, None),
181-
sources={event.target.source_uuid: None},
182-
items=deleted_items,
183-
)
185+
try:
186+
utils.delete_collection(source.filesystem_id)
187+
return EventResult(
188+
event_id=event.id,
189+
status=(EventStatusCode.OK, None),
190+
sources={event.target.source_uuid: None},
191+
items=deleted_items,
192+
)
193+
except ValueError as exc:
194+
return EventResult(
195+
event_id=event.id,
196+
status=(EventStatusCode.InternalServerError, str(exc)),
197+
)
184198

185199
@staticmethod
186200
def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
@@ -208,6 +222,7 @@ def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
208222
# Mark as deleted all the items in the source's collection
209223
deleted_items = {item.uuid: None for item in source.collection}
210224

225+
# NB. Does not raise exceptions from `utils.delete_file_object()`.
211226
utils.delete_source_files(source.filesystem_id)
212227
db.session.refresh(source)
213228

@@ -218,6 +233,51 @@ def handle_source_conversation_deleted(event: Event, minor: int) -> EventResult:
218233
items=deleted_items,
219234
)
220235

236+
@staticmethod
237+
def handle_source_conversation_truncated(event: Event, minor: int) -> EventResult:
238+
"""
239+
A `source_conversation_truncated` event involves deleting all the items
240+
in the source's collection with interaction counts less than or equal to
241+
the specified upper bound, assumed to be the last item known to the
242+
client. This achieves the same consistency as a
243+
`source_conversation_deleted` event without requiring its strict
244+
versioning.
245+
"""
246+
247+
try:
248+
source = Source.query.filter(Source.uuid == event.target.source_uuid).one()
249+
except NoResultFound:
250+
return EventResult(
251+
event_id=event.id,
252+
status=(
253+
EventStatusCode.Gone,
254+
None,
255+
),
256+
)
257+
258+
deleted: List[ItemUUID] = []
259+
for item in source.collection:
260+
if item.interaction_count <= event.data.upper_bound:
261+
try:
262+
utils.delete_file_object(item)
263+
except ValueError:
264+
# `utils.delete_file_object()` is non-atomic: it guarantees
265+
# database deletion but not filesystem deletion. The former
266+
# is all we need for consistency with the client, and the
267+
# latter will be caught by monitoring for "disconnected"
268+
# submissions.
269+
pass
270+
271+
deleted.append(item.uuid)
272+
273+
db.session.refresh(source)
274+
return EventResult(
275+
event_id=event.id,
276+
status=(EventStatusCode.OK, None),
277+
sources={source.uuid: source},
278+
items={item_uuid: None for item_uuid in deleted},
279+
)
280+
221281
@staticmethod
222282
def handle_source_starred(event: Event, minor: int) -> EventResult:
223283
try:

securedrop/journalist_app/api2/types.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class EventType(StrEnum):
3636
ITEM_SEEN = auto()
3737
SOURCE_DELETED = auto()
3838
SOURCE_CONVERSATION_DELETED = auto()
39+
SOURCE_CONVERSATION_TRUNCATED = auto()
3940
SOURCE_STARRED = auto()
4041
SOURCE_UNSTARRED = auto()
4142

@@ -52,6 +53,7 @@ class EventStatusCode(IntEnum):
5253
Conflict = 409
5354
# The target UUID doesn't exist and it was a deletion request
5455
Gone = 410
56+
InternalServerError = 500
5557
NotImplemented = 501
5658

5759

@@ -144,7 +146,21 @@ def __post_init__(self) -> None:
144146
raise ValueError("reply must be a non-empty string")
145147

146148

147-
EVENT_DATA_TYPES = {EventType.REPLY_SENT: ReplySentData}
149+
@dataclass(frozen=True)
150+
class SourceConversationTruncatedData(EventData):
151+
# An upper bound of n means "delete items with interaction counts (sparsely)
152+
# up to and including n".
153+
upper_bound: int
154+
155+
def __post_init__(self) -> None:
156+
if self.upper_bound < 0:
157+
raise ValueError("upper_bound must be non-negative")
158+
159+
160+
EVENT_DATA_TYPES = {
161+
EventType.REPLY_SENT: ReplySentData,
162+
EventType.SOURCE_CONVERSATION_TRUNCATED: SourceConversationTruncatedData,
163+
}
148164

149165

150166
@dataclass(frozen=True)

securedrop/models.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ def query_options(cls, base: Optional[Load] = None) -> Tuple[Load, ...]:
279279
base.joinedload(cls.seen_messages).joinedload(SeenMessage.journalist), # type: ignore[attr-defined]
280280
)
281281

282+
@property
283+
def interaction_count(self) -> int:
284+
# Extract interaction_count from filename
285+
# (format: {interaction_count}-{journalist_filename}-*)
286+
return int(self.filename.split("-")[0])
287+
282288
@property
283289
def is_file(self) -> bool:
284290
return self.filename.endswith("doc.gz.gpg") or self.filename.endswith("doc.zip.gpg")
@@ -293,10 +299,6 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
293299
else: # is_message
294300
seen_by = [m.journalist.uuid for m in self.seen_messages if m.journalist]
295301

296-
# Extract interaction_count from filename
297-
# (format: {interaction_count}-{journalist_filename}-*)
298-
interaction_count = int(self.filename.split("-")[0])
299-
300302
data = {
301303
"kind": "file" if self.is_file else "message",
302304
"uuid": self.uuid,
@@ -308,7 +310,7 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
308310
}
309311

310312
if minor >= 2:
311-
data["interaction_count"] = interaction_count
313+
data["interaction_count"] = self.interaction_count
312314

313315
return data
314316

@@ -409,11 +411,13 @@ def query_options(cls, base: Optional[Load] = None) -> Tuple[Load, ...]:
409411
base.joinedload(cls.seen_replies).joinedload(SeenReply.journalist), # type: ignore[attr-defined]
410412
)
411413

412-
def to_api_v2(self, minor: int) -> Dict[str, Any]:
414+
@property
415+
def interaction_count(self) -> int:
413416
# Extract interaction_count from filename
414-
# (format: {interaction_count}-{journalist_filename}-reply.gpg)
415-
interaction_count = int(self.filename.split("-")[0])
417+
# (format: {interaction_count}-{journalist_filename}-*)
418+
return int(self.filename.split("-")[0])
416419

420+
def to_api_v2(self, minor: int) -> Dict[str, Any]:
417421
data = {
418422
"kind": "reply",
419423
"uuid": self.uuid,
@@ -425,7 +429,7 @@ def to_api_v2(self, minor: int) -> Dict[str, Any]:
425429
}
426430

427431
if minor >= 2:
428-
data["interaction_count"] = interaction_count
432+
data["interaction_count"] = self.interaction_count
429433

430434
return data
431435

securedrop/tests/test_journalist_api2.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,3 +1106,104 @@ def test_api_minor_versions(journalist_app, journalist_api_token, test_files, mi
11061106

11071107
else:
11081108
assert "events" not in resp.json
1109+
1110+
1111+
def test_api2_source_conversation_truncated(
1112+
journalist_app,
1113+
journalist_api_token,
1114+
test_files,
1115+
):
1116+
"""
1117+
Test processing of the "source_conversation_truncated" event.
1118+
Items with interaction_count <= upper_bound must be deleted.
1119+
Items with interaction_count > upper_bound must remain.
1120+
"""
1121+
with journalist_app.test_client() as app:
1122+
source = test_files["source"]
1123+
1124+
# Ensure we have submissions/replies and interaction_count fields
1125+
assert len(test_files["submissions"]) >= 1
1126+
assert len(test_files["replies"]) >= 1
1127+
1128+
# Fetch index to get current versions and interaction counts
1129+
index = app.get(
1130+
url_for("api2.index"),
1131+
headers=get_api_headers(journalist_api_token),
1132+
)
1133+
assert index.status_code == 200
1134+
1135+
# Build a map of item_uuid -> interaction_count
1136+
item_uuids = [item.uuid for item in (test_files["submissions"] + test_files["replies"])]
1137+
1138+
batch_resp = app.post(
1139+
url_for("api2.data"),
1140+
json={"items": item_uuids},
1141+
headers=get_api_headers(journalist_api_token),
1142+
)
1143+
assert batch_resp.status_code == 200
1144+
data = batch_resp.json
1145+
1146+
initial_counts = {
1147+
item_uuid: item["interaction_count"] for item_uuid, item in data["items"].items()
1148+
}
1149+
1150+
# Choose a bound that deletes some but not all items
1151+
# Pick the median interaction_count so we get both outcomes
1152+
sorted_counts = sorted(initial_counts.values())
1153+
upper_bound = sorted_counts[len(sorted_counts) // 2]
1154+
1155+
source_version = index.json["sources"][source.uuid]
1156+
1157+
event = Event(
1158+
id="999001",
1159+
target=SourceTarget(source_uuid=source.uuid, version=source_version),
1160+
type=EventType.SOURCE_CONVERSATION_TRUNCATED,
1161+
data={"upper_bound": upper_bound},
1162+
)
1163+
1164+
response = app.post(
1165+
url_for("api2.data"),
1166+
json={"events": [asdict(event)]},
1167+
headers=get_api_headers(journalist_api_token),
1168+
)
1169+
assert response.status_code == 200
1170+
1171+
status_code, msg = response.json["events"][event.id]
1172+
# Because some deletes may fail (simulated) and some succeed, the handler
1173+
# returns 200 if all succeed.
1174+
# The test_files fixtures never cause delete_file_object() to raise,
1175+
# so OK (200) is expected.
1176+
assert status_code == 200
1177+
1178+
# Verify item-wise results
1179+
returned_items = response.json["items"]
1180+
assert isinstance(returned_items, dict)
1181+
1182+
for item_uuid, count in initial_counts.items():
1183+
if count <= upper_bound:
1184+
# Must be returned as deleted: {uuid: None}
1185+
assert item_uuid in returned_items
1186+
assert returned_items[item_uuid] is None
1187+
# Also confirm removal in DB
1188+
assert (
1189+
Submission.query.filter(Submission.uuid == item_uuid).one_or_none()
1190+
or Reply.query.filter(Reply.uuid == item_uuid).one_or_none()
1191+
) is None
1192+
else:
1193+
# Must not be deleted
1194+
assert (
1195+
Submission.query.filter(Submission.uuid == item_uuid).one_or_none()
1196+
or Reply.query.filter(Reply.uuid == item_uuid).one_or_none()
1197+
) is not None
1198+
1199+
# Source must still exist
1200+
assert Source.query.filter(Source.uuid == source.uuid).one_or_none() is not None
1201+
1202+
# Resubmission must yield "Already Reported" (208)
1203+
res2 = app.post(
1204+
url_for("api2.data"),
1205+
json={"events": [asdict(event)]},
1206+
headers=get_api_headers(journalist_api_token),
1207+
)
1208+
assert res2.status_code == 200
1209+
assert res2.json["events"][event.id][0] == 208

0 commit comments

Comments
 (0)