-
Describe the bugcannot apply partition using partition_cols in DataFrame.write_parquet column to partition: id, datatype: Utf8 python version: 3.11.9 To Reproduceimport daft
import random
n_rows = 200_000
random_numbers = [random.randint(1, n_rows) for _ in range(n_rows)]
df = {
'id': [str(i) for i in random_numbers)
}
for i in range(11):
df[f'col{i}'] = [random.randint(1, n_rows) for _ in range(n_rows)]
df = daft. from_pydict(df)
# df as described as above
df.write_parquet(
root_dir='path/to/test', # please check upload to AWS s3 also if possible, e.g., 's3://bucket/keys'
write_mode="append",
partition_cols=["id", ]
) Expected behaviorI would expect the partition can be done in a second for 200_000 rows. Component(s)Python Runner Additional contextNo response |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments
-
Hi @GZ82 , thanks for opening this issue! Looks like you are partitioning on the Writing 200_000 files, even if the files are small, to local disk is probably going to be quite slow as we will be bottlenecked by file system operations, disk i/o, etc. What we can do on our end is implementing a mechanism to cap the number of open files at a time, which could help. Writing to s3 could be faster, and if that is what you need, then you could benefit from running this workload distributed, via the ray_runner. Just to make sure, is your intent to partition this data into a separate directory for each unique id? |
Beta Was this translation helpful? Give feedback.
-
Hi @colin-ho Thanks very much for your quick reply! what I want to know is if daft has any features/function can |
Beta Was this translation helpful? Give feedback.
-
Yes there's a couple of data skipping optimizations that Daft can do with parquet. Firstly, parquet itself contains metadata such as min/max stats per column. When you do a read -> filter operation, such as Secondly, daft can infer a hive style partitioning scheme for reads, e.g. Lets say we wrote parquet with The hive partitioning approach works well when there's a lot of data per partition. If you are filtering on something like a unique key, then it won't work so well because there will be many directories and files, and parquet statistics will likely do a better job. |
Beta Was this translation helpful? Give feedback.
-
Hi @colin-ho thanks again for your reply in details! I will think about how to partition my data. and glad to know daft will write statistics to parquet files. my last question before closing the issue (which is not a real one): any other metadata will be wrote to parquet file in addition to statistics. |
Beta Was this translation helpful? Give feedback.
-
Yes! Parquet metadata also stores information such as the encodings, compression scheme, bytes offsets of the columns, etc. You can find a general list of what metadata can be stored in a parquet file here: https://parquet.apache.org/docs/file-format/metadata/ The bytes offsets of the columns for example is useful when you read only a few columns, e.g. |
Beta Was this translation helpful? Give feedback.
-
Also, I'm going to convert this issue to a discussion. |
Beta Was this translation helpful? Give feedback.
Yes there's a couple of data skipping optimizations that Daft can do with parquet.
Firstly, parquet itself contains metadata such as min/max stats per column. When you do a read -> filter operation, such as
daft.read_parquet(file).where('id' > 100)
, daft will analyze the parquet metadata to determine which row groups / files match or don't match the filter, and only read the ones that do. By default, daft will write statistics to parquet files.Secondly, daft can infer a hive style partitioning scheme for reads, e.g.
daft.read_parquet(file, hive_partitioning =True)
, and skip partitions entirely. Hive partitioning is essentially where the directories are named by the partition value. This…