Skip to content

Commit

Permalink
Expiry per dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekas committed Aug 7, 2024
1 parent 4f56f2a commit 8e1699b
Show file tree
Hide file tree
Showing 22 changed files with 467 additions and 639 deletions.
9 changes: 3 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,9 @@ The following options are available:
--api-bindaddr The REST API bind address [=127.0.0.1].
-p, --api-port The REST Api port [=8080].
--repo-kind Backend for main repo store (fs, sqlite) [=fs].
-q, --storage-quota The size of the total storage quota dedicated to the node [=8589934592].
-t, --block-ttl Default block timeout in seconds - 0 disables the ttl [=$DefaultBlockTtl].
--block-mi Time interval in seconds - determines frequency of block maintenance cycle: how
often blocks are checked for expiration and cleanup
[=$DefaultBlockMaintenanceInterval].
--block-mn Number of blocks to check every maintenance cycle [=1000].
-q, --storage-quota The size in bytes of the total storage quota dedicated to the node [=8589934592].
-t, --default-ttl Default dataset expiry [=1d].
--maintenance-interval Determines how frequently datasets are checked for expiration and cleanup [=5m].
-c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives
[=0].
Expand Down
20 changes: 12 additions & 8 deletions codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type
restServer: RestServerRef
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
maintenance: DatasetMaintainer
taskpool: Taskpool

CodexPrivateKey* = libp2p.PrivateKey # alias
Expand Down Expand Up @@ -246,6 +246,9 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)

metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!")

repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!"))
Expand All @@ -256,21 +259,21 @@ proc new*(

repoStore = RepoStore.new(
repoDs = repoData,
metaDs = LevelDbDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create metadata store!"),
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)
metaDs = metaDs,
quotaMaxBytes = config.storageQuota)

maintenance = BlockMaintainer.new(
maintenance = DatasetMaintainer.new(
repoStore,
interval = config.blockMaintenanceInterval,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks)
metaDs,
defaultExpiry = config.defaultExpiry,
interval = config.maintenanceInterval)

peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)

prover = if config.prover:
if not fileAccessible($config.circomR1cs, {AccessFlags.Read}) and
endsWith($config.circomR1cs, ".r1cs"):
Expand Down Expand Up @@ -306,6 +309,7 @@ proc new*(
codexNode = CodexNodeRef.new(
switch = switch,
networkStore = store,
maintenance = maintenance,
engine = engine,
prover = prover,
discovery = discovery,
Expand Down
31 changes: 12 additions & 19 deletions codex/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ export units, net, codextypes, logutils

export
DefaultQuotaBytes,
DefaultBlockTtl,
DefaultBlockMaintenanceInterval,
DefaultNumberOfBlocksToMaintainPerInterval
DefaultDefaultExpiry,
DefaultMaintenanceInterval

proc defaultDataDir*(): string =
let dataDir = when defined(windows):
Expand Down Expand Up @@ -209,24 +208,18 @@ type
name: "storage-quota"
abbr: "q" }: NBytes

blockTtl* {.
desc: "Default block timeout in seconds - 0 disables the ttl"
defaultValue: DefaultBlockTtl
defaultValueDesc: $DefaultBlockTtl
name: "block-ttl"
defaultExpiry* {.
desc: "Default dataset expiry in seconds"
defaultValue: DefaultDefaultExpiry
defaultValueDesc: $DefaultDefaultExpiry
name: "default-ttl"
abbr: "t" }: Duration

blockMaintenanceInterval* {.
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup"
defaultValue: DefaultBlockMaintenanceInterval
defaultValueDesc: $DefaultBlockMaintenanceInterval
name: "block-mi" }: Duration

blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle"
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval
name: "block-mn" }: int
maintenanceInterval* {.
desc: "Determines how frequently datasets are checked for expiration and cleanup"
defaultValue: DefaultMaintenanceInterval
defaultValueDesc: $DefaultMaintenanceInterval
name: "maintenance-interval" }: Duration

cacheSize* {.
desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives"
Expand Down
2 changes: 2 additions & 0 deletions codex/namespaces.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const
CodexMetaNamespace & "/ttl"
CodexBlockProofNamespace* = # Cid and Proof
CodexMetaNamespace & "/proof"
CodexDatasetMetadataNamespace* = # Dataset
CodexMetaNamespace & "/dataset"
CodexDhtNamespace* = "dht" # Dht namespace
CodexDhtProvidersNamespace* = # Dht providers namespace
CodexDhtNamespace & "/providers"
Expand Down
55 changes: 33 additions & 22 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type
switch: Switch
networkId: PeerId
networkStore: NetworkStore
maintenance: DatasetMaintainer
engine: BlockExcEngine
prover: ?Prover
discovery: Discovery
Expand Down Expand Up @@ -155,17 +156,8 @@ proc updateExpiry*(
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)

try:
let
ensuringFutures = Iter[int].new(0..<manifest.blocksCount)
.mapIt(self.networkStore.localStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
await self.maintenance.ensureExpiry(manifest.treeCid, expiry)

Check warning on line 159 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L159

Added line #L159 was not covered by tests

return success()

proc fetchBatched*(
self: CodexNodeRef,
Expand Down Expand Up @@ -274,6 +266,13 @@ proc streamEntireDataset(

# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", manifestCid

if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[manifestCid])).errorOption:
return failure(err)

Check warning on line 274 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L274

Added line #L274 was not covered by tests

LPStream(StoreStream.new(self.networkStore, manifest, pad = false)).success

proc retrieve*(
Expand Down Expand Up @@ -361,6 +360,12 @@ proc store*(
error "Unable to store manifest"
return failure(err)

if err =? (await self.maintenance.trackExpiry(
treeCid,
manifest.blocksCount,
manifestsCids = @[manifestBlk.cid])).errorOption:
return failure(err)

Check warning on line 367 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L367

Added line #L367 was not covered by tests

info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeCid,
blocks = manifest.blocksCount,
Expand Down Expand Up @@ -566,18 +571,17 @@ proc onStore(
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))

proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len

let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)

if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg
return failure(err)
proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
if not blocksCb.isNil:
await blocksCb(blocks)
else:
success()

return success()
if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[cid])).errorOption:
return failure(err)

without indexer =? manifest.verifiableStrategy.init(
0, manifest.blocksCount - 1, manifest.numSlots).catch, err:
Expand All @@ -591,7 +595,7 @@ proc onStore(
if err =? (await self.fetchBatched(
manifest.treeCid,
blksIter,
onBatch = updateExpiry)).errorOption:
onBatch = onBatch)).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)

Expand All @@ -607,6 +611,11 @@ proc onStore(

trace "Slot successfully retrieved and reconstructed"

if err =? (await self.maintenance.ensureExpiry(
manifest.treeCid,
expiry)).errorOption:
return failure(err)

Check warning on line 617 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L617

Added line #L617 was not covered by tests

return success()

proc onProve(
Expand Down Expand Up @@ -771,6 +780,7 @@ proc new*(
T: type CodexNodeRef,
switch: Switch,
networkStore: NetworkStore,
maintenance: DatasetMaintainer,
engine: BlockExcEngine,
discovery: Discovery,
prover = Prover.none,
Expand All @@ -782,6 +792,7 @@ proc new*(
CodexNodeRef(
switch: switch,
networkStore: networkStore,
maintenance: maintenance,
engine: engine,
prover: prover,
discovery: discovery,
Expand Down
27 changes: 3 additions & 24 deletions codex/stores/blockstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future
raiseAssert("getBlockAndProof not implemented!")

method putBlock*(
self: BlockStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.base.} =
self: BlockStore,
blk: Block
): Future[?!void] {.base.} =
## Put a block to the blockstore
##

Expand All @@ -89,27 +89,6 @@ method getCidAndProof*(

raiseAssert("getCidAndProof not implemented!")

method ensureExpiry*(
self: BlockStore,
cid: Cid,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##

raiseAssert("Not implemented!")

method ensureExpiry*(
self: BlockStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970): Future[?!void] {.base.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##

raiseAssert("Not implemented!")

method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##
Expand Down
24 changes: 1 addition & 23 deletions codex/stores/cachestore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =

method putBlock*(
self: CacheStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
blk: Block): Future[?!void] {.async.} =
## Put a block to the blockstore
##

Expand All @@ -209,27 +208,6 @@ method putCidAndProof*(
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success()

method ensureExpiry*(
self: CacheStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##

discard # CacheStore does not have notion of TTL

method ensureExpiry*(
self: CacheStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's associated TTL in store - not applicable for CacheStore
##

discard # CacheStore does not have notion of TTL

method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
Expand Down
8 changes: 8 additions & 0 deletions codex/stores/keyutils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
DatasetMetadataKey* = Key.init(CodexDatasetMetadataNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
Expand All @@ -47,3 +48,10 @@ proc createBlockExpirationMetadataQueryKey*(): ?!Key =

proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key =
(BlockProofKey / $treeCid).flatMap((k: Key) => k / $index)

proc createDatasetMetadataKey*(treeCid: Cid): ?!Key =
DatasetMetadataKey / $treeCid

proc createDatasetMetadataQueryKey*(): ?!Key =
let queryString = ? (DatasetMetadataKey / "*")
Key.init(queryString)
Loading

0 comments on commit 8e1699b

Please sign in to comment.