Skip to content

Commit 001d12c

Browse files
committed
Rebase fix
1 parent 7949b98 commit 001d12c

File tree

11 files changed

+108
-101
lines changed

11 files changed

+108
-101
lines changed

libminifi/include/core/ProcessSession.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ class ProcessSessionImpl : public ReferenceContainerImpl, public virtual Process
110110
void importFrom(io::InputStream&& stream, const std::shared_ptr<core::FlowFile> &flow) override;
111111

112112
void import(const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0) override;
113+
void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) override;
114+
void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) override;
113115

114116
bool exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) override;
115117

libminifi/src/core/ProcessSession.cpp

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ void ProcessSessionImpl::write(core::FlowFile &flow, const io::OutputStreamCallb
267267
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
268268
provenance_report_->modifyContent(flow, details, duration);
269269
if (metrics_) {
270-
metrics_->bytes_written += stream->size();
270+
metrics_->bytesWritten() += stream->size();
271271
}
272272
} catch (const std::exception& exception) {
273273
logger_->log_debug("Caught Exception during process session write, type: {}, what: {}", typeid(exception).name(), exception.what());
@@ -319,7 +319,7 @@ void ProcessSessionImpl::append(const std::shared_ptr<core::FlowFile> &flow, con
319319
}
320320
flow->setSize(flow_file_size + (stream->size() - stream_size_before_callback));
321321
if (metrics_) {
322-
metrics_->bytes_written += stream->size() - stream_size_before_callback;
322+
metrics_->bytesWritten() += stream->size() - stream_size_before_callback;
323323
}
324324

325325
std::stringstream details;
@@ -379,7 +379,7 @@ int64_t ProcessSessionImpl::read(const core::FlowFile& flow_file, const io::Inpu
379379
throw Exception(FILE_OPERATION_EXCEPTION, "Failed to process flowfile content");
380380
}
381381
if (metrics_) {
382-
metrics_->bytes_read += ret;
382+
metrics_->bytesRead() += ret;
383383
}
384384
return ret;
385385
} catch (const std::exception& exception) {
@@ -428,8 +428,8 @@ int64_t ProcessSessionImpl::readWrite(const std::shared_ptr<core::FlowFile> &flo
428428
flow->setOffset(0);
429429
flow->setResourceClaim(output_claim);
430430
if (metrics_) {
431-
metrics_->bytes_written += read_write_result->bytes_written;
432-
metrics_->bytes_read += read_write_result->bytes_read;
431+
metrics_->bytesWritten() += read_write_result->bytes_written;
432+
metrics_->bytesRead() += read_write_result->bytes_read;
433433
}
434434

435435
return read_write_result->bytes_written;
@@ -498,7 +498,7 @@ void ProcessSessionImpl::importFrom(io::InputStream &stream, const std::shared_p
498498

499499
content_stream->close();
500500
if (metrics_) {
501-
metrics_->bytes_written += content_stream->size();
501+
metrics_->bytesWritten() += content_stream->size();
502502
}
503503
std::stringstream details;
504504
details << process_context_->getProcessorNode()->getName() << " modify flow record content " << flow->getUUIDStr();
@@ -562,7 +562,7 @@ void ProcessSessionImpl::import(const std::string& source, const std::shared_ptr
562562

563563
stream->close();
564564
if (metrics_) {
565-
metrics_->bytes_written += stream->size();
565+
metrics_->bytesWritten() += stream->size();
566566
}
567567
input.close();
568568
if (!keepSource) {
@@ -667,7 +667,7 @@ void ProcessSessionImpl::import(const std::string& source, std::vector<std::shar
667667
flowFile->getOffset(), flowFile->getSize(), flowFile->getResourceClaim()->getContentFullPath(), flowFile->getUUIDStr());
668668
stream->close();
669669
if (metrics_) {
670-
metrics_->bytes_written += stream->size();
670+
metrics_->bytesWritten() += stream->size();
671671
}
672672
std::string details = process_context_->getProcessorNode()->getName() + " modify flow record content " + flowFile->getUUIDStr();
673673
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
@@ -953,7 +953,7 @@ void ProcessSessionImpl::commit() {
953953
if (metrics_) {
954954
auto time_delta = std::chrono::steady_clock::now() - commit_start_time;
955955
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(time_delta));
956-
metrics_->processing_nanos += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
956+
metrics_->processingNanos() += std::chrono::duration_cast<std::chrono::nanoseconds>(time_delta).count();
957957
}
958958
} catch (const std::exception& exception) {
959959
logger_->log_debug("Caught Exception during process session commit, type: {}, what: {}", typeid(exception).name(), exception.what());
@@ -1165,8 +1165,8 @@ std::shared_ptr<core::FlowFile> ProcessSessionImpl::get() {
11651165
ret->setAttribute(SpecialFlowAttribute::FLOW_ID, flow_version->getFlowId());
11661166
}
11671167
if (metrics_) {
1168-
metrics_->incoming_bytes += ret->getSize();
1169-
++metrics_->incoming_flow_files;
1168+
metrics_->incomingBytes() += ret->getSize();
1169+
++metrics_->incomingFlowFiles();
11701170
}
11711171
return ret;
11721172
}

libminifi/src/core/state/nodes/FlowInformation.cpp

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() {
8181
.children = {
8282
{.name = "id", .value = std::string{processor->getUUIDStr()}},
8383
{.name = "groupId", .value = processor->getProcessGroupUUIDStr()},
84-
{.name = "bytesRead", .value = metrics->bytes_read.load()},
85-
{.name = "bytesWritten", .value = metrics->bytes_written.load()},
86-
{.name = "flowFilesIn", .value = metrics->incoming_flow_files.load()},
87-
{.name = "flowFilesOut", .value = metrics->transferred_flow_files.load()},
88-
{.name = "bytesIn", .value = metrics->incoming_bytes.load()},
89-
{.name = "bytesOut", .value = metrics->transferred_bytes.load()},
90-
{.name = "invocations", .value = metrics->invocations.load()},
91-
{.name = "processingNanos", .value = metrics->processing_nanos.load()},
84+
{.name = "bytesRead", .value = metrics->bytesRead().load()},
85+
{.name = "bytesWritten", .value = metrics->bytesWritten().load()},
86+
{.name = "flowFilesIn", .value = metrics->incomingFlowFiles().load()},
87+
{.name = "flowFilesOut", .value = metrics->transferredFlowFiles().load()},
88+
{.name = "bytesIn", .value = metrics->incomingBytes().load()},
89+
{.name = "bytesOut", .value = metrics->transferredBytes().load()},
90+
{.name = "invocations", .value = metrics->invocations().load()},
91+
{.name = "processingNanos", .value = metrics->processingNanos().load()},
9292
{.name = "activeThreadCount", .value = -1},
9393
{.name = "terminatedThreadCount", .value = -1},
9494
{.name = "runStatus", .value = (processor->isRunning() ? "Running" : "Stopped")}
@@ -115,21 +115,21 @@ std::vector<PublishedMetric> FlowInformation::calculateMetrics() {
115115
continue;
116116
}
117117
auto processor_metrics = processor->getMetrics();
118-
metrics.push_back({"bytes_read", gsl::narrow<double>(processor_metrics->bytes_read.load()),
118+
metrics.push_back({"bytes_read", gsl::narrow<double>(processor_metrics->bytesRead().load()),
119119
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
120-
metrics.push_back({"bytes_written", gsl::narrow<double>(processor_metrics->bytes_written.load()),
120+
metrics.push_back({"bytes_written", gsl::narrow<double>(processor_metrics->bytesWritten().load()),
121121
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
122-
metrics.push_back({"flow_files_in", gsl::narrow<double>(processor_metrics->incoming_flow_files.load()),
122+
metrics.push_back({"flow_files_in", gsl::narrow<double>(processor_metrics->incomingFlowFiles().load()),
123123
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
124-
metrics.push_back({"flow_files_out", gsl::narrow<double>(processor_metrics->transferred_flow_files.load()),
124+
metrics.push_back({"flow_files_out", gsl::narrow<double>(processor_metrics->transferredFlowFiles().load()),
125125
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
126-
metrics.push_back({"bytes_in", gsl::narrow<double>(processor_metrics->incoming_bytes.load()),
126+
metrics.push_back({"bytes_in", gsl::narrow<double>(processor_metrics->incomingBytes().load()),
127127
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
128-
metrics.push_back({"bytes_out", gsl::narrow<double>(processor_metrics->transferred_bytes.load()),
128+
metrics.push_back({"bytes_out", gsl::narrow<double>(processor_metrics->transferredBytes().load()),
129129
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
130-
metrics.push_back({"invocations", gsl::narrow<double>(processor_metrics->invocations.load()),
130+
metrics.push_back({"invocations", gsl::narrow<double>(processor_metrics->invocations().load()),
131131
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
132-
metrics.push_back({"processing_nanos", gsl::narrow<double>(processor_metrics->processing_nanos.load()),
132+
metrics.push_back({"processing_nanos", gsl::narrow<double>(processor_metrics->processingNanos().load()),
133133
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});
134134
metrics.push_back({"is_running", (processor->isRunning() ? 1.0 : 0.0),
135135
{{"processor_uuid", processor->getUUIDStr()}, {"processor_name", processor->getName()}, {"metric_class", "FlowInformation"}}});

libminifi/test/unit/MetricsTests.cpp

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -287,12 +287,12 @@ TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
287287
REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
288288
}
289289

290-
class DuplicateContentProcessor : public minifi::core::Processor {
291-
using minifi::core::Processor::Processor;
290+
class DuplicateContentProcessor : public minifi::core::ProcessorImpl {
291+
using minifi::core::ProcessorImpl::ProcessorImpl;
292292

293293
public:
294-
DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : Processor(name, uuid) {}
295-
explicit DuplicateContentProcessor(std::string_view name) : Processor(name) {}
294+
DuplicateContentProcessor(std::string_view name, const minifi::utils::Identifier& uuid) : ProcessorImpl(name, uuid) {}
295+
explicit DuplicateContentProcessor(std::string_view name) : ProcessorImpl(name) {}
296296
static constexpr const char* Description = "A processor that creates two more of the same flow file.";
297297
static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
298298
static constexpr auto Success = core::RelationshipDefinition{"success", "Newly created FlowFiles"};
@@ -333,29 +333,29 @@ TEST_CASE("Test processor metrics change after trigger", "[ProcessorMetrics]") {
333333
minifi::test::SingleProcessorTestController test_controller(std::make_unique<DuplicateContentProcessor>("DuplicateContentProcessor"));
334334
test_controller.trigger({minifi::test::InputFlowFileData{"log line 1", {}}});
335335
auto metrics = test_controller.getProcessor()->getMetrics();
336-
CHECK(metrics->invocations == 1);
337-
CHECK(metrics->incoming_flow_files == 1);
338-
CHECK(metrics->transferred_flow_files == 2);
336+
CHECK(metrics->invocations() == 1);
337+
CHECK(metrics->incomingFlowFiles() == 1);
338+
CHECK(metrics->transferredFlowFiles() == 2);
339339
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 1);
340340
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 1);
341-
CHECK(metrics->incoming_bytes == 10);
342-
CHECK(metrics->transferred_bytes == 30);
343-
CHECK(metrics->bytes_read == 10);
344-
CHECK(metrics->bytes_written == 20);
345-
auto old_nanos = metrics->processing_nanos.load();
346-
CHECK(metrics->processing_nanos > 0);
341+
CHECK(metrics->incomingBytes() == 10);
342+
CHECK(metrics->transferredBytes() == 30);
343+
CHECK(metrics->bytesRead() == 10);
344+
CHECK(metrics->bytesWritten() == 20);
345+
auto old_nanos = metrics->processingNanos().load();
346+
CHECK(metrics->processingNanos() > 0);
347347

348348
test_controller.trigger({minifi::test::InputFlowFileData{"new log line 2", {}}});
349-
CHECK(metrics->invocations == 2);
350-
CHECK(metrics->incoming_flow_files == 2);
351-
CHECK(metrics->transferred_flow_files == 4);
349+
CHECK(metrics->invocations() == 2);
350+
CHECK(metrics->incomingFlowFiles() == 2);
351+
CHECK(metrics->transferredFlowFiles() == 4);
352352
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("success") == 2);
353353
CHECK(metrics->getTransferredFlowFilesToRelationshipCount("original") == 2);
354-
CHECK(metrics->incoming_bytes == 24);
355-
CHECK(metrics->transferred_bytes == 72);
356-
CHECK(metrics->bytes_read == 24);
357-
CHECK(metrics->bytes_written == 48);
358-
CHECK(metrics->processing_nanos > old_nanos);
354+
CHECK(metrics->incomingBytes() == 24);
355+
CHECK(metrics->transferredBytes() == 72);
356+
CHECK(metrics->bytesRead() == 24);
357+
CHECK(metrics->bytesWritten() == 48);
358+
CHECK(metrics->processingNanos() > old_nanos);
359359
}
360360

361361

minifi-api/include/minifi-cpp/core/ProcessSession.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ class ProcessSession : public virtual ReferenceContainer {
9797

9898
// import from the data source.
9999
virtual void import(const std::string& source, const std::shared_ptr<core::FlowFile> &flow, bool keepSource = true, uint64_t offset = 0) = 0;
100+
virtual void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, uint64_t offset, char inputDelimiter) = 0;
101+
virtual void import(const std::string& source, std::vector<std::shared_ptr<FlowFile>> &flows, bool keepSource, uint64_t offset, char inputDelimiter) = 0;
100102

101103
/**
102104
* Exports the data stream to a file

minifi-api/include/minifi-cpp/core/Processor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone
8383
virtual void validateAnnotations() const = 0;
8484
virtual annotation::Input getInputRequirement() const = 0;
8585
virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0;
86+
virtual std::string getProcessGroupUUIDStr() const = 0;
87+
virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0;
8688

8789
virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0;
8890
virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0;

minifi-api/include/minifi-cpp/core/ProcessorMetrics.h

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,23 +34,24 @@ class ProcessorMetrics : public virtual state::response::ResponseNode {
3434
virtual std::chrono::milliseconds getAverageSessionCommitRuntime() const = 0;
3535
virtual std::chrono::milliseconds getLastSessionCommitRuntime() const = 0;
3636
virtual void addLastSessionCommitRuntime(std::chrono::milliseconds runtime) = 0;
37+
virtual std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const = 0;
3738

3839
virtual std::atomic<size_t>& invocations() = 0;
3940
virtual const std::atomic<size_t>& invocations() const = 0;
40-
virtual std::atomic<size_t>& incoming_flow_files() = 0;
41-
virtual const std::atomic<size_t>& incoming_flow_files() const = 0;
42-
virtual std::atomic<size_t>& transferred_flow_files() = 0;
43-
virtual const std::atomic<size_t>& transferred_flow_files() const = 0;
44-
virtual std::atomic<uint64_t>& incoming_bytes() = 0;
45-
virtual const std::atomic<uint64_t>& incoming_bytes() const = 0;
46-
virtual std::atomic<uint64_t>& transferred_bytes() = 0;
47-
virtual const std::atomic<uint64_t>& transferred_bytes() const = 0;
48-
virtual std::atomic<uint64_t>& bytes_read() = 0;
49-
virtual const std::atomic<uint64_t>& bytes_read() const = 0;
50-
virtual std::atomic<uint64_t>& bytes_written() = 0;
51-
virtual const std::atomic<uint64_t>& bytes_written() const = 0;
52-
virtual std::atomic<uint64_t>& processing_nanos() = 0;
53-
virtual const std::atomic<uint64_t>& processing_nanos() const = 0;
41+
virtual std::atomic<size_t>& incomingFlowFiles() = 0;
42+
virtual const std::atomic<size_t>& incomingFlowFiles() const = 0;
43+
virtual std::atomic<size_t>& transferredFlowFiles() = 0;
44+
virtual const std::atomic<size_t>& transferredFlowFiles() const = 0;
45+
virtual std::atomic<uint64_t>& incomingBytes() = 0;
46+
virtual const std::atomic<uint64_t>& incomingBytes() const = 0;
47+
virtual std::atomic<uint64_t>& transferredBytes() = 0;
48+
virtual const std::atomic<uint64_t>& transferredBytes() const = 0;
49+
virtual std::atomic<uint64_t>& bytesRead() = 0;
50+
virtual const std::atomic<uint64_t>& bytesRead() const = 0;
51+
virtual std::atomic<uint64_t>& bytesWritten() = 0;
52+
virtual const std::atomic<uint64_t>& bytesWritten() const = 0;
53+
virtual std::atomic<uint64_t>& processingNanos() = 0;
54+
virtual const std::atomic<uint64_t>& processingNanos() const = 0;
5455
};
5556

5657
} // namespace org::apache::nifi::minifi::core

utils/include/core/Processor.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,11 +167,11 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C
167167
active_tasks_ = 0;
168168
}
169169

170-
std::string getProcessGroupUUIDStr() const {
170+
std::string getProcessGroupUUIDStr() const override {
171171
return process_group_uuid_;
172172
}
173173

174-
void setProcessGroupUUIDStr(const std::string &uuid) {
174+
void setProcessGroupUUIDStr(const std::string &uuid) override {
175175
process_group_uuid_ = uuid;
176176
}
177177

0 commit comments

Comments
 (0)