Skip to content

Commit ac30777

Browse files
committed
run extraction and mega rendering
1 parent 05b3d9a commit ac30777

File tree

7 files changed

+777
-170
lines changed

7 files changed

+777
-170
lines changed

large-scale-viz/high_perf_character_render/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

large-scale-viz/high_perf_character_render/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,6 @@ log = "0.4"
3535

3636
# Async
3737
futures-intrusive = "0.5"
38+
39+
# Compression
40+
zstd = "0.13"

large-scale-viz/high_perf_character_render/src/animation/interpolator.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,29 @@ impl AnimationInterpolator {
5252
let current_frame = &sequence.frames[state.current_frame_index];
5353
let next_frame = &sequence.frames[state.next_frame_index];
5454

55-
// Convert coordinates to pixel positions
55+
// Convert coordinates to pixel positions FIRST
5656
let current_pos = self.coordinate_mapper.convert_coords(&current_frame.coords);
5757
let next_pos = self.coordinate_mapper.convert_coords(&next_frame.coords);
5858

59-
// Linear interpolation
59+
// Check pixel distance - only interpolate if moving <= 16 pixels (1 tile)
60+
let pixel_dx = (next_pos[0] - current_pos[0]).abs();
61+
let pixel_dy = (next_pos[1] - current_pos[1]).abs();
62+
let pixel_distance = pixel_dx.max(pixel_dy);
63+
64+
// Only interpolate if moving contiguously (1 tile = 16 pixels)
65+
let should_interpolate = pixel_distance <= 16.0;
66+
67+
// If jumping > 16 pixels, don't interpolate - just show at current position
68+
let interpolation_t = if should_interpolate {
69+
state.interpolation_t
70+
} else {
71+
0.0
72+
};
73+
74+
// Linear interpolation (or no interpolation if jumping)
6075
let position = [
61-
current_pos[0] + (next_pos[0] - current_pos[0]) * state.interpolation_t,
62-
current_pos[1] + (next_pos[1] - current_pos[1]) * state.interpolation_t,
76+
current_pos[0] + (next_pos[0] - current_pos[0]) * interpolation_t,
77+
current_pos[1] + (next_pos[1] - current_pos[1]) * interpolation_t,
6378
];
6479

6580
// Determine direction based on movement
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
use anyhow::{Context, Result};
2+
use clap::Parser;
3+
use sprite_video_renderer::data::{ParquetFilter, ParquetReader};
4+
use std::fs::{File, OpenOptions};
5+
use std::io::{BufRead, BufReader, BufWriter, Write};
6+
use std::path::PathBuf;
7+
use chrono::Duration;
8+
9+
#[derive(Parser, Debug)]
10+
#[command(author, version, about = "Extract compact runs from parquet files", long_about = None)]
11+
struct Args {
12+
/// Directory containing parquet files
13+
#[arg(long)]
14+
parquet_dir: PathBuf,
15+
16+
/// Output file for compact runs
17+
#[arg(long, default_value = "compact_runs.bin")]
18+
output: PathBuf,
19+
20+
/// Progress file to track processed files
21+
#[arg(long, default_value = "compact_runs.progress")]
22+
progress_file: PathBuf,
23+
24+
/// Minimum run duration in seconds
25+
#[arg(long, default_value = "60")]
26+
min_duration_secs: i64,
27+
28+
/// Maximum coordinates per run
29+
#[arg(long, default_value = "2000")]
30+
max_coords_per_run: usize,
31+
}
32+
33+
#[repr(C, packed)]
34+
#[derive(Debug, Clone, Copy)]
35+
struct CompactCoord {
36+
x: u16,
37+
y: u16,
38+
map_id: u16,
39+
}
40+
41+
fn main() -> Result<()> {
42+
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
43+
44+
let args = Args::parse();
45+
46+
log::info!("=== Extracting compact runs from parquet files ===");
47+
log::info!("Parquet directory: {:?}", args.parquet_dir);
48+
log::info!("Output file: {:?}", args.output);
49+
log::info!("Max coords per run: {}", args.max_coords_per_run);
50+
51+
// Find all parquet files
52+
let mut parquet_files: Vec<PathBuf> = std::fs::read_dir(&args.parquet_dir)
53+
.context("Failed to read parquet directory")?
54+
.filter_map(|entry| {
55+
let entry = entry.ok()?;
56+
let path = entry.path();
57+
if path.is_file() && path.extension()? == "parquet" {
58+
Some(path)
59+
} else {
60+
None
61+
}
62+
})
63+
.collect();
64+
65+
parquet_files.sort();
66+
log::info!("Found {} parquet files", parquet_files.len());
67+
68+
// Load progress - which files have been processed
69+
let mut processed_files = std::collections::HashSet::new();
70+
if args.progress_file.exists() {
71+
let file = File::open(&args.progress_file)?;
72+
let reader = BufReader::new(file);
73+
for line in reader.lines() {
74+
processed_files.insert(line?);
75+
}
76+
log::info!("Loaded progress: {} files already processed", processed_files.len());
77+
}
78+
79+
// Open output file in append mode
80+
let mut output_file = BufWriter::new(
81+
OpenOptions::new()
82+
.create(true)
83+
.append(true)
84+
.open(&args.output)?
85+
);
86+
87+
// Open progress file in append mode
88+
let mut progress_writer = BufWriter::new(
89+
OpenOptions::new()
90+
.create(true)
91+
.append(true)
92+
.open(&args.progress_file)?
93+
);
94+
95+
let mut total_runs_extracted = 0;
96+
let reset_maps = vec![0i64, 37, 40];
97+
let gap_threshold = Duration::minutes(2);
98+
let min_duration = Duration::seconds(args.min_duration_secs);
99+
100+
// Process each parquet file
101+
for (file_idx, parquet_path) in parquet_files.iter().enumerate() {
102+
let file_name = parquet_path.file_name().unwrap().to_string_lossy().to_string();
103+
104+
// Skip if already processed
105+
if processed_files.contains(&file_name) {
106+
log::debug!("Skipping already processed: {}", file_name);
107+
continue;
108+
}
109+
110+
log::info!("Processing [{}/{}]: {}", file_idx + 1, parquet_files.len(), file_name);
111+
112+
// Read parquet file
113+
let reader = ParquetReader::new(ParquetFilter::default());
114+
let mut frames = match reader.read_file(parquet_path) {
115+
Ok(f) => f,
116+
Err(e) => {
117+
log::error!("Failed to read {}: {}", file_name, e);
118+
continue;
119+
}
120+
};
121+
122+
if frames.is_empty() {
123+
log::warn!("No frames in {}", file_name);
124+
writeln!(progress_writer, "{}", file_name)?;
125+
progress_writer.flush()?;
126+
continue;
127+
}
128+
129+
// Sort frames
130+
frames.sort_by(|a, b| {
131+
(&a.user, &a.env_id, a.timestamp, a.path_index)
132+
.cmp(&(&b.user, &b.env_id, b.timestamp, b.path_index))
133+
});
134+
135+
// Extract runs
136+
let mut file_runs_count = 0;
137+
let mut i = 0;
138+
139+
while i < frames.len() {
140+
let run_user = &frames[i].user;
141+
let run_env_id = &frames[i].env_id;
142+
let run_sprite_id = frames[i].sprite_id;
143+
144+
let user_env_start = i;
145+
146+
// Find all frames for this user+env_id
147+
while i < frames.len() && &frames[i].user == run_user && &frames[i].env_id == run_env_id {
148+
i += 1;
149+
}
150+
151+
let user_env_end = i;
152+
153+
// Split into runs
154+
let mut run_start = user_env_start;
155+
156+
for j in (user_env_start + 1)..user_env_end {
157+
let time_gap = frames[j].timestamp - frames[j-1].timestamp;
158+
let curr_map = frames[j].coords[2];
159+
let prev_map = frames[j-1].coords[2];
160+
161+
let should_split = time_gap >= gap_threshold
162+
|| (reset_maps.contains(&curr_map) && !reset_maps.contains(&prev_map));
163+
164+
if should_split {
165+
let duration = frames[j-1].timestamp - frames[run_start].timestamp;
166+
167+
if duration >= min_duration {
168+
// Write this run
169+
write_compact_run(
170+
&mut output_file,
171+
run_sprite_id,
172+
&frames[run_start..j],
173+
args.max_coords_per_run,
174+
)?;
175+
file_runs_count += 1;
176+
total_runs_extracted += 1;
177+
}
178+
179+
run_start = j;
180+
}
181+
}
182+
183+
// Final run
184+
if run_start < user_env_end {
185+
let duration = frames[user_env_end - 1].timestamp - frames[run_start].timestamp;
186+
187+
if duration >= min_duration {
188+
write_compact_run(
189+
&mut output_file,
190+
run_sprite_id,
191+
&frames[run_start..user_env_end],
192+
args.max_coords_per_run,
193+
)?;
194+
file_runs_count += 1;
195+
total_runs_extracted += 1;
196+
}
197+
}
198+
}
199+
200+
log::info!(" Extracted {} runs from {}", file_runs_count, file_name);
201+
202+
// Mark file as processed
203+
writeln!(progress_writer, "{}", file_name)?;
204+
progress_writer.flush()?;
205+
206+
// Periodic flush of output
207+
output_file.flush()?;
208+
}
209+
210+
output_file.flush()?;
211+
progress_writer.flush()?;
212+
213+
log::info!("=== Extraction complete ===");
214+
log::info!("Total runs extracted: {}", total_runs_extracted);
215+
log::info!("Output written to: {:?}", args.output);
216+
217+
// Compress the output file
218+
log::info!("Compressing output...");
219+
compress_file(&args.output)?;
220+
log::info!("Compressed to: {:?}.zst", args.output);
221+
222+
Ok(())
223+
}
224+
225+
fn write_compact_run<W: Write>(
226+
writer: &mut W,
227+
sprite_id: u8,
228+
frames: &[sprite_video_renderer::data::SpriteFrame],
229+
max_coords: usize,
230+
) -> Result<()> {
231+
let coord_count = frames.len().min(max_coords) as u16;
232+
233+
// Write sprite_id
234+
writer.write_all(&[sprite_id])?;
235+
236+
// Write coord_count
237+
writer.write_all(&coord_count.to_le_bytes())?;
238+
239+
// Write coords
240+
for frame in frames.iter().take(max_coords) {
241+
let compact = CompactCoord {
242+
x: frame.coords[0] as u16,
243+
y: frame.coords[1] as u16,
244+
map_id: frame.coords[2] as u16,
245+
};
246+
247+
let bytes = unsafe {
248+
std::slice::from_raw_parts(
249+
&compact as *const CompactCoord as *const u8,
250+
std::mem::size_of::<CompactCoord>(),
251+
)
252+
};
253+
254+
writer.write_all(bytes)?;
255+
}
256+
257+
Ok(())
258+
}
259+
260+
fn compress_file(path: &PathBuf) -> Result<()> {
261+
let input = File::open(path)?;
262+
let mut reader = BufReader::new(input);
263+
264+
let output_path = path.with_extension("bin.zst");
265+
let output = File::create(&output_path)?;
266+
let mut encoder = zstd::Encoder::new(output, 3)?; // Compression level 3 (fast)
267+
268+
std::io::copy(&mut reader, &mut encoder)?;
269+
encoder.finish()?;
270+
271+
log::info!("Original size: {} MB", path.metadata()?.len() / 1_000_000);
272+
log::info!("Compressed size: {} MB", output_path.metadata()?.len() / 1_000_000);
273+
274+
Ok(())
275+
}

0 commit comments

Comments
 (0)