Skip to content

Commit fcbed65

Browse files
committed
env_id was missing from original schema so we have to redo this :(
1 parent 1cd0621 commit fcbed65

File tree

2 files changed

+66
-52
lines changed

2 files changed

+66
-52
lines changed

large-scale-viz/convert_messages.ipynb

Lines changed: 8 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -162,39 +162,15 @@
162162
},
163163
{
164164
"cell_type": "code",
165-
"execution_count": 3,
165+
"execution_count": null,
166166
"id": "4a2922b3-dc6b-41a0-b300-ca72c2208cba",
167167
"metadata": {},
168168
"outputs": [
169169
{
170170
"name": "stderr",
171171
"output_type": "stream",
172172
"text": [
173-
"1854it [42:48:58, 83.14s/it] \n"
174-
]
175-
},
176-
{
177-
"ename": "error",
178-
"evalue": "Error -3 while decompressing data: invalid code lengths set",
179-
"output_type": "error",
180-
"traceback": [
181-
"\u001b[31m---------------------------------------------------------------------------\u001b[39m",
182-
"\u001b[31merror\u001b[39m Traceback (most recent call last)",
183-
"\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[3]\u001b[39m\u001b[32m, line 54\u001b[39m\n\u001b[32m 44\u001b[39m new_df = pd.DataFrame({\n\u001b[32m 45\u001b[39m \u001b[38;5;66;03m# .reset_index(drop=True) is the key fix here!\u001b[39;00m\n\u001b[32m 46\u001b[39m \u001b[33m'\u001b[39m\u001b[33mtimestamp\u001b[39m\u001b[33m'\u001b[39m: pd.to_datetime(chunk[\u001b[33m'\u001b[39m\u001b[33mtimestamp\u001b[39m\u001b[33m'\u001b[39m], \u001b[38;5;28mformat\u001b[39m=\u001b[33m'\u001b[39m\u001b[33mISO8601\u001b[39m\u001b[33m'\u001b[39m).reset_index(drop=\u001b[38;5;28;01mTrue\u001b[39;00m),\n\u001b[32m (...)\u001b[39m\u001b[32m 50\u001b[39m \u001b[33m'\u001b[39m\u001b[33mcoords\u001b[39m\u001b[33m'\u001b[39m: coords \n\u001b[32m 51\u001b[39m })\n\u001b[32m 52\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m new_df\n\u001b[32m---> \u001b[39m\u001b[32m54\u001b[39m \u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mi\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mchunk\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mtqdm\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43menumerate\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mreader\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 56\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mtry\u001b[39;49;00m\u001b[43m:\u001b[49m\n\u001b[32m 57\u001b[39m \u001b[43m \u001b[49m\u001b[43mfinal_chunk\u001b[49m\u001b[43m \u001b[49m\u001b[43m=\u001b[49m\u001b[43m \u001b[49m\u001b[43mprocess_chunk_optimized\u001b[49m\u001b[43m(\u001b[49m\u001b[43mchunk\u001b[49m\u001b[43m)\u001b[49m\n",
184-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/tqdm/std.py:1181\u001b[39m, in \u001b[36mtqdm.__iter__\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1178\u001b[39m time = \u001b[38;5;28mself\u001b[39m._time\n\u001b[32m 1180\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1181\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mobj\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43miterable\u001b[49m\u001b[43m:\u001b[49m\n\u001b[32m 1182\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;28;43;01myield\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mobj\u001b[49m\n\u001b[32m 1183\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;66;43;03m# Update and possibly print the progressbar.\u001b[39;49;00m\n\u001b[32m 1184\u001b[39m \u001b[43m \u001b[49m\u001b[38;5;66;43;03m# Note: does not call self.update(1) for speed optimisation.\u001b[39;49;00m\n",
185-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1843\u001b[39m, in \u001b[36mTextFileReader.__next__\u001b[39m\u001b[34m(self)\u001b[39m\n\u001b[32m 1841\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34m__next__\u001b[39m(\u001b[38;5;28mself\u001b[39m) -> DataFrame:\n\u001b[32m 1842\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m-> \u001b[39m\u001b[32m1843\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mget_chunk\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1844\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mStopIteration\u001b[39;00m:\n\u001b[32m 1845\u001b[39m \u001b[38;5;28mself\u001b[39m.close()\n",
186-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1985\u001b[39m, in \u001b[36mTextFileReader.get_chunk\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 1983\u001b[39m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mStopIteration\u001b[39;00m\n\u001b[32m 1984\u001b[39m size = \u001b[38;5;28mmin\u001b[39m(size, \u001b[38;5;28mself\u001b[39m.nrows - \u001b[38;5;28mself\u001b[39m._currow)\n\u001b[32m-> \u001b[39m\u001b[32m1985\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43mnrows\u001b[49m\u001b[43m=\u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n",
187-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/readers.py:1923\u001b[39m, in \u001b[36mTextFileReader.read\u001b[39m\u001b[34m(self, nrows)\u001b[39m\n\u001b[32m 1916\u001b[39m nrows = validate_integer(\u001b[33m\"\u001b[39m\u001b[33mnrows\u001b[39m\u001b[33m\"\u001b[39m, nrows)\n\u001b[32m 1917\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m 1918\u001b[39m \u001b[38;5;66;03m# error: \"ParserBase\" has no attribute \"read\"\u001b[39;00m\n\u001b[32m 1919\u001b[39m (\n\u001b[32m 1920\u001b[39m index,\n\u001b[32m 1921\u001b[39m columns,\n\u001b[32m 1922\u001b[39m col_dict,\n\u001b[32m-> \u001b[39m\u001b[32m1923\u001b[39m ) = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_engine\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;66;43;03m# type: ignore[attr-defined]\u001b[39;49;00m\n\u001b[32m 1924\u001b[39m \u001b[43m \u001b[49m\u001b[43mnrows\u001b[49m\n\u001b[32m 1925\u001b[39m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 1926\u001b[39m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mException\u001b[39;00m:\n\u001b[32m 1927\u001b[39m \u001b[38;5;28mself\u001b[39m.close()\n",
188-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/site-packages/pandas/io/parsers/c_parser_wrapper.py:234\u001b[39m, in \u001b[36mCParserWrapper.read\u001b[39m\u001b[34m(self, nrows)\u001b[39m\n\u001b[32m 232\u001b[39m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[32m 233\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m.low_memory:\n\u001b[32m--> \u001b[39m\u001b[32m234\u001b[39m chunks = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_reader\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread_low_memory\u001b[49m\u001b[43m(\u001b[49m\u001b[43mnrows\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 235\u001b[39m \u001b[38;5;66;03m# destructive to chunks\u001b[39;00m\n\u001b[32m 236\u001b[39m data = _concatenate_chunks(chunks)\n",
189-
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:850\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader.read_low_memory\u001b[39m\u001b[34m()\u001b[39m\n",
190-
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:905\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._read_rows\u001b[39m\u001b[34m()\u001b[39m\n",
191-
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:874\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._tokenize_rows\u001b[39m\u001b[34m()\u001b[39m\n",
192-
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:891\u001b[39m, in \u001b[36mpandas._libs.parsers.TextReader._check_tokenize_status\u001b[39m\u001b[34m()\u001b[39m\n",
193-
"\u001b[36mFile \u001b[39m\u001b[32mpandas/_libs/parsers.pyx:2053\u001b[39m, in \u001b[36mpandas._libs.parsers.raise_parser_error\u001b[39m\u001b[34m()\u001b[39m\n",
194-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/gzip.py:360\u001b[39m, in \u001b[36mGzipFile.read1\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 358\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m size < \u001b[32m0\u001b[39m:\n\u001b[32m 359\u001b[39m size = io.DEFAULT_BUFFER_SIZE\n\u001b[32m--> \u001b[39m\u001b[32m360\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_buffer\u001b[49m\u001b[43m.\u001b[49m\u001b[43mread1\u001b[49m\u001b[43m(\u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n",
195-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/compression/_common/_streams.py:68\u001b[39m, in \u001b[36mDecompressReader.readinto\u001b[39m\u001b[34m(self, b)\u001b[39m\n\u001b[32m 66\u001b[39m \u001b[38;5;28;01mdef\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34mreadinto\u001b[39m(\u001b[38;5;28mself\u001b[39m, b):\n\u001b[32m 67\u001b[39m \u001b[38;5;28;01mwith\u001b[39;00m \u001b[38;5;28mmemoryview\u001b[39m(b) \u001b[38;5;28;01mas\u001b[39;00m view, view.cast(\u001b[33m\"\u001b[39m\u001b[33mB\u001b[39m\u001b[33m\"\u001b[39m) \u001b[38;5;28;01mas\u001b[39;00m byte_view:\n\u001b[32m---> \u001b[39m\u001b[32m68\u001b[39m data = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43mread\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mlen\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mbyte_view\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 69\u001b[39m byte_view[:\u001b[38;5;28mlen\u001b[39m(data)] = data\n\u001b[32m 70\u001b[39m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28mlen\u001b[39m(data)\n",
196-
"\u001b[36mFile \u001b[39m\u001b[32m~/miniconda3/envs/science/lib/python3.14/gzip.py:580\u001b[39m, in \u001b[36m_GzipReader.read\u001b[39m\u001b[34m(self, size)\u001b[39m\n\u001b[32m 578\u001b[39m uncompress = \u001b[38;5;28mself\u001b[39m._decompressor.decompress(buf, size)\n\u001b[32m 579\u001b[39m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[32m--> \u001b[39m\u001b[32m580\u001b[39m uncompress = \u001b[38;5;28;43mself\u001b[39;49m\u001b[43m.\u001b[49m\u001b[43m_decompressor\u001b[49m\u001b[43m.\u001b[49m\u001b[43mdecompress\u001b[49m\u001b[43m(\u001b[49m\u001b[33;43mb\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[33;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msize\u001b[49m\u001b[43m)\u001b[49m\n\u001b[32m 582\u001b[39m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mself\u001b[39m._decompressor.unused_data != \u001b[33mb\u001b[39m\u001b[33m\"\u001b[39m\u001b[33m\"\u001b[39m:\n\u001b[32m 583\u001b[39m \u001b[38;5;66;03m# Prepend the already read bytes to the fileobj so they can\u001b[39;00m\n\u001b[32m 584\u001b[39m \u001b[38;5;66;03m# be seen by _read_eof() and _read_gzip_header()\u001b[39;00m\n\u001b[32m 585\u001b[39m \u001b[38;5;28mself\u001b[39m._fp.prepend(\u001b[38;5;28mself\u001b[39m._decompressor.unused_data)\n",
197-
"\u001b[31merror\u001b[39m: Error -3 while decompressing data: invalid code lengths set"
173+
"0it [00:00, ?it/s]"
198174
]
199175
}
200176
],
@@ -207,8 +183,8 @@
207183
"import pyarrow.parquet as pq\n",
208184
"from tqdm import tqdm\n",
209185
"\n",
210-
"file_path = \"/home/pwhiddy/messages_backup_24_2025.csv.gz\"\n",
211-
"output_dir = \"/home/pwhiddy/messages_parquet_zstd\"\n",
186+
"file_path = \"/media/pwhiddy/DATA/linux_dat/poke_map_viz_logs/messages.csv.gz\" #\"/home/pwhiddy/messages_backup_24_2025.csv.gz\"\n",
187+
"output_dir = \"/home/pwhiddy/messages_1_15_2026_parquet_zstd_v2_env_id/data\"\n",
212188
"os.makedirs(output_dir, exist_ok=True)\n",
213189
"\n",
214190
"# 1. Initialize the CSV reader (iterator)\n",
@@ -223,12 +199,14 @@
223199
" \n",
224200
" # Extract and sanitize\n",
225201
" user = str(m.get('user', '')).strip()\n",
202+
" env_id = str(m.get('env_id', '')).strip()\n",
226203
" color = str(m.get('color', '')).strip()\n",
227204
" extra = str(m.get('extra', '')).strip()\n",
228205
" \n",
229206
" # Convert empty strings to None for Arrow/Parquet safety\n",
230207
" return (\n",
231208
" user if user != '' else None,\n",
209+
" env_id if env_id != '' else None,\n",
232210
" color if color != '' else None,\n",
233211
" extra if extra != '' else None,\n",
234212
" d.get('coords', [])\n",
@@ -239,13 +217,14 @@
239217
" return (None, None, None, [])\n",
240218
"\n",
241219
" parsed = [parse(m) for m in chunk['message']]\n",
242-
" users, colors, extras, coords = zip(*parsed)\n",
220+
" users, env_ids, colors, extras, coords = zip(*parsed)\n",
243221
"\n",
244222
" # BUILD THE DATAFRAME\n",
245223
" new_df = pd.DataFrame({\n",
246224
" # .reset_index(drop=True) is the key fix here!\n",
247225
" 'timestamp': pd.to_datetime(chunk['timestamp'], format='ISO8601').reset_index(drop=True),\n",
248226
" 'user': pd.Series(users, dtype='category'),\n",
227+
" 'env_id': pd.Series(env_ids, dtype='category'),\n",
249228
" 'color': pd.Series(colors, dtype='category'),\n",
250229
" 'extra': pd.Series(extras),\n",
251230
" 'coords': coords \n",

large-scale-viz/explore_parquet_data.ipynb

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"cells": [
33
{
44
"cell_type": "code",
5-
"execution_count": null,
5+
"execution_count": 1,
66
"id": "60b3ebc2-9c36-4af5-a38d-e7866bbb27d0",
77
"metadata": {},
88
"outputs": [],
@@ -12,7 +12,7 @@
1212
},
1313
{
1414
"cell_type": "code",
15-
"execution_count": null,
15+
"execution_count": 2,
1616
"id": "324d9f76-841c-4011-a1a9-17f9d0d0e6e3",
1717
"metadata": {},
1818
"outputs": [],
@@ -36,21 +36,52 @@
3636
},
3737
{
3838
"cell_type": "code",
39-
"execution_count": null,
39+
"execution_count": 3,
4040
"id": "f98d7d53-571a-420b-9c45-6fe4a5e63e1a",
4141
"metadata": {},
42-
"outputs": [],
42+
"outputs": [
43+
{
44+
"data": {
45+
"text/plain": [
46+
"Schema([('timestamp', Datetime(time_unit='ns', time_zone=None)),\n",
47+
" ('user', Categorical),\n",
48+
" ('color', Categorical),\n",
49+
" ('extra', String),\n",
50+
" ('coords', List(List(Int64)))])"
51+
]
52+
},
53+
"execution_count": 3,
54+
"metadata": {},
55+
"output_type": "execute_result"
56+
}
57+
],
4358
"source": [
4459
"import polars as pl\n",
4560
"pl.scan_parquet(dataset_path).collect_schema()"
4661
]
4762
},
4863
{
4964
"cell_type": "code",
50-
"execution_count": null,
65+
"execution_count": 4,
5166
"id": "b7b25df9-18dc-49f2-8621-83e897db7e5f",
5267
"metadata": {},
53-
"outputs": [],
68+
"outputs": [
69+
{
70+
"data": {
71+
"text/html": [
72+
"<i>naive plan: (run <b>LazyFrame.explain(optimized=True)</b> to see the optimized plan)</i>\n",
73+
" <p></p>\n",
74+
" <div>SELECT [col(\"user\")]<p></p> Parquet SCAN [/home/pwhiddy/messages_24_2025_parquet_zstd/dataset/part_000000.parquet, ... 1851 other sources]<p></p> PROJECT */5 COLUMNS<p></p> ESTIMATED ROWS: 740800000</div>"
75+
],
76+
"text/plain": [
77+
"<LazyFrame at 0x71BFF86BDA90>"
78+
]
79+
},
80+
"execution_count": 4,
81+
"metadata": {},
82+
"output_type": "execute_result"
83+
}
84+
],
5485
"source": [
5586
"pl.scan_parquet(dataset_path).select(pl.col(\"user\"))"
5687
]
@@ -60,7 +91,22 @@
6091
"execution_count": null,
6192
"id": "8a39a473-d053-4fd2-b6d0-ddd43dc7763f",
6293
"metadata": {},
63-
"outputs": [],
94+
"outputs": [
95+
{
96+
"data": {
97+
"application/vnd.jupyter.widget-view+json": {
98+
"model_id": "b305d5f2f5044c83b7f527f29ce7112f",
99+
"version_major": 2,
100+
"version_minor": 0
101+
},
102+
"text/plain": [
103+
" 0%| | 0/211 [00:00<?, ?it/s]"
104+
]
105+
},
106+
"metadata": {},
107+
"output_type": "display_data"
108+
}
109+
],
64110
"source": [
65111
"import polars as pl\n",
66112
"from tqdm.notebook import tqdm\n",
@@ -88,22 +134,7 @@
88134
"execution_count": null,
89135
"id": "6716d283-ac10-4707-8281-53c746515568",
90136
"metadata": {},
91-
"outputs": [
92-
{
93-
"data": {
94-
"application/vnd.jupyter.widget-view+json": {
95-
"model_id": "e596b13cc68d4f9cb0499de0d6a4c22f",
96-
"version_major": 2,
97-
"version_minor": 0
98-
},
99-
"text/plain": [
100-
" 0%| | 0/211 [00:00<?, ?it/s]"
101-
]
102-
},
103-
"metadata": {},
104-
"output_type": "display_data"
105-
}
106-
],
137+
"outputs": [],
107138
"source": [
108139
"import polars as pl\n",
109140
"from tqdm.notebook import tqdm\n",
@@ -115,6 +146,10 @@
115146
"total_rows = lf.select(pl.len()).collect()['len'][0]\n",
116147
"\n",
117148
"def pretty_print_counts(user_counts):\n",
149+
" total_users = len(user_counts)\n",
150+
" total_coords = sum(user_counts.values())\n",
151+
" avg = total_coords / total_users\n",
152+
" print(f\"total users: {total_users} total coords: {total_coords} avg coords per user: {avg}\")\n",
118153
" print({user: human_format(count) for user,count in user_counts.items()})\n",
119154
"\n",
120155
"pbar = tqdm(range(total_rows//batch_size))\n",

0 commit comments

Comments
 (0)