2121use anyhow:: { Context as _, Result } ;
2222use chrono:: { DateTime , Utc } ;
2323use futures:: TryStreamExt ;
24+ use mz_ore:: future:: { timeout, TimeoutError } ;
2425use mz_tls_util:: make_tls;
2526use std:: fmt;
2627use std:: path:: PathBuf ;
@@ -230,7 +231,7 @@ pub enum RelationCategory {
230231 /// For relations that belong in the `mz_introspection` schema.
231232 /// These relations require a replica name to be specified.
232233 Introspection ,
233- /// For relations that are retained metric objects that we'd also like to get the SUBSCRIBE output for.
234+ /// For relations that are retained metric objects that we'd like to get the SUBSCRIBE output for.
234235 Retained ,
235236 /// Other relations that we want to do a SELECT * FROM on.
236237 Basic ,
@@ -373,10 +374,18 @@ static RELATIONS: &[Relation] = &[
373374 category : RelationCategory :: Basic ,
374375 } ,
375376 // Sources/sinks
377+ Relation {
378+ name : "mz_source_statistics_with_history" ,
379+ category : RelationCategory :: Basic ,
380+ } ,
376381 Relation {
377382 name : "mz_source_statistics_with_history" ,
378383 category : RelationCategory :: Retained ,
379384 } ,
385+ Relation {
386+ name : "mz_sink_statistics" ,
387+ category : RelationCategory :: Basic ,
388+ } ,
380389 Relation {
381390 name : "mz_sink_statistics" ,
382391 category : RelationCategory :: Retained ,
@@ -621,6 +630,11 @@ static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
621630// TODO (debug_tool3): Make this configurable.
622631static PG_QUERY_TIMEOUT : Duration = Duration :: from_secs ( 20 ) ;
623632
633+ /// The amount of time we wait to collect data from the subscribe
634+ /// query before cancelling the query. This is to prevent the query
635+ /// from running indefinitely.
636+ static SUBSCRIBE_SCRAPE_TIMEOUT : Duration = Duration :: from_secs ( 3 ) ;
637+
624638/// The maximum number of errors we tolerate for a cluster replica.
625639/// If a cluster replica has more than this many errors, we skip it.
626640static MAX_CLUSTER_REPLICA_ERROR_COUNT : usize = 3 ;
@@ -682,43 +696,111 @@ pub async fn create_postgres_connection(
682696 Ok ( ( pg_client, pg_conn, tls) )
683697}
684698
685- pub async fn copy_relation_to_csv (
699+ pub async fn write_copy_stream (
686700 transaction : & Transaction < ' _ > ,
687- file_path_name : PathBuf ,
688- column_names : & Vec < String > ,
701+ copy_query : & str ,
702+ file : & mut tokio :: fs :: File ,
689703 relation_name : & str ,
690704) -> Result < ( ) , anyhow:: Error > {
691- let mut file = tokio:: fs:: File :: create ( & file_path_name) . await ?;
692- // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
693- file. write_all ( ( column_names. join ( "," ) + "\n " ) . as_bytes ( ) )
694- . await ?;
695-
696- // Stream data rows to CSV
697- let copy_query = format ! (
698- "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)" ,
699- relation_name
700- ) ;
701-
702705 let copy_stream = transaction
703- . copy_out ( & copy_query)
706+ . copy_out ( copy_query)
704707 . await
705708 . context ( format ! ( "Failed to COPY TO for {}" , relation_name) ) ?
706709 . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e) ) ;
707710 let copy_stream = std:: pin:: pin!( copy_stream) ;
708711 let mut reader = StreamReader :: new ( copy_stream) ;
709- tokio:: io:: copy ( & mut reader, & mut file) . await ?;
712+ tokio:: io:: copy ( & mut reader, file) . await ?;
710713 // Ensure the file is flushed to disk.
711714 file. sync_all ( ) . await ?;
712715
713- info ! ( "Copied {} to {}" , relation_name, file_path_name. display( ) ) ;
714716 Ok :: < ( ) , anyhow:: Error > ( ( ) )
715717}
716718
719+ pub async fn copy_relation_to_csv (
720+ transaction : & Transaction < ' _ > ,
721+ file_path_name : PathBuf ,
722+ column_names : & Vec < String > ,
723+ relation : & Relation ,
724+ tls : MakeTlsConnector ,
725+ ) -> Result < ( ) , anyhow:: Error > {
726+ let mut file = tokio:: fs:: File :: create ( & file_path_name) . await ?;
727+ // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
728+ file. write_all ( ( column_names. join ( "," ) + "\n " ) . as_bytes ( ) )
729+ . await ?;
730+
731+ match relation. category {
732+ RelationCategory :: Retained => {
733+ let copy_query = format ! (
734+ "COPY (SUBSCRIBE TO (SELECT * FROM {})) TO STDOUT WITH (FORMAT CSV);" ,
735+ relation. name
736+ ) ;
737+
738+ let copy_fut = write_copy_stream ( transaction, & copy_query, & mut file, relation. name ) ;
739+ // We use a timeout to cut the SUBSCRIBE query short since it's expected to run indefinitely.
740+ // Alternatively, we could use a `DECLARE...FETCH ALL` for the same effect, but then we'd have
741+ // to format the result as CSV ourselves, leading to more code. Another alternative is to
742+ // specify an UPTO, but it gets finicky to get the UPTO frontier right since we can't rely on
743+ // wallclock time.
744+ let res = timeout ( SUBSCRIBE_SCRAPE_TIMEOUT , copy_fut) . await ;
745+
746+ match res {
747+ Ok ( ( ) ) => Ok ( ( ) ) ,
748+ Err ( TimeoutError :: DeadlineElapsed ) => {
749+ transaction. cancel_token ( ) . cancel_query ( tls) . await ?;
750+ Ok ( ( ) )
751+ }
752+ Err ( e) => Err ( e) ,
753+ }
754+ . map_err ( |e| anyhow:: anyhow!( e) ) ?;
755+ }
756+ _ => {
757+ let copy_query = format ! (
758+ "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)" ,
759+ relation. name
760+ ) ;
761+ write_copy_stream ( transaction, & copy_query, & mut file, relation. name ) . await ?;
762+ }
763+ } ;
764+
765+ info ! ( "Copied {} to {}" , relation. name, file_path_name. display( ) ) ;
766+ Ok :: < ( ) , anyhow:: Error > ( ( ) )
767+ }
768+
769+ pub async fn query_column_names (
770+ pg_client : & PgClient ,
771+ relation : & Relation ,
772+ ) -> Result < Vec < String > , anyhow:: Error > {
773+ let relation_name = relation. name ;
774+ // We query the column names to write the header row of the CSV file.
775+ let mut column_names = pg_client
776+ . query ( & format ! ( "SHOW COLUMNS FROM {}" , & relation_name) , & [ ] )
777+ . await
778+ . context ( format ! ( "Failed to get column names for {}" , relation_name) ) ?
779+ . into_iter ( )
780+ . map ( |row| match row. try_get :: < _ , String > ( "name" ) {
781+ Ok ( name) => Some ( name) ,
782+ Err ( _) => None ,
783+ } )
784+ . filter_map ( |row| row)
785+ . collect :: < Vec < _ > > ( ) ;
786+
787+ match relation. category {
788+ RelationCategory :: Retained => {
789+ column_names. splice ( 0 ..0 , [ "mz_timestamp" . to_string ( ) , "mz_diff" . to_string ( ) ] ) ;
790+ }
791+ _ => ( ) ,
792+ }
793+
794+ Ok ( column_names)
795+ }
796+
717797pub async fn query_relation (
718798 transaction : & Transaction < ' _ > ,
719799 start_time : DateTime < Utc > ,
720800 relation : & Relation ,
801+ column_names : & Vec < String > ,
721802 cluster_replica : Option < & ClusterReplica > ,
803+ tls : MakeTlsConnector ,
722804) -> Result < ( ) , anyhow:: Error > {
723805 let relation_name = relation. name ;
724806 let relation_category = & relation. category ;
@@ -750,42 +832,31 @@ pub async fn query_relation(
750832 ) ) ?;
751833 }
752834
753- // We query the column names to write the header row of the CSV file.
754- let column_names = transaction
755- . query ( & format ! ( "SHOW COLUMNS FROM {}" , & relation_name) , & [ ] )
756- . await
757- . context ( format ! ( "Failed to get column names for {}" , relation_name) ) ?
758- . into_iter ( )
759- . map ( |row| match row. try_get :: < _ , String > ( "name" ) {
760- Ok ( name) => Some ( name) ,
761- Err ( _) => None ,
762- } )
763- . filter_map ( |row| row)
764- . collect :: < Vec < _ > > ( ) ;
765-
766835 match relation_category {
767836 RelationCategory :: Basic => {
768837 let file_path = format_file_path ( start_time, None ) ;
769838 let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
770839 tokio:: fs:: create_dir_all ( & file_path) . await ?;
771840
772- copy_relation_to_csv ( transaction, file_path_name, & column_names, relation_name ) . await ?;
841+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation , tls ) . await ?;
773842 }
774843 RelationCategory :: Introspection => {
775844 let file_path = format_file_path ( start_time, cluster_replica) ;
776845 tokio:: fs:: create_dir_all ( & file_path) . await ?;
777846
778847 let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
779848
780- copy_relation_to_csv ( transaction, file_path_name, & column_names, relation_name ) . await ?;
849+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation , tls ) . await ?;
781850 }
782- _ => {
851+ RelationCategory :: Retained => {
852+ // Copy the current state and retained subscribe state
783853 let file_path = format_file_path ( start_time, None ) ;
784- let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
854+ let file_path_name = file_path
855+ . join ( format ! ( "{}_subscribe" , relation_name) )
856+ . with_extension ( "csv" ) ;
785857 tokio:: fs:: create_dir_all ( & file_path) . await ?;
786858
787- copy_relation_to_csv ( transaction, file_path_name, & column_names, relation_name) . await ?;
788- // TODO (debug_tool1): Dump the `FETCH ALL SUBSCRIBE` output too
859+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation, tls) . await ?;
789860 }
790861 }
791862 Ok :: < ( ) , anyhow:: Error > ( ( ) )
@@ -845,8 +916,12 @@ impl<'n> SystemCatalogDumper<'n> {
845916 cluster_replica : Option < & ClusterReplica > ,
846917 ) -> Result < ( ) , anyhow:: Error > {
847918 info ! (
848- "Copying relation {}{}" ,
919+ "Copying relation {}{}{} " ,
849920 relation. name,
921+ match relation. category {
922+ RelationCategory :: Retained => " (subscribe history)" ,
923+ _ => "" ,
924+ } ,
850925 cluster_replica. map_or_else( || "" . to_string( ) , |replica| format!( " in {}" , replica) )
851926 ) ;
852927
@@ -865,10 +940,23 @@ impl<'n> SystemCatalogDumper<'n> {
865940
866941 async move {
867942 // TODO (debug_tool3): Use a transaction for the entire dump instead of per query.
868- let mut pg_client_lock = pg_client. lock ( ) . await ;
869- let transaction = pg_client_lock. transaction ( ) . await ?;
870-
871- match query_relation ( & transaction, start_time, relation, cluster_replica) . await
943+ let mut pg_client = pg_client. lock ( ) . await ;
944+
945+ // We cannot query the column names in the transaction because SUBSCRIBE queries
946+ // cannot be executed with SELECT and SHOW queries in the same transaction.
947+ let column_names = query_column_names ( & pg_client, relation) . await ?;
948+
949+ let transaction = pg_client. transaction ( ) . await ?;
950+
951+ match query_relation (
952+ & transaction,
953+ start_time,
954+ relation,
955+ & column_names,
956+ cluster_replica,
957+ self . pg_tls . clone ( ) ,
958+ )
959+ . await
872960 {
873961 Ok ( ( ) ) => Ok ( ( ) ) ,
874962 Err ( err) => {
0 commit comments