- 
                Notifications
    You must be signed in to change notification settings 
- Fork 338
Partition Writer Support Part 1: add partition splitter #1040
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| This PR may conflict with #1014. But I'm not sure whether #1014 can be reviewed and merge recently. I'm ok to merge any one first and I will fix the conflict later. cc @liurenjie1024 @jonathanc-n @Fokko @Xuanwo @sdd | 
4ac8823    to
    7bc64bb      
    Compare
  
    | @ZENOTME I should be able to take a look at this tomorrow | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
small nits, I don't know if worth to even change if this is just temporary code.
| // # TODO | ||
| // Remove this after partition writer supported. | ||
| #[allow(dead_code)] | ||
| pub struct RecordBatchPartitionsplitter { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| pub struct RecordBatchPartitionsplitter { | |
| pub struct RecordBatchPartitionSplitter { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and rest of code
| partition_batches.push(( | ||
| row, | ||
| filter_record_batch(batch, &filter_array) | ||
| .expect("We should guarantee the filter array is valid"), | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to propogate error instead of expect
7bc64bb    to
    f60e82d      
    Compare
  
    | This PR is ready to review. cc @liurenjie1024 @Fokko @Xuanwo @sdd | 
| Hi @jonathanc-n & @ZENOTME | 
| @ranjanankur314 I will try my best to give it another review today. At the end of the day we will need a committer's review | 
e91af9e    to
    a482ff3      
    Compare
  
    | Hi, I think this PR is ready to review. cc @liurenjie1024 @Xuanwo @Fokko @kevinjqliu @sdd | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr!
| } | ||
|  | ||
| /// Split the record batch into multiple record batches according to provided partition columns. | ||
| pub fn split_by_partition( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this have to be pub?
| &self, | ||
| batch: &RecordBatch, | ||
| partition_columns: &[ArrayRef], | ||
| ) -> Result<Vec<(OwnedRow, RecordBatch)>> { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use Struct to replace OwnedRow? I assume the partitioned writer needs Struct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! I noticed that out Struct support Hash, so we don't need OwnedRow. I have remove them and use Struct directlty.
| } | ||
|  | ||
| /// Convert row back to iceberg value. | ||
| pub fn convert_row(&self, rows: Vec<OwnedRow>) -> Result<Vec<Struct>> { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this function? Why not just convert partition value columnar batch into Vec<Struct>
a482ff3    to
    c0d5a12      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr! Generally LGTM, just one nit.
| let partition_type = self.partition_spec.partition_type(&self.schema)?; | ||
| let partition_arrow_type = type_to_arrow_type(&Type::Struct(partition_type.clone()))?; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do this conversion for each batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch
eabc8d0    to
    8a62fe2      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ZENOTME for this pr!
Which issue does this PR close?
This PR is part 1 to close #342.
What changes are included in this PR?
The partition writer support will be separate into three PR:
This PR is the first part.
Are these changes tested?