Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import Resources on pipeline registration #1229

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beagle/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,4 +468,8 @@

APP_CACHE = os.environ.get("BEAGLE_APP_CACHE", "/tmp")

REFERENCE_STORAGE_ID = os.environ.get("BEAGLE_REFERENCE_STORAGE_ID")
REFERENCE_FILE_GROUP_ID = os.environ.get("BEAGLE_REFERENCE_FILE_GROUP_ID")
APP_REFERENCE_FILES_PATH = os.environ.get("BEAGLE_APP_REFERENCE_FILES_PATH", "./reference/reference.json")

CONTACT_EMAIL = os.environ.get("EVENTS_CONTACT_EMAIL", "")
1 change: 1 addition & 0 deletions beagle_etl/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def at_start(sender, **k):
"runner.tasks.terminate_job": {"queue": settings.BEAGLE_RUNNER_QUEUE},
"runner.tasks.complete_job": {"queue": settings.BEAGLE_RUNNER_QUEUE},
"runner.tasks.fail_job": {"queue": settings.BEAGLE_RUNNER_QUEUE},
"runner.tasks.register_reference_files": {"queue": settings.BEAGLE_RUNNER_QUEUE},
"notifier.tasks.send_notification": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"file_system.tasks.populate_job_group_notifier_metadata": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
"beagle_etl.tasks.job_processor": {"queue": settings.BEAGLE_DEFAULT_QUEUE},
Expand Down
4 changes: 4 additions & 0 deletions beagle_etl/tests/jobs/test_metadb.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from deepdiff import DeepDiff
from django.test import TestCase
from django.conf import settings
from django.db.models import signals
from django.contrib.auth.models import User
from beagle_etl.models import SMILEMessage
from beagle_etl.models import JobGroup, JobGroupNotifier, Notifier
Expand All @@ -16,9 +17,12 @@
from file_system.models import Request, Sample, Patient, FileMetadata
from file_system.repository import FileRepository
from study.objects import StudyObject
from runner.tasks import register_pipeline_reference_files
from runner.models import Pipeline


