From ffb8ccf3263753113fdc38bb2b1bb409d86af8af Mon Sep 17 00:00:00 2001 From: anakin87 Date: Fri, 9 Feb 2024 12:56:32 +0100 Subject: [PATCH 1/2] change run arguments to options --- src/hayhooks/cli/run/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hayhooks/cli/run/__init__.py b/src/hayhooks/cli/run/__init__.py index 1467c42..c6d8606 100644 --- a/src/hayhooks/cli/run/__init__.py +++ b/src/hayhooks/cli/run/__init__.py @@ -3,7 +3,7 @@ @click.command() -@click.argument("host", default="localhost") -@click.argument('port', default=1416) +@click.option('-h', '--host', default="localhost") +@click.option('-p', '--port', default=1416) def run(host, port): uvicorn.run("hayhooks.server:app", host=host, port=port) From 351efdca388bd1818ae827dd65f0b012af5b3c72 Mon Sep 17 00:00:00 2001 From: anakin87 Date: Fri, 9 Feb 2024 17:51:16 +0100 Subject: [PATCH 2/2] deploy pipelines at startup --- src/hayhooks/cli/run/__init__.py | 9 ++- src/hayhooks/server/app.py | 26 ++++++++- src/hayhooks/server/handlers/deploy.py | 67 +--------------------- src/hayhooks/server/utils/__init__.py | 3 + src/hayhooks/server/utils/deploy_utils.py | 68 +++++++++++++++++++++++ 5 files changed, 104 insertions(+), 69 deletions(-) create mode 100644 src/hayhooks/server/utils/__init__.py create mode 100644 src/hayhooks/server/utils/deploy_utils.py diff --git a/src/hayhooks/cli/run/__init__.py b/src/hayhooks/cli/run/__init__.py index c6d8606..daff6b6 100644 --- a/src/hayhooks/cli/run/__init__.py +++ b/src/hayhooks/cli/run/__init__.py @@ -1,9 +1,12 @@ import click import uvicorn +import os @click.command() -@click.option('-h', '--host', default="localhost") -@click.option('-p', '--port', default=1416) -def run(host, port): +@click.option('--host', default="localhost") +@click.option('--port', default=1416) +@click.option('--pipelines-dir', default="pipelines.d") +def run(host, port, pipelines_dir): + os.environ["PIPELINES_DIR"] = pipelines_dir uvicorn.run("hayhooks.server:app", host=host, port=port) diff --git a/src/hayhooks/server/app.py b/src/hayhooks/server/app.py index dba7d21..927cd69 100644 --- a/src/hayhooks/server/app.py +++ b/src/hayhooks/server/app.py @@ -1,6 +1,30 @@ from fastapi import FastAPI +import os +import glob +from pathlib import Path +from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition +import logging -app = FastAPI() +logger = logging.getLogger("uvicorn.info") + + +def create_app(): + app = FastAPI() + + # Deploy all pipelines in the pipelines directory + pipelines_dir = os.environ.get("PIPELINES_DIR") + for pipeline_file_path in glob.glob(f"{pipelines_dir}/*.yml"): + name = Path(pipeline_file_path).stem + with open(pipeline_file_path, "r") as pipeline_file: + source_code = pipeline_file.read() + + pipeline_defintion = PipelineDefinition(name=name, source_code=source_code) + deployed_pipeline = deploy_pipeline_def(app, pipeline_defintion) + logger.info(f"Deployed pipeline: {deployed_pipeline['name']}") + return app + + +app = create_app() @app.get("/") diff --git a/src/hayhooks/server/handlers/deploy.py b/src/hayhooks/server/handlers/deploy.py index 28be798..144ae17 100644 --- a/src/hayhooks/server/handlers/deploy.py +++ b/src/hayhooks/server/handlers/deploy.py @@ -1,70 +1,7 @@ -from fastapi import HTTPException -from fastapi.responses import JSONResponse -from pydantic import BaseModel, create_model - from hayhooks.server import app -from hayhooks.server.pipelines import registry - - -class PipelineDefinition(BaseModel): - name: str - source_code: str +from hayhooks.server.utils.deploy_utils import deploy_pipeline_def, PipelineDefinition @app.post("/deploy") async def deploy(pipeline_def: PipelineDefinition): - try: - pipe = registry.add(pipeline_def.name, pipeline_def.source_code) - except ValueError as e: - raise HTTPException(status_code=409, detail=f"{e}") from e - - request_model = {} - for component_name, inputs in pipe.inputs().items(): - # Inputs have this form: - # { - # 'first_addition': { <-- Component Name - # 'value': {'type': , 'is_mandatory': True}, <-- Input - # 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input - # }, - # 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}}, - # } - component_model = {} - for name, typedef in inputs.items(): - component_model[name] = (typedef["type"], typedef.get("default_value", ...)) - request_model[component_name] = (create_model('ComponentParams', **component_model), ...) - - PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model) - - response_model = {} - for component_name, outputs in pipe.outputs().items(): - # Outputs have this form: - # { - # 'second_addition': { <-- Component Name - # 'result': {'type': ""} <-- Output - # }, - # } - component_model = {} - for name, typedef in outputs.items(): - component_model[name] = (typedef["type"], ...) - response_model[component_name] = (create_model('ComponentParams', **component_model), ...) - - PipelineRunResponse = create_model(f'{pipeline_def.name.capitalize()}RunResponse', **response_model) - - # There's no way in FastAPI to define the type of the request body other than annotating - # the endpoint handler. We have to ignore the type here to make FastAPI happy while - # silencing static type checkers (that would have good reasons to trigger!). - async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore - output = pipe.run(data=pipeline_run_req.dict()) - return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200) - - app.add_api_route( - path=f"/{pipeline_def.name}", - endpoint=pipeline_run, - methods=["POST"], - name=pipeline_def.name, - response_model=PipelineRunResponse, - ) - app.openapi_schema = None - app.setup() - - return {"name": pipeline_def.name} + return await deploy_pipeline_def(app, pipeline_def) diff --git a/src/hayhooks/server/utils/__init__.py b/src/hayhooks/server/utils/__init__.py new file mode 100644 index 0000000..cb2a5d8 --- /dev/null +++ b/src/hayhooks/server/utils/__init__.py @@ -0,0 +1,3 @@ +from hayhooks.server.utils import deploy_utils + +__all__ = ["deploy_utils"] diff --git a/src/hayhooks/server/utils/deploy_utils.py b/src/hayhooks/server/utils/deploy_utils.py new file mode 100644 index 0000000..9bea6cf --- /dev/null +++ b/src/hayhooks/server/utils/deploy_utils.py @@ -0,0 +1,68 @@ +from fastapi import HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel, create_model + +from hayhooks.server.pipelines import registry + + +class PipelineDefinition(BaseModel): + name: str + source_code: str + + +def deploy_pipeline_def(app, pipeline_def: PipelineDefinition): + try: + pipe = registry.add(pipeline_def.name, pipeline_def.source_code) + except ValueError as e: + raise HTTPException(status_code=409, detail=f"{e}") from e + + request_model = {} + for component_name, inputs in pipe.inputs().items(): + # Inputs have this form: + # { + # 'first_addition': { <-- Component Name + # 'value': {'type': , 'is_mandatory': True}, <-- Input + # 'add': {'type': typing.Optional[int], 'is_mandatory': False, 'default_value': None}, <-- Input + # }, + # 'second_addition': {'add': {'type': typing.Optional[int], 'is_mandatory': False}}, + # } + component_model = {} + for name, typedef in inputs.items(): + component_model[name] = (typedef["type"], typedef.get("default_value", ...)) + request_model[component_name] = (create_model('ComponentParams', **component_model), ...) + + PipelineRunRequest = create_model(f'{pipeline_def.name.capitalize()}RunRequest', **request_model) + + response_model = {} + for component_name, outputs in pipe.outputs().items(): + # Outputs have this form: + # { + # 'second_addition': { <-- Component Name + # 'result': {'type': ""} <-- Output + # }, + # } + component_model = {} + for name, typedef in outputs.items(): + component_model[name] = (typedef["type"], ...) + response_model[component_name] = (create_model('ComponentParams', **component_model), ...) + + PipelineRunResponse = create_model(f'{pipeline_def.name.capitalize()}RunResponse', **response_model) + + # There's no way in FastAPI to define the type of the request body other than annotating + # the endpoint handler. We have to ignore the type here to make FastAPI happy while + # silencing static type checkers (that would have good reasons to trigger!). + async def pipeline_run(pipeline_run_req: PipelineRunRequest) -> JSONResponse: # type: ignore + output = pipe.run(data=pipeline_run_req.dict()) + return JSONResponse(PipelineRunResponse(**output).model_dump(), status_code=200) + + app.add_api_route( + path=f"/{pipeline_def.name}", + endpoint=pipeline_run, + methods=["POST"], + name=pipeline_def.name, + response_model=PipelineRunResponse, + ) + app.openapi_schema = None + app.setup() + + return {"name": pipeline_def.name}