Skip to content

Commit 6227af5

Browse files
Merge pull request #34 from nextmv-io/merschformann/multi-file
Improves multi-file support
2 parents 6d594d1 + c4d0746 commit 6227af5

28 files changed

+768
-26
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
# multi-file Workflow Example
2+
3+
!!! tip
4+
5+
This example uses the [`echo` app](../tutorials/echo.md), make sure to complete
6+
that tutorial first.
7+
8+
This example showcases how to use **multi-file** applications _within_ a
9+
Nextpipe workflow. A multi-file application differs from a JSON-based
10+
application in that it accepts a directory of files as input and produces a
11+
directory of files as output. Note that the workflow itself is also a multi-file
12+
application, however, this is a user choice (i.e., you could also create a
13+
JSON-based workflow that uses multi-file sub-applications).
14+
15+
For demonstration purposes, we will use the simple [echo-multi application] as
16+
the sub-application, which echoes the input files as output files.
17+
18+
Find the workflow code below (mind the comments explaining each step):
19+
20+
```python
21+
import os
22+
import shutil
23+
24+
import nextmv
25+
import nextmv.cloud
26+
27+
from nextpipe import FlowSpec, app, log, needs, step
28+
29+
options = nextmv.Options(
30+
nextmv.Option("input", str, "inputs/", "Path to input file.", False),
31+
nextmv.Option("output", str, "outputs/", "Path to output file.", False),
32+
)
33+
34+
35+
# >>> Workflow definition
36+
class Flow(FlowSpec):
37+
# The first step receives the path to the input files directly (see main()) and
38+
# automatically zips the directory and passes it to the 'echo-multi' sub-app.
39+
@app(app_id="echo-multi")
40+
@step
41+
def solve1():
42+
"""Runs a multi-file model."""
43+
pass
44+
45+
# The second step receives the path to the output files from the first step. This path
46+
# will point to a temporary directory containing the output files from the first step.
47+
@needs(predecessors=[solve1])
48+
@step
49+
def transform(result_path: str):
50+
"""Transforms the result for the next step."""
51+
# Just list the content of the result directory.
52+
log(f"Contents of result directory {result_path}:")
53+
for file_name in os.listdir(result_path):
54+
full_file_name = os.path.join(result_path, file_name)
55+
if os.path.isfile(full_file_name):
56+
log(f"- {file_name}")
57+
58+
# Add a new file to the result for demonstration purposes.
59+
new_file_path = os.path.join(result_path, "additional_file.txt")
60+
with open(new_file_path, "w") as f:
61+
f.write("This is an additional file added in the transform step.\n")
62+
log(f"Added new file: {new_file_path}")
63+
64+
return result_path
65+
66+
# The third step receives the (modified) directory from the transform step and runs
67+
# another multi-file app on it.
68+
@app(
69+
app_id="echo-multi",
70+
# We specify the content type explicitly here. This is normally done via the app's
71+
# manifest, but we can do it explicitly like this too.
72+
run_configuration=nextmv.RunConfiguration(
73+
format=nextmv.Format(
74+
format_input=nextmv.FormatInput(input_type=nextmv.InputFormat.MULTI_FILE),
75+
format_output=nextmv.FormatOutput(output_type=nextmv.OutputFormat.MULTI_FILE),
76+
)
77+
),
78+
full_result=True,
79+
)
80+
@needs(predecessors=[transform])
81+
@step
82+
def solve2(result: nextmv.cloud.RunResult):
83+
"""Runs another multi-file model."""
84+
pass
85+
86+
# The final step receives the output from 'solve2' as a full result object (see
87+
# 'full_result=True' above). In this case, the path to the output files is available
88+
# via 'result.output'.
89+
@needs(predecessors=[solve2])
90+
@step
91+
def prepare_output(result: nextmv.cloud.RunResult):
92+
"""Transforms the result for the next step."""
93+
# Extract the path to the output files.
94+
result_path = result.output
95+
# Simply copy the files from the given directory to the expected output directory.
96+
os.makedirs(options.output, exist_ok=True)
97+
for file_name in os.listdir(result_path):
98+
full_file_name = os.path.join(result_path, file_name)
99+
if os.path.isfile(full_file_name):
100+
shutil.copy(full_file_name, options.output)
101+
102+
103+
def main():
104+
# Run workflow (simply provide the path to the multi-file input)
105+
flow = Flow("DecisionFlow", options.input)
106+
flow.run()
107+
# The last step of the flow already prepares the output in the requested directory,
108+
# so no need to do anything here anymore.
109+
110+
111+
if __name__ == "__main__":
112+
main()
113+
```
114+
115+
Run the example:
116+
117+
```bash
118+
$ python main.py
119+
[nextpipe] No application ID or run ID found, uplink is inactive.
120+
[nextpipe] Flow: Flow
121+
[nextpipe] nextpipe: v0.3.5
122+
[nextpipe] nextmv: 0.33.0
123+
[nextpipe] Flow graph steps:
124+
[nextpipe] Step:
125+
[nextpipe] Definition: Step(solve1, StepRun(echo-multi, , {}, False))
126+
[nextpipe] Docstring: Runs a multi-file model.
127+
[nextpipe] Step:
128+
[nextpipe] Definition: Step(transform, StepNeeds(solve1))
129+
[nextpipe] Docstring: Transforms the result for the next step.
130+
[nextpipe] Step:
131+
[nextpipe] Definition: Step(solve2, StepNeeds(transform), StepRun(echo-multi, , {}, True))
132+
[nextpipe] Docstring: Runs another multi-file model.
133+
[nextpipe] Step:
134+
[nextpipe] Definition: Step(prepare_output, StepNeeds(solve2))
135+
[nextpipe] Docstring: Transforms the result for the next step.
136+
[nextpipe] Mermaid diagram:
137+
[nextpipe] graph LR
138+
solve1(solve1)
139+
solve1 --> transform
140+
transform(transform)
141+
transform --> solve2
142+
solve2(solve2)
143+
solve2 --> prepare_output
144+
prepare_output(prepare_output)
145+
146+
[nextpipe] Mermaid URL: https://mermaid.ink/svg/Z3JhcGggTFIKICBzb2x2ZTEoc29sdmUxKQogIHNvbHZlMSAtLT4gdHJhbnNmb3JtCiAgdHJhbnNmb3JtKHRyYW5zZm9ybSkKICB0cmFuc2Zvcm0gLS0+IHNvbHZlMgogIHNvbHZlMihzb2x2ZTIpCiAgc29sdmUyIC0tPiBwcmVwYXJlX291dHB1dAogIHByZXBhcmVfb3V0cHV0KHByZXBhcmVfb3V0cHV0KQo=?theme=dark
147+
[nextpipe] Running node solve1_0
148+
[nextpipe] Started app step solve1_0 run, find it at https://cloud.nextmv.io/app/echo-multi/run/latest-a-JAvuFgDR?view=details
149+
/home/marius/.asdf/installs/python/3.13.7/lib/python3.13/shutil.py:1281: DeprecationWarning: Python 3.14 will, by default, filter extracted tar archives and reject files or modify their metadata. Use the filter argument to control this behavior.
150+
tarobj.extractall(extract_dir, filter=filter)
151+
[nextpipe] Running node transform_0
152+
[transform_0] Contents of result directory /tmp/nextpipe_output_igqsibzm:
153+
[transform_0] - input.xlsx
154+
[transform_0] - data.csv
155+
[transform_0] Added new file: /tmp/nextpipe_output_igqsibzm/additional_file.txt
156+
[nextpipe] Running node solve2_0
157+
[nextpipe] Started app step solve2_0 run, find it at https://cloud.nextmv.io/app/echo-multi/run/latest-HIwvuFgDg?view=details
158+
[nextpipe] Running node prepare_output_0
159+
```
160+
161+
Content of the output directory:
162+
163+
```bash
164+
tree outputs/
165+
outputs/
166+
├── additional_file.txt
167+
├── data.csv
168+
└── input.xlsx
169+
170+
1 directory, 3 files
171+
```
172+
173+
The resulting Mermaid diagram for this flow looks like this:
174+
175+
```mermaid
176+
graph LR
177+
solve1(solve1)
178+
solve1 --> transform
179+
transform(transform)
180+
transform --> solve2
181+
solve2(solve2)
182+
solve2 --> prepare_output
183+
prepare_output(prepare_output)
184+
```
185+
186+
[echo-multi application]: ../tutorials/echo-multi.md

docs/tutorials/echo-multi.md

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
# The `echo-multi` app
2+
3+
Several examples assume you have a Nextmv application called `echo-multi`. This
4+
is just a simple application created for demonstration purposes. It takes the
5+
input files and echoes them as output files.
6+
7+
Let's get set up with the `echo-multi` application. Before starting:
8+
9+
1. [Sign up][signup] for a Nextmv account.
10+
2. Get your API key. Go to [Team > API Key][api-key].
11+
12+
Make sure that you have your API key set as an environment variable:
13+
14+
```bash
15+
export NEXTMV_API_KEY="<YOUR-API-KEY>"
16+
```
17+
18+
Now that you have a valid Nextmv account and API key, let's create the
19+
`echo-multi` Nextmv app (start in an empty directory).
20+
21+
1. Create a folder `inputs/` and add some sample input files to it. For example,
22+
you can create two text files `input.csv` and `input.txt` with some sample
23+
content.
24+
1. In a new directory, create a file called `main.py` with the code for the
25+
basic app that echoes the input.
26+
27+
```python
28+
import glob
29+
30+
import os
31+
import time
32+
33+
import nextmv
34+
35+
def main():
36+
options = nextmv.Options(
37+
nextmv.Option("input", str, "inputs/", "Path to input file.", False),
38+
nextmv.Option("output", str, "outputs/", "Path to output file.", False),
39+
nextmv.Option("duration", float, 1.0, "Runtime duration (in seconds).", False),
40+
)
41+
42+
# Read and prepare the input data.
43+
input_data = read_input(options.input)
44+
45+
# Log information about the input files.
46+
nextmv.log(f"Size of input files (count: {len(input_data)}):")
47+
for file_path, content in input_data.items():
48+
nextmv.log(f" {file_path}: {len(content)} bytes")
49+
50+
# Sleep for the specified duration.
51+
nextmv.log(f"Sleeping for {options.duration} seconds...")
52+
time.sleep(options.duration)
53+
nextmv.log("Woke up from sleep.")
54+
55+
# Write the output.
56+
write_output(options.output, input_data)
57+
58+
def read_input(input_path: str) -> dict[str, bytes]:
59+
"""Reads the input files to memory."""
60+
input_files = glob.glob(os.path.join(input_path, "**/*"), recursive=True)
61+
content = {}
62+
for file_path in input_files:
63+
if os.path.isfile(file_path):
64+
with open(file_path, "rb") as file:
65+
nextmv.log(f"Reading file: {file_path}")
66+
content[file_path] = file.read()
67+
return content
68+
69+
def write_output(output_path: str, content: dict[str, bytes]) -> None:
70+
"""Writes the given output files."""
71+
if not os.path.exists(output_path):
72+
os.makedirs(output_path)
73+
74+
for file_path, data in content.items():
75+
output_file_path = os.path.join(output_path, os.path.basename(file_path))
76+
with open(output_file_path, "wb") as file:
77+
nextmv.log(f"Writing file: {output_file_path}")
78+
file.write(data)
79+
80+
if __name__ == "__main__":
81+
main()
82+
```
83+
84+
Note that the application uses the [`nextmv`][nextmv-docs] library. This
85+
library is a dependency of `nextpipe` and should be installed automatically
86+
when you install `nextpipe`.
87+
88+
You may run the app locally to test it:
89+
90+
```bash
91+
python main.py
92+
```
93+
94+
1. Create a `requirements.txt` file with the following requirements for running
95+
the app:
96+
97+
```requirements.txt
98+
nextmv
99+
```
100+
101+
1. Create an `app.yaml` file (the app manifest) with the following instructions:
102+
103+
```yaml
104+
type: python
105+
runtime: ghcr.io/nextmv-io/runtime/python:3.11
106+
files:
107+
- main.py
108+
python:
109+
pip-requirements: requirements.txt
110+
```
111+
112+
1. Push the application to your Nextmv account. Create a `push.py` script in
113+
the same directory with the following code:
114+
115+
```python
116+
import os
117+
118+
from nextmv import cloud
119+
120+
client = cloud.Client(api_key=os.getenv("NEXTMV_API_KEY"))
121+
app = cloud.Application.new(client=client, name="echo-multi", id="echo-multi", description="Sample echo multi-file app.", exist_ok=True)
122+
app.push(verbose=True)
123+
```
124+
125+
1. Execute the `push.py` script to push the app to your Nextmv account:
126+
127+
```bash
128+
$ python push.py
129+
💽 Starting build for Nextmv application.
130+
🐍 Bundling Python dependencies.
131+
📋 Copied files listed in "app.yaml" manifest.
132+
📦 Packaged application (588 files, 5.39 MiB).
133+
🌟 Pushing to application: "echo-multi".
134+
💥️ Successfully pushed to application: "echo-multi".
135+
{
136+
"app_id": "echo-multi",
137+
"endpoint": "https://api.cloud.nextmv.io",
138+
"instance_url": "v1/applications/echo-multi/runs?instance_id=devint"
139+
}
140+
```
141+
142+
Alternatively, you can use the [Nextmv CLI][nextmv-cli] to create and push the app:
143+
144+
```bash
145+
nextmv app create -a echo-multi -n echo-multi -d "Sample echo multi-file app."
146+
nextmv app push -a echo-multi
147+
```
148+
149+
Now you are ready to run the examples.
150+
151+
[signup]: https://cloud.nextmv.io
152+
[api-key]: https://cloud.nextmv.io/team/api-keys
153+
[nextmv-docs]: https://nextmv-py.readthedocs.io/en/latest/nextmv/
154+
[nextmv-cli]: https://docs.nextmv.io/docs/using-nextmv/reference/cli

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ nav:
1212
- The echo app: tutorials/echo.md
1313
- Examples:
1414
- Basic chained workflow: examples/basic-chained-workflow.md
15+
- Multi-file workflow: examples/multifile-workflow.md
1516
- Fanout workflow: examples/fanout-workflow.md
1617
- Ensemble workflow: examples/ensemble-workflow.md
1718
- Complex workflow: examples/complex-workflow.md

0 commit comments

Comments
 (0)