Skip to content

Commit

Permalink
Migrate GitRepositoriesImportObjects to prefect
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasG0 committed Nov 15, 2024
1 parent f00a6ae commit fc5f3f7
Show file tree
Hide file tree
Showing 10 changed files with 89 additions and 103 deletions.
10 changes: 10 additions & 0 deletions backend/infrahub/git/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,13 @@ class GitRepositoryMerge(BaseModel):
source_branch: str = Field(..., description="The source branch")
destination_branch: str = Field(..., description="The source branch")
default_branch: str = Field(..., description="The default branch in Git")


class GitRepositoryImportObjects(BaseModel):
"""Re run import job against an existing commit."""

repository_id: str = Field(..., description="The unique ID of the Repository")
repository_name: str = Field(..., description="The name of the repository")
repository_kind: str = Field(..., description="The type of repository")
commit: str = Field(..., description="Specific commit to pull")
infrahub_branch_name: str = Field(..., description="Infrahub branch on which to sync the remote repository")
13 changes: 13 additions & 0 deletions backend/infrahub/git/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from .models import (
GitRepositoryAdd,
GitRepositoryAddReadOnly,
GitRepositoryImportObjects,
GitRepositoryMerge,
GitRepositoryPullReadOnly,
RequestArtifactDefinitionGenerate,
Expand Down Expand Up @@ -499,3 +500,15 @@ async def setup_commit_automation() -> None:
else:
await client.create_automation(automation=automation)
run_log.info(f"{AUTOMATION_NAME} Created")


@flow(name="git-repository-import-object", flow_run_name="Import objects from git repository")
async def import_objects_from_git_repository(model: GitRepositoryImportObjects) -> None:
await add_branch_tag(model.infrahub_branch_name)
repo = await get_initialized_repo(
repository_id=model.repository_id,
name=model.repository_name,
service=services.service,
repository_kind=model.repository_kind,
)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
14 changes: 11 additions & 3 deletions backend/infrahub/graphql/mutations/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@
from infrahub.core.protocols import CoreGenericRepository, CoreReadOnlyRepository, CoreRepository
from infrahub.core.schema import NodeSchema
from infrahub.exceptions import ValidationError
from infrahub.git.models import GitRepositoryAdd, GitRepositoryAddReadOnly, GitRepositoryPullReadOnly
from infrahub.git.models import (
GitRepositoryAdd,
GitRepositoryAddReadOnly,
GitRepositoryImportObjects,
GitRepositoryPullReadOnly,
)
from infrahub.graphql.types.common import IdentifierInput
from infrahub.log import get_logger
from infrahub.message_bus import messages
from infrahub.message_bus.messages.git_repository_connectivity import GitRepositoryConnectivityResponse
from infrahub.workflows.catalogue import (
GIT_REPOSITORIES_IMPORT_OBJECTS,
GIT_REPOSITORIES_PULL_READ_ONLY,
GIT_REPOSITORY_ADD,
GIT_REPOSITORY_ADD_READ_ONLY,
Expand Down Expand Up @@ -228,15 +234,17 @@ async def mutate(
branch=branch,
)

message = messages.GitRepositoryImportObjects(
model = GitRepositoryImportObjects(
repository_id=repository_id,
repository_name=str(repo.name.value),
repository_kind=repo.get_kind(),
commit=str(repo.commit.value),
infrahub_branch_name=branch.name,
)
if context.service:
await context.service.send(message=message)
await context.service.workflow.submit_workflow(
workflow=GIT_REPOSITORIES_IMPORT_OBJECTS, parameters={"model": model}
)
return {"ok": True}


Expand Down
2 changes: 0 additions & 2 deletions backend/infrahub/message_bus/messages/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from .git_diff_namesonly import GitDiffNamesOnly, GitDiffNamesOnlyResponse
from .git_file_get import GitFileGet, GitFileGetResponse
from .git_repository_connectivity import GitRepositoryConnectivity
from .git_repository_importobjects import GitRepositoryImportObjects
from .proposed_change.request_proposedchange_refreshartifacts import RequestProposedChangeRefreshArtifacts
from .proposed_change.request_proposedchange_repositorychecks import RequestProposedChangeRepositoryChecks
from .proposed_change.request_proposedchange_rungenerators import RequestProposedChangeRunGenerators
Expand Down Expand Up @@ -53,7 +52,6 @@
"git.diff.names_only": GitDiffNamesOnly,
"git.file.get": GitFileGet,
"git.repository.connectivity": GitRepositoryConnectivity,
"git.repository.import_objects": GitRepositoryImportObjects,
"schema.migration.path": SchemaMigrationPath,
"schema.validator.path": SchemaValidatorPath,
"refresh.git.fetch": RefreshGitFetch,
Expand Down

This file was deleted.

1 change: 0 additions & 1 deletion backend/infrahub/message_bus/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
"git.diff.names_only": git.diff.names_only,
"git.file.get": git.file.get,
"git.repository.connectivity": git.repository.connectivity,
"git.repository.import_objects": git.repository.import_objects,
"refresh.git.fetch": git.repository.fetch,
"refresh.registry.branches": refresh.registry.branches,
"refresh.registry.rebased_branch": refresh.registry.rebased_branch,
Expand Down
16 changes: 0 additions & 16 deletions backend/infrahub/message_bus/operations/git/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,6 @@ async def connectivity(message: messages.GitRepositoryConnectivity, service: Inf
await service.reply(message=response, initiator=message)


@flow(name="git-repository-import-object")
async def import_objects(message: messages.GitRepositoryImportObjects, service: InfrahubServices) -> None:
async with service.git_report(
related_node=message.repository_id,
title=f"Processing repository ({message.repository_name})",
) as git_report:
repo = await get_initialized_repo(
repository_id=message.repository_id,
name=message.repository_name,
service=service,
repository_kind=message.repository_kind,
)
repo.task_report = git_report
await repo.import_objects_from_files(infrahub_branch_name=message.infrahub_branch_name, commit=message.commit)


@flow(name="refresh-git-fetch", flow_run_name="Fetch git repository {message.repository_name} on " + WORKER_IDENTITY)
async def fetch(message: messages.RefreshGitFetch, service: InfrahubServices) -> None:
if message.meta and message.meta.initiator_id == WORKER_IDENTITY:
Expand Down
9 changes: 9 additions & 0 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@
function="setup_commit_automation",
)

GIT_REPOSITORIES_IMPORT_OBJECTS = WorkflowDefinition(
name="git-repository-import-object",
type=WorkflowType.INTERNAL,
module="infrahub.git.tasks",
function="import_objects_from_git_repository",
tags=[WorkflowTag.DATABASE_CHANGE],
)


worker_pools = [INFRAHUB_WORKER_POOL]

Expand Down Expand Up @@ -318,6 +326,7 @@
UPDATE_COMPUTED_ATTRIBUTE_TRANSFORM,
REQUEST_PROPOSED_CHANGE_DATA_INTEGRITY,
AUTOMATION_SCHEMA_UPDATED,
GIT_REPOSITORIES_IMPORT_OBJECTS,
]

automation_setup_workflows = [AUTOMATION_GIT_UPDATED, AUTOMATION_SCHEMA_UPDATED]
75 changes: 46 additions & 29 deletions backend/tests/unit/graphql/mutations/test_repository.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from unittest.mock import call, patch

import pytest

Expand All @@ -9,11 +10,14 @@
from infrahub.core.initialization import create_branch
from infrahub.core.manager import NodeManager
from infrahub.core.node import Node
from infrahub.git.models import GitRepositoryImportObjects
from infrahub.graphql.mutations.repository import cleanup_payload
from infrahub.message_bus import messages
from infrahub.services import InfrahubServices
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.workflows.catalogue import GIT_REPOSITORIES_IMPORT_OBJECTS
from tests.adapters.message_bus import BusRecorder
from tests.helpers.graphql import graphql_mutation
from tests.helpers.utils import init_global_service

if TYPE_CHECKING:
from infrahub.core.branch import Branch
Expand All @@ -25,35 +29,48 @@ async def test_trigger_repository_import(
):
repository_model = registry.schema.get_node_schema(name=InfrahubKind.REPOSITORY, branch=default_branch)
recorder = BusRecorder()
service = InfrahubServices(database=db, message_bus=recorder)

RUN_REIMPORT = """
mutation InfrahubRepositoryProcess($id: String!) {
InfrahubRepositoryProcess(data: {id: $id}) {
ok
service = InfrahubServices(database=db, message_bus=recorder, workflow=WorkflowLocalExecution())

with init_global_service(service), patch(
"infrahub.services.adapters.workflow.local.WorkflowLocalExecution.submit_workflow"
) as mock_submit_workflow:
RUN_REIMPORT = """
mutation InfrahubRepositoryProcess($id: String!) {
InfrahubRepositoryProcess(data: {id: $id}) {
ok
}
}
}
"""

repo = await Node.init(schema=repository_model, db=db, branch=default_branch)
commit_id = "d85571671cf51f561fb0695d8657747f9ce84057"
await repo.new(db=db, name="test-edge-demo", location="/tmp/edge", commit=commit_id)
await repo.save(db=db)
result = await graphql_mutation(
query=RUN_REIMPORT,
db=db,
variables={"id": repo.id},
service=service,
)

assert not result.errors
assert result.data

assert len(recorder.messages) == 1
message = recorder.messages[0]
assert isinstance(message, messages.GitRepositoryImportObjects)
assert message.repository_id == repo.id
assert message.commit == commit_id
"""

repo = await Node.init(schema=repository_model, db=db, branch=default_branch)
commit_id = "d85571671cf51f561fb0695d8657747f9ce84057"
await repo.new(db=db, name="test-edge-demo", location="/tmp/edge", commit=commit_id)
await repo.save(db=db)
result = await graphql_mutation(
query=RUN_REIMPORT,
db=db,
variables={"id": repo.id},
service=service,
)

assert not result.errors
assert result.data

expected_calls = [
call(
workflow=GIT_REPOSITORIES_IMPORT_OBJECTS,
parameters={
"model": GitRepositoryImportObjects(
repository_id=repo.id,
repository_name=str(repo.name.value),
repository_kind=repo.get_kind(),
commit=commit_id,
infrahub_branch_name=default_branch.name,
)
},
),
]
mock_submit_workflow.assert_has_calls(expected_calls)


async def test_repository_update(db: InfrahubDatabase, register_core_models_schema: None, default_branch: Branch):
Expand Down
37 changes: 0 additions & 37 deletions docs/docs/reference/message-bus-events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -371,24 +371,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **repository_name** | The name of the repository | string | None |
| **repository_location** | The location of repository | string | None |
<!-- vale on -->
<!-- vale off -->
#### Event git.repository.import_objects
<!-- vale on -->

**Description**: Re run import job against an existing commit.

**Priority**: 3

<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **repository_id** | The unique ID of the Repository | string | None |
| **repository_name** | The name of the repository | string | None |
| **repository_kind** | The type of repository | string | None |
| **commit** | Specific commit to pull | string | None |
| **infrahub_branch_name** | Infrahub branch on which to sync the remote repository | string | None |
<!-- vale on -->


<!-- vale off -->
Expand Down Expand Up @@ -1130,25 +1112,6 @@ For more detailed explanations on how to use these events within Infrahub, see t
| **repository_name** | The name of the repository | string | None |
| **repository_location** | The location of repository | string | None |
<!-- vale on -->
<!-- vale off -->
#### Event git.repository.import_objects
<!-- vale on -->

**Description**: Re run import job against an existing commit.

**Priority**: 3


<!-- vale off -->
| Key | Description | Type | Default Value |
|-----|-------------|------|---------------|
| **meta** | Meta properties for the message | N/A | None |
| **repository_id** | The unique ID of the Repository | string | None |
| **repository_name** | The name of the repository | string | None |
| **repository_kind** | The type of repository | string | None |
| **commit** | Specific commit to pull | string | None |
| **infrahub_branch_name** | Infrahub branch on which to sync the remote repository | string | None |
<!-- vale on -->


<!-- vale off -->
Expand Down

0 comments on commit fc5f3f7

Please sign in to comment.