18
18
use anyhow:: { Context as _, Result } ;
19
19
use chrono:: { DateTime , Utc } ;
20
20
use futures:: TryStreamExt ;
21
+ use mz_ore:: future:: { timeout, TimeoutError } ;
21
22
use mz_tls_util:: make_tls;
22
23
use std:: fmt;
23
24
use std:: path:: PathBuf ;
@@ -26,9 +27,7 @@ use std::sync::Arc;
26
27
use std:: time:: Duration ;
27
28
use tokio:: io:: AsyncWriteExt ;
28
29
use tokio:: sync:: Mutex ;
29
- use tokio_postgres:: {
30
- Client as PgClient , Config as PgConfig , Connection , NoTls , Socket , Transaction ,
31
- } ;
30
+ use tokio_postgres:: { Client as PgClient , Config as PgConfig , Connection , Socket , Transaction } ;
32
31
use tokio_util:: io:: StreamReader ;
33
32
34
33
use k8s_openapi:: api:: core:: v1:: Service ;
@@ -229,7 +228,7 @@ pub enum RelationCategory {
229
228
/// For relations that belong in the `mz_introspection` schema.
230
229
/// These relations require a replica name to be specified.
231
230
Introspection ,
232
- /// For relations that are retained metric objects that we'd also like to get the SUBSCRIBE output for.
231
+ /// For relations that are retained metric objects that we'd like to get the SUBSCRIBE output for.
233
232
Retained ,
234
233
/// Other relations that we want to do a SELECT * FROM on.
235
234
Basic ,
@@ -372,10 +371,18 @@ static RELATIONS: &[Relation] = &[
372
371
category : RelationCategory :: Basic ,
373
372
} ,
374
373
// Sources/sinks
374
+ Relation {
375
+ name : "mz_source_statistics_with_history" ,
376
+ category : RelationCategory :: Basic ,
377
+ } ,
375
378
Relation {
376
379
name : "mz_source_statistics_with_history" ,
377
380
category : RelationCategory :: Retained ,
378
381
} ,
382
+ Relation {
383
+ name : "mz_sink_statistics" ,
384
+ category : RelationCategory :: Basic ,
385
+ } ,
379
386
Relation {
380
387
name : "mz_sink_statistics" ,
381
388
category : RelationCategory :: Retained ,
@@ -620,6 +627,11 @@ static PG_CONNECTION_TIMEOUT: Duration = Duration::from_secs(60);
620
627
/// sign that the operation won't work.
621
628
static PG_QUERY_TIMEOUT : Duration = Duration :: from_secs ( 20 ) ;
622
629
630
+ /// The amount of time we wait to collect data from the subscribe
631
+ /// query before cancelling the query. This is to prevent the query
632
+ /// from running indefinitely.
633
+ static SUBSCRIBE_SCRAPE_TIMEOUT : Duration = Duration :: from_secs ( 3 ) ;
634
+
623
635
/// The maximum number of errors we tolerate for a cluster replica.
624
636
/// If a cluster replica has more than this many errors, we skip it.
625
637
static MAX_CLUSTER_REPLICA_ERROR_COUNT : usize = 3 ;
@@ -681,41 +693,110 @@ pub async fn create_postgres_connection(
681
693
Ok ( ( pg_client, pg_conn, tls) )
682
694
}
683
695
696
+ pub async fn write_copy_stream (
697
+ transaction : & Transaction < ' _ > ,
698
+ copy_query : & str ,
699
+ file : & mut tokio:: fs:: File ,
700
+ relation_name : & str ,
701
+ ) -> Result < ( ) , anyhow:: Error > {
702
+ let copy_stream = transaction
703
+ . copy_out ( copy_query)
704
+ . await
705
+ . context ( format ! ( "Failed to COPY TO for {}" , relation_name) ) ?
706
+ . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e) ) ;
707
+ let copy_stream = std:: pin:: pin!( copy_stream) ;
708
+ let mut reader = StreamReader :: new ( copy_stream) ;
709
+ tokio:: io:: copy ( & mut reader, file) . await ?;
710
+ Ok :: < ( ) , anyhow:: Error > ( ( ) )
711
+ }
712
+
684
713
pub async fn copy_relation_to_csv (
685
714
transaction : & Transaction < ' _ > ,
686
715
file_path_name : PathBuf ,
687
- column_names : & mut Vec < String > ,
688
- relation_name : & str ,
716
+ column_names : & Vec < String > ,
717
+ relation : & Relation ,
718
+ tls : MakeTlsConnector ,
689
719
) -> Result < ( ) , anyhow:: Error > {
690
720
let mut file = tokio:: fs:: File :: create ( & file_path_name) . await ?;
691
721
692
722
file. write_all ( ( column_names. join ( "," ) + "\n " ) . as_bytes ( ) )
693
723
. await ?;
694
724
695
- // Stream data rows to CSV
696
- let copy_query = format ! (
697
- "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)" ,
698
- relation_name
699
- ) ;
725
+ match relation. category {
726
+ RelationCategory :: Retained => {
727
+ let copy_query = format ! (
728
+ "COPY (SUBSCRIBE TO (SELECT * FROM {})) TO STDOUT WITH (FORMAT CSV);" ,
729
+ relation. name
730
+ ) ;
731
+
732
+ let copy_fut = write_copy_stream ( transaction, & copy_query, & mut file, relation. name ) ;
733
+
734
+ // We use a timeout to cut the SUBSCRIBE query short since it's expected to run indefinitely.
735
+ // Alternatively, we could use a `DECLARE...FETCH ALL` for the same effect, but then we'd have
736
+ // to format the result as CSV ourselves, leading to more code. Another alternative is to
737
+ // specify an UPTO, but it gets finicky to get the UPTO frontier right since we can't rely on
738
+ // wallclock time.
739
+ let res = timeout ( SUBSCRIBE_SCRAPE_TIMEOUT , copy_fut) . await ;
740
+
741
+ match res {
742
+ Ok ( ( ) ) => Ok ( ( ) ) ,
743
+ Err ( TimeoutError :: DeadlineElapsed ) => {
744
+ transaction. cancel_token ( ) . cancel_query ( tls) . await ?;
745
+ Ok ( ( ) )
746
+ }
747
+ Err ( e) => Err ( e) ,
748
+ }
749
+ . map_err ( |e| anyhow:: anyhow!( e) ) ?;
750
+ }
751
+ _ => {
752
+ let copy_query = format ! (
753
+ "COPY (SELECT * FROM {}) TO STDOUT WITH (FORMAT CSV)" ,
754
+ relation. name
755
+ ) ;
756
+ write_copy_stream ( transaction, & copy_query, & mut file, relation. name ) . await ?;
757
+ }
758
+ } ;
759
+
760
+ info ! ( "Copied {} to {}" , relation. name, file_path_name. display( ) ) ;
761
+ Ok :: < ( ) , anyhow:: Error > ( ( ) )
762
+ }
700
763
701
- let copy_stream = transaction
702
- . copy_out ( & copy_query)
764
+ pub async fn query_column_names (
765
+ pg_client : & PgClient ,
766
+ relation : & Relation ,
767
+ ) -> Result < Vec < String > , anyhow:: Error > {
768
+ let relation_name = relation. name ;
769
+ // We query the column names to write the header row of the CSV file.
770
+ // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
771
+ let mut column_names = pg_client
772
+ . query ( & format ! ( "SHOW COLUMNS FROM {}" , & relation_name) , & [ ] )
703
773
. await
704
- . context ( format ! ( "Failed to COPY TO for {}" , relation_name) ) ?
705
- . map_err ( |e| std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , e) ) ;
706
- let copy_stream = std:: pin:: pin!( copy_stream) ;
707
- let mut reader = StreamReader :: new ( copy_stream) ;
708
- tokio:: io:: copy ( & mut reader, & mut file) . await ?;
774
+ . context ( format ! ( "Failed to get column names for {}" , relation_name) ) ?
775
+ . into_iter ( )
776
+ . map ( |row| match row. try_get :: < _ , String > ( "name" ) {
777
+ Ok ( name) => Some ( name) ,
778
+ Err ( _) => None ,
779
+ } )
780
+ . filter_map ( |row| row)
781
+ . collect :: < Vec < _ > > ( ) ;
709
782
710
- info ! ( "Copied {} to {}" , relation_name, file_path_name. display( ) ) ;
711
- Ok :: < ( ) , anyhow:: Error > ( ( ) )
783
+ match relation. category {
784
+ RelationCategory :: Retained => {
785
+ column_names. splice ( 0 ..0 , [ "mz_timestamp" . to_string ( ) , "mz_diff" . to_string ( ) ] ) ;
786
+ }
787
+ _ => ( ) ,
788
+ }
789
+
790
+ Ok ( column_names)
712
791
}
713
792
714
793
pub async fn query_relation (
715
794
transaction : & Transaction < ' _ > ,
716
795
start_time : DateTime < Utc > ,
717
796
relation : & Relation ,
797
+ column_names : & Vec < String > ,
718
798
cluster_replica : Option < & ClusterReplica > ,
799
+ tls : MakeTlsConnector ,
719
800
) -> Result < ( ) , anyhow:: Error > {
720
801
let relation_name = relation. name ;
721
802
let relation_category = & relation. category ;
@@ -747,61 +828,31 @@ pub async fn query_relation(
747
828
) ) ?;
748
829
}
749
830
750
- // We query the column names to write the header row of the CSV file.
751
- // TODO (SangJunBak): Use `WITH (HEADER TRUE)` once database-issues#2846 is implemented.
752
- let mut column_names = transaction
753
- . query ( & format ! ( "SHOW COLUMNS FROM {}" , & relation_name) , & [ ] )
754
- . await
755
- . context ( format ! ( "Failed to get column names for {}" , relation_name) ) ?
756
- . into_iter ( )
757
- . map ( |row| match row. try_get :: < _ , String > ( "name" ) {
758
- Ok ( name) => Some ( name) ,
759
- Err ( _) => None ,
760
- } )
761
- . filter_map ( |row| row)
762
- . collect :: < Vec < _ > > ( ) ;
763
-
764
831
match relation_category {
765
832
RelationCategory :: Basic => {
766
833
let file_path = format_file_path ( start_time, None ) ;
767
834
let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
768
835
tokio:: fs:: create_dir_all ( & file_path) . await ?;
769
836
770
- copy_relation_to_csv (
771
- transaction,
772
- file_path_name,
773
- & mut column_names,
774
- relation_name,
775
- )
776
- . await ?;
837
+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation, tls) . await ?;
777
838
}
778
839
RelationCategory :: Introspection => {
779
840
let file_path = format_file_path ( start_time, cluster_replica) ;
780
841
tokio:: fs:: create_dir_all ( & file_path) . await ?;
781
842
782
843
let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
783
844
784
- copy_relation_to_csv (
785
- transaction,
786
- file_path_name,
787
- & mut column_names,
788
- relation_name,
789
- )
790
- . await ?;
845
+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation, tls) . await ?;
791
846
}
792
- _ => {
847
+ RelationCategory :: Retained => {
848
+ // Copy the current state and retained subscribe state
793
849
let file_path = format_file_path ( start_time, None ) ;
794
- let file_path_name = file_path. join ( relation_name) . with_extension ( "csv" ) ;
850
+ let file_path_name = file_path
851
+ . join ( format ! ( "{}_subscribe" , relation_name) )
852
+ . with_extension ( "csv" ) ;
795
853
tokio:: fs:: create_dir_all ( & file_path) . await ?;
796
854
797
- copy_relation_to_csv (
798
- transaction,
799
- file_path_name,
800
- & mut column_names,
801
- relation_name,
802
- )
803
- . await ?;
804
- // TODO (debug_tool1): Dump the `FETCH ALL SUBSCRIBE` output too
855
+ copy_relation_to_csv ( transaction, file_path_name, column_names, relation, tls) . await ?;
805
856
}
806
857
}
807
858
Ok :: < ( ) , anyhow:: Error > ( ( ) )
@@ -861,8 +912,12 @@ impl<'n> SystemCatalogDumper<'n> {
861
912
cluster_replica : Option < & ClusterReplica > ,
862
913
) -> Result < ( ) , anyhow:: Error > {
863
914
info ! (
864
- "Copying relation {}{}" ,
915
+ "Copying relation {}{}{} " ,
865
916
relation. name,
917
+ match relation. category {
918
+ RelationCategory :: Retained => " (subscribe history)" ,
919
+ _ => "" ,
920
+ } ,
866
921
cluster_replica. map_or_else( || "" . to_string( ) , |replica| format!( " in {}" , replica) )
867
922
) ;
868
923
@@ -881,10 +936,23 @@ impl<'n> SystemCatalogDumper<'n> {
881
936
882
937
async move {
883
938
// TODO (debug_tool3): Use a transaction for the entire dump instead of per query.
884
- let mut pg_client_lock = pg_client. lock ( ) . await ;
885
- let transaction = pg_client_lock. transaction ( ) . await ?;
886
-
887
- match query_relation ( & transaction, start_time, & relation, cluster_replica) . await
939
+ let mut pg_client = pg_client. lock ( ) . await ;
940
+
941
+ // We cannot query the column names in the transaction because SUBSCRIBE queries
942
+ // cannot be executed with SELECT and SHOW queries in the same transaction.
943
+ let column_names = query_column_names ( & pg_client, & relation) . await ?;
944
+
945
+ let transaction = pg_client. transaction ( ) . await ?;
946
+
947
+ match query_relation (
948
+ & transaction,
949
+ start_time,
950
+ & relation,
951
+ & column_names,
952
+ cluster_replica,
953
+ self . pg_tls . clone ( ) ,
954
+ )
955
+ . await
888
956
{
889
957
Ok ( ( ) ) => Ok ( ( ) ) ,
890
958
Err ( err) => {
0 commit comments