Skip to content

feat(bin): flush output to stdout before db conn shutdown #262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 7, 2025
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions sqllogictest-bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod engines;

use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::io::{stdout, Read, Seek, SeekFrom, Write};
use std::io::{self, stdout, Read, Seek, SeekFrom, Stdout, Write};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -430,17 +430,20 @@ async fn run_parallel(
let labels = labels.to_vec();
let cancel = cancel.clone();
async move {
let (buf, res) = AbortOnDropHandle::new(tokio::spawn(async move {
let mut buf = vec![];
let res = connect_and_run_test_file(
&mut buf, filename, &engine, config, &labels, cancel,
let res = AbortOnDropHandle::new(tokio::spawn(async move {
connect_and_run_test_file(
Vec::new(),
filename,
&engine,
config,
&labels,
cancel,
)
.await;
(buf, res)
.await
}))
.await
.unwrap();
(db_name, file, res, buf)
(db_name, file, res)
}
})
.buffer_unordered(jobs);
Expand All @@ -453,10 +456,7 @@ async fn run_parallel(
let mut connection_refused = false;
let start = Instant::now();

while let Some((db_name, file, res, buf)) = stream.next().await {
stdout().write_all(&buf)?;
stdout().flush()?;

while let Some((db_name, file, res)) = stream.next().await {
let test_case_name = file.to_test_case_name();
let case = res.to_junit(&test_case_name, junit.as_deref().unwrap_or_default());
test_suite.add_test_case(case);
Expand Down Expand Up @@ -541,7 +541,7 @@ async fn run_serial(
let test_case_name = file.to_string_lossy().to_test_case_name();

let res = connect_and_run_test_file(
&mut stdout(),
stdout(),
file,
engine,
config.clone(),
Expand Down Expand Up @@ -592,7 +592,7 @@ async fn update_test_files(
let mut runner = Runner::new(|| engines::connect(engine, &config));
runner.set_var(well_known::DATABASE.to_owned(), config.db.clone());

if let Err(e) = update_test_file(&mut std::io::stdout(), &mut runner, &file, format).await {
if let Err(e) = update_test_file(&mut io::stdout(), &mut runner, &file, format).await {
{
println!("{}\n\n{:?}", style("[FAILED]").red().bold(), e);
println!();
Expand All @@ -605,7 +605,7 @@ async fn update_test_files(
Ok(())
}

async fn flush(out: &mut impl std::io::Write) -> std::io::Result<()> {
async fn flush(out: &mut impl io::Write) -> io::Result<()> {
tokio::task::block_in_place(|| out.flush())
}

Expand Down Expand Up @@ -666,8 +666,29 @@ impl RunResult {
}
}

trait Output: Write {
fn finish(&mut self) -> io::Result<()>;
}

/// In serial mode, we directly write to stdout.
impl Output for Stdout {
fn finish(&mut self) -> io::Result<()> {
self.flush()
}
}

/// In parallel mode, we write to a buffer and flush it to stdout at the end
/// to avoid interleaving output from different parallelism.
impl Output for Vec<u8> {
fn finish(&mut self) -> io::Result<()> {
let mut stdout = stdout();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not guarantee the exclusiveness of the handler?

The std doc says,

Each handle returned is a reference to a shared global buffer whose access is synchronized via a mutex. If you need more explicit control over locking, see the Stdout::lock method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. However, it appears that write_all is not interruptable and we only call it once, so we still won't observe interleaving output. Nonetheless, lock seems always better.

stdout.write_all(self)?;
stdout.flush()
}
}

async fn connect_and_run_test_file(
out: &mut impl std::io::Write,
mut out: impl Output,
filename: PathBuf,
engine: &EngineConfig,
config: DBConfig,
Expand Down Expand Up @@ -716,7 +737,7 @@ async fn connect_and_run_test_file(
.unwrap();
RunResult::Cancelled
}
result = run_test_file(out, &mut runner, filename) => {
result = run_test_file(&mut out, &mut runner, filename) => {
if let Err(err) = &result {
writeln!(
out,
Expand All @@ -730,14 +751,15 @@ async fn connect_and_run_test_file(
}
};

out.finish().unwrap();
runner.shutdown_async().await;

result
}

/// Different from [`Runner::run_file_async`], we re-implement it here to print some progress
/// information.
async fn run_test_file<T: std::io::Write, M: MakeConnection>(
async fn run_test_file<T: io::Write, M: MakeConnection>(
out: &mut T,
runner: &mut Runner<M::Conn, M>,
filename: impl AsRef<Path>,
Expand Down Expand Up @@ -806,7 +828,7 @@ async fn run_test_file<T: std::io::Write, M: MakeConnection>(
Ok(duration)
}

fn finish_test_file<T: std::io::Write>(
fn finish_test_file<T: io::Write>(
out: &mut T,
time_stack: &mut Vec<Instant>,
did_pop: &mut bool,
Expand Down Expand Up @@ -842,7 +864,7 @@ fn finish_test_file<T: std::io::Write>(

/// Different from [`sqllogictest::update_test_file`], we re-implement it here to print some
/// progress information.
async fn update_test_file<T: std::io::Write, M: MakeConnection>(
async fn update_test_file<T: io::Write, M: MakeConnection>(
out: &mut T,
runner: &mut Runner<M::Conn, M>,
filename: impl AsRef<Path>,
Expand All @@ -862,7 +884,7 @@ async fn update_test_file<T: std::io::Write, M: MakeConnection>(

begin_times.push(Instant::now());

fn create_outfile(filename: impl AsRef<Path>) -> std::io::Result<(PathBuf, File)> {
fn create_outfile(filename: impl AsRef<Path>) -> io::Result<(PathBuf, File)> {
let filename = filename.as_ref();
let outfilename = filename.file_name().unwrap().to_str().unwrap().to_owned() + ".temp";
let outfilename = filename.parent().unwrap().join(outfilename);
Expand All @@ -880,7 +902,7 @@ async fn update_test_file<T: std::io::Write, M: MakeConnection>(
filename: &String,
outfilename: &PathBuf,
outfile: &mut File,
) -> std::io::Result<()> {
) -> io::Result<()> {
// check whether outfile ends with multiple newlines, which happens if
// - the last record is statement/query
// - the original file ends with multiple newlines
Expand Down
Loading