-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
UnspecifiedThe issue is currently not fully specifiedThe issue is currently not fully specifiedquestionFurther information is requestedFurther information is requested
Description
In our processing we want to collect different information in form of operationOutcome issues, that we write into the job at every jobupdate.
A potential way for this would be a monadic structure here named acc for accumulator.
Here a quick llm generated example:
public record Acc<T>(
T value,
List<OperationOutcome.OperationOutcomeIssueComponent> issues
) {
public static <T> Acc<T> ok(T value) {
return new Acc<>(value, List.of());
}
public Acc<T> addInfo(String msg) {
return addIssue(issue(OperationOutcome.IssueSeverity.INFORMATION, msg));
}
public Acc<T> addWarning(String msg) {
return addIssue(issue(OperationOutcome.IssueSeverity.WARNING, msg));
}
public Acc<T> addError(String msg) {
return addIssue(issue(OperationOutcome.IssueSeverity.ERROR, msg));
}
private static OperationOutcome.OperationOutcomeIssueComponent issue(
OperationOutcome.IssueSeverity severity,
String msg
) {
return new OperationOutcome.OperationOutcomeIssueComponent()
.setSeverity(severity)
.setDiagnostics(msg);
}
public <R> Acc<R> map(Function<T, R> f) {
return new Acc<>(f.apply(value), issues);
}
public <R> Acc<R> flatMap(Function<T, Acc<R>> f) {
Acc<R> next = f.apply(value);
List<OperationOutcome.OperationOutcomeIssueComponent> merged =
new ArrayList<>(issues);
merged.addAll(next.issues());
return new Acc<>(next.value(), merged);
}
}With an example usage:
private Mono<BatchResult> processBatch(
PatientBatchWithConsent inputBatch,
UUID jobID,
GroupsToProcess groupsToProcess,
BatchState batchState
) {
UUID id = inputBatch.id();
logMemory(id);
return Mono.just(Acc.ok(inputBatch))
// Add info that we started
.map(acc -> acc.addInfo("Starting batch " + id))
// 1) Load patient compartment
.flatMap(acc ->
directResourceLoader.directLoadPatientCompartment(
groupsToProcess.directPatientCompartmentGroups(),
acc.value()
)
.map(Acc::ok)
.map(a -> a.addInfo("Loaded patient compartment for batch " + id))
.onErrorResume(e ->
Mono.just(acc.addError("Failed loading patient compartment: " + e.getMessage()))
)
)
// 2) Reference resolution
.flatMap(acc ->
referenceResolver.processSinglePatientBatch(
acc.value(),
groupsToProcess.allGroups()
)
.map(Acc::ok)
.map(a -> a.addInfo("Resolved references for batch " + id))
.onErrorResume(e ->
Mono.just(acc.addError("Reference resolution failed: " + e.getMessage()))
)
)
// 3) Cascading delete
.map(acc ->
acc.map(p -> cascadingDelete.handlePatientBatch(p, groupsToProcess.allGroups()))
.addInfo("Applied cascading delete for batch " + id)
)
// 4) Copy + Redact
.map(acc ->
acc.map(p -> batchCopierRedacter.transformBatch(p, groupsToProcess.allGroups()))
.addInfo("Applied copy/redact pipeline for batch " + id)
)
// 5) Write output bundle
.flatMap(acc ->
writeBatch(jobID, acc.value())
.thenReturn(acc.addInfo("Wrote NDJSON bundle for batch " + id))
.onErrorResume(e ->
Mono.just(acc.addError("Writing NDJSON failed: " + e.getMessage()))
)
)
// 6) Convert Acc<T> → BatchResult
.map(acc -> {
boolean hasErrors = acc.issues().stream()
.anyMatch(i -> i.getSeverity() == OperationOutcome.IssueSeverity.ERROR);
BatchState updatedState = hasErrors
? batchState.updateStatus(WorkUnitStatus.ERROR)
: batchState.updateStatus(WorkUnitStatus.FINISHED);
Set<ResourceGroupRelation> valid = hasErrors
? Set.of()
: acc.value().coreBundle().getValidResourceGroups();
return new BatchResult(
updatedState,
valid,
acc.issues()
);
});
}@alexanderkiel @bastianschaffer do you think we would need something along these lines or is this generally overkill?
Metadata
Metadata
Assignees
Labels
UnspecifiedThe issue is currently not fully specifiedThe issue is currently not fully specifiedquestionFurther information is requestedFurther information is requested