We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
This is the logic of my code.
async fn create_data_events(&self, captured_tables: HashSet<TableId>) -> core::result::Result<(), MySqlConnectorError> { let pool = &self.context.pool; let sender = &self.sender; let max_concurrent_tasks = 2; let semaphore = Arc::new(Semaphore::new(max_concurrent_tasks)); let mut handlers = Vec::new(); for table_id in captured_tables { let table = self.context.schema.table_for(&table_id).unwrap().clone(); let connection = pool.get_conn().await?; let sender_clone = sender.clone(); // Self::export_data(connection, table_id, table, sender.clone()).await?; let semaphore_clone = semaphore.clone(); let join_handler = tokio::spawn(async move { let _permit = semaphore_clone.acquire().await.unwrap(); Self::export_data(connection, table_id, table, sender_clone).await }); handlers.push(join_handler); } for handle in handlers { if let Ok(snapshot_result) = handle.await {} } Ok(()) }
async fn export_data( mut connection: Conn, table_id: TableId, table: Arc<Table>, sender: Sender<FlatMessage>, ) -> core::result::Result<(), MySqlConnectorError> { { info!("Exporting data from table '{}'", table_id); let export_start_time = Instant::now(); let mut stream: ResultSetStream<Row, TextProtocol> = connection.query_stream(format!("select * from {}", table_id)).await?; let mut rows_count = 0; while let Some(next) = stream.next().await { match next { Ok(row) => { let message = table.generate_snapshot_record(row); match sender.send(message).await { Ok(_) => {} Err(err) => error!("{}", err), } } Err(err) => { error!("Snapshotting of table {} failed : {:#?}", table_id, err); } } rows_count += 1; } drop(stream); info!( "Finished exporting {} records for table'{}';total duration {:#?}", rows_count, table_id, export_start_time.elapsed() ); } let _ = connection.disconnect().await?; Ok(()) }
The text was updated successfully, but these errors were encountered:
It's very strange that when I set the Semaphore to 1, which means that the connection is not used concurrently, no error occurs.
Sorry, something went wrong.
No branches or pull requests
This is the logic of my code.
The text was updated successfully, but these errors were encountered: