Skip to content

Conversation

@tanujnay112
Copy link
Contributor

Description of changes

Summarize the changes made by this PR.

  • Improvements & Bug fixes
    • ...
  • New functionality
    • ...

Test plan

How are these changes tested?

  • Tests pass locally with pytest for python, yarn test for js, cargo test for rust

Migration plan

Are there any migrations, or any forwards/backwards compatibility changes needed in order to make sure this change deploys reliably?

Observability plan

What is the plan to instrument and monitor this change?

Documentation Changes

Are all docstrings for user-facing APIs updated if required? Do we need to make documentation changes in the docs section?

@tanujnay112 tanujnay112 marked this pull request as ready for review November 14, 2025 21:30
@github-actions
Copy link

Reviewer Checklist

Please leverage this checklist to ensure your code review is thorough before approving

Testing, Bugs, Errors, Logs, Documentation

  • Can you think of any use case in which the code does not behave as intended? Have they been tested?
  • Can you think of any inputs or external events that could break the code? Is user input validated and safe? Have they been tested?
  • If appropriate, are there adequate property based tests?
  • If appropriate, are there adequate unit tests?
  • Should any logging, debugging, tracing information be added or removed?
  • Are error messages user-friendly?
  • Have all documentation changes needed been made?
  • Have all non-obvious changes been commented?

System Compatibility

  • Are there any potential impacts on other parts of the system or backward compatibility?
  • Does this change intersect with any items on our roadmap, and if so, is there a plan for fitting them together?

Quality

  • Is this code of a unexpectedly high quality (Readability, Modularity, Intuitiveness)

Copy link
Contributor Author

tanujnay112 commented Nov 14, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@github-actions
Copy link

⚠️ The Helm chart was updated without a version bump. Your changes will only be published if the version field in k8s/distributed-chroma/Chart.yaml is updated.

@propel-code-bot
Copy link
Contributor

propel-code-bot bot commented Nov 14, 2025

Purge Nonce Plumbing from Core Services, Limiting It to s3heap Internals

This PR completely removes the nonce concept from every service except the s3-heap implementation, where it was originally introduced for optimistic concurrency. By deleting the column, proto fields, API handling, and any read/write logic, the nonce is now an internal detail of s3heap only. This simplifies the task data model, shrinks payloads, and eliminates a class of latent bugs related to stale‐nonce mismatches. A forward-compatible DB migration drops the column, and all Go/Rust models, protobuf stubs, and tests are updated accordingly.

Key Changes

• Dropped nonce column via migration 20251114125442_drop_nonce.sql and updated Go/Rust DAOs
• Regenerated protobufs after removing nonce from Task-related messages
• Refactored coordinator, sysdb, GC, scheduler, and related Rust types to stop reading/writing nonce
• Adjusted gRPC servers/clients, k8s manifests, Tiltfile, and CI configs to new schema
• Kept nonce logic untouched inside s3heap, isolating it from the public surface

Affected Areas

• Database schema & migrations
• Go & Rust data models (sysdb, coordinator)
• Protobuf/IDL definitions and generated code
• gRPC service handlers and clients
• Task execution & scheduling pathways
• CI/CD tooling, manifests, and tests

This summary was automatically generated by @propel-code-bot

Comment on lines 265 to +268
suite.NotNil(taskResp)
originalTaskID := taskResp.Id
suite.T().Logf("Created fully initialized task: %s", originalTaskID)

// STEP 2: Directly UPDATE database to make task partial (simulate Phase 3 failure)
// Set lowest_live_nonce = NULL to simulate the task being stuck
_, err = db.Exec(`UPDATE public.tasks SET lowest_live_nonce = NULL WHERE task_id = $1`, originalTaskID)
suite.NoError(err, "Should be able to corrupt task in database")
suite.T().Logf("Made task partial by setting lowest_live_nonce = NULL")
// TODO: Uncomment after proto regeneration
// originalTaskID := taskResp.Id
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Test skip without proper validation: The test is skipped unconditionally with suite.T().Skip() immediately after creating the task response, making the entire test setup meaningless. Either remove the test or implement proper proto field validation before skipping.

// Bad - unconditional skip after setup
suite.NotNil(taskResp)
// TODO: Uncomment after proto regeneration
// originalTaskID := taskResp.Id
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")

// Better - skip early or add validation
if taskResp.Id == "" {
    suite.T().Skip("Test requires proto regeneration")
    return
}
originalTaskID := taskResp.Id
// ... continue test
Context for Agents
[**BestPractice**]

**Test skip without proper validation**: The test is skipped unconditionally with `suite.T().Skip()` immediately after creating the task response, making the entire test setup meaningless. Either remove the test or implement proper proto field validation before skipping.

```go
// Bad - unconditional skip after setup
suite.NotNil(taskResp)
// TODO: Uncomment after proto regeneration
// originalTaskID := taskResp.Id
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")

// Better - skip early or add validation
if taskResp.Id == "" {
    suite.T().Skip("Test requires proto regeneration")
    return
}
originalTaskID := taskResp.Id
// ... continue test
```

File: go/pkg/sysdb/coordinator/heap_client_integration_test.go
Line: 268

Comment on lines +512 to +516
/// Note: This service is currently not fully functional due to nonce removal
pub async fn entrypoint() {
eprintln!("Heap tender service is not currently implemented");
eprintln!("The heap scheduling functionality was removed");
std::process::exit(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Dead code after unconditional process exit: The entrypoint() function always exits with std::process::exit(1), making it impossible to return normally. This prevents proper cleanup and makes the function signature (async fn) misleading.

// Current - unreachable return
pub async fn entrypoint() {
    eprintln!("Heap tender service is not currently implemented");
    eprintln!("The heap scheduling functionality was removed");
    std::process::exit(1);
} // implicit return is unreachable

// Better - explicit never return or remove async
pub fn entrypoint() -> ! {
    eprintln!("Heap tender service is not currently implemented");
    eprintln!("The heap scheduling functionality was removed");
    std::process::exit(1)
}
Context for Agents
[**BestPractice**]

**Dead code after unconditional process exit**: The `entrypoint()` function always exits with `std::process::exit(1)`, making it impossible to return normally. This prevents proper cleanup and makes the function signature (`async fn`) misleading.

```rust
// Current - unreachable return
pub async fn entrypoint() {
    eprintln!("Heap tender service is not currently implemented");
    eprintln!("The heap scheduling functionality was removed");
    std::process::exit(1);
} // implicit return is unreachable

// Better - explicit never return or remove async
pub fn entrypoint() -> ! {
    eprintln!("Heap tender service is not currently implemented");
    eprintln!("The heap scheduling functionality was removed");
    std::process::exit(1)
}
```

File: rust/s3heap-service/src/lib.rs
Line: 516

Comment on lines 248 to +249
pub async fn tend_to_heap(&self) -> Result<(), Error> {
let (witness, cursor, tended) = self.read_and_coalesce_dirty_log().await?;
if !tended.is_empty() {
let collection_ids = tended.iter().map(|t| t.0).collect::<Vec<_>>();
let scheduled = self
.sysdb
.clone()
.peek_schedule_by_collection_id(&collection_ids)
.await?;
let triggerables: Vec<Option<Schedule>> = scheduled
.into_iter()
.map(|s: ScheduleEntry| -> Result<_, Error> {
let triggerable = Triggerable {
partitioning: s3heap::UnitOfPartitioningUuid::new(s.collection_id.0),
scheduling: s3heap::UnitOfSchedulingUuid::new(s.attached_function_id),
};
if let Some(next_scheduled) = s.when_to_run {
let schedule = Schedule {
triggerable,
next_scheduled,
nonce: s.attached_function_run_nonce.0,
};
Ok(Some(schedule))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>, _>>()?;
let triggerables: Vec<Schedule> = triggerables.into_iter().flatten().collect();
if !triggerables.is_empty() {
self.writer.push(&triggerables).await?;
}
}
if let Some(witness) = witness.as_ref() {
self.cursor
.save(&HEAP_TENDER_CURSOR_NAME, &cursor, witness)
.await?;
} else {
self.cursor
.init(&HEAP_TENDER_CURSOR_NAME, cursor.clone())
.await?;
}
Ok(())
Err(Error::Internal("Not implemented".to_string()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Unimplemented function returns error instead of using proper Rust patterns: The tend_to_heap() function immediately returns an error string instead of using unimplemented!() or todo!() macros, which provide better stack traces and clearer intent.

// Current - generic error message
pub async fn tend_to_heap(&self) -> Result<(), Error> {
    Err(Error::Internal("Not implemented".to_string()))
}

// Better - explicit unimplemented with context
pub async fn tend_to_heap(&self) -> Result<(), Error> {
    unimplemented!("tend_to_heap removed after nonce removal - see TODO(tanujnay112)")
}
Context for Agents
[**BestPractice**]

**Unimplemented function returns error instead of using proper Rust patterns**: The `tend_to_heap()` function immediately returns an error string instead of using `unimplemented!()` or `todo!()` macros, which provide better stack traces and clearer intent.

```rust
// Current - generic error message
pub async fn tend_to_heap(&self) -> Result<(), Error> {
    Err(Error::Internal("Not implemented".to_string()))
}

// Better - explicit unimplemented with context
pub async fn tend_to_heap(&self) -> Result<(), Error> {
    unimplemented!("tend_to_heap removed after nonce removal - see TODO(tanujnay112)")
}
```

File: rust/s3heap-service/src/lib.rs
Line: 249

Comment on lines +167 to +174
// Finish marks work as complete
func (s *attachedFunctionDb) Finish(id uuid.UUID) error {
now := time.Now()
result := s.db.Model(&dbmodel.AttachedFunction{}).
Where("id = ?", id).
Where("is_deleted = false").
Where("lowest_live_nonce IS NULL"). // Only update if still NULL (2PC marker)
UpdateColumns(map[string]interface{}{
"lowest_live_nonce": lowestLiveNonce,
"updated_at": now,
"updated_at": now,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Finish method always returns success without validation: The Finish() method updates the database but doesn't verify that the attached function exists or is in a valid state before updating. If RowsAffected == 0, it returns ErrAttachedFunctionNotFound, but there's no validation that the attached function is actually ready to be finished (e.g., not already deleted, not in an invalid state).

// Current - minimal validation
result := s.db.Model(&dbmodel.AttachedFunction{}).
    Where("id = ?", id).
    Where("is_deleted = false").
    UpdateColumns(map[string]interface{}{
        "updated_at": now,
    })

// Consider adding state validation
// 1. Verify the function exists and is in expected state
// 2. Check if it's already finished
// 3. Validate any preconditions before marking complete
Context for Agents
[**BestPractice**]

**Finish method always returns success without validation**: The `Finish()` method updates the database but doesn't verify that the attached function exists or is in a valid state before updating. If `RowsAffected == 0`, it returns `ErrAttachedFunctionNotFound`, but there's no validation that the attached function is actually ready to be finished (e.g., not already deleted, not in an invalid state).

```go
// Current - minimal validation
result := s.db.Model(&dbmodel.AttachedFunction{}).
    Where("id = ?", id).
    Where("is_deleted = false").
    UpdateColumns(map[string]interface{}{
        "updated_at": now,
    })

// Consider adding state validation
// 1. Verify the function exists and is in expected state
// 2. Check if it's already finished
// 3. Validate any preconditions before marking complete
```

File: go/pkg/sysdb/metastore/db/dao/task.go
Line: 174

@blacksmith-sh
Copy link
Contributor

blacksmith-sh bot commented Nov 14, 2025

Found 1 test failure on Blacksmith runners:

Failure

Test View Logs
s3heap-service::test_k8s_integration_00_heap_tender/
test_k8s_integration_cursor_advances_on_subsequent_runs
View Logs


Fix in Cursor

schedule.Nonce == testMinimalUUIDv7.String() && // Should use minimal UUID
schedule.NextScheduled != nil
})).Return(nil).Once()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CriticalError]

While you've correctly removed the mock for UpdateLowestLiveNonce, a related mock for heapClient.Push in this same test (TestAttachFunction_SuccessfulCreation_WithHeapService) appears to be incorrect after the refactoring. It expects a specific nonce value (testMinimalUUIDv7.String()), but the refactored AttachFunction no longer sets the Nonce field on the Schedule protobuf, causing it to default to "". This will likely cause the test to fail.

A similar issue exists in TestAttachFunction_RecoveryFlow_HeapFailureThenSuccess.

The mocks should be updated to expect an empty string for the nonce to align with the removal of the nonce logic.

Context for Agents
[**CriticalError**]

While you've correctly removed the mock for `UpdateLowestLiveNonce`, a related mock for `heapClient.Push` in this same test (`TestAttachFunction_SuccessfulCreation_WithHeapService`) appears to be incorrect after the refactoring. It expects a specific nonce value (`testMinimalUUIDv7.String()`), but the refactored `AttachFunction` no longer sets the `Nonce` field on the `Schedule` protobuf, causing it to default to `""`. This will likely cause the test to fail.

A similar issue exists in `TestAttachFunction_RecoveryFlow_HeapFailureThenSuccess`.

The mocks should be updated to expect an empty string for the nonce to align with the removal of the nonce logic.

File: go/pkg/sysdb/coordinator/create_task_test.go
Line: 264

Comment on lines +488 to +492
// let cutoff_time = chrono::DateTime::<chrono::Utc>::from(
// attached_function_soft_delete_absolute_cutoff_time,
// );

// self.prune_heap_across_shards(cutoff_time).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[BestPractice]

Since the prune_heap_across_shards function has been removed, this commented-out code block is now dead code and can be removed for better clarity.

Context for Agents
[**BestPractice**]

Since the `prune_heap_across_shards` function has been removed, this commented-out code block is now dead code and can be removed for better clarity.

File: rust/garbage_collector/src/garbage_collector_component.rs
Line: 492

Comment on lines +361 to +362
// TODO: Uncomment after proto regeneration
suite.T().Skip("Test requires proto regeneration for AttachFunctionResponse.Id field")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[TestCoverage]

Critical: Test will always be skipped due to early return

Same issue as the previous test - the skip is placed after task creation, guaranteeing the actual cleanup/recreation logic never runs.

// Move skip to beginning or remove it
if needsProtoRegeneration {
    suite.T().Skip("Test requires proto regeneration")
    return
}
Context for Agents
[**TestCoverage**]

**Critical: Test will always be skipped due to early return**

Same issue as the previous test - the skip is placed after task creation, guaranteeing the actual cleanup/recreation logic never runs.

```go
// Move skip to beginning or remove it
if needsProtoRegeneration {
    suite.T().Skip("Test requires proto regeneration")
    return
}
```

File: go/pkg/sysdb/coordinator/heap_client_integration_test.go
Line: 362

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants