diff --git a/orderbook/usecase/active_orders_cache.go b/orderbook/usecase/active_orders_cache.go new file mode 100644 index 000000000..a878febe6 --- /dev/null +++ b/orderbook/usecase/active_orders_cache.go @@ -0,0 +1,92 @@ +package orderbookusecase + +import ( + "fmt" + "sync" + + lru "github.com/hashicorp/golang-lru/v2" + orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook" +) + +// cacheKey represents the composite key for the active orders cache +type cacheKey struct { + poolID uint64 + userAddress string +} + +// activeOrdersCacheEntry represents a single cache entry containing active orders +type activeOrdersCacheEntry struct { + Orders []orderbookdomain.Order +} + +// activeOrdersCache is a thread-safe LRU cache for active orders +type activeOrdersCache struct { + cache *lru.Cache[cacheKey, activeOrdersCacheEntry] + mu sync.RWMutex + // poolEntries tracks which cache keys belong to which pool for bulk invalidation + poolEntries map[uint64]map[string]struct{} +} + +// newActiveOrdersCache creates a new active orders cache with the specified size +func newActiveOrdersCache(size int) (*activeOrdersCache, error) { + cache, err := lru.New[cacheKey, activeOrdersCacheEntry](size) + if err != nil { + return nil, fmt.Errorf("failed to create LRU cache: %w", err) + } + + return &activeOrdersCache{ + cache: cache, + poolEntries: make(map[uint64]map[string]struct{}), + }, nil +} + +// get retrieves active orders for a given pool ID and user address +func (c *activeOrdersCache) get(poolID uint64, userAddress string) (activeOrdersCacheEntry, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + key := cacheKey{poolID: poolID, userAddress: userAddress} + return c.cache.Get(key) +} + +// set stores active orders for a given pool ID and user address +func (c *activeOrdersCache) set(poolID uint64, userAddress string, entry activeOrdersCacheEntry) { + c.mu.Lock() + defer c.mu.Unlock() + + key := cacheKey{poolID: poolID, userAddress: userAddress} + + // Track this key for the pool + if _, exists := c.poolEntries[poolID]; !exists { + c.poolEntries[poolID] = make(map[string]struct{}) + } + c.poolEntries[poolID][userAddress] = struct{}{} + + c.cache.Add(key, entry) +} + +// invalidatePool removes all cached entries for a given pool ID +func (c *activeOrdersCache) invalidatePool(poolID uint64) { + c.mu.Lock() + defer c.mu.Unlock() + + // Get all keys for this pool + if keys, exists := c.poolEntries[poolID]; exists { + // Remove each key from the cache + for userAddress := range keys { + key := cacheKey{poolID: poolID, userAddress: userAddress} + c.cache.Remove(key) + } + // Remove the pool entry tracking + delete(c.poolEntries, poolID) + } +} + +// clear removes all entries from the cache +func (c *activeOrdersCache) clear() { + c.mu.Lock() + defer c.mu.Unlock() + + c.cache.Purge() + c.poolEntries = make(map[uint64]map[string]struct{}) +} diff --git a/orderbook/usecase/export_test.go b/orderbook/usecase/export_test.go index 46e910432..c61467afa 100644 --- a/orderbook/usecase/export_test.go +++ b/orderbook/usecase/export_test.go @@ -15,5 +15,9 @@ func (o *OrderbookUseCaseImpl) SetFetchActiveOrdersEveryDuration(duration time.D // ProcessOrderBookActiveOrders is an alias of processOrderBookActiveOrders for testing purposes func (o *OrderbookUseCaseImpl) ProcessOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) { - return o.processOrderBookActiveOrders(ctx, orderBook, ownerAddress) + return o.processOrderbookActiveOrders(ctx, orderBook, ownerAddress) +} + +func (o *OrderbookUseCaseImpl) DisableCache() { + o.activeOrdersCache = nil } diff --git a/orderbook/usecase/orderbook_usecase.go b/orderbook/usecase/orderbook_usecase.go index ca5074049..0eb2cdaa0 100644 --- a/orderbook/usecase/orderbook_usecase.go +++ b/orderbook/usecase/orderbook_usecase.go @@ -27,6 +27,7 @@ type OrderbookUseCaseImpl struct { poolsUsecease mvc.PoolsUsecase tokensUsecease mvc.TokensUsecase logger log.Logger + activeOrdersCache *activeOrdersCache } var _ mvc.OrderBookUsecase = &OrderbookUseCaseImpl{} @@ -36,6 +37,8 @@ const ( maxQueryTicks = 500 // Max number of ticks cancels to query at a time maxQueryTicksCancels = 100 + // Default size for the active orders cache + defaultCacheSize = 100_000 ) // New creates a new orderbook use case. @@ -46,12 +49,20 @@ func New( tokensUsecease mvc.TokensUsecase, logger log.Logger, ) *OrderbookUseCaseImpl { + activeOrdersCache, err := newActiveOrdersCache(defaultCacheSize) + if err != nil { + logger.Error("failed to create active orders cache", zap.Error(err)) + // Continue without cache + activeOrdersCache = nil + } + return &OrderbookUseCaseImpl{ orderbookRepository: orderbookRepository, orderBookClient: orderBookClient, poolsUsecease: poolsUsecease, tokensUsecease: tokensUsecease, logger: logger, + activeOrdersCache: activeOrdersCache, } } @@ -80,6 +91,12 @@ func (o *OrderbookUseCaseImpl) ProcessPool(ctx context.Context, pool ingesttypes return fmt.Errorf("pool has no orderbook data %d", poolID) } + // Invalidate cache entries for this pool since its state is changing + // If Active orders time continues to be of importance, we can optimize this. + if o.activeOrdersCache != nil { + o.activeOrdersCache.invalidatePool(poolID) + } + // Update the orderbook client with the orderbook pool ID. ticks := cosmWasmPoolModel.Data.Orderbook.Ticks if len(ticks) == 0 { @@ -164,25 +181,31 @@ func (o *OrderbookUseCaseImpl) GetActiveOrdersStream(ctx context.Context, addres for _, orderbook := range orderbooks { go func(orderbook domain.CanonicalOrderBooksResult) { - limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address) - if len(limitOrders) == 0 && err == nil { - return // skip empty orders - } - - if err != nil { - telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc() - o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err)) - } - + // guard for context cancellation select { - case c <- orderbookdomain.OrderbookResult{ - PoolID: orderbook.PoolID, - IsBestEffort: isBestEffort, - LimitOrders: limitOrders, - Error: err, - }: case <-ctx.Done(): return + default: + limitOrders, isBestEffort, err := o.processOrderbookActiveOrders(ctx, orderbook, address) + if err != nil { + telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc() + o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err)) + c <- orderbookdomain.OrderbookResult{ + IsBestEffort: false, + PoolID: orderbook.PoolID, + LimitOrders: nil, + Error: err, + } + return + } + // if len is 0, skip sending to channel + if len(limitOrders) > 0 { + c <- orderbookdomain.OrderbookResult{ + IsBestEffort: isBestEffort, + PoolID: orderbook.PoolID, + LimitOrders: limitOrders, + } + } } }(orderbook) } @@ -227,7 +250,17 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri // Process orderbooks concurrently for _, orderbook := range orderbooks { go func(orderbook domain.CanonicalOrderBooksResult) { - limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address) + limitOrders, isBestEffort, err := o.processOrderbookActiveOrders(ctx, orderbook, address) + if err != nil { + results <- orderbookdomain.OrderbookResult{ + IsBestEffort: isBestEffort, + PoolID: orderbook.PoolID, + LimitOrders: nil, + Error: err, + } + return + } + results <- orderbookdomain.OrderbookResult{ IsBestEffort: isBestEffort, PoolID: orderbook.PoolID, @@ -260,23 +293,47 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri return finalResults, isBestEffort, nil } -// processOrderBookActiveOrders fetches and processes the active orders for a given orderbook. -// It returns the active formatted limit orders and an error if any. -// Errors if: -// - failed to fetch active orders -// - failed to fetch metadata by chain denom -// - failed to create limit order -// -// For every order, if an error occurs processing the order, it is skipped rather than failing the entire process. -// This is a best-effort process. -func (o *OrderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) { - if err := orderbook.Validate(); err != nil { +func (o *OrderbookUseCaseImpl) processOrderbookActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, address string) ([]orderbookdomain.LimitOrder, bool, error) { + rawOrders, err := o.getRawActiveOrders(ctx, orderbook, address) + if err != nil { return nil, false, err } + return o.processRawOrders(orderbook, rawOrders) +} + +func (o *OrderbookUseCaseImpl) getRawActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.Order, error) { + if o.activeOrdersCache != nil { + if entry, found := o.activeOrdersCache.get(orderbook.PoolID, ownerAddress); found { + return entry.Orders, nil + } + } + + rawOrders, err := o.getRawActiveOrdersFromChain(ctx, orderbook, ownerAddress) + if err != nil { + return nil, err + } + + // Cache the raw orders if successful + if o.activeOrdersCache != nil { + o.activeOrdersCache.set(orderbook.PoolID, ownerAddress, activeOrdersCacheEntry{ + Orders: rawOrders, + }) + } + + return rawOrders, nil +} + +// getActiveOrdersFromChain fetches raw active orders from the chain for a given orderbook and address. +// It returns the raw orders, a best-effort flag, and any error that occurred. +func (o *OrderbookUseCaseImpl) getRawActiveOrdersFromChain(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.Order, error) { + if err := orderbook.Validate(); err != nil { + return nil, err + } + orders, count, err := o.orderBookClient.GetActiveOrders(ctx, orderbook.ContractAddress, ownerAddress) if err != nil { - return nil, false, types.FailedToGetActiveOrdersError{ + return nil, types.FailedToGetActiveOrdersError{ ContractAddress: orderbook.ContractAddress, OwnerAddress: ownerAddress, Err: err, @@ -285,9 +342,15 @@ func (o *OrderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, // There are orders to process for given orderbook if count == 0 { - return nil, false, nil + return nil, nil } + return orders, nil +} + +// processRawOrders processes raw orders into formatted limit orders. +// It returns the processed limit orders and any error that occurred. +func (o *OrderbookUseCaseImpl) processRawOrders(orderbook domain.CanonicalOrderBooksResult, orders []orderbookdomain.Order) ([]orderbookdomain.LimitOrder, bool, error) { // Create a slice to store the results results := make([]orderbookdomain.LimitOrder, 0, len(orders)) diff --git a/orderbook/usecase/orderbook_usecase_test.go b/orderbook/usecase/orderbook_usecase_test.go index 01eb9cbc5..1497f5362 100644 --- a/orderbook/usecase/orderbook_usecase_test.go +++ b/orderbook/usecase/orderbook_usecase_test.go @@ -3,6 +3,7 @@ package orderbookusecase_test import ( "context" "errors" + "fmt" "sort" "strings" "testing" @@ -413,6 +414,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrdersStream() { if tc.setupMocks != nil { tc.setupMocks(ctx, cancel, usecase, &orderbookrepositorysitory, &client, &poolsUsecase, &tokensusecase, &callcount) } + usecase.DisableCache() // Call the method under test orders := usecase.GetActiveOrdersStream(ctx, tc.address) @@ -420,7 +422,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrdersStream() { // Wait for the ticker to push the orders if tc.expectedCallCount > 1 { usecase.SetFetchActiveOrdersEveryDuration(tc.tickerDuration) - time.Sleep(tc.tickerDuration) + time.Sleep(tc.tickerDuration + time.Millisecond*10) } // Collect results from the stream @@ -549,7 +551,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() { poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc( nil, s.NewCanonicalOrderBooksResult(1, "A"), - s.NewCanonicalOrderBooksResult(1, "B"), + s.NewCanonicalOrderBooksResult(2, "B"), ) grpcclient.GetActiveOrdersCb = func(ctx context.Context, contractAddress string, ownerAddress string) (orderbookdomain.Orders, uint64, error) { @@ -646,6 +648,10 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() { sort.SliceStable(orders, func(i, j int) bool { return orders[i].OrderId < orders[j].OrderId }) + orderDebug := []string{} + for _, order := range orders { + orderDebug = append(orderDebug, fmt.Sprintf("%s-%d", order.OrderbookAddress, order.OrderId)) + } // Assert the results if tc.expectedError != nil { @@ -654,7 +660,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() { } else { s.Assert().NoError(err) s.Assert().Equal(tc.expectedIsBestEffort, isBestEffort) - s.Assert().Equal(tc.expectedOrders, orders) + s.Assert().Equal(tc.expectedOrders, orders, orderDebug) } }) } @@ -697,7 +703,7 @@ func (s *OrderbookUsecaseTestSuite) TestProcessOrderBookActiveOrders() { order: newLimitOrder().WithOrderbookAddress("A"), ownerAddress: "osmo1h5la3t4y8cljl34lsqdszklvcn053u4ryz9qr78v64rsxezyxwlsdelsdr", expectedError: nil, - expectedOrders: nil, + expectedOrders: []orderbookdomain.LimitOrder{}, expectedIsBestEffort: false, }, {