Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions migrations/message_store_postgres/content_script_version_4.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const ContentScriptVersion_4* =
"""
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
meta VARCHAR,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
Comment on lines +20 to +34
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the impression that it would be possible to just change the partitioned-table schema and that will automatically reflect that change in the current existing partitions. If that is the case, I think we can simplify the migration logic :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried it without the partition part (45a35e7) but the migration failed :/
So for now I think it's better to leave it this way

min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 4 WHERE version = 3;
"""
5 changes: 4 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import content_script_version_1, content_script_version_2, content_script_version_3
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4

type MigrationScript* = object
version*: int
Expand All @@ -12,6 +14,7 @@ const PgMigrationScripts* =
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
11 changes: 9 additions & 2 deletions tests/testlib/wakucore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic
proc fakeWakuMessage*(
payload: string | seq[byte] = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
meta = newSeq[byte](),
meta: string | seq[byte] = newSeq[byte](),
ts = now(),
ephemeral = false,
): WakuMessage =
var payloadBytes: seq[byte]
var metaBytes: seq[byte]

when payload is string:
payloadBytes = toBytes(payload)
else:
payloadBytes = payload

when meta is string:
metaBytes = toBytes(meta)
else:
metaBytes = meta

WakuMessage(
payload: payloadBytes,
contentTopic: contentTopic,
meta: meta,
meta: metaBytes,
version: 2,
timestamp: ts,
ephemeral: ephemeral,
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ suite "Postgres driver":

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
const meta = "test meta"

let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)

let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
Expand All @@ -75,6 +76,7 @@ suite "Postgres driver":
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
assert toHex(actualMsg.meta) == toHex(msg.meta)

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
Expand Down
55 changes: 55 additions & 0 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,61 @@ suite "Postgres driver - queries":
check:
filteredMessages == expected[2 .. 3]

asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"

let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)

## Then
assert res.isOk(), res.error

let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]

asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
Expand Down
9 changes: 5 additions & 4 deletions tests/waku_archive/test_driver_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ suite "SQLite driver":
test "insert a message":
## Given
const contentTopic = "test-content-topic"
const meta = "test meta"

let driver = newSqliteArchiveDriver()

let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)

## When
Expand All @@ -51,9 +52,9 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do(item: auto) -> bool:
let (pubsubTopic, msg, _, _, hash) = item
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash
let (pubsubTopic, actualMsg, _, _, hash) = item
actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash and msg.meta == actualMsg.meta

## Cleanup
(waitFor driver.close()).expect("driver to close")
61 changes: 61 additions & 0 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic":
## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"

let driver = newSqliteArchiveDriver()

let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)

## Then
check:
res.isOk()

let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
Expand Down
1 change: 1 addition & 0 deletions waku/factory/validator_signed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment addresses the doubt if meta field should be included in the hash.
Not sure if to delete the comment or not, it's ugly but I feel that it can be useful

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion there's no need to add meta to hash calculation but better to confirm with @jm-clius

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the signature goes in the meta field What do you mean? Which signature we talking here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm referring to the signed validators feature. In order to validate a message, looks like it expects the message hash to be signed and the signature to be attached to the WakuMessage's meta field.
https://github.com/waku-org/nwaku/blob/6382dedb42d0dda10c3eebcc243e31dbd8f8ceac/waku/factory/validator_signed.nim#L70-L73

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then yes indeed you can't add meta to the hash otherwise it would break this feature.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please note that the hash for the signed validator feature is an application-level message hash. It is different from the internal Waku Message hash.
For the signed validator feature, the meta field cannot be included (as pointed out, because it contains the very signature over the application-level hash). This is specified here: https://github.com/vacp2p/rfc-index/blob/main/status/raw/simple-scaling.md#dos-protection
For Waku Message hashes (the ones used in the Store and elsewhere) the meta field forms an essential part in order to allow applications to differentiate messages that may be exactly similar in all other fields: https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/14/message.md#deterministic-message-hashing

ctx.update(
if msg.ephemeral:
@[1.byte]
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 3 # increase this when there is an update in the database schema
const SchemaVersion* = 4 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
Loading