Skip to content

Commit 6ccdafa

Browse files
committed
an attempt at a jippitied faster parser
1 parent 9c22400 commit 6ccdafa

File tree

3 files changed

+187
-8
lines changed

3 files changed

+187
-8
lines changed

large-scale-viz/Cargo.lock

Lines changed: 30 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

large-scale-viz/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,19 @@ csv = "1.3.0"
1111
flate2 = "1.0.30"
1212
image = "0.25.2"
1313
indicatif = "0.17.8"
14-
serde = {version = "1.0.204", features = ["derive"]}
14+
jemallocator = "0.5.4"
15+
memchr = "2.7.6"
16+
rayon = "1.11.0"
17+
serde = { version = "1.0.204", features = ["derive", "rc"] }
1518
serde_json = "1.0.120"
1619
tiff = "0.9.1"
1720

1821
[[bin]]
1922
name = "extract_users"
2023

24+
[[bin]]
25+
name = "coords_counter_fast_new"
26+
2127
#[[bin]]
2228
#name = "extract_users_fast"
2329

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
use std::io::{self, Read};
2+
use rayon::prelude::*;
3+
use serde::Deserialize;
4+
use std::cell::RefCell;
5+
use std::borrow::Cow;
6+
use indicatif::{ProgressBar, ProgressStyle, HumanCount};
7+
8+
#[global_allocator]
9+
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
10+
11+
#[derive(Deserialize, Debug)]
12+
struct Row<'a> {
13+
#[serde(borrow)]
14+
metadata: Metadata<'a>,
15+
coords: Vec<[i32; 3]>,
16+
}
17+
18+
#[derive(Deserialize, Debug)]
19+
struct Metadata<'a> {
20+
#[serde(borrow)]
21+
user: Cow<'a, str>,
22+
#[serde(borrow)]
23+
color: Cow<'a, str>,
24+
#[serde(borrow)]
25+
extra: Cow<'a, str>,
26+
}
27+
28+
fn main() -> io::Result<()> {
29+
// 1. Setup Progress Bar with a wide template
30+
let pb = ProgressBar::new_spinner();
31+
pb.set_style(ProgressStyle::default_spinner()
32+
// Template: [Time] Bytes (Speed) | Custom Message
33+
.template("{spinner:.green} [{elapsed_precise}] {bytes} ({bytes_per_sec}) | {msg}")
34+
.unwrap());
35+
36+
let stdin = io::stdin();
37+
let mut handle = stdin.lock();
38+
39+
let cap = 1024 * 1024;
40+
let mut buf = vec![0u8; cap];
41+
let mut left_over = Vec::with_capacity(4096);
42+
43+
let mut total_coords: u64 = 0;
44+
let mut magikarp_count: u64 = 0;
45+
let mut loop_idx: u64 = 0;
46+
47+
loop {
48+
let start_fill = left_over.len();
49+
if buf.len() < start_fill + cap {
50+
buf.resize(start_fill + cap, 0);
51+
}
52+
buf[0..start_fill].copy_from_slice(&left_over);
53+
54+
let n = handle.read(&mut buf[start_fill..])?;
55+
if n == 0 {
56+
if !left_over.is_empty() {
57+
let (c, m) = process_chunk(&left_over);
58+
total_coords += c;
59+
magikarp_count += m;
60+
}
61+
break;
62+
}
63+
64+
// --- UPDATE PROGRESS BAR ---
65+
pb.inc(n as u64);
66+
67+
// Update the text message every 128 chunks (approx 30 times/sec at 3.5GB/s)
68+
// This prevents terminal flickering and locking overhead.
69+
if loop_idx % 512 == 0 {
70+
pb.set_message(format!(
71+
"Coords: {} | Magikarps: {}",
72+
HumanCount(total_coords),
73+
HumanCount(magikarp_count)
74+
));
75+
}
76+
loop_idx += 1;
77+
78+
let valid_data = &buf[0..start_fill + n];
79+
80+
let split_idx = match memchr::memrchr(b'\n', valid_data) {
81+
Some(idx) => idx + 1,
82+
None => {
83+
left_over.extend_from_slice(valid_data);
84+
continue;
85+
}
86+
};
87+
88+
let (chunk, rest) = valid_data.split_at(split_idx);
89+
left_over.clear();
90+
left_over.extend_from_slice(rest);
91+
92+
let (batch_coords, batch_magikarp) = process_chunk(chunk);
93+
94+
total_coords += batch_coords;
95+
magikarp_count += batch_magikarp;
96+
}
97+
98+
pb.finish_with_message(format!(
99+
"Done! Total Coords: {} | Total Magikarps: {}",
100+
HumanCount(total_coords),
101+
HumanCount(magikarp_count)
102+
));
103+
104+
Ok(())
105+
}
106+
107+
fn process_chunk(chunk: &[u8]) -> (u64, u64) {
108+
chunk
109+
.par_split(|&b| b == b'\n')
110+
.filter(|line| !line.is_empty())
111+
.map(|line| parse_row(line))
112+
.reduce(|| (0, 0), |a, b| (a.0 + b.0, a.1 + b.1))
113+
}
114+
115+
thread_local! {
116+
static SCRATCH: RefCell<Vec<u8>> = RefCell::new(Vec::with_capacity(8192));
117+
}
118+
119+
fn parse_row(line: &[u8]) -> (u64, u64) {
120+
let comma_pos = match memchr::memchr(b',', line) {
121+
Some(p) => p,
122+
None => return (0, 0),
123+
};
124+
125+
if line.len() < comma_pos + 3 { return (0, 0); }
126+
127+
let raw_json_slice = &line[comma_pos + 2 .. line.len() - 1];
128+
129+
SCRATCH.with(|cell| {
130+
let mut scratch = cell.borrow_mut();
131+
scratch.clear();
132+
133+
let mut i = 0;
134+
let len = raw_json_slice.len();
135+
while i < len {
136+
let b = raw_json_slice[i];
137+
scratch.push(b);
138+
if b == b'"' { i += 1; }
139+
i += 1;
140+
}
141+
142+
match serde_json::from_slice::<Row>(&scratch) {
143+
Ok(row) => {
144+
let is_magikarp = if row.metadata.extra.contains("Magikarp") { 1 } else { 0 };
145+
(row.coords.len() as u64, is_magikarp)
146+
},
147+
Err(_) => (0, 0),
148+
}
149+
})
150+
}

0 commit comments

Comments
 (0)