Skip to content

Commit 834b7fb

Browse files
committed
starting to process the reformatted data a bit\!
1 parent 2d5c18f commit 834b7fb

File tree

3 files changed

+406
-5
lines changed

3 files changed

+406
-5
lines changed
Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,39 @@
162162
},
163163
{
164164
"cell_type": "code",
165-
"execution_count": null,
165+
"execution_count": 3,
166166
"id": "4a2922b3-dc6b-41a0-b300-ca72c2208cba",
167167
"metadata": {},
168168
"outputs": [
169169
{
170170
"name": "stderr",
171171
"output_type": "stream",
172172
"text": [
173-
"2it [03:18, 99.47s/it]"
173+
"1854it [42:48:58, 83.14s/it] \n"
174+
]
175+
},
176+
{
177+
"ename": "error",
178+
"evalue": "Error -3 while decompressing data: invalid code lengths set",
179+
"output_type": "error",
180+
"traceback": [
181+
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
182+
"\u001b[31merror\u001b[39m Traceback (most recent call last)",
183+
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 54\u001b[39m\n\u001b[32m 44\u001b[39m new_df = pd.DataFrame({\n\u001b[32m 45\u001b[39m \u001b[38;5;66;03m# .reset_index(drop=True) is the key fix here!\u001b[39;00m\n\u001b[32m 46\u001b[39m \u001b[33m'\u001b[39m\u001b[33mtimestamp\u001b[39m\u001b[33m'\u001b[39m: pd.to_datetime(chunk[\u001b[33m'\u001b[39m\u001b[33mtimestamp\u001b[39m\u001b[33m'\u001b[39m], \u001b[38;5;28mformat\u001b[39m=\u001b[33m'\u001b[39m\u001b[33mISO8601\u001b[39m\u001b[33m'\u001b[39m).reset_index(drop=\u001b[38;5;28;01mTrue\u001b[39;00m),\n\u001b[32m (...)\u001b[39m\u001b[32m 50\u001b[39m \u001b[33m'\u001b[39m\u001b[33mcoords\u001b[39m\u001b[33m'\u001b[39m: coords \n\u001b[32m 51\u001b[39m })\n\u001b[32m 52\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m new_df\n\u001b[32m---> \u001b[39m\u001b[32m54\u001b[39m \u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mi\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mchunk\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mtqdm\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43menumerate\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mreader\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 56\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mtry\u001b[39;49;00m\u001b[43m:\u001b[49m\n\u001b[32m 57\u001b[39m \u001b[43m \u001b[49m\u001b[43mfinal_chunk\u001b[49m\u001b[43m \u001b[49m\u001b[43m=\u001b[49m\u001b[43m \u001b[49m\u001b[43mprocess_chunk_optimized\u001b[49m\u001b[43m(\u001b[49m\u001b[43mchunk\u001b[49m\u001b[43m)\u001b[49m\n",
184+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/tqdm/std.py:1181\u001b[39m, in \u001b[36mtqdm.__iter__\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1178\u001b[39m time = \u001b[38;5;28mself\u001b[39m._time\n\u001b[32m 1180\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1181\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mobj\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43miterable\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 1182\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01myield\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mobj\u001b[49m\n\u001b[32m 1183\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;66;43;03m# Update and possibly print the progressbar.\u001b[39;49;00m\n\u001b[32m 1184\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;66;43;03m# Note: does not call self.update(1) for speed optimisation.\u001b[39;49;00m\n",
185+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1843\u001b[39m, in \u001b[36mTextFileReader.__next__\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1841\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__next__\u001b[39m(\u001b[38;5;28mself\u001b[39m) -> DataFrame:\n\u001b[32m 1842\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1843\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mget_chunk\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1844\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mStopIteration\u001b[39;00m:\n\u001b[32m 1845\u001b[39m \u001b[38;5;28mself\u001b[39m.close()\n",
186+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1985\u001b[39m, in \u001b[36mTextFileReader.get_chunk\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 1983\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mStopIteration\u001b[39;00m\n\u001b[32m 1984\u001b[39m size = \u001b[38;5;28mmin\u001b[39m(size, \u001b[38;5;28mself\u001b[39m.nrows - \u001b[38;5;28mself\u001b[39m._currow)\n\u001b[32m-> \u001b[39m\u001b[32m1985\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43mnrows\u001b[49m\u001b[43m=\u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n",
187+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1923\u001b[39m, in \u001b[36mTextFileReader.read\u001b[39m\u001b[34m(self, nrows)\u001b[39m\n\u001b[32m 1916\u001b[39m nrows = validate_integer(\u001b[33m\"\u001b[39m\u001b[33mnrows\u001b[39m\u001b[33m\"\u001b[39m, nrows)\n\u001b[32m 1917\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m 1918\u001b[39m \u001b[38;5;66;03m# error: \"ParserBase\" has no attribute \"read\"\u001b[39;00m\n\u001b[32m 1919\u001b[39m (\n\u001b[32m 1920\u001b[39m index,\n\u001b[32m 1921\u001b[39m columns,\n\u001b[32m 1922\u001b[39m col_dict,\n\u001b[32m-> \u001b[39m\u001b[32m1923\u001b[39m ) = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_engine\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;66;43;03m# type: ignore[attr-defined]\u001b[39;49;00m\n\u001b[32m 1924\u001b[39m \u001b[43m \u001b[49m\u001b[43mnrows\u001b[49m\n\u001b[32m 1925\u001b[39m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1926\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[32m 1927\u001b[39m \u001b[38;5;28mself\u001b[39m.close()\n",
188+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/c_parser_wrapper.py:234\u001b[39m, in \u001b[36mCParserWrapper.read\u001b[39m\u001b[34m(self, nrows)\u001b[39m\n\u001b[32m 232\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m 233\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m.low_memory:\n\u001b[32m--> \u001b[39m\u001b[32m234\u001b[39m chunks = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_reader\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread_low_memory\u001b[49m\u001b[43m(\u001b[49m\u001b[43mnrows\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 235\u001b[39m \u001b[38;5;66;03m# destructive to chunks\u001b[39;00m\n\u001b[32m 236\u001b[39m data = _concatenate_chunks(chunks)\n",
189+
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:850\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader.read_low_memory\u001b[39m\u001b[34m()\u001b[39m\n",
190+
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:905\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._read_rows\u001b[39m\u001b[34m()\u001b[39m\n",
191+
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:874\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._tokenize_rows\u001b[39m\u001b[34m()\u001b[39m\n",
192+
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:891\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._check_tokenize_status\u001b[39m\u001b[34m()\u001b[39m\n",
193+
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:2053\u001b[39m, in \u001b[36mpandas._libs.parsers.raise_parser_error\u001b[39m\u001b[34m()\u001b[39m\n",
194+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/gzip.py:360\u001b[39m, in \u001b[36mGzipFile.read1\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 358\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m size < \u001b[32m0\u001b[39m:\n\u001b[32m 359\u001b[39m size = io.DEFAULT_BUFFER_SIZE\n\u001b[32m--> \u001b[39m\u001b[32m360\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_buffer\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread1\u001b[49m\u001b[43m(\u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n",
195+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/compression/_common/_streams.py:68\u001b[39m, in \u001b[36mDecompressReader.readinto\u001b[39m\u001b[34m(self, b)\u001b[39m\n\u001b[32m 66\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mreadinto\u001b[39m(\u001b[38;5;28mself\u001b[39m, b):\n\u001b[32m 67\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m \u001b[38;5;28mmemoryview\u001b[39m(b) \u001b[38;5;28;01mas\u001b[39;00m view, view.cast(\u001b[33m\"\u001b[39m\u001b[33mB\u001b[39m\u001b[33m\"\u001b[39m) \u001b[38;5;28;01mas\u001b[39;00m byte_view:\n\u001b[32m---> \u001b[39m\u001b[32m68\u001b[39m data = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mlen\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mbyte_view\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 69\u001b[39m byte_view[:\u001b[38;5;28mlen\u001b[39m(data)] = data\n\u001b[32m 70\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(data)\n",
196+
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/gzip.py:580\u001b[39m, in \u001b[36m_GzipReader.read\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 578\u001b[39m uncompress = \u001b[38;5;28mself\u001b[39m._decompressor.decompress(buf, size)\n\u001b[32m 579\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m580\u001b[39m uncompress = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_decompressor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mdecompress\u001b[49m\u001b[43m(\u001b[49m\u001b[33;43mb\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 582\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._decompressor.unused_data != \u001b[33mb\u001b[39m\u001b[33m\"\u001b[39m\u001b[33m\"\u001b[39m:\n\u001b[32m 583\u001b[39m \u001b[38;5;66;03m# Prepend the already read bytes to the fileobj so they can\u001b[39;00m\n\u001b[32m 584\u001b[39m \u001b[38;5;66;03m# be seen by _read_eof() and _read_gzip_header()\u001b[39;00m\n\u001b[32m 585\u001b[39m \u001b[38;5;28mself\u001b[39m._fp.prepend(\u001b[38;5;28mself\u001b[39m._decompressor.unused_data)\n",
197+
"\u001b[31merror\u001b[39m: Error -3 while decompressing data: invalid code lengths set"
174198
]
175199
}
176200
],
@@ -198,9 +222,9 @@
198222
" m = d.get('metadata', {})\n",
199223
" \n",
200224
" # Extract and sanitize\n",
201-
" user = m.get('user', '').strip()\n",
202-
" color = m.get('color', '').strip()\n",
203-
" extra = m.get('extra', '').strip()\n",
225+
" user = str(m.get('user', '')).strip()\n",
226+
" color = str(m.get('color', '')).strip()\n",
227+
" extra = str(m.get('extra', '')).strip()\n",
204228
" \n",
205229
" # Convert empty strings to None for Arrow/Parquet safety\n",
206230
" return (\n",
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "60b3ebc2-9c36-4af5-a38d-e7866bbb27d0",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"dataset_path = \"/home/pwhiddy/messages_24_2025_parquet_zstd/dataset\""
11+
]
12+
},
13+
{
14+
"cell_type": "code",
15+
"execution_count": null,
16+
"id": "324d9f76-841c-4011-a1a9-17f9d0d0e6e3",
17+
"metadata": {},
18+
"outputs": [],
19+
"source": [
20+
"def human_format(num, decimals=3):\n",
21+
" \"\"\"\n",
22+
" Format a number into a human-readable string with K, M, B, T suffixes.\n",
23+
" \n",
24+
" Examples:\n",
25+
" 1432 -> 1.432K\n",
26+
" 4232000 -> 4.232M\n",
27+
" 7235000000 -> 7.235B\n",
28+
" \"\"\"\n",
29+
" magnitude = 0\n",
30+
" suffixes = ['', 'K', 'M', 'B', 'T', 'Q'] # Extend if needed\n",
31+
" while abs(num) >= 1000 and magnitude < len(suffixes) - 1:\n",
32+
" magnitude += 1\n",
33+
" num /= 1000.0\n",
34+
" return f\"{num:.{decimals}f}{suffixes[magnitude]}\""
35+
]
36+
},
37+
{
38+
"cell_type": "code",
39+
"execution_count": null,
40+
"id": "f98d7d53-571a-420b-9c45-6fe4a5e63e1a",
41+
"metadata": {},
42+
"outputs": [],
43+
"source": [
44+
"import polars as pl\n",
45+
"pl.scan_parquet(dataset_path).collect_schema()"
46+
]
47+
},
48+
{
49+
"cell_type": "code",
50+
"execution_count": null,
51+
"id": "b7b25df9-18dc-49f2-8621-83e897db7e5f",
52+
"metadata": {},
53+
"outputs": [],
54+
"source": [
55+
"pl.scan_parquet(dataset_path).select(pl.col(\"user\"))"
56+
]
57+
},
58+
{
59+
"cell_type": "code",
60+
"execution_count": null,
61+
"id": "8a39a473-d053-4fd2-b6d0-ddd43dc7763f",
62+
"metadata": {},
63+
"outputs": [],
64+
"source": [
65+
"import polars as pl\n",
66+
"from tqdm.notebook import tqdm\n",
67+
"\n",
68+
"total = 0\n",
69+
"batch_size = 3500000\n",
70+
"\n",
71+
"lf = pl.scan_parquet(dataset_path)\n",
72+
"total_rows = lf.select(pl.len()).collect()['len'][0]\n",
73+
"\n",
74+
"pbar = tqdm(range(total_rows//batch_size))\n",
75+
"for row_idx in pbar:\n",
76+
" offset = row_idx * batch_size\n",
77+
" batch_count = lf.slice(offset, batch_size).select(\n",
78+
" pl.col(\"coords\").list.len().sum()\n",
79+
" ).collect()[0, 0]\n",
80+
" pbar.set_postfix_str(f\"coord count: {human_format(total)}\")\n",
81+
" total += batch_count\n",
82+
"\n",
83+
"print(total)"
84+
]
85+
},
86+
{
87+
"cell_type": "code",
88+
"execution_count": null,
89+
"id": "6716d283-ac10-4707-8281-53c746515568",
90+
"metadata": {},
91+
"outputs": [
92+
{
93+
"data": {
94+
"application/vnd.jupyter.widget-view+json": {
95+
"model_id": "e596b13cc68d4f9cb0499de0d6a4c22f",
96+
"version_major": 2,
97+
"version_minor": 0
98+
},
99+
"text/plain": [
100+
" 0%| | 0/211 [00:00<?, ?it/s]"
101+
]
102+
},
103+
"metadata": {},
104+
"output_type": "display_data"
105+
}
106+
],
107+
"source": [
108+
"import polars as pl\n",
109+
"from tqdm.notebook import tqdm\n",
110+
"\n",
111+
"user_counts = {}\n",
112+
"batch_size = 3500000\n",
113+
"\n",
114+
"lf = pl.scan_parquet(dataset_path)\n",
115+
"total_rows = lf.select(pl.len()).collect()['len'][0]\n",
116+
"\n",
117+
"def pretty_print_counts(user_counts):\n",
118+
" print({user: human_format(count) for user,count in user_counts.items()})\n",
119+
"\n",
120+
"pbar = tqdm(range(total_rows//batch_size))\n",
121+
"for row_idx in pbar:\n",
122+
" offset = row_idx * batch_size\n",
123+
" batch_counts = lf.slice(offset, batch_size).group_by('user').agg(\n",
124+
" pl.col(\"coords\").list.len().sum()\n",
125+
" ).collect()\n",
126+
" py_counts = dict(zip(batch_counts[:, 0], batch_counts[:, 1]))\n",
127+
" for user, count in py_counts.items():\n",
128+
" if user not in user_counts.keys():\n",
129+
" user_counts[user] = count\n",
130+
" else:\n",
131+
" user_counts[user] += count\n",
132+
" if row_idx % 4 == 0:\n",
133+
" pretty_print_counts(user_counts)\n",
134+
" #total += batch_count\n",
135+
"print(\"done!\")\n",
136+
"pretty_print_counts(user_counts)"
137+
]
138+
},
139+
{
140+
"cell_type": "code",
141+
"execution_count": null,
142+
"id": "b3f9575a-c108-4678-93f5-dbebb3ebb362",
143+
"metadata": {},
144+
"outputs": [],
145+
"source": []
146+
},
147+
{
148+
"cell_type": "code",
149+
"execution_count": null,
150+
"id": "0905dc31-3976-4262-9723-f1249d661094",
151+
"metadata": {},
152+
"outputs": [],
153+
"source": []
154+
}
155+
],
156+
"metadata": {
157+
"kernelspec": {
158+
"display_name": "Python 3.14",
159+
"language": "python",
160+
"name": "python314"
161+
},
162+
"language_info": {
163+
"codemirror_mode": {
164+
"name": "ipython",
165+
"version": 3
166+
},
167+
"file_extension": ".py",
168+
"mimetype": "text/x-python",
169+
"name": "python",
170+
"nbconvert_exporter": "python",
171+
"pygments_lexer": "ipython3",
172+
"version": "3.14.2"
173+
}
174+
},
175+
"nbformat": 4,
176+
"nbformat_minor": 5
177+
}

large-scale-viz/map_explore.ipynb

Lines changed: 200 additions & 0 deletions
Large diffs are not rendered by default.

0 commit comments

Comments
 (0)