Skip to content

Commit e77edd7

Browse files
committed
MINIFICPP-2448 Add minifi.sh flowStatus command
1 parent a881933 commit e77edd7

27 files changed

+1255
-9
lines changed

OPS.md

+72-3
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,25 @@
1919
This readme defines operational commands for managing instances.
2020

2121
## Table of Contents
22-
2322
- [Description](#description)
24-
- [Managing](#managing-minifi)
25-
- [Commands](#commands)
23+
- [Managing MiNiFi](#managing-minifi)
24+
- [SSL](#ssl)
25+
- [Commands](#commands)
26+
- [Specifying connecting information](#specifying-connecting-information)
27+
- [Start Command](#start-command)
28+
- [Stop command](#stop-command)
29+
- [List connections command](#list-connections-command)
30+
- [List components command](#list-components-command)
31+
- [Clear connection command](#clear-connection-command)
32+
- [GetSize command](#getsize-command)
33+
- [Update flow](#update-flow)
34+
- [Get full connection command](#get-full-connection-command)
35+
- [Get manifest command](#get-manifest-command)
36+
- [Flowstatus command](#flowstatus-command)
37+
- [Processor](#processor)
38+
- [Connection](#connection)
39+
- [Instance](#instance)
40+
- [System Diagnostics](#system-diagnostics)
2641

2742
## Description
2843

@@ -109,3 +124,57 @@ Provides a list of full connections, if any.
109124
./minificontroller --manifest
110125

111126
Writes the agent manifest json to standard output
127+
128+
#### Flowstatus command
129+
./minificontroller --flowstatus "processor:TailFile:health,stats,bulletins"
130+
131+
The command returns the flow status for the specified query in JSON format.
132+
The query consists of the query type, the element identifier, and the query options. Each part is separated by the ':' colon character. Multiple query options are specified as a comma-separated list. In some query types the identifier is omitted, in this case only the query type and the query options are specified. Multiple queries can also be specified in a flowStatus command, in this case the queries are separated by the ';' semicolon character. For example: `./minificontroller --flowstatus "processor:TailFile:health,stats,bulletins;processor:LogAttribute:stats"`
133+
134+
Supported query types:
135+
136+
##### Processor
137+
138+
To query the processors, use the `processor` flag and specify the processor (by ID, name or "all") followed by one of the processor options. The processor options are below:
139+
140+
- health: The processor's run status, whether or not it has bulletins.
141+
- bulletins: A list of all the current bulletins (if there are any).
142+
- stats: The current stats of the processor.
143+
144+
An example query to get the health and stats of the "GenerateFlowFile" processor is below.
145+
146+
`./minificontroller --flowstatus "processor:GenerateFlowFile:health,stats"`
147+
148+
##### Connection
149+
150+
To query the connections, use the `connection` flag and specify the connection (by ID, name or "all") followed by one of the connection options. The connection options are below:
151+
152+
- health: The processor's run status, whether or not it has bulletins.
153+
154+
An example query to get the health and stats of the "Connection1" connection is below.
155+
156+
`./minificontroller --flowstatus "connection:Connection1:health"`
157+
158+
##### Instance
159+
160+
To query the status of the MiNiFi instance, use the `instance` flag followed by one of the instance options. The instance options are below.
161+
162+
- health: The instance reporting the aggregated state of the connections, and whether or not it has bulletins.
163+
- bulletins: A list of all the current bulletins (if there are any).
164+
- stats: The aggregated stats of all processors (bytes read, written, transferred, and flowfiles transferred).
165+
166+
An example query to get the all the statuses of the instance is below.
167+
168+
`./minificontroller --flowstatus "instance:health,stats,bulletins"`
169+
170+
##### System Diagnostics
171+
172+
To query the system diagnostics, use the `systemdiagnostics` flag followed by one of the system diagnostics options. The system diagnostics options are below.
173+
174+
- processorstats: The system processor stats. This includes the available processors and load average.
175+
- contentrepositoryusage: Disk usage stats on the partition or volume where the content repository is located.
176+
- flowfilerepositoryusage: Disk usage stats on the partition or volume where the flowfile repository is located.
177+
178+
An example query to get the processor stats, content repository usage and FlowFile repository usage from the system diagnostics is below.
179+
180+
`./minificontroller --flowStatus "systemdiagnostics:processorstats,contentrepositoryusage,flowfilerepositoryusage"`

README.md

+13
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,19 @@ MiNiFi can then be stopped by issuing:
503503

504504
$ ./bin/minifi.sh stop
505505

506+
### Query flow status
507+
508+
To query the status of the flow, you can use the following command on Unix systems:
509+
510+
$ ./bin/minifi.sh flowStatus [host:optional] [port:optional] "<query>"
511+
512+
On Windows systems, you can use the following command:
513+
514+
$ ./bin/flowstatus-minifi.bat [host:optional] [port:optional] "<query>"
515+
516+
The query can look like the following: "processor:TailFile:health,stats,bulletins". For more information on the query syntax and options, please see the [flow status documentation](OPS.md#Flowstatus-command).
517+
Note: The command requires minifi controller to be enabled in the minifi.properties file.
518+
506519
### Running as a docker container
507520
You can use the officially released image pulled from the [apache/nifi-minifi-cpp](https://hub.docker.com/r/apache/nifi-minifi-cpp) repository on dockerhub or you can use your locally built image.
508521
The container can be run with a specific configuration by mounting the locally edited configuration files to your docker container.

bin/flowstatus-minifi.bat

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
@echo off &setlocal enabledelayedexpansion
2+
rem
3+
rem Licensed to the Apache Software Foundation (ASF) under one or more
4+
rem contributor license agreements. See the NOTICE file distributed with
5+
rem this work for additional information regarding copyright ownership.
6+
rem The ASF licenses this file to You under the Apache License, Version 2.0
7+
rem (the "License"); you may not use this file except in compliance with
8+
rem the License. You may obtain a copy of the License at
9+
rem
10+
rem http://www.apache.org/licenses/LICENSE-2.0
11+
rem
12+
rem Unless required by applicable law or agreed to in writing, software
13+
rem distributed under the License is distributed on an "AS IS" BASIS,
14+
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
rem See the License for the specific language governing permissions and
16+
rem limitations under the License.
17+
rem
18+
19+
set "bin_dir=%~dp0"
20+
21+
if not exist "%bin_dir%\minificontroller.exe" (
22+
echo MiNiFi Controller is not installed
23+
exit /b
24+
)
25+
26+
if "%~1"=="" (
27+
echo MiNiFi flowStatus operation requires a flow status query parameter like "processor:TailFile:health,stats,bulletins"
28+
goto usage
29+
)
30+
31+
if "%~2"=="" (
32+
"%bin_dir%\minificontroller.exe" --flowstatus "%~1"
33+
exit /b
34+
)
35+
36+
if "%~3"=="" (
37+
"%bin_dir%\minificontroller.exe" --port "%~1" --flowstatus "%~2"
38+
exit /b
39+
)
40+
41+
"%bin_dir%\minificontroller.exe" --host "%~1" --port "%~2" --flowstatus "%~3"
42+
exit /b
43+
44+
:usage
45+
echo Usage: flowStatus.bat [^<host^>] [^<port^>] ^<flowstatus^>
46+
exit /b

bin/minifi.sh

+23-1
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,25 @@ status_service() {
183183
fi
184184
}
185185

186+
flowStatus() {
187+
if ! [ -f "${bin_dir}/minificontroller" ]; then
188+
echo "MiNiFi Controller is not installed"
189+
return
190+
fi
191+
if [ "$#" -lt 2 ]; then
192+
echo "MiNiFi flowStatus operation requires a flow status query parameter like \"processor:TailFile:health,stats,bulletins\""
193+
return
194+
fi
195+
196+
if [ "$#" -lt 3 ]; then
197+
exec "${bin_dir}/minificontroller" --flowstatus "$2"
198+
elif [ "$#" -lt 3 ]; then
199+
exec "${bin_dir}/minificontroller" --port "$2" --flowstatus "$3"
200+
else
201+
exec "${bin_dir}/minificontroller" --host "$2" --port "$3" --flowstatus "$4"
202+
fi
203+
}
204+
186205
case "$1" in
187206
start)
188207
start_service
@@ -207,7 +226,10 @@ case "$1" in
207226
uninstall "$@"
208227
echo "Service minifi uninstalled. Please remove the ${MINIFI_HOME} directory manually."
209228
;;
229+
flowStatus)
230+
flowStatus "$@"
231+
;;
210232
*)
211-
echo "Usage: minifi.sh {start|stop|run|restart|status|install|uninstall}"
233+
echo "Usage: minifi.sh {start|stop|run|restart|status|install|uninstall|flowStatus}"
212234
;;
213235
esac

controller/CMakeLists.txt

+3
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,6 @@ if (NOT WIN32)
5959
endif()
6060

6161
install(TARGETS minificontroller RUNTIME DESTINATION bin COMPONENT bin)
62+
if(WIN32)
63+
install(FILES "${CMAKE_SOURCE_DIR}/bin/flowstatus-minifi.bat" DESTINATION bin COMPONENT bin)
64+
endif()

controller/Controller.cpp

+20
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,24 @@ nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData&
266266
return {};
267267
}
268268

269+
bool getFlowStatus(const utils::net::SocketData& socket_data, const std::string& status_query, std::ostream &out) {
270+
std::unique_ptr<io::BaseStream> connection_stream = std::make_unique<utils::net::AsioSocketConnection>(socket_data);
271+
if (connection_stream->initialize() < 0) {
272+
return false;
273+
}
274+
io::BufferStream buffer;
275+
auto op = static_cast<uint8_t>(c2::Operation::describe);
276+
buffer.write(&op, 1);
277+
buffer.write("flowstatus");
278+
buffer.write(status_query);
279+
if (io::isError(connection_stream->write(buffer.getBuffer()))) {
280+
return false;
281+
}
282+
connection_stream->read(op);
283+
std::string manifest;
284+
connection_stream->read(manifest, true);
285+
out << manifest << std::endl;
286+
return true;
287+
}
288+
269289
} // namespace org::apache::nifi::minifi::controller

controller/Controller.h

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ bool listComponents(const utils::net::SocketData& socket_data, std::ostream &out
3636
bool listConnections(const utils::net::SocketData& socket_data, std::ostream &out, bool show_header = true);
3737
bool printManifest(const utils::net::SocketData& socket_data, std::ostream &out);
3838
bool getJstacks(const utils::net::SocketData& socket_data, std::ostream &out);
39+
bool getFlowStatus(const utils::net::SocketData& socket_data, const std::string& status_query, std::ostream &out);
3940
nonstd::expected<void, std::string> getDebugBundle(const utils::net::SocketData& socket_data, const std::filesystem::path& target_dir);
4041

4142
} // namespace org::apache::nifi::minifi::controller

controller/MiNiFiController.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ int main(int argc, char **argv) {
147147
argument_parser.add_argument("--updateflow")
148148
.metavar("FLOW_CONFIG_PATH")
149149
.help("Updates the flow of the agent using the provided flow file");
150+
argument_parser.add_argument("--flowstatus")
151+
.metavar("FLOW_STATUS_QUERY")
152+
.help("Returns flow status for the provided query");
150153

151154
auto addFlagOption = [&](std::string_view name, const std::string& help) {
152155
argument_parser.add_argument(name)
@@ -271,6 +274,12 @@ int main(int argc, char **argv) {
271274
else
272275
std::cout << "Debug bundle written to " << std::filesystem::path(*debug_path) / "debug.tar.gz";
273276
}
277+
278+
if (const auto& status_query = argument_parser.present("--flowstatus")) {
279+
if (!minifi::controller::getFlowStatus(socket_data, *status_query, std::cout)) {
280+
std::cout << "Could not connect to remote host " << socket_data.host << ":" << socket_data.port << std::endl;
281+
}
282+
}
274283
} catch (const std::exception &exc) {
275284
// catch anything thrown within try block that derives from std::exception
276285
std::cerr << exc.what() << std::endl;

controller/tests/ControllerTests.cpp

+35
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,16 @@ class TestControllerSocketReporter : public c2::ControllerSocketReporter {
200200
std::string getAgentManifest() override {
201201
return "testAgentManifest";
202202
}
203+
204+
void setRoot(core::ProcessGroup* /*root*/) override {
205+
}
206+
207+
void setFlowStatusDependencies(core::BulletinStore* /*bulletin_store*/, const std::filesystem::path& /*flowile_repo_dir*/, const std::filesystem::path& /*flowile_repo_dir*/) override {
208+
}
209+
210+
std::string getFlowStatus(const std::vector<c2::FlowStatusRequest>& /*requests*/) override {
211+
return "";
212+
}
203213
};
204214

205215
class TestControllerServiceProvider : public core::controller::ControllerServiceProviderImpl {
@@ -570,4 +580,29 @@ TEST_CASE_METHOD(ControllerTestFixture, "Debug bundle retrieval fails if target
570580
REQUIRE(result.error() == "Object specified as the target directory already exists and it is not a directory");
571581
}
572582

583+
TEST_CASE_METHOD(ControllerTestFixture, "Test flow status getter", "[controllerTests]") {
584+
SECTION("With SSL from service provider") {
585+
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_SERVICE_PROVIDER);
586+
}
587+
588+
SECTION("With SSL from properties") {
589+
setConnectionType(ControllerTestFixture::ConnectionType::SSL_FROM_CONFIGURATION);
590+
}
591+
592+
SECTION("Without SSL") {
593+
setConnectionType(ControllerTestFixture::ConnectionType::UNSECURE);
594+
}
595+
596+
auto reporter = std::make_shared<minifi::c2::ControllerSocketMetricsPublisher>("ControllerSocketMetricsPublisher");
597+
auto response_node_loader = std::make_shared<minifi::state::response::ResponseNodeLoaderImpl>(configuration_, std::vector<std::shared_ptr<core::RepositoryMetricsSource>>{}, nullptr);
598+
reporter->initialize(configuration_, response_node_loader);
599+
initalizeControllerSocket(reporter);
600+
601+
std::stringstream flow_status_stream;
602+
minifi::controller::getFlowStatus(controller_socket_data_, "processor:TailFile:health", flow_status_stream);
603+
std::string expected_status = "{\"controllerServiceStatusList\":null,\"connectionStatusList\":null,\"remoteProcessGroupStatusList\":null,\"instanceStatus\":null,\"systemDiagnosticsStatus\":null,"
604+
"\"processorStatusList\":[],\"errorsGeneratingReport\":[\"Unable to get processorStatus: No processor with key 'TailFile' to report status on\"]}\n";
605+
REQUIRE(flow_status_stream.str() == expected_status);
606+
}
607+
573608
} // namespace org::apache::nifi::minifi::test

libminifi/include/FlowController.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
#include "core/state/MetricsPublisherStore.h"
6262
#include "RootProcessGroupWrapper.h"
6363
#include "c2/ControllerSocketProtocol.h"
64+
#include "core/BulletinStore.h"
6465

6566
namespace org::apache::nifi::minifi {
6667

@@ -74,7 +75,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi
7475
std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration,
7576
std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store = nullptr,
7677
std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), std::function<void()> request_restart = []{},
77-
utils::file::AssetManager* asset_manager = {});
78+
utils::file::AssetManager* asset_manager = {}, core::BulletinStore* bulletin_store = {});
7879

7980
~FlowController() override;
8081

libminifi/include/RootProcessGroupWrapper.h

+6
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "core/state/MetricsPublisherStore.h"
3333
#include "utils/gsl.h"
3434
#include "core/logging/Logger.h"
35+
#include "c2/ControllerSocketProtocol.h"
3536

3637
namespace org::apache::nifi::minifi {
3738

@@ -77,6 +78,10 @@ class RootProcessGroupWrapper {
7778
std::optional<std::vector<state::StateController*>> getAllProcessorControllers(
7879
const std::function<gsl::not_null<std::unique_ptr<state::ProcessorController>>(core::Processor&)>& controllerFactory);
7980

81+
void setControllerSocketProtocol(c2::ControllerSocketProtocol* controller_socket_protocol) {
82+
controller_socket_protocol_ = controller_socket_protocol;
83+
}
84+
8085
private:
8186
std::optional<std::chrono::milliseconds> loadShutdownTimeoutFromConfiguration();
8287

@@ -86,6 +91,7 @@ class RootProcessGroupWrapper {
8691
state::MetricsPublisherStore* metrics_publisher_store_{};
8792
std::unordered_map<utils::Identifier, std::unique_ptr<state::ProcessorController>> processor_to_controller_;
8893
std::chrono::milliseconds shutdown_check_interval_{1000};
94+
c2::ControllerSocketProtocol* controller_socket_protocol_{};
8995
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<RootProcessGroupWrapper>::getLogger();
9096
};
9197

libminifi/include/c2/ControllerSocketMetricsPublisher.h

+6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "core/state/MetricsPublisher.h"
2727
#include "core/logging/LoggerFactory.h"
2828
#include "c2/HeartbeatJsonSerializer.h"
29+
#include "FlowStatusBuilder.h"
2930

3031
namespace org::apache::nifi::minifi::c2 {
3132

@@ -42,10 +43,15 @@ class ControllerSocketMetricsPublisher : public state::MetricsPublisherImpl, pub
4243
std::unordered_set<std::string> getConnections() override;
4344
std::string getAgentManifest() override;
4445

46+
void setRoot(core::ProcessGroup* root) override;
47+
void setFlowStatusDependencies(core::BulletinStore* bulletin_store, const std::filesystem::path& flowile_repo_dir, const std::filesystem::path& content_repo_dir) override;
48+
std::string getFlowStatus(const std::vector<FlowStatusRequest>& requests) override;
49+
4550
protected:
4651
c2::HeartbeatJsonSerializer heartbeat_json_serializer_;
4752
std::mutex queue_metrics_node_mutex_;
4853
std::shared_ptr<state::response::ResponseNode> queue_metrics_node_;
54+
FlowStatusBuilder flow_status_builder_;
4955
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ControllerSocketMetricsPublisher>::getLogger();
5056
};
5157

libminifi/include/c2/ControllerSocketProtocol.h

+2
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class ControllerSocketProtocol {
4646
std::shared_ptr<Configure> configuration, const std::shared_ptr<ControllerSocketReporter>& controller_socket_reporter);
4747
~ControllerSocketProtocol();
4848
void initialize();
49+
void setRoot(core::ProcessGroup* root);
4950

5051
private:
5152
void handleStart(io::BaseStream &stream);
@@ -59,6 +60,7 @@ class ControllerSocketProtocol {
5960
void writeGetFullResponse(io::BaseStream &stream);
6061
void writeManifestResponse(io::BaseStream &stream);
6162
void writeJstackResponse(io::BaseStream &stream);
63+
void writeFlowStatusResponse(io::BaseStream &stream);
6264
void writeDebugBundleResponse(io::BaseStream &stream);
6365
void handleDescribe(io::BaseStream &stream);
6466
asio::awaitable<void> handleCommand(std::unique_ptr<io::BaseStream> stream);

0 commit comments

Comments
 (0)