@@ -44,17 +44,19 @@ type OperationManager struct {
4444 LeaseStorage ports.OperationLeaseStorage
4545 Config OperationManagerConfig
4646 OperationDefinitionConstructors map [string ]operations.DefinitionConstructor
47+ SchedulerStorage ports.SchedulerStorage
4748 Logger * zap.Logger
4849}
4950
50- func New (flow ports.OperationFlow , storage ports.OperationStorage , operationDefinitionConstructors map [string ]operations.DefinitionConstructor , leaseStorage ports.OperationLeaseStorage , config OperationManagerConfig ) * OperationManager {
51+ func New (flow ports.OperationFlow , storage ports.OperationStorage , operationDefinitionConstructors map [string ]operations.DefinitionConstructor , leaseStorage ports.OperationLeaseStorage , config OperationManagerConfig , schedulerStorage ports. SchedulerStorage ) * OperationManager {
5152 return & OperationManager {
5253 Flow : flow ,
5354 Storage : storage ,
5455 OperationDefinitionConstructors : operationDefinitionConstructors ,
5556 OperationCancelFunctions : NewOperationCancelFunctions (),
5657 LeaseStorage : leaseStorage ,
5758 Config : config ,
59+ SchedulerStorage : schedulerStorage ,
5860 Logger : zap .L ().With (zap .String ("component" , "service" ), zap .String ("service" , "operation_manager" )),
5961 }
6062}
@@ -174,7 +176,17 @@ func (om *OperationManager) FinishOperation(ctx context.Context, op *operation.O
174176}
175177
176178func (om * OperationManager ) EnqueueOperationCancellationRequest (ctx context.Context , schedulerName , operationID string ) error {
177- err := om .Flow .EnqueueOperationCancellationRequest (ctx , ports.OperationCancellationRequest {
179+ _ , err := om .SchedulerStorage .GetScheduler (ctx , schedulerName )
180+ if err != nil {
181+ return fmt .Errorf ("failed to fetch scheduler from storage: %w" , err )
182+ }
183+
184+ _ , _ , err = om .Storage .GetOperation (ctx , schedulerName , operationID )
185+ if err != nil {
186+ return fmt .Errorf ("failed to fetch operation from storage: %w" , err )
187+ }
188+
189+ err = om .Flow .EnqueueOperationCancellationRequest (ctx , ports.OperationCancellationRequest {
178190 SchedulerName : schedulerName ,
179191 OperationID : operationID ,
180192 })
0 commit comments