Skip to content

Commit

Permalink
Expiry per dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
tbekas committed Feb 7, 2024
1 parent 532ceda commit d2666ac
Show file tree
Hide file tree
Showing 20 changed files with 492 additions and 676 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ The following options are available:
-p, --api-port The REST Api port [=8080].
--repo-kind Backend for main repo store (fs, sqlite) [=fs].
-q, --storage-quota The size of the total storage quota dedicated to the node [=8589934592].
-t, --block-ttl Default block timeout in seconds - 0 disables the ttl [=$DefaultBlockTtl].
--block-mi Time interval in seconds - determines frequency of block maintenance cycle: how
often blocks are checked for expiration and cleanup
[=$DefaultBlockMaintenanceInterval].
--block-mn Number of blocks to check every maintenance cycle [=1000].
-t, --default-ttl Default dataset expiry in seconds [=$DefaultDefaultExpiry].
--maint-interval Maintenance interval in seconds - determines frequency of maintenance cycle:
how often datasets are checked for expiration and cleanup. Value 0 disables the
maintenance [=$DefaultMaintenanceInterval].
-c, --cache-size The size of the block cache, 0 disables the cache - might help on slow hardrives
[=0].
--persistence Enables persistence mechanism, requires an Ethereum node [=false].
Expand Down
23 changes: 12 additions & 11 deletions codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type
restServer: RestServerRef
codexNode: CodexNodeRef
repoStore: RepoStore
maintenance: BlockMaintainer
maintenance: DatasetMaintainer

CodexPrivateKey* = libp2p.PrivateKey # alias
EthWallet = ethers.Wallet
Expand Down Expand Up @@ -238,31 +238,32 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)

metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!")

repoData = case config.repoKind
of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5)
.expect("Should create repo file data store!"))
of repoSQLite: Datastore(SQLiteDatastore.new($config.dataDir)
.expect("Should create repo SQLite data store!"))
of repoSQLite: Datastore(metaDs)

repoStore = RepoStore.new(
repoDs = repoData,
metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
.expect("Should create meta data store!"),
quotaMaxBytes = config.storageQuota,
blockTtl = config.blockTtl)
metaDs = metaDs,
quotaMaxBytes = config.storageQuota)

maintenance = BlockMaintainer.new(
maintenance = DatasetMaintainer.new(
repoStore,
interval = config.blockMaintenanceInterval,
numberOfBlocksPerInterval = config.blockMaintenanceNumberOfBlocks)
metaDs,
defaultExpiry = config.defaultExpiry,
interval = config.maintenanceInterval)

peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new()
blockDiscovery = DiscoveryEngine.new(repoStore, peerStore, network, discovery, pendingBlocks)
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery)
codexNode = CodexNodeRef.new(switch, store, maintenance, engine, erasure, discovery)
restServer = RestServerRef.new(
codexNode.initRestApi(config, repoStore),
initTAddress(config.apiBindAddress , config.apiPort),
Expand Down
31 changes: 12 additions & 19 deletions codex/conf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,8 @@ export units
export net
export
DefaultQuotaBytes,
DefaultBlockTtl,
DefaultBlockMaintenanceInterval,
DefaultNumberOfBlocksToMaintainPerInterval
DefaultDefaultExpiry,
DefaultMaintenanceInterval

const
codex_enable_api_debug_peers* {.booldefine.} = false
Expand Down Expand Up @@ -194,24 +193,18 @@ type
name: "storage-quota"
abbr: "q" }: NBytes

