Skip to content

Commit

Permalink
Merge pull request #2 from masci/load-pipelines-at-startup
Browse files Browse the repository at this point in the history
Load pipelines at startup
  • Loading branch information
masci authored Feb 12, 2024
2 parents 4c89fc8 + 351efdc commit 0f7b7fa
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 69 deletions.
9 changes: 6 additions & 3 deletions src/hayhooks/cli/run/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import click
import uvicorn
import os


@click.command()
@click.argument("host", default="localhost")
@click.argument('port', default=1416)
def run(host, port):
@click.option('--host', default="localhost")
@click.option('--port', default=1416)
@click.option('--pipelines-dir', default="pipelines.d")
def run(host, port, pipelines_dir):
os.environ["PIPELINES_DIR"] = pipelines_dir
uvicorn.run("hayhooks.server:app", host=host, port=port)
26 changes: 25 additions & 1 deletion src/hayhooks/server/app.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
from fastapi import FastAPI
import os
import glob
from pathlib import Path
from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition
import logging

app = FastAPI()
logger = logging.getLogger("uvicorn.info")


def create_app():
app = FastAPI()

# Deploy all pipelines in the pipelines directory
pipelines_dir = os.environ.get("PIPELINES_DIR")
for pipeline_file_path in glob.glob(f"{pipelines_dir}/*.yml"):
name = Path(pipeline_file_path).stem
with open(pipeline_file_path, "r") as pipeline_file:
source_code = pipeline_file.read()

pipeline_defintion = PipelineDefinition(name=name, source_code=source_code)
deployed_pipeline = deploy_pipeline_def(app, pipeline_defintion)
logger.info(f"Deployed pipeline: {deployed_pipeline['name']}")
return app


app = create_app()


@app.get("/")
Expand Down
67 changes: 2 additions & 65 deletions src/hayhooks/server/handlers/deploy.py
Original file line number Diff line number Diff line change
@@ -1,70 +1,7 @@
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, create_model

from hayhooks.server import app
from hayhooks.server.pipelines import registry


class PipelineDefinition(BaseModel):
name: str
source_code: str
from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition


@app.post("/deploy")
async def deploy(pipeline_def: PipelineDefinition):
try:
pipe = registry.add(pipeline_def.name, pipeline_def.source_code)
except ValueError as e:
raise HTTPException(status_code=409, detail=f"{e}") from e

request_model = {}
for component_name, inputs in pipe.inputs().items():
# Inputs have this form:
# {
# 'first_addition': { <-- Component Name
# 'value': {'type': <class 'int'>, 'is_mandatory': True}, <-- Input
# 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input
# },
# 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}},
# }
component_model = {}
for name, typedef in inputs.items():
component_model[name] = (typedef["type"], typedef.get("default_value", ...))
request_model[component_name] = (create_model('ComponentParams', **component_model), ...)

PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model)

response_model = {}
for component_name, outputs in pipe.outputs().items():
# Outputs have this form:
# {
# 'second_addition': { <-- Component Name
# 'result': {'type': "<class 'int'>"} <-- Output
# },
# }
component_model = {}
for name, typedef in outputs.items():
component_model[name] = (typedef["type"], ...)
response_model[component_name] = (create_model('ComponentParams', **component_model), ...)

PipelineRunResponse = create_model(f'{pipeline_def.name.capitalize()}RunResponse', **response_model)

# There's no way in FastAPI to define the type of the request body other than annotating
# the endpoint handler. We have to ignore the type here to make FastAPI happy while
# silencing static type checkers (that would have good reasons to trigger!).
async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore
output = pipe.run(data=pipeline_run_req.dict())
return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200)

app.add_api_route(
path=f"/{pipeline_def.name}",
endpoint=pipeline_run,
methods=["POST"],
name=pipeline_def.name,
response_model=PipelineRunResponse,
)
app.openapi_schema = None
app.setup()

return {"name": pipeline_def.name}
return await deploy_pipeline_def(app, pipeline_def)
3 changes: 3 additions & 0 deletions src/hayhooks/server/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from hayhooks.server.utils import deploy_utils

__all__ = ["deploy_utils"]
68 changes: 68 additions & 0 deletions src/hayhooks/server/utils/deploy_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel, create_model

from hayhooks.server.pipelines import registry


class PipelineDefinition(BaseModel):
name: str
source_code: str


def deploy_pipeline_def(app, pipeline_def: PipelineDefinition):
try:
pipe = registry.add(pipeline_def.name, pipeline_def.source_code)
except ValueError as e:
raise HTTPException(status_code=409, detail=f"{e}") from e

request_model = {}
for component_name, inputs in pipe.inputs().items():
# Inputs have this form:
# {
# 'first_addition': { <-- Component Name
# 'value': {'type': <class 'int'>, 'is_mandatory': True}, <-- Input
# 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input
# },
# 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}},
# }
component_model = {}
for name, typedef in inputs.items():
component_model[name] = (typedef["type"], typedef.get("default_value", ...))
request_model[component_name] = (create_model('ComponentParams', **component_model), ...)

PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model)

response_model = {}
for component_name, outputs in pipe.outputs().items():
# Outputs have this form:
# {
# 'second_addition': { <-- Component Name
# 'result': {'type': "<class 'int'>"} <-- Output
# },
# }
component_model = {}
for name, typedef in outputs.items():
component_model[name] = (typedef["type"], ...)
response_model[component_name] = (create_model('ComponentParams', **component_model), ...)

PipelineRunResponse = create_model(f'{pipeline_def.name.capitalize()}RunResponse', **response_model)

# There's no way in FastAPI to define the type of the request body other than annotating
# the endpoint handler. We have to ignore the type here to make FastAPI happy while
# silencing static type checkers (that would have good reasons to trigger!).
async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore
output = pipe.run(data=pipeline_run_req.dict())
return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200)

app.add_api_route(
path=f"/{pipeline_def.name}",
endpoint=pipeline_run,
methods=["POST"],
name=pipeline_def.name,
response_model=PipelineRunResponse,
)
app.openapi_schema = None
app.setup()

return {"name": pipeline_def.name}

0 comments on commit 0f7b7fa

Please sign in to comment.