1515// specific language governing permissions and limitations 
1616// under the License. 
1717
18+ use  std:: collections:: HashMap ; 
1819use  std:: sync:: Arc ; 
1920
2021use  arrow_array:: builder:: { 
21-     BooleanBuilder ,  ListBuilder ,  PrimitiveBuilder ,  StringBuilder ,  StructBuilder , 
22+     BooleanBuilder ,  GenericListBuilder ,   ListBuilder ,  PrimitiveBuilder ,  StringBuilder ,  StructBuilder , 
2223} ; 
23- use  arrow_array:: types:: { Int32Type ,  Int64Type ,   Int8Type } ; 
24+ use  arrow_array:: types:: { Int32Type ,  Int64Type } ; 
2425use  arrow_array:: RecordBatch ; 
25- use  arrow_schema:: { DataType ,  Field ,  Fields ,   Schema } ; 
26+ use  arrow_schema:: { DataType ,  Field ,  Fields } ; 
2627use  futures:: { stream,  StreamExt } ; 
2728
29+ use  crate :: arrow:: schema_to_arrow_schema; 
2830use  crate :: scan:: ArrowRecordBatchStream ; 
31+ use  crate :: spec:: { FieldSummary ,  ListType ,  NestedField ,  PrimitiveType ,  StructType ,  Type } ; 
2932use  crate :: table:: Table ; 
3033use  crate :: Result ; 
3134
@@ -40,44 +43,111 @@ impl<'a> ManifestsTable<'a> {
4043        Self  {  table } 
4144    } 
4245
43-     fn  partition_summary_fields ( )  -> Vec < Field >  { 
44-         vec ! [ 
45-             Field :: new( "contains_null" ,  DataType :: Boolean ,  false ) , 
46-             Field :: new( "contains_nan" ,  DataType :: Boolean ,  true ) , 
47-             Field :: new( "lower_bound" ,  DataType :: Utf8 ,  true ) , 
48-             Field :: new( "upper_bound" ,  DataType :: Utf8 ,  true ) , 
49-         ] 
50-     } 
51- 
52-     /// Returns the schema of the manifests table. 
53- pub  fn  schema ( & self )  -> Schema  { 
54-         Schema :: new ( vec ! [ 
55-             Field :: new( "content" ,  DataType :: Int8 ,  false ) , 
56-             Field :: new( "path" ,  DataType :: Utf8 ,  false ) , 
57-             Field :: new( "length" ,  DataType :: Int64 ,  false ) , 
58-             Field :: new( "partition_spec_id" ,  DataType :: Int32 ,  false ) , 
59-             Field :: new( "added_snapshot_id" ,  DataType :: Int64 ,  false ) , 
60-             Field :: new( "added_data_files_count" ,  DataType :: Int32 ,  false ) , 
61-             Field :: new( "existing_data_files_count" ,  DataType :: Int32 ,  false ) , 
62-             Field :: new( "deleted_data_files_count" ,  DataType :: Int32 ,  false ) , 
63-             Field :: new( "added_delete_files_count" ,  DataType :: Int32 ,  false ) , 
64-             Field :: new( "existing_delete_files_count" ,  DataType :: Int32 ,  false ) , 
65-             Field :: new( "deleted_delete_files_count" ,  DataType :: Int32 ,  false ) , 
66-             Field :: new( 
46+     /// Returns the iceberg schema of the manifests table. 
47+ pub  fn  schema ( & self )  -> crate :: spec:: Schema  { 
48+         let  fields = vec ! [ 
49+             NestedField :: new( 14 ,  "content" ,  Type :: Primitive ( PrimitiveType :: Int ) ,  true ) , 
50+             NestedField :: new( 1 ,  "path" ,  Type :: Primitive ( PrimitiveType :: String ) ,  true ) , 
51+             NestedField :: new( 2 ,  "length" ,  Type :: Primitive ( PrimitiveType :: Long ) ,  true ) , 
52+             NestedField :: new( 
53+                 3 , 
54+                 "partition_spec_id" , 
55+                 Type :: Primitive ( PrimitiveType :: Int ) , 
56+                 true , 
57+             ) , 
58+             NestedField :: new( 
59+                 4 , 
60+                 "added_snapshot_id" , 
61+                 Type :: Primitive ( PrimitiveType :: Long ) , 
62+                 true , 
63+             ) , 
64+             NestedField :: new( 
65+                 5 , 
66+                 "added_data_files_count" , 
67+                 Type :: Primitive ( PrimitiveType :: Int ) , 
68+                 true , 
69+             ) , 
70+             NestedField :: new( 
71+                 6 , 
72+                 "existing_data_files_count" , 
73+                 Type :: Primitive ( PrimitiveType :: Int ) , 
74+                 true , 
75+             ) , 
76+             NestedField :: new( 
77+                 7 , 
78+                 "deleted_data_files_count" , 
79+                 Type :: Primitive ( PrimitiveType :: Int ) , 
80+                 true , 
81+             ) , 
82+             NestedField :: new( 
83+                 15 , 
84+                 "added_delete_files_count" , 
85+                 Type :: Primitive ( PrimitiveType :: Int ) , 
86+                 true , 
87+             ) , 
88+             NestedField :: new( 
89+                 16 , 
90+                 "existing_delete_files_count" , 
91+                 Type :: Primitive ( PrimitiveType :: Int ) , 
92+                 true , 
93+             ) , 
94+             NestedField :: new( 
95+                 17 , 
96+                 "deleted_delete_files_count" , 
97+                 Type :: Primitive ( PrimitiveType :: Int ) , 
98+                 true , 
99+             ) , 
100+             NestedField :: new( 
101+                 8 , 
67102                "partition_summaries" , 
68-                 DataType :: List ( Arc :: new( Field :: new_struct( 
69-                     "item" , 
70-                     Self :: partition_summary_fields( ) , 
71-                     false , 
72-                 ) ) ) , 
73-                 false , 
103+                 Type :: List ( ListType  { 
104+                     element_field:  Arc :: new( NestedField :: new( 
105+                         9 , 
106+                         "item" , 
107+                         Type :: Struct ( StructType :: new( vec![ 
108+                             Arc :: new( NestedField :: new( 
109+                                 10 , 
110+                                 "contains_null" , 
111+                                 Type :: Primitive ( PrimitiveType :: Boolean ) , 
112+                                 true , 
113+                             ) ) , 
114+                             Arc :: new( NestedField :: new( 
115+                                 11 , 
116+                                 "contains_nan" , 
117+                                 Type :: Primitive ( PrimitiveType :: Boolean ) , 
118+                                 false , 
119+                             ) ) , 
120+                             Arc :: new( NestedField :: new( 
121+                                 12 , 
122+                                 "lower_bound" , 
123+                                 Type :: Primitive ( PrimitiveType :: String ) , 
124+                                 false , 
125+                             ) ) , 
126+                             Arc :: new( NestedField :: new( 
127+                                 13 , 
128+                                 "upper_bound" , 
129+                                 Type :: Primitive ( PrimitiveType :: String ) , 
130+                                 false , 
131+                             ) ) , 
132+                         ] ) ) , 
133+                         true , 
134+                     ) ) , 
135+                 } ) , 
136+                 true , 
74137            ) , 
75-         ] ) 
138+         ] ; 
139+ 
140+         crate :: spec:: Schema :: builder ( ) 
141+             . with_fields ( fields. into_iter ( ) . map ( |f| f. into ( ) ) ) 
142+             . build ( ) 
143+             . unwrap ( ) 
76144    } 
77145
78146    /// Scans the manifests table. 
79147pub  async  fn  scan ( & self )  -> Result < ArrowRecordBatchStream >  { 
80-         let  mut  content = PrimitiveBuilder :: < Int8Type > :: new ( ) ; 
148+         let  schema = schema_to_arrow_schema ( & self . schema ( ) ) ?; 
149+ 
150+         let  mut  content = PrimitiveBuilder :: < Int32Type > :: new ( ) ; 
81151        let  mut  path = StringBuilder :: new ( ) ; 
82152        let  mut  length = PrimitiveBuilder :: < Int64Type > :: new ( ) ; 
83153        let  mut  partition_spec_id = PrimitiveBuilder :: < Int32Type > :: new ( ) ; 
@@ -88,22 +158,14 @@ impl<'a> ManifestsTable<'a> {
88158        let  mut  added_delete_files_count = PrimitiveBuilder :: < Int32Type > :: new ( ) ; 
89159        let  mut  existing_delete_files_count = PrimitiveBuilder :: < Int32Type > :: new ( ) ; 
90160        let  mut  deleted_delete_files_count = PrimitiveBuilder :: < Int32Type > :: new ( ) ; 
91-         let  mut  partition_summaries = ListBuilder :: new ( StructBuilder :: from_fields ( 
92-             Fields :: from ( Self :: partition_summary_fields ( ) ) , 
93-             0 , 
94-         ) ) 
95-         . with_field ( Arc :: new ( Field :: new_struct ( 
96-             "item" , 
97-             Self :: partition_summary_fields ( ) , 
98-             false , 
99-         ) ) ) ; 
161+         let  mut  partition_summaries = self . partition_summary_builder ( ) ?; 
100162
101163        if  let  Some ( snapshot)  = self . table . metadata ( ) . current_snapshot ( )  { 
102164            let  manifest_list = snapshot
103165                . load_manifest_list ( self . table . file_io ( ) ,  & self . table . metadata_ref ( ) ) 
104166                . await ?; 
105167            for  manifest in  manifest_list. entries ( )  { 
106-                 content. append_value ( manifest. content  as  i8 ) ; 
168+                 content. append_value ( manifest. content  as  i32 ) ; 
107169                path. append_value ( manifest. manifest_path . clone ( ) ) ; 
108170                length. append_value ( manifest. manifest_length ) ; 
109171                partition_spec_id. append_value ( manifest. partition_spec_id ) ; 
@@ -119,32 +181,11 @@ impl<'a> ManifestsTable<'a> {
119181                    . append_value ( manifest. existing_files_count . unwrap_or ( 0 )  as  i32 ) ; 
120182                deleted_delete_files_count
121183                    . append_value ( manifest. deleted_files_count . unwrap_or ( 0 )  as  i32 ) ; 
122- 
123-                 let  partition_summaries_builder = partition_summaries. values ( ) ; 
124-                 for  summary in  & manifest. partitions  { 
125-                     partition_summaries_builder
126-                         . field_builder :: < BooleanBuilder > ( 0 ) 
127-                         . unwrap ( ) 
128-                         . append_value ( summary. contains_null ) ; 
129-                     partition_summaries_builder
130-                         . field_builder :: < BooleanBuilder > ( 1 ) 
131-                         . unwrap ( ) 
132-                         . append_option ( summary. contains_nan ) ; 
133-                     partition_summaries_builder
134-                         . field_builder :: < StringBuilder > ( 2 ) 
135-                         . unwrap ( ) 
136-                         . append_option ( summary. lower_bound . as_ref ( ) . map ( |v| v. to_string ( ) ) ) ; 
137-                     partition_summaries_builder
138-                         . field_builder :: < StringBuilder > ( 3 ) 
139-                         . unwrap ( ) 
140-                         . append_option ( summary. upper_bound . as_ref ( ) . map ( |v| v. to_string ( ) ) ) ; 
141-                     partition_summaries_builder. append ( true ) ; 
142-                 } 
143-                 partition_summaries. append ( true ) ; 
184+                 self . append_partition_summaries ( & mut  partition_summaries,  & manifest. partitions ) ; 
144185            } 
145186        } 
146187
147-         let  batch = RecordBatch :: try_new ( Arc :: new ( self . schema ( ) ) ,  vec ! [ 
188+         let  batch = RecordBatch :: try_new ( Arc :: new ( schema) ,  vec ! [ 
148189            Arc :: new( content. finish( ) ) , 
149190            Arc :: new( path. finish( ) ) , 
150191            Arc :: new( length. finish( ) ) , 
@@ -158,9 +199,60 @@ impl<'a> ManifestsTable<'a> {
158199            Arc :: new( deleted_delete_files_count. finish( ) ) , 
159200            Arc :: new( partition_summaries. finish( ) ) , 
160201        ] ) ?; 
161- 
162202        Ok ( stream:: iter ( vec ! [ Ok ( batch) ] ) . boxed ( ) ) 
163203    } 
204+ 
205+     fn  partition_summary_builder ( & self )  -> Result < GenericListBuilder < i32 ,  StructBuilder > >  { 
206+         let  schema = schema_to_arrow_schema ( & self . schema ( ) ) ?; 
207+         let  partition_summary_fields =
208+             match  schema. field_with_name ( "partition_summaries" ) ?. data_type ( )  { 
209+                 DataType :: List ( list_type)  => match  list_type. data_type ( )  { 
210+                     DataType :: Struct ( fields)  => fields. to_vec ( ) , 
211+                     _ => unreachable ! ( ) , 
212+                 } , 
213+                 _ => unreachable ! ( ) , 
214+             } ; 
215+ 
216+         let  partition_summaries = ListBuilder :: new ( StructBuilder :: from_fields ( 
217+             Fields :: from ( partition_summary_fields. clone ( ) ) , 
218+             0 , 
219+         ) ) 
220+         . with_field ( Arc :: new ( 
221+             Field :: new_struct ( "item" ,  partition_summary_fields,  false ) . with_metadata ( 
222+                 HashMap :: from ( [ ( "PARQUET:field_id" . to_string ( ) ,  "9" . to_string ( ) ) ] ) , 
223+             ) , 
224+         ) ) ; 
225+ 
226+         Ok ( partition_summaries) 
227+     } 
228+ 
229+     fn  append_partition_summaries ( 
230+         & self , 
231+         builder :  & mut  GenericListBuilder < i32 ,  StructBuilder > , 
232+         partitions :  & [ FieldSummary ] , 
233+     )  { 
234+         let  partition_summaries_builder = builder. values ( ) ; 
235+         for  summary in  partitions { 
236+             partition_summaries_builder
237+                 . field_builder :: < BooleanBuilder > ( 0 ) 
238+                 . unwrap ( ) 
239+                 . append_value ( summary. contains_null ) ; 
240+             partition_summaries_builder
241+                 . field_builder :: < BooleanBuilder > ( 1 ) 
242+                 . unwrap ( ) 
243+                 . append_option ( summary. contains_nan ) ; 
244+             partition_summaries_builder
245+                 . field_builder :: < StringBuilder > ( 2 ) 
246+                 . unwrap ( ) 
247+                 . append_option ( summary. lower_bound . as_ref ( ) . map ( |v| v. to_string ( ) ) ) ; 
248+             partition_summaries_builder
249+                 . field_builder :: < StringBuilder > ( 3 ) 
250+                 . unwrap ( ) 
251+                 . append_option ( summary. upper_bound . as_ref ( ) . map ( |v| v. to_string ( ) ) ) ; 
252+             partition_summaries_builder. append ( true ) ; 
253+         } 
254+         builder. append ( true ) ; 
255+     } 
164256} 
165257
166258#[ cfg( test) ]  
@@ -175,25 +267,25 @@ mod tests {
175267        let  mut  fixture = TableTestFixture :: new ( ) ; 
176268        fixture. setup_manifest_files ( ) . await ; 
177269
178-         let  batch_stream  = fixture. table . inspect ( ) . manifests ( ) . scan ( ) . await . unwrap ( ) ; 
270+         let  record_batch  = fixture. table . inspect ( ) . manifests ( ) . scan ( ) . await . unwrap ( ) ; 
179271
180272        check_record_batches ( 
181-             batch_stream , 
273+             record_batch , 
182274            expect ! [ [ r#" 
183-                 Field { name: "content", data_type: Int8 , nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
184-                 Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
185-                 Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
186-                 Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
187-                 Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
188-                 Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
189-                 Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
190-                 Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
191-                 Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
192-                 Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
193-                 Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, 
194-                 Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }"# ] ] , 
275+                 Field { name: "content", data_type: Int32 , nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "14" } }, 
276+                 Field { name: "path", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1" } }, 
277+                 Field { name: "length", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2" } }, 
278+                 Field { name: "partition_spec_id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3" } }, 
279+                 Field { name: "added_snapshot_id", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4" } }, 
280+                 Field { name: "added_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "5" } }, 
281+                 Field { name: "existing_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "6" } }, 
282+                 Field { name: "deleted_data_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "7" } }, 
283+                 Field { name: "added_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "15" } }, 
284+                 Field { name: "existing_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "16" } }, 
285+                 Field { name: "deleted_delete_files_count", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "17" } }, 
286+                 Field { name: "partition_summaries", data_type: List(Field { name: "item", data_type: Struct([Field { name: "contains_null", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "10" } }, Field { name: "contains_nan", data_type: Boolean, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "11" } }, Field { name: "lower_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "12" } }, Field { name: "upper_bound", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "13" } }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "9" } }), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "8" } }"# ] ] , 
195287            expect ! [ [ r#" 
196-                 content: PrimitiveArray<Int8 > 
288+                 content: PrimitiveArray<Int32 > 
197289                [ 
198290                  0, 
199291                ], 
0 commit comments