Skip to content

Commit

Permalink
apacheGH-39962: [C++] Small CSV reader refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
pitrou committed Feb 6, 2024
1 parent a6e577d commit f8b9e84
Showing 1 changed file with 47 additions and 99 deletions.
146 changes: 47 additions & 99 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -445,16 +445,22 @@ class BlockParsingOperator {
num_rows_seen_ += parser->total_num_rows();
}

RETURN_NOT_OK(block.consume_bytes(parsed_size));
if (block.consume_bytes) {
RETURN_NOT_OK(block.consume_bytes(parsed_size));
}
return ParsedBlock{std::move(parser), block.block_index,
static_cast<int64_t>(parsed_size) + block.bytes_skipped};
}

int64_t num_rows_seen() const { return num_rows_seen_; }

int num_csv_cols() const { return num_csv_cols_; }

private:
io::IOContext io_context_;
ParseOptions parse_options_;
int num_csv_cols_;
bool count_rows_;
const ParseOptions parse_options_;
const int num_csv_cols_;
const bool count_rows_;
int64_t num_rows_seen_;
};

Expand Down Expand Up @@ -570,7 +576,6 @@ class ReaderMixin {
parse_options_(parse_options),
convert_options_(convert_options),
count_rows_(count_rows),
num_rows_seen_(count_rows_ ? 1 : -1),
input_(std::move(input)) {}

protected:
Expand All @@ -581,6 +586,7 @@ class ReaderMixin {
const uint8_t* data = buf->data();
const auto data_end = data + buf->size();
DCHECK_GT(data_end - data, 0);
int64_t num_rows_seen = 1;

if (read_options_.skip_rows) {
// Skip initial rows (potentially invalid CSV data)
Expand All @@ -593,14 +599,14 @@ class ReaderMixin {
"either file is too short or header is larger than block size");
}
if (count_rows_) {
num_rows_seen_ += num_skipped_rows;
num_rows_seen += num_skipped_rows;
}
}

if (read_options_.column_names.empty()) {
// Parse one row (either to read column names or to know the number of columns)
BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
num_rows_seen_, 1);
BlockParser parser(io_context_.pool(), parse_options_, /*num_cols=*/-1,
/*first_row=*/num_rows_seen, /*max_num_rows=*/1);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser.Parse(
std::string_view(reinterpret_cast<const char*>(data), data_end - data),
Expand All @@ -627,7 +633,7 @@ class ReaderMixin {
// Skip parsed header row
data += parsed_size;
if (count_rows_) {
++num_rows_seen_;
++num_rows_seen;
}
}
} else {
Expand All @@ -636,14 +642,17 @@ class ReaderMixin {

if (count_rows_) {
// increase rows seen to skip past rows which will be skipped
num_rows_seen_ += read_options_.skip_rows_after_names;
num_rows_seen += read_options_.skip_rows_after_names;
}

auto bytes_consumed = data - buf->data();
*rest = SliceBuffer(buf, bytes_consumed);

num_csv_cols_ = static_cast<int32_t>(column_names_.size());
DCHECK_GT(num_csv_cols_, 0);
int32_t num_csv_cols = static_cast<int32_t>(column_names_.size());
DCHECK_GT(num_csv_cols, 0);
// Since we know the number of columns, we can instantiate the BlockParsingOperator
parsing_operator_.emplace(io_context_, parse_options_, num_csv_cols,
count_rows_ ? num_rows_seen : -1);

RETURN_NOT_OK(MakeConversionSchema());
return bytes_consumed;
Expand Down Expand Up @@ -691,7 +700,7 @@ class ReaderMixin {

if (convert_options_.include_columns.empty()) {
// Include all columns in CSV file order
for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
for (int32_t col_index = 0; col_index < num_csv_cols(); ++col_index) {
append_csv_column(column_names_[col_index], col_index);
}
} else {
Expand Down Expand Up @@ -719,66 +728,25 @@ class ReaderMixin {
return Status::OK();
}

struct ParseResult {
std::shared_ptr<BlockParser> parser;
int64_t parsed_bytes;
};

Result<ParseResult> Parse(const std::shared_ptr<Buffer>& partial,
const std::shared_ptr<Buffer>& completion,
const std::shared_ptr<Buffer>& block, int64_t block_index,
bool is_final) {
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
auto parser = std::make_shared<BlockParser>(
io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);

std::shared_ptr<Buffer> straddling;
std::vector<std::string_view> views;
if (partial->size() != 0 || completion->size() != 0) {
if (partial->size() == 0) {
straddling = completion;
} else if (completion->size() == 0) {
straddling = partial;
} else {
ARROW_ASSIGN_OR_RAISE(
straddling, ConcatenateBuffers({partial, completion}, io_context_.pool()));
}
views = {std::string_view(*straddling), std::string_view(*block)};
} else {
views = {std::string_view(*block)};
}
uint32_t parsed_size;
if (is_final) {
RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
} else {
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
}
// See BlockParsingOperator for explanation.
const int64_t bytes_before_buffer = partial->size() + completion->size();
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
return Status::Invalid(
"CSV parser got out of sync with chunker. This can mean the data file "
"contains cell values spanning multiple lines; please consider enabling "
"the option 'newlines_in_values'.");
}
Result<ParsedBlock> Parse(const CSVBlock& block) {
DCHECK(parsing_operator_.has_value());
return (*parsing_operator_)(block);
}

if (count_rows_) {
num_rows_seen_ += parser->total_num_rows();
}
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
int num_csv_cols() const {
DCHECK(parsing_operator_.has_value());
return parsing_operator_->num_csv_cols();
}

io::IOContext io_context_;
ReadOptions read_options_;
ParseOptions parse_options_;
ConvertOptions convert_options_;

// Number of columns in the CSV file
int32_t num_csv_cols_ = -1;
// Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
bool count_rows_;
// Number of rows seen in the csv. Not used if count_rows is false
int64_t num_rows_seen_;
const ReadOptions read_options_;
const ParseOptions parse_options_;
const ConvertOptions convert_options_;
// Whether to track the number of rows seen in the CSV being parsed
const bool count_rows_;

std::optional<BlockParsingOperator> parsing_operator_;

// Column names in the CSV file
std::vector<std::string> column_names_;
ConversionSchema conversion_schema_;
Expand Down Expand Up @@ -822,14 +790,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
return Status::OK();
}

Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
const std::shared_ptr<Buffer>& completion,
const std::shared_ptr<Buffer>& block,
int64_t block_index, bool is_final) {
ARROW_ASSIGN_OR_RAISE(auto result,
Parse(partial, completion, block, block_index, is_final));
RETURN_NOT_OK(ProcessData(result.parser, block_index));
return result.parsed_bytes;
Status ParseAndInsert(const CSVBlock& block) {
ARROW_ASSIGN_OR_RAISE(auto result, Parse(block));
RETURN_NOT_OK(ProcessData(result.parser, result.block_index));
return Status::OK();
}

// Trigger conversion of parsed block data
Expand Down Expand Up @@ -921,17 +885,14 @@ class StreamingReaderImpl : public ReaderMixin,
ProcessHeader(first_buffer, &after_header));
bytes_decoded_->fetch_add(header_bytes_consumed);

auto parser_op =
BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
ARROW_ASSIGN_OR_RAISE(
auto decoder_op,
BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_));

auto block_gen = SerialBlockReader::MakeAsyncIterator(
std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header),
read_options_.skip_rows_after_names);
auto parsed_block_gen =
MakeMappedGenerator(std::move(block_gen), std::move(parser_op));
auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), *parsing_operator_);
auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));

auto self = shared_from_this();
Expand Down Expand Up @@ -1035,11 +996,7 @@ class SerialTableReader : public BaseTableReader {
// EOF
break;
}
ARROW_ASSIGN_OR_RAISE(
int64_t parsed_bytes,
ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
maybe_block.block_index, maybe_block.is_final));
RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes));
RETURN_NOT_OK(ParseAndInsert(maybe_block));
}
// Finish conversion, create schema and table
RETURN_NOT_OK(task_group_->Finish());
Expand Down Expand Up @@ -1110,13 +1067,8 @@ class AsyncThreadedTableReader
DCHECK(!maybe_block.consume_bytes);

// Launch parse task
self->task_group_->Append([self, maybe_block] {
return self
->ParseAndInsert(maybe_block.partial, maybe_block.completion,
maybe_block.buffer, maybe_block.block_index,
maybe_block.is_final)
.status();
});
self->task_group_->Append(
[self, maybe_block] { return self->ParseAndInsert(maybe_block); });
return Status::OK();
};

Expand Down Expand Up @@ -1239,12 +1191,8 @@ class CSVRowCounter : public ReaderMixin,
// IterationEnd.
std::function<Result<std::optional<int64_t>>(const CSVBlock&)> count_cb =
[self](const CSVBlock& maybe_block) -> Result<std::optional<int64_t>> {
ARROW_ASSIGN_OR_RAISE(
auto parser,
self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
maybe_block.block_index, maybe_block.is_final));
RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes));
int32_t total_row_count = parser.parser->total_num_rows();
ARROW_ASSIGN_OR_RAISE(auto parsed_block, self->Parse(maybe_block));
int32_t total_row_count = parsed_block.parser->total_num_rows();
self->row_count_ += total_row_count;
return total_row_count;
};
Expand Down

0 comments on commit f8b9e84

Please sign in to comment.