-
-
Notifications
You must be signed in to change notification settings - Fork 89
Description
Describe the bug
When using pg_chameleon with a MySQL source that experiences temporary connectivity issues, the read daemon crashes despite having on_error_read: continue
configured. Similar to #69.
To Reproduce
Steps to reproduce the behavior:
- Configure pg_chameleon with
on_error_read: continue
in the source configuration - Start replication
- Shut the MySQL server after the __init_read_replica is finished but before the batch_data is finished
- Connection times out
- Instead of continuing and attempting to reconnect as configured, the read daemon crashes completely
Expected behavior
When on_error_read: continue
is set, the read daemon should catch connection timeout errors, log them, and attempt to reconnect rather than crashing.
Environment:
- OS: Debian
- MySQL Version: 8.0.40
- PostgreSQL Version: 17
- Python Version: 3.11
- Cloud hosted database: Selfhost
Additional context
The issue was previously addressed in commit 9b8e98a (for issue #69), but the fix is incomplete. While on_error_read
is checked in __init_read_replica
, the error occurring in the read_replica
method when calling __read_replica_stream
isn't caught.
Log snippet showing the crash:
2025-02-19 00:23:50 MainProcess ERROR: Read process alive: False - Replay process alive: True
2025-02-19 00:23:50 MainProcess ERROR: Stack trace: Traceback (most recent call last):
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pymysql/connections.py", line 649, in connect
sock = socket.create_connection(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/socket.py", line 851, in create_connection
raise exceptions[0]
File "/usr/lib/python3.11/socket.py", line 836, in create_connection
sock.connect(sa)
TimeoutError: [Errno 110] Connection timed out
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pg_chameleon/lib/global_lib.py", line 535, in read_replica
self.mysql_source.read_replica()
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pg_chameleon/lib/mysql_lib.py", line 1527, in read_replica
replica_data=self.__read_replica_stream(batch_data)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pg_chameleon/lib/mysql_lib.py", line 1290, in __read_replica_stream
for binlogevent in my_stream:
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pymysqlreplication/binlogstream.py", line 587, in fetchone
self.__connect_to_stream()
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pymysqlreplication/binlogstream.py", line 349, in __connect_to_stream
self._stream_connection = self.pymysql_wrapper(**self.__connection_settings)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pymysql/connections.py", line 361, in __init__
self.connect()
File "/home/REDACTED/chameleon/lib/python3.11/site-packages/pymysql/connections.py", line 716, in connect
raise exc
pymysql.err.OperationalError: (2003, "Can't connect to MySQL server on 'REDACTED' ([Errno 110] Connection timed out)")
2025-02-19 00:23:50 MainProcess ERROR: Read daemon crashed. Terminating the replay daemon.
2025-02-19 00:23:50 MainProcess INFO: Replica process for source mysql ended
While on_error_read
is set correctly as shown here:
pg_chameleon/pg_chameleon/lib/mysql_lib.py
Lines 870 to 881 in 5458575
def __init_read_replica(self): | |
""" | |
The method calls the pre-steps required by the read replica method. | |
""" | |
self.replica_conn = {} | |
self.source_config = self.sources[self.source] | |
try: | |
exit_on_error = True if self.source_config["on_error_read"]=='exit' else False | |
except KeyError: | |
exit_on_error = True |
The call to __read_replica_stream
in read_replica
is not wrapped in a try/except block that would respect this setting:
pg_chameleon/pg_chameleon/lib/mysql_lib.py
Lines 1499 to 1527 in 5458575
def read_replica(self): | |
""" | |
The method gets the batch data from PostgreSQL. | |
If the batch data is not empty then method read_replica_stream is executed to get the rows from | |
the mysql replica stored into the PostgreSQL database. | |
When the method exits the replica_data list is decomposed in the master_data (log name, position and last event's timestamp). | |
If the flag close_batch is set then the master status is saved in PostgreSQL the batch id returned by the method is | |
is saved in the class variable id_batch. | |
This variable is used to determine whether the old batch should be closed or not. | |
If the variable is not empty then the previous batch gets closed with a simple update of the processed flag. | |
""" | |
skip = self.__init_read_replica() | |
if skip: | |
self.logger.warning("Couldn't connect to the source database for reading the replica. Ignoring.") | |
else: | |
self.pg_engine.set_source_status("running") | |
replica_paused = self.pg_engine.get_replica_paused() | |
if replica_paused: | |
self.logger.info("Read replica is paused") | |
self.pg_engine.set_read_paused(True) | |
else: | |
self.pg_engine.set_read_paused(False) | |
batch_data = self.pg_engine.get_batch_data() | |
if len(batch_data)>0: | |
id_batch=batch_data[0][0] | |
self.logger.debug("Batch data %s " % (batch_data)) | |
replica_data=self.__read_replica_stream(batch_data) |
Suggested fix would be to wrap the __read_replica_stream
call in a try/except block that handles connection errors in accordance with the on_error_read
setting.