Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause test workflow when changes are deployed to production #842

Open
panoptikum opened this issue Sep 7, 2023 · 3 comments
Open

Pause test workflow when changes are deployed to production #842

panoptikum opened this issue Sep 7, 2023 · 3 comments

Comments

@panoptikum
Copy link

Hello,

I am currently thinking about how to implement a process that automatically switches the Job attribute "PAUSE_STATUS" to PAUSED, if I push changes either per tag or whatever method to production. Is this something dbx somehow supports? I could not find anything...
Right now, we usually set up new pipelines in the test environment first than we ship things to production. If I do changes now, we activate that test workflow again but it does not get switched off again if changes move to production as well.

Any input on this matter is much appreciated.

Best
Felix

@NodeJSmith
Copy link

@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do.

def dbx_adhoc_run():
    args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
    print("")  # separate next printed line from args

    non_default_args = args.get_non_default_args()

    if non_default_args:
        args.print_non_default_args(non_default_args)

    if not non_default_args:
        echo("Skipping adhoc run because all args are default")
        return

    args.adhoc = True

    create_deployment(args)

    # this will deploy with a different name ("adhoc" added to name)
    dbx_deploy(args=args)

    w = WorkspaceClient()

    with pause_current_job(w, args):
        with one_time_job_run(w, args) as run:
            print("")
            echo(f"Created new run: {run.run_page_url}")

            try:
                _ = w.jobs.wait_get_run_job_terminated_or_skipped(
                    run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
                )

                run_status = w.jobs.get_run(run_id=run.run_id)

                task_output_dict: dict[str, RunOutput] = {}
                for task in run_status.tasks:
                    task_output = w.jobs.get_run_output(run_id=task.run_id)
                    task_output_dict[task.task_key] = task_output

                echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
                for task_key, task_output in task_output_dict.items():
                    echo(f"Task logs: {task_key}\n")
                    echo(task_output.logs)
            except KeyboardInterrupt:
                print("")  # skip the line that has the ^C
                echo("Keyboard interrupt, canceling run")
                w.jobs.cancel_run_and_wait(run_id=run.run_id)
                raise

            if run_status.state.result_state != RunResultState.SUCCESS:
                for task_key, task_output in task_output_dict.items():
                    if task_output.error:
                        echo(f"Error: {task_key}\n")
                        echo("Error: ", task_output.error)
                        echo("Error Trace:", task_output.error_trace)

                raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")


@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
    exc = None
    try:
        one_time_job = get_job(w, get_package_name(args))

        run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)

        run = w.jobs.get_run(run_id=run_wait.run_id)

        yield run
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Deleting one time job")
        w.jobs.delete(job_id=one_time_job.job_id)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
    exc = None
    try:
        # get the standard job name so we can pause the schedule and cancel any running jobs
        job_name = get_package_name(args.catalog)
        standard_job = get_job(w, job_name)
        job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
        for job_run in job_runs:
            echo(f"Canceling existing run: {job_run.run_id}")
            w.jobs.cancel_run_and_wait(run_id=job_run.run_id)

        orig_settings = standard_job.settings

        if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
            yield
            return

        echo("Pausing schedule")
        copied_settings = deepcopy(orig_settings)
        copied_settings.schedule.pause_status = PauseStatus.PAUSED

        w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)

        yield
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Unpausing schedule")
        w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


def get_job(w: WorkspaceClient, job_name: str):
    job_list = list(w.jobs.list(name=job_name))

    job_id = job_list[0].job_id
    existing_job = w.jobs.get(job_id=job_id)
    return existing_job

1 similar comment
@NodeJSmith
Copy link

@panoptikum I am using the Python Databricks SDK for something similar to what you are describing. When testing changes to a pipeline I pause the current production job (production referring to live in Databricks, this could be in dev, stage, or prod) and cancel any running job runs. I then deploy the changed one as an adhoc workflow, run it, and then when it's done unpause the job. Below is a copy/paste from my deployment logic, hopefully it will give you some idea of how to accomplish what you are wanting to do.