blockTtl* {.
desc: "Default block timeout in seconds - 0 disables the ttl"
defaultValue: DefaultBlockTtl
defaultValueDesc: $DefaultBlockTtl
name: "block-ttl"
defaultExpiry* {.
desc: "Default dataset expiry in seconds"
defaultValue: DefaultDefaultExpiry
defaultValueDesc: $DefaultDefaultExpiry
name: "default-ttl"
abbr: "t" }: Duration

blockMaintenanceInterval* {.
desc: "Time interval in seconds - determines frequency of block maintenance cycle: how often blocks are checked for expiration and cleanup"
defaultValue: DefaultBlockMaintenanceInterval
defaultValueDesc: $DefaultBlockMaintenanceInterval
name: "block-mi" }: Duration

blockMaintenanceNumberOfBlocks* {.
desc: "Number of blocks to check every maintenance cycle"
defaultValue: DefaultNumberOfBlocksToMaintainPerInterval
defaultValueDesc: $DefaultNumberOfBlocksToMaintainPerInterval
name: "block-mn" }: int
maintenanceInterval* {.
desc: "Maintenance interval in seconds - determines frequency of maintenance cycle: how often datasets are checked for expiration and cleanup"
defaultValue: DefaultMaintenanceInterval
defaultValueDesc: $DefaultMaintenanceInterval
name: "maint-interval" }: Duration

cacheSize* {.
desc: "The size of the block cache, 0 disables the cache - might help on slow hardrives"
Expand Down
2 changes: 2 additions & 0 deletions codex/namespaces.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const
CodexMetaNamespace & "/ttl"
CodexBlockProofNamespace* = # Cid and Proof
CodexMetaNamespace & "/proof"
CodexDatasetMetadataNamespace* = # Dataset
CodexMetaNamespace & "/dataset"
CodexDhtNamespace* = "dht" # Dht namespace
CodexDhtProvidersNamespace* = # Dht providers namespace
CodexDhtNamespace & "/providers"
Expand Down
60 changes: 37 additions & 23 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type
switch: Switch
networkId: PeerId
blockStore: BlockStore
maintenance: DatasetMaintainer
engine: BlockExcEngine
erasure: Erasure
discovery: Discovery
Expand Down Expand Up @@ -152,18 +153,8 @@ proc updateExpiry*(
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)

try:
let
ensuringFutures = Iter
.fromSlice(0..<manifest.blocksCount)
.mapIt(self.blockStore.ensureExpiry( manifest.treeCid, it, expiry ))
await allFuturesThrowing(ensuringFutures)
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
await self.maintenance.ensureExpiry(manifest.treeCid, expiry)

Check warning on line 156 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L156

Added line #L156 was not covered by tests

return success()

proc fetchBatched*(
self: CodexNodeRef,
Expand Down Expand Up @@ -233,6 +224,13 @@ proc retrieve*(

# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", cid

if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[cid])).errorOption:
return failure(err)

Check warning on line 232 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L232

Added line #L232 was not covered by tests

LPStream(StoreStream.new(self.blockStore, manifest, pad = false)).success
else:
let
Expand All @@ -251,6 +249,10 @@ proc retrieve*(
await stream.pushEof()

asyncSpawn streamOneBlock()

if err =? (await self.maintenance.trackExpiry(cid)).errorOption:
return failure(err)

LPStream(stream).success()

proc store*(
Expand Down Expand Up @@ -322,6 +324,12 @@ proc store*(
trace "Unable to store manifest"
return failure(err)

if err =? (await self.maintenance.trackExpiry(
treeCid,
manifest.blocksCount,
manifestsCids = @[manifestBlk.cid])).errorOption:
return failure(err)

Check warning on line 331 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L331

Added line #L331 was not covered by tests

info "Stored data", manifestCid = manifestBlk.cid,
treeCid = treeCid,
blocks = manifest.blocksCount,
Expand Down Expand Up @@ -515,24 +523,23 @@ proc onStore(
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))

proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
trace "Updating expiry for blocks", blocks = blocks.len

let ensureExpiryFutures = blocks.mapIt(self.blockStore.ensureExpiry(it.cid, expiry))
if updateExpiryErr =? (await allFutureResult(ensureExpiryFutures)).errorOption:
return failure(updateExpiryErr)

if not blocksCb.isNil and err =? (await blocksCb(blocks)).errorOption:
trace "Unable to process blocks", err = err.msg
return failure(err)
proc onBatch(blocks: seq[bt.Block]): Future[?!void] {.async.} =
if not blocksCb.isNil:
await blocksCb(blocks)
else:
success()

return success()
if err =? (await self.maintenance.trackExpiry(
manifest.treeCid,
manifest.blocksCount,
manifestsCids = @[cid])).errorOption:
return failure(err)

Check warning on line 536 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L536

Added line #L536 was not covered by tests

if blksIter =? builder.slotIndiciesIter(slotIdx) and
err =? (await self.fetchBatched(
manifest.treeCid,
blksIter,
onBatch = updateExpiry)).errorOption:
onBatch = onBatch)).errorOption:
trace "Unable to fetch blocks", err = err.msg
return failure(err)

Expand All @@ -544,6 +551,11 @@ proc onStore(
trace "Slot root mismatch", manifest = manifest.slotRoots[slotIdx.int], recovered = slotRoot.toSlotCid()
return failure(newException(CodexError, "Slot root mismatch"))

if err =? (await self.maintenance.ensureExpiry(
manifest.treeCid,
expiry)).errorOption:
return failure(err)

Check warning on line 557 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L557

Added line #L557 was not covered by tests

return success()

proc onProve(
Expand Down Expand Up @@ -698,6 +710,7 @@ proc new*(
T: type CodexNodeRef,
switch: Switch,
store: BlockStore,
maintenance: DatasetMaintainer,
engine: BlockExcEngine,
erasure: Erasure,
discovery: Discovery,
Expand All @@ -708,6 +721,7 @@ proc new*(
CodexNodeRef(
switch: switch,
blockStore: store,
maintenance: maintenance,
engine: engine,
erasure: erasure,
discovery: discovery,
Expand Down
26 changes: 1 addition & 25 deletions codex/stores/blockstore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ method getBlockAndProof*(self: BlockStore, treeCid: Cid, index: Natural): Future

method putBlock*(
self: BlockStore,
blk: Block,
ttl = Duration.none
blk: Block
): Future[?!void] {.base.} =
## Put a block to the blockstore
##
Expand Down Expand Up @@ -91,29 +90,6 @@ method getCidAndProof*(

raiseAssert("getCidAndProof not implemented!")

method ensureExpiry*(
self: BlockStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.base.} =
## Ensure that block's assosicated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##

raiseAssert("Not implemented!")

method ensureExpiry*(
self: BlockStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.base.} =
## Ensure that block's associated expiry is at least given timestamp
## If the current expiry is lower then it is updated to the given one, otherwise it is left intact
##

raiseAssert("Not implemented!")

method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} =
## Delete a block from the blockstore
##
Expand Down
24 changes: 1 addition & 23 deletions codex/stores/cachestore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool =

method putBlock*(
self: CacheStore,
blk: Block,
ttl = Duration.none): Future[?!void] {.async.} =
blk: Block): Future[?!void] {.async.} =
## Put a block to the blockstore
##

Expand All @@ -223,27 +222,6 @@ method putCidAndProof*(
self.cidAndProofCache[(treeCid, index)] = (blockCid, proof)
success()

method ensureExpiry*(
self: CacheStore,
cid: Cid,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's assosicated TTL in store - not applicable for CacheStore
##

discard # CacheStore does not have notion of TTL

method ensureExpiry*(
self: CacheStore,
treeCid: Cid,
index: Natural,
expiry: SecondsSince1970
): Future[?!void] {.async.} =
## Updates block's associated TTL in store - not applicable for CacheStore
##

discard # CacheStore does not have notion of TTL

method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
## Delete a block from the blockstore
##
Expand Down
8 changes: 8 additions & 0 deletions codex/stores/keyutils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const
CodexManifestKey* = Key.init(CodexManifestNamespace).tryGet
BlocksTtlKey* = Key.init(CodexBlocksTtlNamespace).tryGet
BlockProofKey* = Key.init(CodexBlockProofNamespace).tryGet
DatasetMetadataKey* = Key.init(CodexDatasetMetadataNamespace).tryGet
QuotaKey* = Key.init(CodexQuotaNamespace).tryGet
QuotaUsedKey* = (QuotaKey / "used").tryGet
QuotaReservedKey* = (QuotaKey / "reserved").tryGet
Expand All @@ -47,3 +48,10 @@ proc createBlockExpirationMetadataQueryKey*(): ?!Key =

proc createBlockCidAndProofMetadataKey*(treeCid: Cid, index: Natural): ?!Key =
(BlockProofKey / $treeCid).flatMap((k: Key) => k / $index)

proc createDatasetMetadataKey*(treeCid: Cid): ?!Key =
DatasetMetadataKey / $treeCid

proc createDatasetMetadataQueryKey*(): ?!Key =
let queryString = ? (DatasetMetadataKey / "*")
Key.init(queryString)
Loading

0 comments on commit d2666ac

Please sign in to comment.