Skip to content

Commit a46d445

Browse files
authored
feat: supporting meta field in store (#2609)
1 parent f8184a6 commit a46d445

File tree

12 files changed

+286
-36
lines changed

12 files changed

+286
-36
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
const ContentScriptVersion_4* =
2+
"""
3+
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
4+
ALTER TABLE messages RENAME TO messages_backup;
5+
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
6+
7+
CREATE TABLE IF NOT EXISTS messages (
8+
pubsubTopic VARCHAR NOT NULL,
9+
contentTopic VARCHAR NOT NULL,
10+
payload VARCHAR,
11+
version INTEGER NOT NULL,
12+
timestamp BIGINT NOT NULL,
13+
id VARCHAR NOT NULL,
14+
messageHash VARCHAR NOT NULL,
15+
storedAt BIGINT NOT NULL,
16+
meta VARCHAR,
17+
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
18+
) PARTITION BY RANGE (storedAt);
19+
20+
DO $$
21+
DECLARE
22+
min_storedAt numeric;
23+
max_storedAt numeric;
24+
min_storedAtSeconds integer = 0;
25+
max_storedAtSeconds integer = 0;
26+
partition_name TEXT;
27+
create_partition_stmt TEXT;
28+
BEGIN
29+
SELECT MIN(storedAt) into min_storedAt
30+
FROM messages_backup;
31+
32+
SELECT MAX(storedAt) into max_storedAt
33+
FROM messages_backup;
34+
35+
min_storedAtSeconds := min_storedAt / 1000000000;
36+
max_storedAtSeconds := max_storedAt / 1000000000;
37+
38+
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
39+
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
40+
' PARTITION OF messages FOR VALUES FROM (' ||
41+
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
42+
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
43+
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
44+
END IF;
45+
END $$;
46+
47+
INSERT INTO messages (
48+
pubsubTopic,
49+
contentTopic,
50+
payload,
51+
version,
52+
timestamp,
53+
id,
54+
messageHash,
55+
storedAt
56+
)
57+
SELECT pubsubTopic,
58+
contentTopic,
59+
payload,
60+
version,
61+
timestamp,
62+
id,
63+
messageHash,
64+
storedAt
65+
FROM messages_backup;
66+
67+
DROP TABLE messages_backup;
68+
69+
UPDATE version SET version = 4 WHERE version = 3;
70+
71+
"""

migrations/message_store_postgres/pg_migration_manager.nim

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import content_script_version_1, content_script_version_2, content_script_version_3
1+
import
2+
content_script_version_1, content_script_version_2, content_script_version_3,
3+
content_script_version_4
24

35
type MigrationScript* = object
46
version*: int
@@ -12,6 +14,7 @@ const PgMigrationScripts* =
1214
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
1315
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
1416
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
17+
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
1518
]
1619

1720
proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =

tests/testlib/wakucore.nim

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic
4747
proc fakeWakuMessage*(
4848
payload: string | seq[byte] = "TEST-PAYLOAD",
4949
contentTopic = DefaultContentTopic,
50-
meta = newSeq[byte](),
50+
meta: string | seq[byte] = newSeq[byte](),
5151
ts = now(),
5252
ephemeral = false,
5353
): WakuMessage =
5454
var payloadBytes: seq[byte]
55+
var metaBytes: seq[byte]
56+
5557
when payload is string:
5658
payloadBytes = toBytes(payload)
5759
else:
5860
payloadBytes = payload
5961

