Skip to content

Commit 7284df9

Browse files
committed
apacheGH-39962: [C++] Small CSV reader refactoring
1 parent 8ffc214 commit 7284df9

File tree

1 file changed

+47
-99
lines changed

1 file changed

+47
-99
lines changed

cpp/src/arrow/csv/reader.cc

+47-99
Original file line numberDiff line numberDiff line change
@@ -445,16 +445,22 @@ class BlockParsingOperator {
445445
num_rows_seen_ += parser->total_num_rows();
446446
}
447447

448-
RETURN_NOT_OK(block.consume_bytes(parsed_size));
448+
if (block.consume_bytes) {
449+
RETURN_NOT_OK(block.consume_bytes(parsed_size));
450+
}
449451
return ParsedBlock{std::move(parser), block.block_index,
450452
static_cast<int64_t>(parsed_size) + block.bytes_skipped};
451453
}
452454

455+
int64_t num_rows_seen() const { return num_rows_seen_; }
456+
457+
int num_csv_cols() const { return num_csv_cols_; }
458+
453459
private:
454460
io::IOContext io_context_;
455-
ParseOptions parse_options_;
456-
int num_csv_cols_;
457-
bool count_rows_;
461+
const ParseOptions parse_options_;
462+
const int num_csv_cols_;
463+
const bool count_rows_;
458464
int64_t num_rows_seen_;
459465
};
460466

@@ -570,7 +576,6 @@ class ReaderMixin {
570576
parse_options_(parse_options),
571577
convert_options_(convert_options),
572578
count_rows_(count_rows),
573-
num_rows_seen_(count_rows_ ? 1 : -1),
574579
input_(std::move(input)) {}
575580

576581
protected:
@@ -581,6 +586,7 @@ class ReaderMixin {
581586
const uint8_t* data = buf->data();
582587
const auto data_end = data + buf->size();
583588
DCHECK_GT(data_end - data, 0);
589+
int64_t num_rows_seen = 1;
584590

585591
if (read_options_.skip_rows) {
586592
// Skip initial rows (potentially invalid CSV data)
@@ -593,14 +599,14 @@ class ReaderMixin {
593599
"either file is too short or header is larger than block size");
594600
}
595601
if (count_rows_) {
596-
num_rows_seen_ += num_skipped_rows;
602+
num_rows_seen += num_skipped_rows;
597603
}
598604
}
599605

600606
if (read_options_.column_names.empty()) {
601607
// Parse one row (either to read column names or to know the number of columns)
602-
BlockParser parser(io_context_.pool(), parse_options_, num_csv_cols_,
603-
num_rows_seen_, 1);
608+
BlockParser parser(io_context_.pool(), parse_options_, /*num_cols=*/-1,
609+
/*first_row=*/num_rows_seen, /*max_num_rows=*/1);
604610
uint32_t parsed_size = 0;
605611
RETURN_NOT_OK(parser.Parse(
606612
std::string_view(reinterpret_cast<const char*>(data), data_end - data),
@@ -627,7 +633,7 @@ class ReaderMixin {
627633
// Skip parsed header row
628634
data += parsed_size;
629635
if (count_rows_) {
630-
++num_rows_seen_;
636+
++num_rows_seen;
631637
}
632638
}
633639
} else {
@@ -636,14 +642,17 @@ class ReaderMixin {
636642

637643
if (count_rows_) {
638644
// increase rows seen to skip past rows which will be skipped
639-
num_rows_seen_ += read_options_.skip_rows_after_names;
645+
num_rows_seen += read_options_.skip_rows_after_names;
640646
}
641647

642648
auto bytes_consumed = data - buf->data();
643649
*rest = SliceBuffer(buf, bytes_consumed);
644650

645-
num_csv_cols_ = static_cast<int32_t>(column_names_.size());
646-
DCHECK_GT(num_csv_cols_, 0);
651+
int32_t num_csv_cols = static_cast<int32_t>(column_names_.size());
652+
DCHECK_GT(num_csv_cols, 0);
653+
// Since we know the number of columns, we can instantiate the BlockParsingOperator
654+
parsing_operator_.emplace(io_context_, parse_options_, num_csv_cols,
655+
count_rows_ ? num_rows_seen : -1);
647656

648657
RETURN_NOT_OK(MakeConversionSchema());
649658
return bytes_consumed;
@@ -691,7 +700,7 @@ class ReaderMixin {
691700

692701
if (convert_options_.include_columns.empty()) {
693702
// Include all columns in CSV file order
694-
for (int32_t col_index = 0; col_index < num_csv_cols_; ++col_index) {
703+
for (int32_t col_index = 0; col_index < num_csv_cols(); ++col_index) {
695704
append_csv_column(column_names_[col_index], col_index);
696705
}
697706
} else {
@@ -719,66 +728,25 @@ class ReaderMixin {
719728
return Status::OK();
720729
}
721730

722-
struct ParseResult {
723-
std::shared_ptr<BlockParser> parser;
724-
int64_t parsed_bytes;
725-
};
726-
727-
Result<ParseResult> Parse(const std::shared_ptr<Buffer>& partial,
728-
const std::shared_ptr<Buffer>& completion,
729-
const std::shared_ptr<Buffer>& block, int64_t block_index,
730-
bool is_final) {
731-
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
732-
auto parser = std::make_shared<BlockParser>(
733-
io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
734-
735-
std::shared_ptr<Buffer> straddling;
736-
std::vector<std::string_view> views;
737-
if (partial->size() != 0 || completion->size() != 0) {
738-
if (partial->size() == 0) {
739-
straddling = completion;
740-
} else if (completion->size() == 0) {
741-
straddling = partial;
742-
} else {
743-
ARROW_ASSIGN_OR_RAISE(
744-
straddling, ConcatenateBuffers({partial, completion}, io_context_.pool()));
745-
}
746-
views = {std::string_view(*straddling), std::string_view(*block)};
747-
} else {
748-
views = {std::string_view(*block)};
749-
}
750-
uint32_t parsed_size;
751-
if (is_final) {
752-
RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
753-
} else {
754-
RETURN_NOT_OK(parser->Parse(views, &parsed_size));
755-
}
756-
// See BlockParsingOperator for explanation.
757-
const int64_t bytes_before_buffer = partial->size() + completion->size();
758-
if (static_cast<int64_t>(parsed_size) < bytes_before_buffer) {
759-
return Status::Invalid(
760-
"CSV parser got out of sync with chunker. This can mean the data file "
761-
"contains cell values spanning multiple lines; please consider enabling "
762-
"the option 'newlines_in_values'.");
763-
}
731+
Result<ParsedBlock> Parse(const CSVBlock& block) {
732+
DCHECK(parsing_operator_.has_value());
733+
return (*parsing_operator_)(block);
734+
}
764735

765-
if (count_rows_) {
766-
num_rows_seen_ += parser->total_num_rows();
767-
}
768-
return ParseResult{std::move(parser), static_cast<int64_t>(parsed_size)};
736+
int num_csv_cols() const {
737+
DCHECK(parsing_operator_.has_value());
738+
return parsing_operator_->num_csv_cols();
769739
}
770740

771741
io::IOContext io_context_;
772-
ReadOptions read_options_;
773-
ParseOptions parse_options_;
774-
ConvertOptions convert_options_;
775-
776-
// Number of columns in the CSV file
777-
int32_t num_csv_cols_ = -1;
778-
// Whether num_rows_seen_ tracks the number of rows seen in the CSV being parsed
779-
bool count_rows_;
780-
// Number of rows seen in the csv. Not used if count_rows is false
781-
int64_t num_rows_seen_;
742+
const ReadOptions read_options_;
743+
const ParseOptions parse_options_;
744+
const ConvertOptions convert_options_;
745+
// Whether to track the number of rows seen in the CSV being parsed
746+
const bool count_rows_;
747+
748+
std::optional<BlockParsingOperator> parsing_operator_;
749+
782750
// Column names in the CSV file
783751
std::vector<std::string> column_names_;
784752
ConversionSchema conversion_schema_;
@@ -822,14 +790,10 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
822790
return Status::OK();
823791
}
824792

825-
Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
826-
const std::shared_ptr<Buffer>& completion,
827-
const std::shared_ptr<Buffer>& block,
828-
int64_t block_index, bool is_final) {
829-
ARROW_ASSIGN_OR_RAISE(auto result,
830-
Parse(partial, completion, block, block_index, is_final));
831-
RETURN_NOT_OK(ProcessData(result.parser, block_index));
832-
return result.parsed_bytes;
793+
Status ParseAndInsert(const CSVBlock& block) {
794+
ARROW_ASSIGN_OR_RAISE(auto result, Parse(block));
795+
RETURN_NOT_OK(ProcessData(result.parser, result.block_index));
796+
return Status::OK();
833797
}
834798

835799
// Trigger conversion of parsed block data
@@ -921,17 +885,14 @@ class StreamingReaderImpl : public ReaderMixin,
921885
ProcessHeader(first_buffer, &after_header));
922886
bytes_decoded_->fetch_add(header_bytes_consumed);
923887

924-
auto parser_op =
925-
BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
926888
ARROW_ASSIGN_OR_RAISE(
927889
auto decoder_op,
928890
BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_));
929891

930892
auto block_gen = SerialBlockReader::MakeAsyncIterator(
931893
std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header),
932894
read_options_.skip_rows_after_names);
933-
auto parsed_block_gen =
934-
MakeMappedGenerator(std::move(block_gen), std::move(parser_op));
895+
auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), *parsing_operator_);
935896
auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op));
936897

937898
auto self = shared_from_this();
@@ -1035,11 +996,7 @@ class SerialTableReader : public BaseTableReader {
1035996
// EOF
1036997
break;
1037998
}
1038-
ARROW_ASSIGN_OR_RAISE(
1039-
int64_t parsed_bytes,
1040-
ParseAndInsert(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1041-
maybe_block.block_index, maybe_block.is_final));
1042-
RETURN_NOT_OK(maybe_block.consume_bytes(parsed_bytes));
999+
RETURN_NOT_OK(ParseAndInsert(maybe_block));
10431000
}
10441001
// Finish conversion, create schema and table
10451002
RETURN_NOT_OK(task_group_->Finish());
@@ -1110,13 +1067,8 @@ class AsyncThreadedTableReader
11101067
DCHECK(!maybe_block.consume_bytes);
11111068

11121069
// Launch parse task
1113-
self->task_group_->Append([self, maybe_block] {
1114-
return self
1115-
->ParseAndInsert(maybe_block.partial, maybe_block.completion,
1116-
maybe_block.buffer, maybe_block.block_index,
1117-
maybe_block.is_final)
1118-
.status();
1119-
});
1070+
self->task_group_->Append(
1071+
[self, maybe_block] { return self->ParseAndInsert(maybe_block); });
11201072
return Status::OK();
11211073
};
11221074

@@ -1239,12 +1191,8 @@ class CSVRowCounter : public ReaderMixin,
12391191
// IterationEnd.
12401192
std::function<Result<std::optional<int64_t>>(const CSVBlock&)> count_cb =
12411193
[self](const CSVBlock& maybe_block) -> Result<std::optional<int64_t>> {
1242-
ARROW_ASSIGN_OR_RAISE(
1243-
auto parser,
1244-
self->Parse(maybe_block.partial, maybe_block.completion, maybe_block.buffer,
1245-
maybe_block.block_index, maybe_block.is_final));
1246-
RETURN_NOT_OK(maybe_block.consume_bytes(parser.parsed_bytes));
1247-
int32_t total_row_count = parser.parser->total_num_rows();
1194+
ARROW_ASSIGN_OR_RAISE(auto parsed_block, self->Parse(maybe_block));
1195+
int32_t total_row_count = parsed_block.parser->total_num_rows();
12481196
self->row_count_ += total_row_count;
12491197
return total_row_count;
12501198
};

0 commit comments

Comments
 (0)