Skip to content

Commit d6208f3

Browse files
authored
Merge pull request #594 from avison9/feature/parsingVesu
[DataHandler] Add background task for parsing Vesu
2 parents 916dfd1 + f0b2b06 commit d6208f3

File tree

4 files changed

+190
-0
lines changed

4 files changed

+190
-0
lines changed

apps/shared/background_tasks/data_handler/event_tasks.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,44 @@
33
"""
44

55
import logging
6+
import asyncio
67
from datetime import datetime
8+
import threading
79

810
from apps.shared.celery_conf import app
911
from data_handler.handlers.events.nostra.transform_events import NostraTransformer
1012
from data_handler.handlers.events.zklend.transform_events import ZklendTransformer
13+
from apps.data_handler.handlers.loan_states.vesu.events import VesuLoanEntity
1114

1215
logger = logging.getLogger(__name__)
1316

17+
CHUNK_SIZE = 5
18+
19+
"""in the event of mulitple loop run async method in a new thread to avoid event loop conflicts"""
20+
def run_async_in_thread(coro):
21+
"""Run an async coroutine in a new thread with its own event loop."""
22+
result = None
23+
exception = None
24+
25+
def run_coro():
26+
nonlocal result, exception
27+
loop = asyncio.new_event_loop()
28+
asyncio.set_event_loop(loop)
29+
try:
30+
result = loop.run_until_complete(coro)
31+
except Exception as e:
32+
exception = e
33+
finally:
34+
loop.close()
35+
36+
thread = threading.Thread(target=run_coro)
37+
thread.start()
38+
thread.join()
39+
40+
if exception:
41+
raise exception
42+
return result
43+
1444

1545
@app.task(name="process_zklend_events")
1646
def process_zklend_events():
@@ -96,3 +126,44 @@ def process_nostra_events():
96126
exc,
97127
exc_info=True,
98128
)
129+
130+
131+
@app.task(name="process_vesu_events", bind=True, max_retries=3)
132+
def process_vesu_events(self):
133+
"""
134+
Process and store Vesu protocol events.
135+
Fetches ModifyPosition events from the blockchain, updates user positions,
136+
and stores them in a mock database.
137+
"""
138+
start_time = datetime.utcnow()
139+
logger.info("Starting Vesu event processing")
140+
try:
141+
vesu_entity = VesuLoanEntity()
142+
run_async_in_thread(vesu_entity.update_positions_data())
143+
execution_time = (datetime.utcnow() - start_time).total_seconds()
144+
logger.info(
145+
"Successfully processed Vesu events in %.2fs (UTC). Blocks: %d to %d",
146+
execution_time,
147+
vesu_entity.last_processed_block - CHUNK_SIZE, # default as update_positions_data chunk_size
148+
vesu_entity.last_processed_block,
149+
)
150+
except (ValueError, TypeError, RuntimeError) as exc:
151+
execution_time = (datetime.utcnow() - start_time).total_seconds()
152+
logger.error(
153+
"Error processing Vesu events after %.2fs: %s",
154+
execution_time,
155+
exc,
156+
exc_info=True,
157+
)
158+
self.retry(countdown=60)
159+
except Exception as exc:
160+
execution_time = (datetime.utcnow() - start_time).total_seconds()
161+
logger.error(
162+
"Unexpected error processing Vesu events after %.2fs: %s",
163+
execution_time,
164+
exc,
165+
exc_info=True,
166+
)
167+
self.retry(countdown=60)
168+
169+

apps/shared/celery_conf.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@
6767
"task": "process_nostra_events",
6868
"schedule": crontab(minute=f"*/{CRONTAB_TIME}"),
6969
},
70+
"process_vesu_events_every_5_mins": {
71+
"task": "process_vesu_events",
72+
"schedule": crontab(minute="*/5"),
73+
},
7074
}
7175

7276
from apps.shared.background_tasks.data_handler.order_books_tasks import ekubo_order_book

apps/shared/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ psycopg2 = "^2.9.10"
2323
sqlalchemy-utils = "^0.41.2"
2424
pydantic-settings = "^2.8.1"
2525
asyncpg = "^0.30.0"
26+
celery = "^5.3"
2627

2728
[tool.poetry.group.dev.dependencies]
2829
pytest = "^8.3.3"
@@ -31,6 +32,7 @@ black = "^23.3.0"
3132
isort = "^5.12.0"
3233
mypy = "^1.3.0"
3334
flake8 = "^6.0.0"
35+
celery = "^5.3"
3436

3537

3638

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# import pytest
2+
# import time
3+
# import asyncio
4+
# from unittest.mock import Mock, AsyncMock, patch
5+
# from datetime import datetime
6+
# import logging
7+
# from apps.data_handler.handlers.loan_states.vesu.events import VesuLoanEntity
8+
# from apps.shared.background_tasks.data_handler.event_tasks import process_vesu_events
9+
10+
11+
# logging.basicConfig(level=logging.INFO)
12+
# logger = logging.getLogger(__name__)
13+
14+
# @pytest.fixture
15+
# def mock_vesu_entity():
16+
# """Fixture to mock VesuLoanEntity."""
17+
# mock = Mock(spec=VesuLoanEntity)
18+
# mock.last_processed_block = 654244 # Initial block
19+
# return mock
20+
21+
# @pytest.fixture
22+
# def caplog(caplog):
23+
# """Fixture to capture log output."""
24+
# caplog.set_level(logging.INFO, logger="apps.shared.background_tasks.data_handler.event_tasks")
25+
# return caplog
26+
27+
# def test_process_vesu_events_success(mock_vesu_entity, caplog, monkeypatch):
28+
# """Test successful execution of process_vesu_events."""
29+
# # Mock VesuLoanEntity Class
30+
# monkeypatch.setattr(
31+
# "apps.shared.background_tasks.data_handler.event_tasks.VesuLoanEntity",
32+
# Mock(return_value=mock_vesu_entity)
33+
# )
34+
# # Mock update_positions_data
35+
# async def mock_update():
36+
# mock_vesu_entity.last_processed_block = 654249
37+
# mock_vesu_entity.update_positions_data = AsyncMock(side_effect=mock_update)
38+
39+
# process_vesu_events()
40+
41+
# assert "Starting Vesu event processing" in caplog.text
42+
# assert "Successfully processed Vesu events" in caplog.text
43+
# assert "Blocks: 654244 to 654249" in caplog.text
44+
# assert "(UTC)" in caplog.text
45+
# assert mock_vesu_entity.last_processed_block == 654249
46+
47+
# def test_process_vesu_events_value_error(mock_vesu_entity, caplog, monkeypatch):
48+
# """Test handling of ValueError."""
49+
# monkeypatch.setattr(
50+
# "apps.shared.background_tasks.data_handler.event_tasks.VesuLoanEntity",
51+
# Mock(return_value=mock_vesu_entity)
52+
# )
53+
# mock_vesu_entity.update_positions_data = AsyncMock(side_effect=ValueError("Failed to fetch events from Starknet"))
54+
# # Mock retry to prevent Retry exception
55+
# with patch.object(process_vesu_events, "retry", side_effect=Exception("Retry mocked")):
56+
# with pytest.raises(Exception, match="Retry mocked"):
57+
# process_vesu_events()
58+
59+
# assert "Starting Vesu event processing" in caplog.text
60+
# assert "Error processing Vesu events" in caplog.text
61+
# assert "Failed to fetch events from Starknet" in caplog.text
62+
63+
# def test_process_vesu_events_unexpected_error(mock_vesu_entity, caplog, monkeypatch):
64+
# """Test handling of unexpected errors."""
65+
# monkeypatch.setattr(
66+
# "apps.shared.background_tasks.data_handler.event_tasks.VesuLoanEntity",
67+
# Mock(return_value=mock_vesu_entity)
68+
# )
69+
# mock_vesu_entity.update_positions_data = AsyncMock(side_effect=Exception("Unexpected Starknet error"))
70+
# # Mock retry to prevent Retry exception
71+
# with patch.object(process_vesu_events, "retry", side_effect=Exception("Retry mocked")):
72+
# with pytest.raises(Exception, match="Retry mocked"):
73+
# process_vesu_events()
74+
75+
# assert "Starting Vesu event processing" in caplog.text
76+
# assert "Unexpected error processing Vesu events" in caplog.text
77+
# assert "Unexpected Starknet error" in caplog.text
78+
79+
# def test_process_vesu_events_scheduled(mock_vesu_entity, caplog, monkeypatch):
80+
# """Test periodic execution of process_vesu_events with a 3-second interval."""
81+
# chunk_size = 5
82+
# # Mock VesuLoanEntity
83+
# monkeypatch.setattr(
84+
# "apps.shared.background_tasks.data_handler.event_tasks.VesuLoanEntity",
85+
# Mock(return_value=mock_vesu_entity)
86+
# )
87+
88+
# async def mock_update():
89+
# mock_vesu_entity.last_processed_block += chunk_size
90+
# mock_vesu_entity.update_positions_data = AsyncMock(side_effect=mock_update)
91+
92+
# # Simulate 3 runs with 10-second intervals
93+
# initial_block = mock_vesu_entity.last_processed_block
94+
# num_runs = 3
95+
# for i in range(num_runs):
96+
# process_vesu_events()
97+
# if i < num_runs - 1:
98+
# time.sleep(10)
99+
100+
101+
# expected_blocks = [
102+
# (initial_block, initial_block + chunk_size),
103+
# (initial_block + chunk_size, initial_block + 2 * chunk_size),
104+
# (initial_block + 2 * chunk_size, initial_block + 3 * chunk_size)
105+
# ]
106+
# for start_block, end_block in expected_blocks:
107+
# assert f"Blocks: {start_block} to {end_block}" in caplog.text
108+
109+
110+
# assert mock_vesu_entity.last_processed_block == initial_block + num_runs * chunk_size
111+
# assert "Starting Vesu event processing" in caplog.text
112+
# assert "Successfully processed Vesu events" in caplog.text
113+
# assert "(UTC)" in caplog.text

0 commit comments

Comments
 (0)