Skip to content

Commit 9db4451

Browse files
author
Matthew Barnes
committed
database: Add DBClient.UpdateOperationDoc method
1 parent 682ad91 commit 9db4451

File tree

2 files changed

+61
-0
lines changed

2 files changed

+61
-0
lines changed

internal/database/cache.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,17 @@ func (c *Cache) CreateOperationDoc(ctx context.Context, doc *OperationDocument)
114114
return nil
115115
}
116116

117+
func (c *Cache) UpdateOperationDoc(ctx context.Context, operationID string, callback func(*OperationDocument) bool) (bool, error) {
118+
// Make sure lookup keys are lowercase.
119+
key := strings.ToLower(operationID)
120+
121+
if doc, ok := c.operation[key]; ok {
122+
return callback(doc), nil
123+
}
124+
125+
return false, ErrNotFound
126+
}
127+
117128
func (c *Cache) DeleteOperationDoc(ctx context.Context, operationID string) error {
118129
// Make sure lookup keys are lowercase.
119130
key := strings.ToLower(operationID)

internal/database/database.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ type DBClient interface {
6969

7070
GetOperationDoc(ctx context.Context, operationID string) (*OperationDocument, error)
7171
CreateOperationDoc(ctx context.Context, doc *OperationDocument) error
72+
UpdateOperationDoc(ctx context.Context, operationID string, callback func(*OperationDocument) bool) (bool, error)
7273
DeleteOperationDoc(ctx context.Context, operationID string) error
7374

7475
// GetSubscriptionDoc retrieves a SubscriptionDocument from the database given the subscriptionID.
@@ -355,6 +356,55 @@ func (d *CosmosDBClient) CreateOperationDoc(ctx context.Context, doc *OperationD
355356
return nil
356357
}
357358

359+
// UpdateOperationDoc updates an operation document by first fetching the document and
360+
// passing it to the provided callback for modifications to be applied. It then attempts to
361+
// replace the existing document with the modified document and an "etag" precondition. Upon
362+
// a precondition failure the function repeats for a limited number of times before giving up.
363+
//
364+
// The callback function should return true if modifications were applied, signaling to proceed
365+
// with the document replacement. The boolean return value reflects this: returning true if the
366+
// document was successfully replaced, or false with or without an error to indicate no change.
367+
func (d *CosmosDBClient) UpdateOperationDoc(ctx context.Context, operationID string, callback func(*OperationDocument) bool) (bool, error) {
368+
var err error
369+
370+
pk := azcosmos.NewPartitionKeyString(operationsPartitionKey)
371+
372+
options := &azcosmos.ItemOptions{}
373+
374+
for try := 0; try < 5; try++ {
375+
var doc *OperationDocument
376+
var data []byte
377+
378+
doc, err = d.GetOperationDoc(ctx, operationID)
379+
if err != nil {
380+
return false, err
381+
}
382+
383+
if !callback(doc) {
384+
return false, nil
385+
}
386+
387+
data, err = json.Marshal(doc)
388+
if err != nil {
389+
return false, fmt.Errorf("failed to marshal Operations container item for '%s': %w", operationID, err)
390+
}
391+
392+
options.IfMatchEtag = &doc.ETag
393+
_, err = d.operations.ReplaceItem(ctx, pk, doc.ID, data, options)
394+
if err == nil {
395+
return true, nil
396+
}
397+
398+
var responseError *azcore.ResponseError
399+
err = fmt.Errorf("failed to replace Operations container item for '%s': %w", operationID, err)
400+
if !errors.As(err, &responseError) || responseError.StatusCode != http.StatusPreconditionFailed {
401+
return false, err
402+
}
403+
}
404+
405+
return false, err
406+
}
407+
358408
// DeleteOperationDoc deletes the asynchronous operation document for the given
359409
// operation ID from the "operations" container
360410
func (d *CosmosDBClient) DeleteOperationDoc(ctx context.Context, operationID string) error {

0 commit comments

Comments
 (0)