class TestNewRequest(TestCase):
signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [
"file_system.filegroup.json",
"file_system.filetype.json",
Expand Down
2 changes: 1 addition & 1 deletion notifier/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def notifier_start(job_group, request_id, operator=None, metadata={}):
except Notifier.DoesNotExist:
pass
job_group_notifier = JobGroupNotifier.objects.create(
job_group=job_group, request_id=request_id, notifier_type=notifier
job_group_id=job_group, request_id=request_id, notifier_type=notifier
)
eh = event_handler(job_group_notifier.id)
notifier_id = eh.start(request_id)
Expand Down
21 changes: 21 additions & 0 deletions runner/migrations/0058_pipeline_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django 2.2.28 on 2023-05-30 14:43

from django.db import migrations, models
import runner.models


class Migration(migrations.Migration):

dependencies = [
("runner", "0057_auto_20230424_0743"),
]

operations = [
migrations.AddField(
model_name="pipeline",
name="status",
field=models.IntegerField(
choices=[(0, "CWL"), (1, "NEXTFLOW")], db_index=True, default=runner.models.PipelineStatus(0)
),
),
]
8 changes: 8 additions & 0 deletions runner/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,18 @@ class PipelineName(models.Model):
name = models.CharField(max_length=30, null=False, blank=False)


class PipelineStatus(IntEnum):
PREPARING = 0
READY = 1


class Pipeline(BaseModel):
pipeline_type = models.IntegerField(
choices=[(pt.value, pt.name) for pt in ProtocolType], db_index=True, default=ProtocolType.CWL
)
status = models.IntegerField(
choices=[(pt.value, pt.name) for pt in ProtocolType], db_index=True, default=PipelineStatus.PREPARING
)
pipeline_name = models.ForeignKey(PipelineName, null=True, blank=True, on_delete=models.SET_NULL)
name = models.CharField(max_length=100, editable=True)
github = models.CharField(max_length=300, editable=True)
Expand Down
28 changes: 28 additions & 0 deletions runner/pipeline/pipeline_resolver.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import json
import os
import git
import uuid
import shutil
import logging
from django.conf import settings
from file_system.repository import FileRepository
from runner.cache.github_cache import GithubCache
from runner.run.processors.file_processor import FileProcessor


class PipelineResolver(object):
Expand Down Expand Up @@ -41,6 +45,30 @@ def _cleanup(self, location):
else:
shutil.rmtree(location)

def import_reference_files(self):
dir_name = self._dir_name()
pipeline_path = self._git_clone(dir_name)
absolute_path = os.path.join(pipeline_path, settings.APP_REFERENCE_FILES_PATH)
logging.info(f"Locating reference file in {absolute_path}")
if os.path.exists(absolute_path):
with open(absolute_path, "r") as f:
files = json.load(f)
for f in files:
if not FileRepository.filter(
path=f["location"], file_group=settings.REFERENCE_FILE_GROUP_ID
).first():
logging.info(f"Registering {f}")
FileProcessor.create_file_obj(
f["location"],
f["size"],
f["checksum"],
settings.REFERENCE_FILE_GROUP_ID,
)
else:
logging.info(f"Pipeline doesn't have reference file in {absolute_path}")
logging.info(f"Cleanup pipeline directory {dir_name}")
self._cleanup(dir_name)

def load(self):
pass

Expand Down
24 changes: 23 additions & 1 deletion runner/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from celery import shared_task
from django.conf import settings
from django.db.models import Count
from runner.pipeline.pipeline_resolver import PipelineResolver
from runner.run.objects.run_object_factory import RunObjectFactory
from .models import Run, RunStatus, OperatorRun, TriggerAggregateConditionType, TriggerRunType, Pipeline
from .models import Run, RunStatus, OperatorRun, TriggerAggregateConditionType, TriggerRunType, Pipeline, PipelineStatus
from notifier.events import (
RunFinishedEvent,
OperatorRequestEvent,
Expand Down Expand Up @@ -40,6 +41,8 @@
from study.objects import StudyObject
from study.models import JobGroupWatcher, JobGroupWatcherConfig
from django.http import HttpResponse
from django.dispatch import receiver
from django.db.models.signals import post_save


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -817,6 +820,25 @@ def add_pipeline_to_cache(github, version):
GithubCache.add(github, version)


@shared_task
def register_reference_files(pipeline_id):
try:
pipeline = Pipeline.objects.get(id=pipeline_id)
except Pipeline.DoesNotExist:
logging.error(f"Pipeline with id:{pipeline_id} doesn't exist")
return
resolver = PipelineResolver(pipeline.github, pipeline.entrypoint, pipeline.version)
resolver.import_reference_files()
pipeline.status = PipelineStatus.READY
pipeline.save()


@receiver(post_save, sender=Pipeline)
def register_pipeline_reference_files(sender, instance, created, **kwargs):
if created:
register_reference_files.delay(str(instance.id))


class cmo_dmp_manifest:
"""
Description:
Expand Down
6 changes: 4 additions & 2 deletions runner/tests/operator/access/access_cnv/test_cnv_operator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os

from django.test import TestCase

from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.cnv import AccessLegacyCNVOperator
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files


FIXTURES = [
Expand All @@ -31,7 +33,7 @@


class TestAccessCNVOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_cnv_operator(self):
Expand Down
6 changes: 4 additions & 2 deletions runner/tests/operator/access/access_msi/test_msi_operator.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os

from django.test import TestCase

from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.msi import AccessLegacyMSIOperator
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files


FIXTURES = [
Expand All @@ -31,7 +33,7 @@


class TestAccessMSIOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_msi_operator(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from django.test import TestCase

from file_system.models import File
from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.snps_and_indels import AccessLegacySNVOperator
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files

REQUEST_ID = "access_legacy_test_request"
TEST_RUN_ID = "bc23076e-f477-4578-943c-1fbf6f1fca44"
Expand All @@ -33,6 +36,7 @@


class TestAccessSNVOperator(TestCase):
signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_snv_operator(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from django.test import TestCase

from beagle.settings import ROOT_DIR
from django.db.models import signals
from beagle_etl.models import Operator
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.structural_variants import AccessLegacySVOperator
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files


FIXTURES = [
Expand Down Expand Up @@ -33,7 +36,7 @@


class TestAccessSVOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_sv_operator(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os

from django.test import TestCase

from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from file_system.models import File, FileMetadata
from runner.operator.operator_factory import OperatorFactory
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files


FIXTURES = [
Expand All @@ -28,7 +30,7 @@


class TestAccessLegacyOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_operator(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os

from django.test import TestCase

from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from file_system.models import File, FileMetadata
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.merge_fastqs import AccessLegacyFastqMergeOperator, construct_inputs

from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files

FIXTURES = [
"fixtures/tests/merge_fastqs/10151_F_13.file.json",
Expand All @@ -27,7 +27,7 @@


class TestAccessFastqMergeOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_legacy_fastq_merge_operator(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os

from django.test import TestCase

from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from file_system.models import File, FileMetadata
from runner.operator.operator_factory import OperatorFactory
from runner.operator.access.v1_0_0.merge_fastqs import AccessLegacyFastqMergeOperator, construct_inputs

from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files

FIXTURES = [
"fixtures/tests/merge_fastqs/10151_F_13.file.json",
Expand All @@ -27,7 +27,7 @@


class TestAccessNucleoOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_nucleo_operator(self):
Expand Down
5 changes: 4 additions & 1 deletion runner/tests/operator/access/v2_0_0/qc/test_qc_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from django.test import TestCase

from beagle import settings
from django.db.models import signals
from beagle.settings import ROOT_DIR
from beagle_etl.models import Operator
from file_system.models import File, FileMetadata
from runner.operator.operator_factory import OperatorFactory
from runner.models import Pipeline
from runner.tasks import register_pipeline_reference_files


FIXTURES = [
Expand All @@ -28,7 +31,7 @@


class TestAccessQCOperator(TestCase):

signals.post_save.disconnect(register_pipeline_reference_files, sender=Pipeline)
fixtures = [os.path.join(ROOT_DIR, f) for f in FIXTURES + COMMON_FIXTURES]

def test_access_qc_operator(self):
Expand Down
Loading