Skip to content

Add a cache for active orders #615

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: v28.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions orderbook/usecase/active_orders_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package orderbookusecase

import (
"fmt"
"sync"

lru "github.com/hashicorp/golang-lru/v2"
orderbookdomain "github.com/osmosis-labs/sqs/domain/orderbook"
)

// cacheKey represents the composite key for the active orders cache
type cacheKey struct {
poolID uint64
userAddress string
}

// activeOrdersCacheEntry represents a single cache entry containing active orders
type activeOrdersCacheEntry struct {
Orders []orderbookdomain.Order
}

// activeOrdersCache is a thread-safe LRU cache for active orders
type activeOrdersCache struct {
cache *lru.Cache[cacheKey, activeOrdersCacheEntry]
mu sync.RWMutex
// poolEntries tracks which cache keys belong to which pool for bulk invalidation
poolEntries map[uint64]map[string]struct{}
}

// newActiveOrdersCache creates a new active orders cache with the specified size
func newActiveOrdersCache(size int) (*activeOrdersCache, error) {
cache, err := lru.New[cacheKey, activeOrdersCacheEntry](size)
if err != nil {
return nil, fmt.Errorf("failed to create LRU cache: %w", err)
}

return &activeOrdersCache{
cache: cache,
poolEntries: make(map[uint64]map[string]struct{}),
}, nil
}

// get retrieves active orders for a given pool ID and user address
func (c *activeOrdersCache) get(poolID uint64, userAddress string) (activeOrdersCacheEntry, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

key := cacheKey{poolID: poolID, userAddress: userAddress}
return c.cache.Get(key)
}

// set stores active orders for a given pool ID and user address
func (c *activeOrdersCache) set(poolID uint64, userAddress string, entry activeOrdersCacheEntry) {
c.mu.Lock()
defer c.mu.Unlock()

key := cacheKey{poolID: poolID, userAddress: userAddress}

// Track this key for the pool
if _, exists := c.poolEntries[poolID]; !exists {
c.poolEntries[poolID] = make(map[string]struct{})
}
c.poolEntries[poolID][userAddress] = struct{}{}

c.cache.Add(key, entry)
}

// invalidatePool removes all cached entries for a given pool ID
func (c *activeOrdersCache) invalidatePool(poolID uint64) {
c.mu.Lock()
defer c.mu.Unlock()

// Get all keys for this pool
if keys, exists := c.poolEntries[poolID]; exists {
// Remove each key from the cache
for userAddress := range keys {
key := cacheKey{poolID: poolID, userAddress: userAddress}
c.cache.Remove(key)
}
// Remove the pool entry tracking
delete(c.poolEntries, poolID)
}
}

// clear removes all entries from the cache
func (c *activeOrdersCache) clear() {

Check failure on line 86 in orderbook/usecase/active_orders_cache.go

View workflow job for this annotation

GitHub Actions / Run linter

func `(*activeOrdersCache).clear` is unused (unused)
c.mu.Lock()
defer c.mu.Unlock()

c.cache.Purge()
c.poolEntries = make(map[uint64]map[string]struct{})
}
6 changes: 5 additions & 1 deletion orderbook/usecase/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,9 @@ func (o *OrderbookUseCaseImpl) SetFetchActiveOrdersEveryDuration(duration time.D

// ProcessOrderBookActiveOrders is an alias of processOrderBookActiveOrders for testing purposes
func (o *OrderbookUseCaseImpl) ProcessOrderBookActiveOrders(ctx context.Context, orderBook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) {
return o.processOrderBookActiveOrders(ctx, orderBook, ownerAddress)
return o.processOrderbookActiveOrders(ctx, orderBook, ownerAddress)
}

func (o *OrderbookUseCaseImpl) DisableCache() {
o.activeOrdersCache = nil
}
123 changes: 93 additions & 30 deletions orderbook/usecase/orderbook_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type OrderbookUseCaseImpl struct {
poolsUsecease mvc.PoolsUsecase
tokensUsecease mvc.TokensUsecase
logger log.Logger
activeOrdersCache *activeOrdersCache
}

var _ mvc.OrderBookUsecase = &OrderbookUseCaseImpl{}
Expand All @@ -36,6 +37,8 @@ const (
maxQueryTicks = 500
// Max number of ticks cancels to query at a time
maxQueryTicksCancels = 100
// Default size for the active orders cache
defaultCacheSize = 100_000
)

// New creates a new orderbook use case.
Expand All @@ -46,12 +49,20 @@ func New(
tokensUsecease mvc.TokensUsecase,
logger log.Logger,
) *OrderbookUseCaseImpl {
activeOrdersCache, err := newActiveOrdersCache(defaultCacheSize)
if err != nil {
logger.Error("failed to create active orders cache", zap.Error(err))
// Continue without cache
activeOrdersCache = nil
}

return &OrderbookUseCaseImpl{
orderbookRepository: orderbookRepository,
orderBookClient: orderBookClient,
poolsUsecease: poolsUsecease,
tokensUsecease: tokensUsecease,
logger: logger,
activeOrdersCache: activeOrdersCache,
}
}

Expand Down Expand Up @@ -80,6 +91,12 @@ func (o *OrderbookUseCaseImpl) ProcessPool(ctx context.Context, pool ingesttypes
return fmt.Errorf("pool has no orderbook data %d", poolID)
}

// Invalidate cache entries for this pool since its state is changing
// If Active orders time continues to be of importance, we can optimize this.
if o.activeOrdersCache != nil {
o.activeOrdersCache.invalidatePool(poolID)
}

// Update the orderbook client with the orderbook pool ID.
ticks := cosmWasmPoolModel.Data.Orderbook.Ticks
if len(ticks) == 0 {
Expand Down Expand Up @@ -164,25 +181,31 @@ func (o *OrderbookUseCaseImpl) GetActiveOrdersStream(ctx context.Context, addres

for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)
if len(limitOrders) == 0 && err == nil {
return // skip empty orders
}

if err != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err))
}

// guard for context cancellation
select {
case c <- orderbookdomain.OrderbookResult{
PoolID: orderbook.PoolID,
IsBestEffort: isBestEffort,
LimitOrders: limitOrders,
Error: err,
}:
case <-ctx.Done():
return
default:
limitOrders, isBestEffort, err := o.processOrderbookActiveOrders(ctx, orderbook, address)
if err != nil {
telemetry.ProcessingOrderbookActiveOrdersErrorCounter.Inc()
o.logger.Error(telemetry.ProcessingOrderbookActiveOrdersErrorMetricName, zap.Any("pool_id", orderbook.PoolID), zap.Any("err", err))
c <- orderbookdomain.OrderbookResult{
IsBestEffort: false,
PoolID: orderbook.PoolID,
LimitOrders: nil,
Error: err,
}
return
}
// if len is 0, skip sending to channel
if len(limitOrders) > 0 {
c <- orderbookdomain.OrderbookResult{
IsBestEffort: isBestEffort,
PoolID: orderbook.PoolID,
LimitOrders: limitOrders,
}
}
}
}(orderbook)
}
Expand Down Expand Up @@ -227,7 +250,17 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
// Process orderbooks concurrently
for _, orderbook := range orderbooks {
go func(orderbook domain.CanonicalOrderBooksResult) {
limitOrders, isBestEffort, err := o.processOrderBookActiveOrders(ctx, orderbook, address)
limitOrders, isBestEffort, err := o.processOrderbookActiveOrders(ctx, orderbook, address)
if err != nil {
results <- orderbookdomain.OrderbookResult{
IsBestEffort: isBestEffort,
PoolID: orderbook.PoolID,
LimitOrders: nil,
Error: err,
}
return
}

results <- orderbookdomain.OrderbookResult{
IsBestEffort: isBestEffort,
PoolID: orderbook.PoolID,
Expand Down Expand Up @@ -260,23 +293,47 @@ func (o *OrderbookUseCaseImpl) GetActiveOrders(ctx context.Context, address stri
return finalResults, isBestEffort, nil
}

// processOrderBookActiveOrders fetches and processes the active orders for a given orderbook.
// It returns the active formatted limit orders and an error if any.
// Errors if:
// - failed to fetch active orders
// - failed to fetch metadata by chain denom
// - failed to create limit order
//
// For every order, if an error occurs processing the order, it is skipped rather than failing the entire process.
// This is a best-effort process.
func (o *OrderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.LimitOrder, bool, error) {
if err := orderbook.Validate(); err != nil {
func (o *OrderbookUseCaseImpl) processOrderbookActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, address string) ([]orderbookdomain.LimitOrder, bool, error) {
rawOrders, err := o.getRawActiveOrders(ctx, orderbook, address)
if err != nil {
return nil, false, err
}

return o.processRawOrders(orderbook, rawOrders)
}

func (o *OrderbookUseCaseImpl) getRawActiveOrders(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.Order, error) {
if o.activeOrdersCache != nil {
if entry, found := o.activeOrdersCache.get(orderbook.PoolID, ownerAddress); found {
return entry.Orders, nil
}
}

rawOrders, err := o.getRawActiveOrdersFromChain(ctx, orderbook, ownerAddress)
if err != nil {
return nil, err
}

// Cache the raw orders if successful
if o.activeOrdersCache != nil {
o.activeOrdersCache.set(orderbook.PoolID, ownerAddress, activeOrdersCacheEntry{
Orders: rawOrders,
})
}

return rawOrders, nil
}

// getActiveOrdersFromChain fetches raw active orders from the chain for a given orderbook and address.
// It returns the raw orders, a best-effort flag, and any error that occurred.
func (o *OrderbookUseCaseImpl) getRawActiveOrdersFromChain(ctx context.Context, orderbook domain.CanonicalOrderBooksResult, ownerAddress string) ([]orderbookdomain.Order, error) {
if err := orderbook.Validate(); err != nil {
return nil, err
}

orders, count, err := o.orderBookClient.GetActiveOrders(ctx, orderbook.ContractAddress, ownerAddress)
if err != nil {
return nil, false, types.FailedToGetActiveOrdersError{
return nil, types.FailedToGetActiveOrdersError{
ContractAddress: orderbook.ContractAddress,
OwnerAddress: ownerAddress,
Err: err,
Expand All @@ -285,9 +342,15 @@ func (o *OrderbookUseCaseImpl) processOrderBookActiveOrders(ctx context.Context,

// There are orders to process for given orderbook
if count == 0 {
return nil, false, nil
return nil, nil
}

return orders, nil
}

// processRawOrders processes raw orders into formatted limit orders.
// It returns the processed limit orders and any error that occurred.
func (o *OrderbookUseCaseImpl) processRawOrders(orderbook domain.CanonicalOrderBooksResult, orders []orderbookdomain.Order) ([]orderbookdomain.LimitOrder, bool, error) {
// Create a slice to store the results
results := make([]orderbookdomain.LimitOrder, 0, len(orders))

Expand Down
14 changes: 10 additions & 4 deletions orderbook/usecase/orderbook_usecase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package orderbookusecase_test
import (
"context"
"errors"
"fmt"
"sort"
"strings"
"testing"
Expand Down Expand Up @@ -413,14 +414,15 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrdersStream() {
if tc.setupMocks != nil {
tc.setupMocks(ctx, cancel, usecase, &orderbookrepositorysitory, &client, &poolsUsecase, &tokensusecase, &callcount)
}
usecase.DisableCache()

// Call the method under test
orders := usecase.GetActiveOrdersStream(ctx, tc.address)

// Wait for the ticker to push the orders
if tc.expectedCallCount > 1 {
usecase.SetFetchActiveOrdersEveryDuration(tc.tickerDuration)
time.Sleep(tc.tickerDuration)
time.Sleep(tc.tickerDuration + time.Millisecond*10)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always had a race condition here before

}

// Collect results from the stream
Expand Down Expand Up @@ -549,7 +551,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() {
poolsUsecase.GetAllCanonicalOrderbookPoolIDsFunc = s.GetAllCanonicalOrderbookPoolIDsFunc(
nil,
s.NewCanonicalOrderBooksResult(1, "A"),
s.NewCanonicalOrderBooksResult(1, "B"),
s.NewCanonicalOrderBooksResult(2, "B"),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing error, started causing issue post-cache since cache is keyed by pool ID

)

grpcclient.GetActiveOrdersCb = func(ctx context.Context, contractAddress string, ownerAddress string) (orderbookdomain.Orders, uint64, error) {
Expand Down Expand Up @@ -646,6 +648,10 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() {
sort.SliceStable(orders, func(i, j int) bool {
return orders[i].OrderId < orders[j].OrderId
})
orderDebug := []string{}
for _, order := range orders {
orderDebug = append(orderDebug, fmt.Sprintf("%s-%d", order.OrderbookAddress, order.OrderId))
}

// Assert the results
if tc.expectedError != nil {
Expand All @@ -654,7 +660,7 @@ func (s *OrderbookUsecaseTestSuite) TestGetActiveOrders() {
} else {
s.Assert().NoError(err)
s.Assert().Equal(tc.expectedIsBestEffort, isBestEffort)
s.Assert().Equal(tc.expectedOrders, orders)
s.Assert().Equal(tc.expectedOrders, orders, orderDebug)
}
})
}
Expand Down Expand Up @@ -697,7 +703,7 @@ func (s *OrderbookUsecaseTestSuite) TestProcessOrderBookActiveOrders() {
order: newLimitOrder().WithOrderbookAddress("A"),
ownerAddress: "osmo1h5la3t4y8cljl34lsqdszklvcn053u4ryz9qr78v64rsxezyxwlsdelsdr",
expectedError: nil,
expectedOrders: nil,
expectedOrders: []orderbookdomain.LimitOrder{},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ITs an internal method, so doesn't matter

expectedIsBestEffort: false,
},
{
Expand Down
Loading