def dbx_adhoc_run():
    args: KnownArgs = ArgsToDataClass(KnownArgs).dclass_instance
    print("")  # separate next printed line from args

    non_default_args = args.get_non_default_args()

    if non_default_args:
        args.print_non_default_args(non_default_args)

    if not non_default_args:
        echo("Skipping adhoc run because all args are default")
        return

    args.adhoc = True

    create_deployment(args)

    # this will deploy with a different name ("adhoc" added to name)
    dbx_deploy(args=args)

    w = WorkspaceClient()

    with pause_current_job(w, args):
        with one_time_job_run(w, args) as run:
            print("")
            echo(f"Created new run: {run.run_page_url}")

            try:
                _ = w.jobs.wait_get_run_job_terminated_or_skipped(
                    run_id=run.run_id, timeout=timedelta(hours=args.timeout_hours)
                )

                run_status = w.jobs.get_run(run_id=run.run_id)

                task_output_dict: dict[str, RunOutput] = {}
                for task in run_status.tasks:
                    task_output = w.jobs.get_run_output(run_id=task.run_id)
                    task_output_dict[task.task_key] = task_output

                echo(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")
                for task_key, task_output in task_output_dict.items():
                    echo(f"Task logs: {task_key}\n")
                    echo(task_output.logs)
            except KeyboardInterrupt:
                print("")  # skip the line that has the ^C
                echo("Keyboard interrupt, canceling run")
                w.jobs.cancel_run_and_wait(run_id=run.run_id)
                raise

            if run_status.state.result_state != RunResultState.SUCCESS:
                for task_key, task_output in task_output_dict.items():
                    if task_output.error:
                        echo(f"Error: {task_key}\n")
                        echo("Error: ", task_output.error)
                        echo("Error Trace:", task_output.error_trace)

                raise Exception(f"Run ID: {run.run_id} finished in state: {run_status.state.result_state}")


@contextmanager
def one_time_job_run(w: WorkspaceClient, args: KnownArgs) -> Generator[Run, None, None]:
    exc = None
    try:
        one_time_job = get_job(w, get_package_name(args))

        run_wait: Run = w.jobs.run_now(job_id=one_time_job.job_id)

        run = w.jobs.get_run(run_id=run_wait.run_id)

        yield run
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Deleting one time job")
        w.jobs.delete(job_id=one_time_job.job_id)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


@contextmanager
def pause_current_job(w: WorkspaceClient, args: KnownArgs):
    exc = None
    try:
        # get the standard job name so we can pause the schedule and cancel any running jobs
        job_name = get_package_name(args.catalog)
        standard_job = get_job(w, job_name)
        job_runs = list(w.jobs.list_runs(job_id=standard_job.job_id, active_only=True)) or []
        for job_run in job_runs:
            echo(f"Canceling existing run: {job_run.run_id}")
            w.jobs.cancel_run_and_wait(run_id=job_run.run_id)

        orig_settings = standard_job.settings

        if not orig_settings.schedule or orig_settings.schedule.pause_status == PauseStatus.PAUSED:
            yield
            return

        echo("Pausing schedule")
        copied_settings = deepcopy(orig_settings)
        copied_settings.schedule.pause_status = PauseStatus.PAUSED

        w.jobs.update(job_id=standard_job.job_id, new_settings=copied_settings)

        yield
    except:  # noqa
        exc = sys.exc_info()[0]
    finally:
        echo("Unpausing schedule")
        w.jobs.update(job_id=standard_job.job_id, new_settings=orig_settings)
        if exc is KeyboardInterrupt:
            return True
        elif exc:
            raise exc


def get_job(w: WorkspaceClient, job_name: str):
    job_list = list(w.jobs.list(name=job_name))

    job_id = job_list[0].job_id
    existing_job = w.jobs.get(job_id=job_id)
    return existing_job

@panoptikum
Copy link
Author

Hi @NodeJSmith ,

Thank you for sharing your code. I'll look into it and see how I can adapt it to my needs.

Best,
Felix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants