-
Notifications
You must be signed in to change notification settings - Fork 27
fix: Improve error handling within Quay Importer #1893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Fixes: trustification#1892 Invalid sources will show a proper error. Disabling importer runs will show a "canceled" message. Errors occurring after the list of SBOM's to be fetched is created, e.g. an expired tag, could stop an importer run. This has been fixed, and any errors should be included in the report after the run completes.
Reviewer's GuideRefactors the Quay importer walker to improve error handling by initializing a shared OCI client, restructuring fetch/store to log and collect errors per SBOM without aborting runs, converting SBOM and repository listing to fallible async streams with cancellation support, refining SBOM validation and data structures, and adding an integration test for invalid sources. File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jcrossley3 - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `modules/importer/src/runner/quay/walker.rs:169` </location>
<code_context>
- async fn repositories(&self, page: Option<String>) -> impl Stream<Item = Repository> {
- stream::unfold(page, async |state| match state {
- None => None,
+ async fn repositories(&self, page: Option<String>) -> Result<Vec<Repository>, Error> {
+ stream::try_unfold(page, async |state| match state {
Some(page) => {
</code_context>
<issue_to_address>
repositories() now returns a Result and collects all repositories eagerly.
Collecting all repositories into a Vec may lead to high memory usage with large datasets. Consider if a streaming approach is more scalable for your use case.
Suggested implementation:
```rust
fn repositories(&self, page: Option<String>) -> impl Stream<Item = Result<Repository, Error>> {
stream::try_unfold(page, move |state| {
let this = self.clone();
async move {
match state {
Some(page) => {
if this.context.is_canceled().await {
return Some((Err(Error::Canceled), None));
}
let batch_result: Result<Batch, Error> = this
.client
.get(this.importer.repositories_url(&page))
.send()
.await
.and_then(|resp| resp.json::<Batch>().await.map_err(Error::from));
match batch_result {
Ok(batch) => {
let mut repos = batch.repositories.into_iter();
let next_page = batch.next_page;
if let Some(repo) = repos.next() {
// If there are more repositories in this batch, keep yielding them
Some((Ok(repo), Some((repos, next_page))))
} else if let Some(next_page) = next_page {
// If there are no more in this batch but there is a next page, continue
Some((Err(Error::NoRepositories), Some((Vec::new().into_iter(), Some(next_page)))))
} else {
// No more repositories or pages
None
}
}
Err(e) => Some((Err(e), None)),
}
}
None => None,
}
}
})
.flat_map(|result| {
// If we have a tuple of (repos iterator, next_page), stream each repo
futures::stream::iter(match result {
(Ok(repo), Some((mut repos, next_page))) => {
let mut items = vec![Ok(repo)];
items.extend(repos.map(Ok));
items
}
(Err(e), _) => vec![Err(e)],
_ => vec![],
})
})
```
- You may need to adjust the logic for handling batches and pagination, depending on the actual structure of your `Batch` and how pagination is implemented.
- Ensure that `self` is `Clone` or otherwise accessible in the async closure.
- You may need to import `futures::stream::{self, Stream, StreamExt}` and other necessary traits.
- The above code assumes that `Batch` has `repositories: Vec<Repository>` and `next_page: Option<String>`.
- Adjust error handling and streaming logic as needed for your actual use case.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1893 +/- ##
==========================================
- Coverage 68.06% 67.91% -0.16%
==========================================
Files 365 365
Lines 23065 23152 +87
Branches 23065 23152 +87
==========================================
+ Hits 15699 15723 +24
- Misses 6486 6548 +62
- Partials 880 881 +1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jcrossley3 - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `modules/importer/src/runner/quay/walker.rs:143` </location>
<code_context>
- async fn sboms(&self) -> Vec<Reference> {
- let tags: Vec<(Reference, u64)> = self
+ async fn sboms(&self) -> Result<Vec<Reference>, Error> {
+ let repositories = self
.repositories(Some(String::new()))
- .await
- .filter(|repo| future::ready(repo.is_public && self.modified_since(repo.last_modified)))
- .map(|repo| self.repository(repo.namespace, repo.name))
+ .try_fold(vec![], |mut acc, repo| async move {
+ if repo.is_public && self.modified_since(repo.last_modified) {
+ acc.push(self.repository(repo.namespace, repo.name));
+ }
+ Ok(acc)
+ })
+ .await?;
+ let tags: Vec<(Reference, u64)> = stream::iter(repositories)
.buffer_unordered(32) // TODO: make configurable
</code_context>
<issue_to_address>
The repositories stream now returns Results, but unwrap_or_default() is still used later.
Using unwrap_or_default() here may hide errors from repository results. Please handle errors explicitly to prevent silent failures.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
let tags: Vec<(Reference, u64)> = stream::iter(repositories)
.buffer_unordered(32) // TODO: make configurable
.filter_map(|repo| future::ready(repo.unwrap_or_default().sboms(&self.importer.source)))
.map(stream::iter)
.flatten()
.collect()
.await;
=======
let tags: Vec<(Reference, u64)> = stream::iter(repositories)
.buffer_unordered(32) // TODO: make configurable
.filter_map(|repo_result| {
match repo_result {
Ok(repo) => future::ready(Some(repo.sboms(&self.importer.source))),
Err(e) => {
log::warn!("Error retrieving repository: {e}");
future::ready(None)
}
}
})
.map(stream::iter)
.flatten()
.collect()
.await;
>>>>>>> REPLACE
</suggested_fix>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jcrossley3 - I've reviewed your changes - here's some feedback:
- In sboms(), the try_fold on the repositories stream will abort the entire run on the first API error—consider catching and reporting each repositories() error instead of propagating Err so that listing failures become skipped entries rather than early exit.
- SBOM references filtered out by too_big are silently dropped—consider logging or adding a report entry for size‐skipped items so they’re visible in the final report.
- You may want to check for cancellation earlier in sboms() (or during repository traversal) to allow long listing operations to be canceled promptly rather than only inside the import loop.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In sboms(), the try_fold on the repositories stream will abort the entire run on the first API error—consider catching and reporting each repositories() error instead of propagating Err so that listing failures become skipped entries rather than early exit.
- SBOM references filtered out by too_big are silently dropped—consider logging or adding a report entry for size‐skipped items so they’re visible in the final report.
- You may want to check for cancellation earlier in sboms() (or during repository traversal) to allow long listing operations to be canceled promptly rather than only inside the import loop.
## Individual Comments
### Comment 1
<location> `modules/importer/src/runner/quay/walker.rs:181` </location>
<code_context>
+ fn repositories(&self, page: Option<String>) -> impl Stream<Item = Result<Repository, Error>> {
</code_context>
<issue_to_address>
Switching to try_unfold/try_flatten changes error propagation for repository streaming.
This change causes the stream to stop at the first error instead of continuing. Please confirm if this behavior aligns with your requirements for handling partial results.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@sourcery-ai review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @jcrossley3 - I've reviewed your changes - here's some feedback:
- Extract the hardcoded
32
concurrency limit in your stream buffering into a named constant or configuration to avoid magic numbers. - Add a
context.is_canceled()
check inside thesboms
pagination stream (right after fetching each page) so cancellation halts the importer immediately. - Consider batching report updates for fetch and store operations instead of locking the mutex separately in each method to reduce contention and improve clarity.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Extract the hardcoded `32` concurrency limit in your stream buffering into a named constant or configuration to avoid magic numbers.
- Add a `context.is_canceled()` check inside the `sboms` pagination stream (right after fetching each page) so cancellation halts the importer immediately.
- Consider batching report updates for fetch and store operations instead of locking the mutex separately in each method to reduce contention and improve clarity.
## Individual Comments
### Comment 1
<location> `modules/importer/src/runner/quay/walker.rs:143` </location>
<code_context>
- } else {
- Some(reference)
- }
+ async fn sboms(&self) -> Result<Vec<Reference>, Error> {
+ self.repositories(Some(String::new()))
+ .try_filter(|v| future::ready(v.is_public && self.modified_since(v.last_modified)))
+ .map_ok(|v| self.repository(v.namespace, v.name))
+ .try_buffer_unordered(32)
+ .map_ok(|repo| {
+ stream::iter(
+ repo.sboms(&self.importer.source)
+ .into_iter()
+ .map(Ok::<_, Error>), // try_flatten expects Results
+ )
})
- .collect()
+ .try_flatten()
+ .try_filter_map(|sbom| future::ready(Ok(self.valid(&sbom).then_some(sbom.reference))))
+ .try_collect()
+ .await
}
</code_context>
<issue_to_address>
sboms() now returns a Result and uses try_stream combinators for error handling.
Note that with this approach, the process stops at the first error. If you need to return partial results, consider gathering successful results and handling errors separately.
</issue_to_address>
<suggested_fix>
<<<<<<< SEARCH
async fn sboms(&self) -> Result<Vec<Reference>, Error> {
self.repositories(Some(String::new()))
.try_filter(|v| future::ready(v.is_public && self.modified_since(v.last_modified)))
.map_ok(|v| self.repository(v.namespace, v.name))
.try_buffer_unordered(32)
.map_ok(|repo| {
stream::iter(
repo.sboms(&self.importer.source)
.into_iter()
.map(Ok::<_, Error>), // try_flatten expects Results
)
})
.try_flatten()
.try_filter_map(|sbom| future::ready(Ok(self.valid(&sbom).then_some(sbom.reference))))
.try_collect()
.await
}
=======
/// Returns a tuple of (successful References, errors encountered)
async fn sboms(&self) -> Result<(Vec<Reference>, Vec<Error>), Error> {
use futures::stream::{StreamExt, TryStreamExt};
let mut successes = Vec::new();
let mut errors = Vec::new();
let mut stream = self.repositories(Some(String::new()))
.try_filter(|v| future::ready(v.is_public && self.modified_since(v.last_modified)))
.map_ok(|v| self.repository(v.namespace, v.name))
.try_buffer_unordered(32)
.map_ok(|repo| {
stream::iter(
repo.sboms(&self.importer.source)
.into_iter()
.map(Ok::<_, Error>),
)
})
.try_flatten();
while let Some(result) = stream.next().await {
match result {
Ok(sbom) => {
if self.valid(&sbom) {
successes.push(sbom.reference);
}
}
Err(e) => {
errors.push(e);
}
}
}
Ok((successes, errors))
}
>>>>>>> REPLACE
</suggested_fix>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
@sourcery-ai dismiss |
I think there's a point in making this configurable. Maybe hitting a rate limit and not being able to opt out of it might he a severe issue.
Also this one, to my understanding, these are remote calls, and be quite a lot of them. Having a cancellation point here seems to make sense. |
Also accounting for bad data in junk repos that can now cause us to error before our list of SBOM's to ingest is created.
Done
Done |
Fixes: #1892
Invalid sources will show a proper error.
Disabling importer runs will show a "canceled" message.
Errors occurring after the list of SBOM's to be fetched is created, e.g. an expired tag, could stop an importer run. This has been fixed, and any errors should be included in the report after the run completes.
Summary by Sourcery
Improve error handling in the Quay importer by capturing non-fatal errors in the report, propagating cancellation and invalid source errors, and ensuring the run completes even when individual SBOM fetches or uploads fail.
Bug Fixes:
Enhancements:
Tests: