|
| 1 | +from pathlib import Path |
| 2 | +import json |
| 3 | + |
| 4 | +# Currently requires a special branch of pyarrow with extra GeoArrow features |
| 5 | +# https://github.com/apache/arrow/compare/main...paleolimbot:arrow:parquet-geo-write-files-from-geoarrow |
| 6 | +import pyarrow as pa |
| 7 | +from pyarrow import parquet |
| 8 | +import geoarrow.pyarrow as ga |
| 9 | + |
| 10 | +here = Path(__file__).parent |
| 11 | + |
| 12 | + |
| 13 | +def list_wkb_files(): |
| 14 | + wkb_files = [] |
| 15 | + with open(here / "manifest.json") as f: |
| 16 | + manifest = json.load(f) |
| 17 | + for group in manifest["groups"]: |
| 18 | + for file in group["files"]: |
| 19 | + if file["format"] == "arrows/wkb": |
| 20 | + name = Path(file["url"]).name |
| 21 | + local_path = here / group["name"] / "files" / name |
| 22 | + assert local_path.exists() |
| 23 | + wkb_files.append(local_path) |
| 24 | + |
| 25 | + return wkb_files |
| 26 | + |
| 27 | + |
| 28 | +def convert_arrow_wkb_to_parquet(src, dst, compression): |
| 29 | + # Maintain chunking from IPC into Parquet so that the statistics |
| 30 | + # are theoretically the same. |
| 31 | + with ( |
| 32 | + pa.ipc.open_stream(src) as reader, |
| 33 | + parquet.ParquetWriter( |
| 34 | + dst, |
| 35 | + reader.schema, |
| 36 | + store_schema=False, |
| 37 | + compression=compression, |
| 38 | + write_geospatial_logical_types=True, |
| 39 | + ) as writer, |
| 40 | + ): |
| 41 | + print(f"Reading {src}") |
| 42 | + for batch in reader: |
| 43 | + writer.write_batch(batch) |
| 44 | + print(f"Wrote {dst}") |
| 45 | + |
| 46 | + |
| 47 | +def check_parquet_file(src, dst): |
| 48 | + # Read in original table for comparison |
| 49 | + with pa.ipc.open_stream(src) as reader: |
| 50 | + original_table = reader.read_all() |
| 51 | + |
| 52 | + print(f"Checking {dst}") |
| 53 | + # with parquet.ParquetFile(dst, arrow_extensions_enabled=False) as f: |
| 54 | + # print(f.schema) |
| 55 | + # print(f.metadata.metadata) |
| 56 | + with parquet.ParquetFile(dst, arrow_extensions_enabled=True) as f: |
| 57 | + # print(f.schema) |
| 58 | + # print(f.metadata.metadata) |
| 59 | + if f.schema_arrow != original_table.schema: |
| 60 | + print(f"Schema mismatch:\n{f.schema_arrow}\nvs\n{original_table.schema}") |
| 61 | + return False |
| 62 | + |
| 63 | + reread = f.read() |
| 64 | + if reread != original_table: |
| 65 | + print("Table mismatch") |
| 66 | + return False |
| 67 | + |
| 68 | + return True |
| 69 | + |
| 70 | + |
| 71 | +def generate_parquet_testing_files(wkb_files, parquet_testing_path): |
| 72 | + successful_checks = 0 |
| 73 | + written_files = 0 |
| 74 | + for path in wkb_files: |
| 75 | + # Skip big files + one CRS example that includes a non-PROJJSON value |
| 76 | + # on purpose (allowed in GeoArrow), which is rightly rejected |
| 77 | + # by Parquet |
| 78 | + name = path.name.replace("_wkb.arrows", "") |
| 79 | + if ( |
| 80 | + "microsoft-buildings" in name |
| 81 | + or ("ns-water" in name and name != "ns-water_water-point") |
| 82 | + or "wkt2" in name |
| 83 | + ): |
| 84 | + print(f"Skipping {name}") |
| 85 | + continue |
| 86 | + |
| 87 | + dst = parquet_testing_path / f"{name}.parquet" |
| 88 | + convert_arrow_wkb_to_parquet(path, dst, compression="none") |
| 89 | + written_files += 1 |
| 90 | + successful_checks += check_parquet_file(path, dst) |
| 91 | + |
| 92 | + if successful_checks != written_files: |
| 93 | + raise ValueError("Some checks failed when generating testing files") |
| 94 | + |
| 95 | + |
| 96 | +def generate_geoarrow_data_parquet_files(wkb_files): |
| 97 | + successful_checks = 0 |
| 98 | + written_files = 0 |
| 99 | + for path in wkb_files: |
| 100 | + name = path.name.replace("_wkb.arrows", "") |
| 101 | + if "wkt2" in name: |
| 102 | + print(f"Skipping {name}") |
| 103 | + continue |
| 104 | + if name.startswith("ns-water") or name.startswith("microsoft"): |
| 105 | + compression = "zstd" |
| 106 | + else: |
| 107 | + compression = "none" |
| 108 | + |
| 109 | + dst = path.parent / f"{name}.parquet" |
| 110 | + convert_arrow_wkb_to_parquet(path, dst, compression=compression) |
| 111 | + written_files += 1 |
| 112 | + successful_checks += check_parquet_file(path, dst) |
| 113 | + |
| 114 | + if successful_checks != written_files: |
| 115 | + raise ValueError("Some checks failed when generating testing files") |
| 116 | + |
| 117 | + |
| 118 | +if __name__ == "__main__": |
| 119 | + parquet_testing_path = here.parent / "parquet-testing" / "data" / "geospatial" |
| 120 | + wkb_files = list_wkb_files() |
| 121 | + generate_parquet_testing_files(wkb_files, parquet_testing_path) |
| 122 | + generate_geoarrow_data_parquet_files(wkb_files) |
0 commit comments