Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expiry per dataset #678

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open

Expiry per dataset #678

wants to merge 1 commit into from

Conversation

tbekas
Copy link
Contributor

@tbekas tbekas commented Jan 17, 2024

No description provided.

@tbekas tbekas changed the title Expiry per dataset WIP WIP Expiry per dataset Jan 17, 2024
@tbekas tbekas force-pushed the expiry-per-dataset branch 3 times, most recently from 5e5928a to 45ebcfe Compare January 17, 2024 18:03
@tbekas tbekas force-pushed the safe-block-deletion branch 2 times, most recently from 4ac3464 to 31f431b Compare January 18, 2024 15:28
@tbekas tbekas force-pushed the safe-block-deletion branch 4 times, most recently from fd45659 to d7053f7 Compare January 30, 2024 12:23
@tbekas tbekas force-pushed the expiry-per-dataset branch 2 times, most recently from cbc430e to d7b246a Compare January 31, 2024 16:53
@tbekas tbekas changed the title WIP Expiry per dataset Expiry per dataset Jan 31, 2024
@tbekas tbekas marked this pull request as ready for review January 31, 2024 16:55
@tbekas tbekas force-pushed the expiry-per-dataset branch 2 times, most recently from 44e1718 to d2666ac Compare February 7, 2024 15:51
README.md Outdated
-t, --default-ttl Default dataset expiry in seconds [=$DefaultDefaultExpiry].
--maint-interval Maintenance interval in seconds - determines frequency of maintenance cycle:
how often datasets are checked for expiration and cleanup. Value 0 disables the
maintenance [=$DefaultMaintenanceInterval].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't seem like it's resolving the default here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. I will change it to the literals 86400 (24 hours) for ttl and 300 for maintenance interval (5 minutes).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this used to work fine (in fact previous versions of codex do resolve this fine), however, I'm seeing the same in #700. Looks like Duration isn't properly stringified.

README.md Outdated
often blocks are checked for expiration and cleanup
[=$DefaultBlockMaintenanceInterval].
--block-mn Number of blocks to check every maintenance cycle [=1000].
-t, --default-ttl Default dataset expiry in seconds [=86400].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned, this should actually work with the constants, which is preferable and it appears like it broke some time ago, so a better solution would be to leave the constants in for now and figure out why they aren't resolving. We don't have to hold this pr because of it however.

codex/node.nim Outdated
@@ -233,6 +224,13 @@ proc retrieve*(

# Retrieve all blocks of the dataset sequentially from the local store or network
trace "Creating store stream for manifest", cid

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this here, is it to prevent retrieving expired datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, whenever we store a new dataset we need to explicitly call trackExpiry so that all blocks within that dataset will get maintained.

Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets get this merged with safe-block-deletion and I'll give it a more thorough review. There lots of changes across main and this two branches which makes reviewing this separately a bit hard.

@tbekas tbekas force-pushed the safe-block-deletion branch 3 times, most recently from fa36cd4 to a951bbf Compare February 12, 2024 23:17
@tbekas tbekas changed the base branch from safe-block-deletion to master June 5, 2024 17:41
Copy link
Contributor

@benbierens benbierens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docker image of this branch seems to be passing basic dist-tests.

codex/codex.nim Outdated
@@ -246,6 +246,9 @@ proc new*(
wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch)

metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metaDs seems to be defined again on line 262, but then as a LevelDbDs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching it 👌 We should be using a single metaDb


CodexProof.decode(bytes)

func `%`*(proof: CodexProof): JsonNode = % byteutils.toHex(proof.encode())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how the changes in this file are connected to the expiry per dataset. A quick explain will do! :D

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole change includes changing of the type of quota usage (used, reserved and available bytes) from simple uint to NBytes. Then to avoid converting it to uint everywhere I changed whenever it was suitable also to NBytes, that includes the data model used for REST endpoint, the RestRepoStore object type. And since it's need to be serialized properly on the endpoint we need such encoder.

if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)

return success()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On finishing a successful delete of a dataset, should we delete that dataset's entry in the dataset-metadata datastore? Is this already handled somewhere? or is there a reason not to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we do it when recording a checkpoint - we check if progress reached 100% and if so we remove the dataset metatada. Line 198.

return success()
else:
datasetMd.checkpoint.progress = index
return await self.deleteBatch(treeCid, datasetMd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call itself, does this not risk huge callstacks for large datasets? Might we not instead 'simply' wait for the next cycle of superviseDatasetDeletion to delete more blocks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, recursion is trampolined in Future. Basically you have infinite™️ callstack with {.async.}

quotaMaxBytes: quotaMaxBytes,
blockTtl: blockTtl
)
export store, types, coders
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Splitting stuff up?! I like it. 👍

@tbekas tbekas force-pushed the expiry-per-dataset branch 6 times, most recently from 8b6d782 to 61617b8 Compare June 19, 2024 15:06
@tbekas tbekas force-pushed the expiry-per-dataset branch 2 times, most recently from deec57d to 4d39a5c Compare June 21, 2024 07:37
Comment on lines 320 to 321
except Exception as exc:
error "Unexpected error during maintenance", msg = exc.msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception should not be caught, because Defect is not catchable which is a derived type of Exception. Instead, use what was there previously:

except CancelledError as error:
  raise error
except CatchableError as exc:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, there is a combination of exceptions being caught and errored Results being handled, which indicates there are some exceptions leaking in the underlying context, when they should be returned as an errored Result.

Ideally we should mark all of the routines that return a Result in the underlying context with {.raises:[].}. When the chronos v4 changes go in, we can also mark the async procs with {.async: (raises:[]).}

self.offset = 0
if (datasetMd.expiry < self.clock.now) and
(datasetMd.checkpoint.timestamp + self.retryDelay.seconds < self.clock.now):
asyncSpawn self.superviseDatasetDeletion(treeCid, datasetMd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly we should track these futures using TrackedFutures so that we can
successfully cancel them on stop:

DatasetMaintainer* = object
  trackedFutures: TrackedFutures

proc new*(
  T: type DatasetMaintainer,
  # ...
) =

  DatasetMaintainer(
    #...
    trackedFutures = TrackedFutures.new()
    #...
  )

# Usage:
proc checkDatasets(self: DatasetMaintainer): Future[?!void] {.async.} =
  # ...
  discard self.superviseDatasetDeletion(treeCid, datasetMd).track(self)
  # ...

proc stop*(self: DatasetMaintainer): Future[void] {.async.} =
  await self.trackedFutures.cancelTracked()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, adding it 👍

Comment on lines 167 to 189
await modify[DatasetMetadata](self.metaDs, key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
let datasetMd = DatasetMetadata(
expiry: max(currDatasetMd.expiry, minExpiry),
leavesCount: currDatasetMd.leavesCount,
manifestsCids: currDatasetMd.manifestsCids,
checkpoint: currDatasetMd.checkpoint
)
return datasetMd.some
else:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit more readable (for me, at least)

Suggested change
await modify[DatasetMetadata](self.metaDs, key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
let datasetMd = DatasetMetadata(
expiry: max(currDatasetMd.expiry, minExpiry),
leavesCount: currDatasetMd.leavesCount,
manifestsCids: currDatasetMd.manifestsCids,
checkpoint: currDatasetMd.checkpoint
)
return datasetMd.some
else:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
)
proc modifyData(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
without currDatasetMd =? maybeCurrDatasetMd:
raise newException(CatchableError, "DatasetMetadata for treeCid " & $treeCid & " not found")
let datasetMd = DatasetMetadata(
expiry: max(currDatasetMd.expiry, minExpiry),
leavesCount: currDatasetMd.leavesCount,
manifestsCids: currDatasetMd.manifestsCids,
checkpoint: currDatasetMd.checkpoint
)
return datasetMd.some
await modify[DatasetMetadata](self.metaDs, key, modifyData)

I still don't think we should be raising exceptions here, because the underlying
implementations (defaultModifyImpl and defaultModifyGetImpl) simply try/except these and turn them into Results.

Copy link
Contributor Author

@tbekas tbekas Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want a modify operation to be stopped, raising an exception is the only way to do that. In this case we want to stop it. The rest of the flow goes as expected, exception gets turned into result and everything gets eventually logged on an error level.

As for the first part of the comment I can extract that anonymous proc into a named proc if you want, however I don't see how it automatically becomes more readable this way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want a modify operation to be stopped, raising an exception is the only way to do that. In this case we want to stop it.

You say this because of the contract for modifyGet, right? Cause there is currently no provisioning there for a modify operation to be aborted?

I suppose returning maybeCurrDatasetMd would be equivalent to a NOP, but sort of inefficient as it would still trigger a write to the underlying store?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want a modify operation to be stopped, raising an exception is the only
way to do that

With Dmitriy's suggested change, returning a Result will be the way to stop an
operation, which is exactly what I had in mind.

I can extract that anonymous proc into a named proc if you want, however I
don't see how it automatically becomes more readable this way.

Understood. As an outside reader, I thought perhaps you might want to know what
is considered subjectively "more readable" for that reader.

Copy link
Contributor Author

@tbekas tbekas Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gmega

You say this because of the contract for modifyGet, right? Cause there is currently no provisioning there for a modify operation to be aborted?

It's not mention in the docs, but in the signature, we return Future which implies it can be a failure.

I suppose returning maybeCurrDatasetMd would be equivalent to a NOP, but sort of inefficient as it would still trigger a write to the underlying store?

Yep, returning the original argument is NOP. So the state in datastore is essentially the same as raising error. The difference is that when error is raised from a closure, that error is propagated to the caller of modifyGet as failure(err). Probably it should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emizzle

With Dmitriy's suggested change, returning a Result will be the way to stop an
operation, which is exactly what I had in mind.

Future already communicates the error. With Future[Result[T]] it becomes ambiguous where the error will be.

Also not sure why are we even talking about it. I'm not changing anything there.

codex/stores/maintenance.nim Outdated Show resolved Hide resolved
Comment on lines 241 to 247
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not simply delete the checkpoint here instead of including the delete logic
in the modify callback?

Copy link
Contributor Author

@tbekas tbekas Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time we're updating the checkpoint here. Deletion happens conditionally only as a last step of the process (recording 100% progress is equivalent to deleting the checkpoint along with dataset metadata). So answering the question we're not deleting because that would yield incorrect maintenance results (we would stop deleting blocks after the first batch and leave all the other blocks as garbage that will possibly never be deleted).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answering the question we're not deleting because that would yield incorrect maintenance results (we would stop deleting blocks after the first batch and leave all the other blocks as garbage that will possibly never be deleted)

I'm not following this, can you elaborate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datasets are not deleted at once, but in increments of size batchSize. Because the dataset may actually be a lot larger than the batch size, we are forced to store a deletion "cursor" (the checkpoint) so that the maintainer picks up from where it left off during the next maintenance cycle. For instance, a dataset with $10\ 000$ blocks will require $10$ maintenance cycles to be deleted (assuming the default batch size of $1\ 000$ blocks), and the checkpoint will only be deleted after the last cycle.

I think this is perhaps more complicated than it needs to be. We should talk about whether or not maintaining fixed batch sizes really make sense, cause I think being able to kill a dataset at once would simplify things. I'm also advocating limiting concurrency by locking datasets that are undergoing garbage collection so that any operation on the dataset gets forcefully reordered wrt ongoing deletion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks like I haven't fully understood why we need checkpoints - the code tries to delete the entire dataset, updating the checkpoint at every batch. There is no actual interruption unless that's coming from outside, so not sure why this is needed.

Copy link
Contributor Author

@tbekas tbekas Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checkpoints are optimization for storing cursor in case of some interruption like node shutdown. We could resume then from where we left (which may be useful for very large datasets). And yes, we try to delete all dataset blocks one after another.

Comment on lines 189 to 211
await self.metaDs.modify(key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)

if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress:
raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid)

if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress:
DatasetMetadata.none
else:
datasetMd.some
else:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more readable imo:

Suggested change
await self.metaDs.modify(key,
proc (maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
if currDatasetMd =? maybeCurrDatasetMd:
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress:
raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid)
if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress:
DatasetMetadata.none
else:
datasetMd.some
else:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
proc modifyData(maybeCurrDatasetMd: ?DatasetMetadata): Future[?DatasetMetadata] {.async.} =
without currDatasetMd =? maybeCurrDatasetMd:
raise newException(CatchableError, "Metadata for dataset with treeCid " & $treeCid & " not found")
if currDatasetMd.expiry != datasetMd.expiry or currDatasetMd.manifestsCids != datasetMd.manifestsCids:
raise newException(CatchableError, "Change in expiry detected, interrupting maintenance for dataset with treeCid " & $treeCid)
if currDatasetMd.checkpoint.progress > datasetMd.checkpoint.progress:
raise newException(CatchableError, "Progress should be increasing only, treeCid " & $treeCid)
if currDatasetMd.leavesCount <= datasetMd.checkpoint.progress:
DatasetMetadata.none
else:
datasetMd.some
await self.metaDs.modify(key, modifyData)

However, I also have a few comments:

  1. Returning DatasetMetadata.none seems like an odd way to indicate that the
    metadata should be deleted. Maybe it might be better to handle the delete
    logic later on when cleaning up?
  2. Why are we raising an exception for metadata not found here? It seems like it
    would be better placed for nim-datastore to handle that, and this
    predicate/callback would not be called because a failure would have been
    returned further up the call stack.
  3. This callback is try/excepted up the callstack and turned into a Result
    which eventually becomes the return value of modify. If we were to change
    the signature of this callback to return a Result instead of raising
    exceptions, then we know that returning failure(err) in the callback will
    become the returned value of modify and hence would be a lot easier to swallow
    as a reader.
  4. Since these Results are ultimately bubbled up to superviseDatasetDeletion,
    it's a good idea to type them properly so that they can be inspected and
    different failures can have differnt outcomes. For example, the "change in
    expiry" that is being checked sounds like it would be a nasty bug of unknown
    origin, so you may want to add a metric to it so you can monitor the
    occurences better.

Copy link
Contributor Author

@tbekas tbekas Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Keep in mind that it has to be done in a concurrent safe way, otherwise some anomalies will occur in the datastore, like deleting still used data or not deleting expired data. modify is the only way to perform concurrent safe updates to the records and it requires to return none if delete is an intended result of the operation.
  2. An exception is raised because it would be an error situation when we're trying to record a checkpoint when there's no DatasetMetadata that's related to the given treeCid. If you would like to see API for modify changed, please create an issue or maybe a PR in nim-datastore that would explain in detail how such API would look like.
  3. This PR uses existing API in nim-datastore. If you would like this API to be changed please raise such issue with detailed explanation in appropriate repo.
  4. No, change in expiry would not mean a nasty bug. It would mean that there was an update to the dataset metadata, that could be a result of for example reuploading a dataset just after it expired and maintenance already started but hasn't finished yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... I'm seriously wondering if we shouldn't limit concurrency so we can make this easier to reason about. But let's talk about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe @dryajov's comment address points 1-3. Regarding point 4, I think you
missed the point. The comment was meant to provide reasoning about why typing
exceptions is important. But as with the change that Dmitriy suggested, a Result
would be returned and exceptions would not need to be raised here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this API proposed by @dryajov will be available in nim-datastore I will use it. Even though I think it's pretty much the same as the current one with additional unnecessary complications to it.

@tbekas tbekas force-pushed the expiry-per-dataset branch 2 times, most recently from 23e8122 to 0ab4b1c Compare June 24, 2024 10:18
@tbekas
Copy link
Contributor Author

tbekas commented Jun 24, 2024

@emizzle please add a comment here whenever you will finish reviewing this PR.

Copy link
Contributor

@dryajov dryajov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I think this looks sound, but I can see that several aspects of the code, (e.g. error handling and overall style) have caused some controversy.

I think there are several reasons for this:

  1. the inherent style of the read-modify-write pattern, like the complex return state and the use of closures, which can be hard to read and reason about
  2. the way we're doing error handling, and overall handling return values

Seeing this in parcatice, I do see the issues with the design of modifyGet. For example, relying on optional to signal what operation to execute (update, delete or keep), limits error handling to only Exceptions. This isn't inherently wrong per se, but it makes the code harder to reason about and less consistent.

One way to address this would be to change modifyGet to be a bit more user friendly. For example by introducing a special return type structure that captures the semantics of the operations more closely, instead of relying on Option, which then opens the possibility of using Result to communicate error states more consistently.

Here is some pseudocode to better illustrate the idea and mull over the different possibilities.

type
  Operations = enum
    Keep,
    Update,
    Delete

  OpResult[T] = object
    case op: Operations
    of Keep: discard
    of Update:
      val: T
    of Delete: discard

  Function*[T, U] = proc(value: T): U {.raises: [CatchableError], gcsafe, closure.}
  ModifyGet* = Function[?!OpResult[seq[bytes]]]]

method modifyGet*(self: Datastore, key: Key, fn: ModifyGet): Future[?!seq[byte]] {.base, locks: "unknown".} =
  let maybeCurrentData = ... # get current data val
  whitout op =? fn(maybeCurrentData), err:
    return failure(...)

  case op.op:
  of Keep: ... # keep val
  of Update: doUpdate(key, op.val) # update entry
  of Delete: doDelete(key)         # delete entry

  success(...)

This makes it easier to reason about the implementation of the update fn, and allows for more consistent error handling style.

Given that this is a limitation of the underlying datastore, and not so much this code, I think we can move this PR forward, provided we address some of the other comments left by the other reviewers and myself. However, I would strongly suggest that we do think about improving modifyGet further and make the required changes asap.

codex/stores/maintenance.nim Outdated Show resolved Hide resolved
codex/stores/maintenance.nim Outdated Show resolved Hide resolved
codex/stores/maintenance.nim Outdated Show resolved Hide resolved
codex/stores/maintenance.nim Outdated Show resolved Hide resolved
Comment on lines 241 to 247
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answering the question we're not deleting because that would yield incorrect maintenance results (we would stop deleting blocks after the first batch and leave all the other blocks as garbage that will possibly never be deleted)

I'm not following this, can you elaborate?

codex/stores/maintenance.nim Outdated Show resolved Hide resolved
without queryKey =? createDatasetMetadataQueryKey(), err:
return failure(err)

without queryIter =? await query[DatasetMetadata](self.metaDs, Query.init(queryKey)), err:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How big is this query going to be, we should be careful and perhaps use pagination if this gets too large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how many datasets are we storing. I would say that number probably will not ever exceed 1k in practice. But if you want I can add pagination.


self.timer.start(onTimer, self.interval)
if self.interval.seconds > 0:
self.timer.start(onTimer, self.interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid using the timer altogether and just use a regular async for here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mean is this?

while (self.notStopped):
   doStuff()
   await asyncSleep(self.delay)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now I left the timer since I didn't get the confirmation if what I proposed above is what you mean.

@dryajov
Copy link
Contributor

dryajov commented Jul 29, 2024

One more comment, I think moving the closures to their own named functions does improve readability somewhat, but it is really marginal, so I'm either way on this and consider it more a matter of style that anything more substantive.

Copy link
Contributor

@emizzle emizzle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the suggested change from @dryajov above, and believe this would clear up quite a lot of the readability and reasoning difficulties I experienced as an outside reader.

I want to reiterate my review motivation throughout this PR, so there is no ambiguity. When I write code, I'm very much motivated by these two things:

  1. It is readable
  2. It is easy to reason about

The main thing is that as a writer, consideration of these points means you are always trying to see what you've written from a reader's perspective. Why is this important? Because if code is not readable and is not easy to reason about, it becomes, to a degree, technical debt.

I understand that these two goals are highly subjective, but what I find valuable as a writer is when a reader provides feedback about those points. I consider myself an "outside reader" of this code since I'm less involved in the client on a daily basis, and all of the suggestions I've made are simply my subjective opinion on how to potentially improve those two things. It doesn't mean I think I'm "right" about any of it, because if there's anything I've learned, being right about everything is only good for digging ditches 😂

A positive takeaway from this PR is that we all are very passionate about our quality of work and Codex itself and that is something we should all be happy about ❤️

Copy link
Member

@gmega gmega left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concerns are with all the concurrency issues I think we may still face with the absence of dataset-level locking, as well as a number of other places where I see we're open to more complexity due to support of transactional updates within our datastore itself.

README.md Outdated Show resolved Hide resolved
codex/codex.nim Show resolved Hide resolved
codex/conf.nim Show resolved Hide resolved
codex/conf.nim Show resolved Hide resolved
DefaultNumberOfBlocksToMaintainPerInterval* = 1000
DefaultDefaultExpiry* = 24.hours
DefaultMaintenanceInterval* = 5.minutes
DefaultBatchSize* = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... as I am reading through this, I am confused by the meaning of batch size in this context. It used to be the number of blocks I'm willing to go through at every garbage collection cycle, but now that expiration is per dataset I can't understand what this means anymore (am I willing to look into 1000 datasets per cycle? That looks too much. Is it still the total number of blocks? Then it's weird, cause this means a dataset can be partially purged if it has more blocks than that). I'll eventually figure it out by reading code, but this is no longer self-explanatory.

Copy link
Contributor Author

@tbekas tbekas Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchSize means number of blocks deleted before recording a checkpoint. If you want I can either add docs here or remove this param altogether with removing checkpoints and replace it using bisect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming to CheckpointLength and adding docs.

Comment on lines 241 to 247
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Datasets are not deleted at once, but in increments of size batchSize. Because the dataset may actually be a lot larger than the batch size, we are forced to store a deletion "cursor" (the checkpoint) so that the maintainer picks up from where it left off during the next maintenance cycle. For instance, a dataset with $10\ 000$ blocks will require $10$ maintenance cycles to be deleted (assuming the default batch size of $1\ 000$ blocks), and the checkpoint will only be deleted after the last cycle.

I think this is perhaps more complicated than it needs to be. We should talk about whether or not maintaining fixed batch sizes really make sense, cause I think being able to kill a dataset at once would simplify things. I'm also advocating limiting concurrency by locking datasets that are undergoing garbage collection so that any operation on the dataset gets forcefully reordered wrt ongoing deletion.

codex/stores/maintenance.nim Outdated Show resolved Hide resolved
Comment on lines 241 to 247
if err =? (await self.recordCheckpoint(treeCid, datasetMd)).errorOption:
return failure(err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK looks like I haven't fully understood why we need checkpoints - the code tries to delete the entire dataset, updating the checkpoint at every batch. There is no actual interruption unless that's coming from outside, so not sure why this is needed.

if numberReceived < self.numberOfBlocksPerInterval:
self.offset = 0
if (datasetMd.expiry < self.clock.now) and
(datasetMd.checkpoint.timestamp + self.retryDelay.seconds < self.clock.now):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah OK I really need to understand this better:

  1. when do we expect deletes to fail;
  2. why is retrying a strategy (assumes a transient condition).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There can be multiple reasons for that. I.e. fail to open db file due to too many file descriptors open.
  2. Because pt. 1. can happen or node can shutdown mid point dataset deletion.

res = StoreResult(kind: Stored, used: blk.data.len.NBytes)
if err =? (await self.repoDs.put(blkKey, blk.data)).errorOption:
raise err

(md.some, res)
)

proc tryDeleteBlock*(self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low): Future[?!DeleteResult] {.async.} =
proc tryDeleteBlock*(self: RepoStore, cid: Cid): Future[?!DeleteResult] {.async.} =
if cid.isEmpty:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that metadata and block state are not updated transactionally makes me a bit anxious. I can see for instance that there is room for metadata to be left behind after a block deletion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to address this.

@emizzle emizzle dismissed their stale review August 1, 2024 00:26

The remainder of the points I raised can be handled with a separate PR in nim-datastore

Copy link
Contributor

@benbierens benbierens left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sha-0ab4b1c branch passes basic dist-tests.

@tbekas tbekas force-pushed the expiry-per-dataset branch 2 times, most recently from 8e1699b to 950c729 Compare August 7, 2024 14:29
@gmega gmega added the Client See https://miro.com/app/board/uXjVNZ03E-c=/ for details label Sep 18, 2024
@gmega gmega assigned benbierens and unassigned tbekas Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client See https://miro.com/app/board/uXjVNZ03E-c=/ for details
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants