Skip to content

Commit 0742e6a

Browse files
authored
Merge pull request #9608 from BerriAI/litellm_use_redis_for_updates
[Reliability] - Reduce DB Deadlocks by storing spend updates in Redis and then committing to DB
2 parents 4879e1e + 7c93b19 commit 0742e6a

21 files changed

+2706
-428
lines changed

litellm/caching/redis_cache.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1045,3 +1045,109 @@ async def async_get_ttl(self, key: str) -> Optional[int]:
10451045
except Exception as e:
10461046
verbose_logger.debug(f"Redis TTL Error: {e}")
10471047
return None
1048+
1049+
async def async_rpush(
1050+
self,
1051+
key: str,
1052+
values: List[Any],
1053+
parent_otel_span: Optional[Span] = None,
1054+
**kwargs,
1055+
) -> int:
1056+
"""
1057+
Append one or multiple values to a list stored at key
1058+
1059+
Args:
1060+
key: The Redis key of the list
1061+
values: One or more values to append to the list
1062+
parent_otel_span: Optional parent OpenTelemetry span
1063+
1064+
Returns:
1065+
int: The length of the list after the push operation
1066+
"""
1067+
_redis_client: Any = self.init_async_client()
1068+
start_time = time.time()
1069+
try:
1070+
response = await _redis_client.rpush(key, *values)
1071+
## LOGGING ##
1072+
end_time = time.time()
1073+
_duration = end_time - start_time
1074+
asyncio.create_task(
1075+
self.service_logger_obj.async_service_success_hook(
1076+
service=ServiceTypes.REDIS,
1077+
duration=_duration,
1078+
call_type="async_rpush",
1079+
)
1080+
)
1081+
return response
1082+
except Exception as e:
1083+
# NON blocking - notify users Redis is throwing an exception
1084+
## LOGGING ##
1085+
end_time = time.time()
1086+
_duration = end_time - start_time
1087+
asyncio.create_task(
1088+
self.service_logger_obj.async_service_failure_hook(
1089+
service=ServiceTypes.REDIS,
1090+
duration=_duration,
1091+
error=e,
1092+
call_type="async_rpush",
1093+
)
1094+
)
1095+
verbose_logger.error(
1096+
f"LiteLLM Redis Cache RPUSH: - Got exception from REDIS : {str(e)}"
1097+
)
1098+
raise e
1099+
1100+
async def async_lpop(
1101+
self,
1102+
key: str,
1103+
count: Optional[int] = None,
1104+
parent_otel_span: Optional[Span] = None,
1105+
**kwargs,
1106+
) -> Union[Any, List[Any]]:
1107+
_redis_client: Any = self.init_async_client()
1108+
start_time = time.time()
1109+
print_verbose(f"LPOP from Redis list: key: {key}, count: {count}")
1110+
try:
1111+
result = await _redis_client.lpop(key, count)
1112+
## LOGGING ##
1113+
end_time = time.time()
1114+
_duration = end_time - start_time
1115+
asyncio.create_task(
1116+
self.service_logger_obj.async_service_success_hook(
1117+
service=ServiceTypes.REDIS,
1118+
duration=_duration,
1119+
call_type="async_lpop",
1120+
)
1121+
)
1122+
1123+
# Handle result parsing if needed
1124+
if isinstance(result, bytes):
1125+
try:
1126+
return result.decode("utf-8")
1127+
except Exception:
1128+
return result
1129+
elif isinstance(result, list) and all(
1130+
isinstance(item, bytes) for item in result
1131+
):
1132+
try:
1133+
return [item.decode("utf-8") for item in result]
1134+
except Exception:
1135+
return result
1136+
return result
1137+
except Exception as e:
1138+
# NON blocking - notify users Redis is throwing an exception
1139+
## LOGGING ##
1140+
end_time = time.time()
1141+
_duration = end_time - start_time
1142+
asyncio.create_task(
1143+
self.service_logger_obj.async_service_failure_hook(
1144+
service=ServiceTypes.REDIS,
1145+
duration=_duration,
1146+
error=e,
1147+
call_type="async_lpop",
1148+
)
1149+
)
1150+
verbose_logger.error(
1151+
f"LiteLLM Redis Cache LPOP: - Got exception from REDIS : {str(e)}"
1152+
)
1153+
raise e

litellm/constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
DEFAULT_IMAGE_HEIGHT = 300
1919
MAX_SIZE_PER_ITEM_IN_MEMORY_CACHE_IN_KB = 1024 # 1MB = 1024KB
2020
SINGLE_DEPLOYMENT_TRAFFIC_FAILURE_THRESHOLD = 1000 # Minimum number of requests to consider "reasonable traffic". Used for single-deployment cooldown logic.
21+
REDIS_UPDATE_BUFFER_KEY = "litellm_spend_update_buffer"
22+
MAX_REDIS_BUFFER_DEQUEUE_COUNT = 100
2123
#### RELIABILITY ####
2224
REPEATED_STREAMING_CHUNK_LIMIT = 100 # catch if model starts looping the same chunk while streaming. Uses high default to prevent false positives.
2325
#### Networking settings ####
@@ -443,3 +445,7 @@
443445

444446
UI_SESSION_TOKEN_TEAM_ID = "litellm-dashboard"
445447
LITELLM_PROXY_ADMIN_NAME = "default_user_id"
448+
449+
########################### DB CRON JOB NAMES ###########################
450+
DB_SPEND_UPDATE_JOB_NAME = "db_spend_update_job"
451+
DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = 60 # 1 minute

litellm/proxy/_types.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,21 @@ class LitellmTableNames(str, enum.Enum):
144144
PROXY_MODEL_TABLE_NAME = "LiteLLM_ProxyModelTable"
145145

146146

147+
class Litellm_EntityType(enum.Enum):
148+
"""
149+
Enum for types of entities on litellm
150+
151+
This enum allows specifying the type of entity that is being tracked in the database.
152+
"""
153+
154+
KEY = "key"
155+
USER = "user"
156+
END_USER = "end_user"
157+
TEAM = "team"
158+
TEAM_MEMBER = "team_member"
159+
ORGANIZATION = "organization"
160+
161+
147162
def hash_token(token: str):
148163
import hashlib
149164

@@ -2719,3 +2734,16 @@ class DailyUserSpendTransaction(TypedDict):
27192734
completion_tokens: int
27202735
spend: float
27212736
api_requests: int
2737+
2738+
2739+
class DBSpendUpdateTransactions(TypedDict):
2740+
"""
2741+
Internal Data Structure for buffering spend updates in Redis or in memory before committing them to the database
2742+
"""
2743+
2744+
user_list_transactions: Optional[Dict[str, float]]
2745+
end_user_list_transactions: Optional[Dict[str, float]]
2746+
key_list_transactions: Optional[Dict[str, float]]
2747+
team_list_transactions: Optional[Dict[str, float]]
2748+
team_member_list_transactions: Optional[Dict[str, float]]
2749+
org_list_transactions: Optional[Dict[str, float]]

0 commit comments

Comments
 (0)