62+
when meta is string:
63+
metaBytes = toBytes(meta)
64+
else:
65+
metaBytes = meta
66+
6067
WakuMessage(
6168
payload: payloadBytes,
6269
contentTopic: contentTopic,
63-
meta: meta,
70+
meta: metaBytes,
6471
version: 2,
6572
timestamp: ts,
6673
ephemeral: ephemeral,

tests/waku_archive/test_driver_postgres.nim

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ suite "Postgres driver":
5454

5555
asyncTest "Insert a message":
5656
const contentTopic = "test-content-topic"
57+
const meta = "test meta"
5758

58-
let msg = fakeWakuMessage(contentTopic = contentTopic)
59+
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
5960

6061
let computedDigest = computeDigest(msg)
6162
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
@@ -75,6 +76,7 @@ suite "Postgres driver":
7576
assert toHex(computedDigest.data) == toHex(digest)
7677
assert toHex(actualMsg.payload) == toHex(msg.payload)
7778
assert toHex(computedHash) == toHex(hash)
79+
assert toHex(actualMsg.meta) == toHex(msg.meta)
7880

7981
asyncTest "Insert and query message":
8082
const contentTopic1 = "test-content-topic-1"

tests/waku_archive/test_driver_postgres_query.nim

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,61 @@ suite "Postgres driver - queries":
133133
check:
134134
filteredMessages == expected[2 .. 3]
135135

136+
asyncTest "single content topic with meta field":
137+
## Given
138+
const contentTopic = "test-content-topic"
139+
140+
let expected =
141+
@[
142+
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
143+
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
144+
fakeWakuMessage(
145+
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
146+
),
147+
fakeWakuMessage(
148+
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
149+
),
150+
fakeWakuMessage(
151+
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
152+
),
153+
fakeWakuMessage(
154+
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
155+
),
156+
fakeWakuMessage(
157+
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
158+
),
159+
fakeWakuMessage(
160+
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
161+
),
162+
]
163+
var messages = expected
164+
165+
shuffle(messages)
166+
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
167+
168+
for msg in messages:
169+
require (
170+
await driver.put(
171+
DefaultPubsubTopic,
172+
msg,
173+
computeDigest(msg),
174+
computeMessageHash(DefaultPubsubTopic, msg),
175+
msg.timestamp,
176+
)
177+
).isOk()
178+
179+
## When
180+
let res = await driver.getMessages(
181+
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
182+
)
183+
184+
## Then
185+
assert res.isOk(), res.error
186+
187+
let filteredMessages = res.tryGet().mapIt(it[1])
188+
check:
189+
filteredMessages == expected[2 .. 3]
190+
136191
asyncTest "single content topic - descending order":
137192
## Given
138193
const contentTopic = "test-content-topic"

tests/waku_archive/test_driver_sqlite.nim

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ suite "SQLite driver":
3232
test "insert a message":
3333
## Given
3434
const contentTopic = "test-content-topic"
35+
const meta = "test meta"
3536

3637
let driver = newSqliteArchiveDriver()
3738

38-
let msg = fakeWakuMessage(contentTopic = contentTopic)
39+
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
3940
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)
4041

4142
## When
@@ -51,9 +52,9 @@ suite "SQLite driver":
5152
check:
5253
storedMsg.len == 1
5354
storedMsg.all do(item: auto) -> bool:
54-
let (pubsubTopic, msg, _, _, hash) = item
55-
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
56-
hash == msgHash
55+
let (pubsubTopic, actualMsg, _, _, hash) = item
56+
actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
57+
hash == msgHash and msg.meta == actualMsg.meta
5758

5859
## Cleanup
5960
(waitFor driver.close()).expect("driver to close")

tests/waku_archive/test_driver_sqlite_query.nim

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic":
116116
## Cleanup
117117
(await driver.close()).expect("driver to close")
118118

119+
asyncTest "single content topic with meta field":
120+
## Given
121+
const contentTopic = "test-content-topic"
122+
123+
let driver = newSqliteArchiveDriver()
124+
125+
let expected =
126+
@[
127+
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
128+
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
129+
fakeWakuMessage(
130+
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
131+
),
132+
fakeWakuMessage(
133+
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
134+
),
135+
fakeWakuMessage(
136+
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
137+
),
138+
fakeWakuMessage(
139+
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
140+
),
141+
fakeWakuMessage(
142+
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
143+
),
144+
fakeWakuMessage(
145+
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
146+
),
147+
]
148+
var messages = expected
149+
150+
shuffle(messages)
151+
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)
152+
153+
for msg in messages:
154+
require (
155+
await driver.put(
156+
DefaultPubsubTopic,
157+
msg,
158+
computeDigest(msg),
159+
computeMessageHash(DefaultPubsubTopic, msg),
160+
msg.timestamp,
161+
)
162+
).isOk()
163+
164+
## When
165+
let res = await driver.getMessages(
166+
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
167+
)
168+
169+
## Then
170+
check:
171+
res.isOk()
172+
173+
let filteredMessages = res.tryGet().mapIt(it[1])
174+
check:
175+
filteredMessages == expected[2 .. 3]
176+
177+
## Cleanup
178+
(await driver.close()).expect("driver to close")
179+
119180
asyncTest "single content topic - descending order":
120181
## Given
121182
const contentTopic = "test-content-topic"

waku/factory/validator_signed.nim

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
3434
ctx.update(msg.payload)
3535
ctx.update(msg.contentTopic.toBytes())
3636
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
37+
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
3738
ctx.update(
3839
if msg.ephemeral:
3940
@[1.byte]

waku/waku_archive/driver/postgres_driver/migrations.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import
99
logScope:
1010
topics = "waku archive migration"
1111

12-
const SchemaVersion* = 3 # increase this when there is an update in the database schema
12+
const SchemaVersion* = 4 # increase this when there is an update in the database schema
1313

1414
proc breakIntoStatements*(script: string): seq[string] =
1515
## Given a full migration script, that can potentially contain a list

0 commit comments

Comments
 (0)