Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate GitRepositoriesImportObjects to prefect #4945

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions backend/infrahub/git/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,13 @@ class GitRepositoryMerge(BaseModel):
source_branch: str = Field(..., description="The source branch")
destination_branch: str = Field(..., description="The source branch")
default_branch: str = Field(..., description="The default branch in Git")


class GitRepositoryImportObjects(BaseModel):
"""Re run import job against an existing commit."""

repository_id: str = Field(..., description="The unique ID of the Repository")
repository_name: str = Field(..., description="The name of the repository")
repository_kind: str = Field(..., description="The type of repository")
commit: str = Field(..., description="Specific commit to pull")
infrahub_branch_name: str = Field(..., description="Infrahub branch on which to sync the remote repository")
19 changes: 19 additions & 0 deletions backend/infrahub/git/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .models import (
GitRepositoryAdd,
GitRepositoryAddReadOnly,
GitRepositoryImportObjects,
GitRepositoryMerge,
GitRepositoryPullReadOnly,
RequestArtifactDefinitionGenerate,
Expand Down Expand Up @@ -499,3 +500,21 @@ async def setup_commit_automation() -> None:
else:
await client.create_automation(automation=automation)
run_log.info(f"{AUTOMATION_NAME} Created")


@flow(name="git-repository-import-object", flow_run_name="Import objects from git repository")
async def import_objects_from_git_repository(model: GitRepositoryImportObjects) -> None:
service = services.service
await add_branch_tag(model.infrahub_branch_name)
async with service.git_report(
related_node=model.repository_id,
title=f"Processing repository ({model.repository_name})",
) as git_report:
repo = await get_initialized_repo(
repository_id=model.repository_id,
name=model.repository_name,
service=services.service,
repository_kind=model.repository_kind,
)
repo.task_report = git_report
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
14 changes: 11 additions & 3 deletions backend/infrahub/graphql/mutations/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
from infrahub.core.protocols import CoreGenericRepository, CoreReadOnlyRepository, CoreRepository
from infrahub.core.schema import NodeSchema
from infrahub.exceptions import ValidationError
from infrahub.git.models import GitRepositoryAdd, GitRepositoryAddReadOnly, GitRepositoryPullReadOnly
from infrahub.git.models import (
GitRepositoryAdd,
GitRepositoryAddReadOnly,
GitRepositoryImportObjects,
GitRepositoryPullReadOnly,
)
from infrahub.graphql.types.common import IdentifierInput
from infrahub.log import get_logger
from infrahub.message_bus import messages
from infrahub.message_bus.messages.git_repository_connectivity import GitRepositoryConnectivityResponse
from infrahub.workflows.catalogue import (
GIT_REPOSITORIES_IMPORT_OBJECTS,
GIT_REPOSITORIES_PULL_READ_ONLY,
GIT_REPOSITORY_ADD,
GIT_REPOSITORY_ADD_READ_ONLY,
Expand Down Expand Up @@ -228,15 +234,17 @@ async def mutate(
branch=branch,
)

message = messages.GitRepositoryImportObjects(
model = GitRepositoryImportObjects(
repository_id=repository_id,
repository_name=str(repo.name.value),
repository_kind=repo.get_kind(),
commit=str(repo.commit.value),
infrahub_branch_name=branch.name,
)
if context.service:
await context.service.send(message=message)
await context.service.workflow.submit_workflow(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to return the task id for this mutation moving forward.
i'll be useful to provide a better experience in the frontend as well at some point

workflow=GIT_REPOSITORIES_IMPORT_OBJECTS, parameters={"model": model}
)
return {"ok": True}


Expand Down
2 changes: 0 additions & 2 deletions backend/infrahub/message_bus/messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from .git_diff_namesonly import GitDiffNamesOnly, GitDiffNamesOnlyResponse
from .git_file_get import GitFileGet, GitFileGetResponse
from .git_repository_connectivity import GitRepositoryConnectivity
from .git_repository_importobjects import GitRepositoryImportObjects
from .proposed_change.request_proposedchange_refreshartifacts import RequestProposedChangeRefreshArtifacts
from .proposed_change.request_proposedchange_repositorychecks import RequestProposedChangeRepositoryChecks
from .proposed_change.request_proposedchange_rungenerators import RequestProposedChangeRunGenerators
Expand Down Expand Up @@ -53,7 +52,6 @@
"git.diff.names_only": GitDiffNamesOnly,
"git.file.get": GitFileGet,
"git.repository.connectivity": GitRepositoryConnectivity,
"git.repository.import_objects": GitRepositoryImportObjects,
"schema.migration.path": SchemaMigrationPath,
"schema.validator.path": SchemaValidatorPath,
"refresh.git.fetch": RefreshGitFetch,
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"git.diff.names_only": git.diff.names_only,
"git.file.get": git.file.get,
"git.repository.connectivity": git.repository.connectivity,
"git.repository.import_objects": git.repository.import_objects,
"refresh.git.fetch": git.repository.fetch,
"refresh.registry.branches": refresh.registry.branches,
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
Expand Down
16 changes: 0 additions & 16 deletions backend/infrahub/message_bus/operations/git/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,6 @@ async def connectivity(message: messages.GitRepositoryConnectivity, service: Inf
await service.reply(message=response, initiator=message)


@flow(name="git-repository-import-object")
async def import_objects(message: messages.GitRepositoryImportObjects, service: InfrahubServices) -> None:
async with service.git_report(
related_node=message.repository_id,
title=f"Processing repository ({message.repository_name})",
) as git_report:
repo = await get_initialized_repo(
repository_id=message.repository_id,
name=message.repository_name,
service=service,
repository_kind=message.repository_kind,
)
repo.task_report = git_report
await repo.import_objects_from_files(infrahub_branch_name=message.infrahub_branch_name, commit=message.commit)


@flow(name="refresh-git-fetch", flow_run_name="Fetch git repository {message.repository_name} on " + WORKER_IDENTITY)
async def fetch(message: messages.RefreshGitFetch, service: InfrahubServices) -> None:
if message.meta and message.meta.initiator_id == WORKER_IDENTITY:
Expand Down
9 changes: 9 additions & 0 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@
function="setup_commit_automation",
)

GIT_REPOSITORIES_IMPORT_OBJECTS = WorkflowDefinition(
name="git-repository-import-object",
type=WorkflowType.INTERNAL,
module="infrahub.git.tasks",
function="import_objects_from_git_repository",
tags=[WorkflowTag.DATABASE_CHANGE],
)


worker_pools = [INFRAHUB_WORKER_POOL]

Expand Down Expand Up @@ -318,6 +326,7 @@
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
AUTOMATION_SCHEMA_UPDATED,
GIT_REPOSITORIES_IMPORT_OBJECTS,
]

automation_setup_workflows = [AUTOMATION_GIT_UPDATED, AUTOMATION_SCHEMA_UPDATED]
75 changes: 46 additions & 29 deletions backend/tests/unit/graphql/mutations/test_repository.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from unittest.mock import call, patch

import pytest

Expand All @@ -9,11 +10,14 @@
from infrahub.core.initialization import create_branch
from infrahub.core.manager import NodeManager
from infrahub.core.node import Node
from infrahub.git.models import GitRepositoryImportObjects
from infrahub.graphql.mutations.repository import cleanup_payload
from infrahub.message_bus import messages
from infrahub.services import InfrahubServices
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.workflows.catalogue import GIT_REPOSITORIES_IMPORT_OBJECTS
from tests.adapters.message_bus import BusRecorder
from tests.helpers.graphql import graphql_mutation
from tests.helpers.utils import init_global_service

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand All @@ -25,35 +29,48 @@ async def test_trigger_repository_import(
):
repository_model = registry.schema.get_node_schema(name=InfrahubKind.REPOSITORY, branch=default_branch)
recorder = BusRecorder()
service = InfrahubServices(database=db, message_bus=recorder)

RUN_REIMPORT = """
mutation InfrahubRepositoryProcess($id: String!) {
InfrahubRepositoryProcess(data: {id: $id}) {
ok
service = InfrahubServices(database=db, message_bus=recorder, workflow=WorkflowLocalExecution())

with init_global_service(service), patch(
"infrahub.services.adapters.workflow.local.WorkflowLocalExecution.submit_workflow"
) as mock_submit_workflow:
RUN_REIMPORT = """
mutation InfrahubRepositoryProcess($id: String!) {
InfrahubRepositoryProcess(data: {id: $id}) {
ok
}
}
}
"""

repo = await Node.init(schema=repository_model, db=db, branch=default_branch)
commit_id = "d85571671cf51f561fb0695d8657747f9ce84057"
await repo.new(db=db, name="test-edge-demo", location="/tmp/edge", commit=commit_id)
await repo.save(db=db)
result = await graphql_mutation(
query=RUN_REIMPORT,
db=db,
variables={"id": repo.id},
service=service,
)

assert not result.errors
assert result.data

assert len(recorder.messages) == 1
message = recorder.messages[0]
assert isinstance(message, messages.GitRepositoryImportObjects)
assert message.repository_id == repo.id
assert message.commit == commit_id
"""

repo = await Node.init(schema=repository_model, db=db, branch=default_branch)
commit_id = "d85571671cf51f561fb0695d8657747f9ce84057"
await repo.new(db=db, name="test-edge-demo", location="/tmp/edge", commit=commit_id)
await repo.save(db=db)
result = await graphql_mutation(
query=RUN_REIMPORT,
db=db,
variables={"id": repo.id},
service=service,
)

assert not result.errors
assert result.data

expected_calls = [
call(
workflow=GIT_REPOSITORIES_IMPORT_OBJECTS,
parameters={
"model": GitRepositoryImportObjects(
repository_id=repo.id,
repository_name=str(repo.name.value),
repository_kind=repo.get_kind(),
commit=commit_id,
infrahub_branch_name=default_branch.name,
)
},
),
]
mock_submit_workflow.assert_has_calls(expected_calls)


async def test_repository_update(db: InfrahubDatabase, register_core_models_schema: None, default_branch: Branch):
Expand Down
37 changes: 0 additions & 37 deletions docs/docs/reference/message-bus-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **repository_name** | The name of the repository | string | None |
| **repository_location** | The location of repository | string | None |
<!-- vale on -->
<!-- vale off -->
#### Event git.repository.import_objects
<!-- vale on -->

**Description**: Re run import job against an existing commit.

**Priority**: 3

<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **repository_id** | The unique ID of the Repository | string | None |
| **repository_name** | The name of the repository | string | None |
| **repository_kind** | The type of repository | string | None |
| **commit** | Specific commit to pull | string | None |
| **infrahub_branch_name** | Infrahub branch on which to sync the remote repository | string | None |
<!-- vale on -->


<!-- vale off -->
Expand Down Expand Up @@ -1130,25 +1112,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **repository_name** | The name of the repository | string | None |
| **repository_location** | The location of repository | string | None |
<!-- vale on -->
<!-- vale off -->
#### Event git.repository.import_objects
<!-- vale on -->

**Description**: Re run import job against an existing commit.

**Priority**: 3


<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **repository_id** | The unique ID of the Repository | string | None |
| **repository_name** | The name of the repository | string | None |
| **repository_kind** | The type of repository | string | None |
| **commit** | Specific commit to pull | string | None |
| **infrahub_branch_name** | Infrahub branch on which to sync the remote repository | string | None |
<!-- vale on -->


<!-- vale off -->
Expand Down
Loading