Skip to content

Commit

Permalink
feat: add stress flow
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed Oct 24, 2023
1 parent 3c3636b commit 5caa0df
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
Imports all flows for every project so we can register all of them.
"""
from pipelines.exemplo import * # noqa
from pipelines.stress import * # noqa
2 changes: 2 additions & 0 deletions pipelines/stress/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.stress.many_tasks.flows import * # noqa
Empty file.
61 changes: 61 additions & 0 deletions pipelines/stress/many_tasks/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
from prefect import Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.stress.many_tasks.tasks import (
cast_to_float,
compare_results_with_delta,
divide_numbers,
log_variable,
multiply_numbers,
subtract_numbers,
sum_numbers,
)

with Flow(
name="Stress Test: Many Tasks",
state_handlers=[handler_inject_bd_credentials],
) as stress__many_tasks__main_flow:
# Parameters
a = Parameter("a")
b = Parameter("b")

# Convert them to floats
a_float = cast_to_float(a)
b_float = cast_to_float(b)

# Make c = (a + b) * (a - b)
sum_ab = sum_numbers(a_float, b_float)
sub_ab = subtract_numbers(a_float, b_float)
c = multiply_numbers(sum_ab, sub_ab)

# Get a^2 and b^2
a_squared = multiply_numbers(a_float, a_float)
b_squared = multiply_numbers(b_float, b_float)

# Make d = c / (a^2 - b^2)
d = divide_numbers(c, subtract_numbers(a_squared, b_squared))

# d must be close to 1
is_close_to_one = compare_results_with_delta(d, 1.0)

# Now we print out all the variables
log_variable("a", a)
log_variable("b", b)
log_variable("a_float", a_float)
log_variable("b_float", b_float)
log_variable("sum_ab", sum_ab)
log_variable("sub_ab", sub_ab)
log_variable("c", c)
log_variable("a_squared", a_squared)
log_variable("b_squared", b_squared)
log_variable("d", d)
log_variable("is_close_to_one", is_close_to_one)


# Storage and run configs
stress__many_tasks__main_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
stress__many_tasks__main_flow.run_config = KubernetesRun(image=constants.DOCKER_IMAGE.value)
47 changes: 47 additions & 0 deletions pipelines/stress/many_tasks/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
from time import sleep
from typing import Any

from prefect import task
from prefeitura_rio.pipelines_utils.logging import log


@task
def cast_to_float(number: Any) -> float:
sleep(0.25)
return float(number)


@task
def sum_numbers(a: float, b: float) -> float:
sleep(0.25)
return a + b


@task
def multiply_numbers(a: float, b: float) -> float:
sleep(0.25)
return a * b


@task
def divide_numbers(a: float, b: float) -> float:
sleep(0.25)
return a / b


@task
def subtract_numbers(a: float, b: float) -> float:
sleep(0.25)
return a - b


@task
def compare_results_with_delta(a: float, b: float, delta: float = 0.0001) -> bool:
sleep(0.25)
return abs(a - b) < delta


@task
def log_variable(name: str, value: Any) -> None:
log(f"{name}: {value}")

0 comments on commit 5caa0df

Please sign in to comment.