diff --git a/Dockerfile.autoreducer b/Dockerfile.autoreducer index d3c0e80b..f8c2d693 100644 --- a/Dockerfile.autoreducer +++ b/Dockerfile.autoreducer @@ -1,9 +1,9 @@ FROM registry.access.redhat.com/ubi9/ubi # install various dependencies -RUN curl http://packages.sns.gov/distros/rhel/9/sns/sns.repo -o /etc/dnf.repos.d/sns.repo || echo "Cannot see packages.sns.gov" RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm RUN dnf updateinfo +RUN dnf install -y procps-ng # pgrep is used for health check # rpm needs to special groups first RUN groupadd snswheel @@ -12,16 +12,10 @@ RUN useradd snsdata -G snswheel # put the developer configuration file in place RUN mkdir -p /etc/autoreduce/ RUN mkdir -p /opt/postprocessing/log/ -RUN curl https://raw.githubusercontent.com/neutrons/post_processing_agent/v3.0/configuration/post_process_consumer.conf.development -o /etc/autoreduce/post_processing.conf -RUN dnf install -y jq -RUN contents="$(jq 'del(.processors)' /etc/autoreduce/post_processing.conf)" && \ -echo -E "${contents}" > /etc/autoreduce/post_processing.conf - -# This configuration allows it to run with docker-compose from https://github.com/neutrons/data_workflow -RUN sed -i 's/localhost/activemq/' /etc/autoreduce/post_processing.conf +COPY tests/configuration/post_process_consumer.conf /etc/autoreduce/post_processing.conf # install postprocessing -RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.0/postprocessing-3.0.0-1.el9.noarch.rpm +RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.1/postprocessing-3.1.0-1.el9.noarch.rpm # install the fake test data ARG DATA_TARBALL=/tmp/SNSdata.tar.gz diff --git a/Dockerfile.autoreducer.himem b/Dockerfile.autoreducer.himem new file mode 100644 index 00000000..16ed944b --- /dev/null +++ b/Dockerfile.autoreducer.himem @@ -0,0 +1,39 @@ +FROM registry.access.redhat.com/ubi9/ubi + +# install various dependencies +RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm +RUN dnf updateinfo +RUN dnf install -y procps-ng # pgrep is used for health check + +# rpm needs to special groups first +RUN groupadd snswheel +RUN useradd snsdata -G snswheel + +# put the developer configuration file in place +RUN mkdir -p /etc/autoreduce/ +RUN mkdir -p /opt/postprocessing/log/ +COPY tests/configuration/post_process_consumer.himem.conf /etc/autoreduce/post_processing.conf + +# install postprocessing +RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.1/postprocessing-3.1.0-1.el9.noarch.rpm + +# install the fake test data +ARG DATA_TARBALL=/tmp/SNSdata.tar.gz +COPY SNSdata.tar.gz ${DATA_TARBALL} +RUN ls ${DATA_TARBALL} +RUN mkdir /SNS +RUN cd /SNS && tar xzf ${DATA_TARBALL} + +# add fake ONCat ingest scripts +RUN touch /opt/postprocessing/scripts/oncat_ingest.py && \ + touch /opt/postprocessing/scripts/oncat_reduced_ingest.py + +# create startup script +RUN echo "#!/bin/bash" > /usr/bin/run_postprocessing && \ + echo "/opt/postprocessing/queueProcessor.py &" >> /usr/bin/run_postprocessing && \ + echo "sleep 1" >> /usr/bin/run_postprocessing && \ + echo "tail -F /opt/postprocessing/log/postprocessing.log" >> /usr/bin/run_postprocessing && \ + chmod +x /usr/bin/run_postprocessing + +# start the service +CMD run_postprocessing diff --git a/docker-compose.yml b/docker-compose.yml index 85220dcb..40640dea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,8 +115,6 @@ services: context: . dockerfile: Dockerfile.autoreducer network: host - ports: - - "8888:8888" hostname: autoreducer healthcheck: test: ["CMD", "pgrep", "python"] @@ -124,6 +122,18 @@ services: activemq: condition: service_healthy + autoreducer_himem: + build: + context: . + dockerfile: Dockerfile.autoreducer.himem + network: host + hostname: autoreducer.himem + healthcheck: + test: ["CMD", "pgrep", "python"] + depends_on: + activemq: + condition: service_healthy + amq_test_gen: restart: always build: diff --git a/tests/configuration/post_process_consumer.conf b/tests/configuration/post_process_consumer.conf new file mode 100644 index 00000000..7875163d --- /dev/null +++ b/tests/configuration/post_process_consumer.conf @@ -0,0 +1,28 @@ +{ + "failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1", + "brokers": [["activemq", 61613]], + "amq_user": "icat", + "amq_pwd": "icat", + "sw_dir": "/opt/postprocessing", + "python_dir": "/opt/postprocessing/postprocessing", + "start_script": "python3", + "task_script": "PostProcessAdmin.py", + "task_script_queue_arg": "-q", + "task_script_data_arg": "-d", + "log_file": "/opt/postprocessing/log/postprocessing.log", + "postprocess_error": "POSTPROCESS.ERROR", + "reduction_started": "REDUCTION.STARTED", + "reduction_complete": "REDUCTION.COMPLETE", + "reduction_error": "REDUCTION.ERROR", + "reduction_disabled": "REDUCTION.DISABLED", + "heart_beat": "/topic/SNS.COMMON.STATUS.AUTOREDUCE.0", + "dev_output_dir": "", + "reduction_data_ready": "REDUCTION.DATA_READY", + "communication_only": 0, + "max_procs": 5, + "processors": ["oncat_processor.ONCatProcessor", + "oncat_reduced_processor.ONCatProcessor", + "create_reduction_script_processor.CreateReductionScriptProcessor", + "reduction_processor.ReductionProcessor" + ] +} diff --git a/tests/configuration/post_process_consumer.himem.conf b/tests/configuration/post_process_consumer.himem.conf new file mode 100644 index 00000000..6baa114f --- /dev/null +++ b/tests/configuration/post_process_consumer.himem.conf @@ -0,0 +1,24 @@ +{ + "failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1", + "brokers": [["activemq", 61613]], + "amq_user": "icat", + "amq_pwd": "icat", + "sw_dir": "/opt/postprocessing", + "python_dir": "/opt/postprocessing/postprocessing", + "start_script": "python3", + "task_script": "PostProcessAdmin.py", + "task_script_queue_arg": "-q", + "task_script_data_arg": "-d", + "log_file": "/opt/postprocessing/log/postprocessing.log", + "postprocess_error": "POSTPROCESS.ERROR", + "reduction_started": "REDUCTION.STARTED", + "reduction_complete": "REDUCTION.COMPLETE", + "reduction_error": "REDUCTION.ERROR", + "reduction_disabled": "REDUCTION.DISABLED", + "heart_beat": "/topic/SNS.COMMON.STATUS.AUTOREDUCE.0", + "dev_output_dir": "", + "reduction_data_ready": "REDUCTION.DATA_READY", + "communication_only": 0, + "max_procs": 5, + "processors": ["reduction_processor.ReductionProcessorHighMemory"] +} diff --git a/tests/data/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5 b/tests/data/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5 new file mode 100644 index 00000000..e69de29b diff --git a/tests/test_DASMONPageView.py b/tests/test_DASMONPageView.py index aac57b43..8c86a895 100644 --- a/tests/test_DASMONPageView.py +++ b/tests/test_DASMONPageView.py @@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics): tree = etree.parse(StringIO(dasmon_diagnostics.text), parser) table_content = tree.xpath("//tr/td//text()") # verify number of entries in the tables - expected_number_of_entries = 39 + expected_number_of_entries = 43 assert len(table_content) == expected_number_of_entries # -- DASMON diagnostics status = table_content[1] diff --git a/tests/test_autoreducer_high_memory.py b/tests/test_autoreducer_high_memory.py new file mode 100644 index 00000000..f9bc8d13 --- /dev/null +++ b/tests/test_autoreducer_high_memory.py @@ -0,0 +1,228 @@ +"""This is to test that the reduction tasks go to the correct autoreducer node +depending on if it requires high memoery or not""" +import psycopg2 +import requests +import time +from dotenv import dotenv_values + + +class TestAutoreducerQueues: + user = "InstrumentScientist" + pwd = "InstrumentScientist" + conn = None + instrument = "vulcan" + IPTS = "IPTS-1234" + run_number = 12345 + + def setup_class(cls): + config = {**dotenv_values(".env"), **dotenv_values(".env.ci")} + assert config + cls.conn = psycopg2.connect( + database=config["DATABASE_NAME"], + user=config["DATABASE_USER"], + password=config["DATABASE_PASS"], + port=config["DATABASE_PORT"], + host="localhost", + ) + time.sleep(1) + + def teardown_class(cls): + cls.conn.close() + + def login(self, next, username, password): + # taken from test_RunPageView.py - consolidate as helper somewhere? + URL = "http://localhost/users/login?next=" + client = requests.session() + + # Retrieve the CSRF token first + client.get(URL) # sets the cookie + csrftoken = client.cookies["csrftoken"] + + login_data = dict(username=username, password=password, csrfmiddlewaretoken=csrftoken) + return client.post(URL + next, data=login_data, timeout=None) + + def create_test_data(self): + """create the instrument, ipts and datarun if they don't already exist + + returns the id for the created rundata""" + conn = TestAutoreducerQueues.conn + cursor = conn.cursor() + + cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,)) + inst_id = cursor.fetchone() + + if inst_id is None: + cursor.execute("INSERT INTO report_instrument (name) VALUES (%s);", (self.instrument,)) + cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,)) + inst_id = cursor.fetchone() + conn.commit() + + cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,)) + ipts_id = cursor.fetchone() + if ipts_id is None: + cursor.execute( + "INSERT INTO report_ipts (expt_name, created_on) VALUES (%s, %s);", + ("IPTS-1234", "2020-05-20 13:02:52.281964;"), + ) + cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,)) + ipts_id = cursor.fetchone() + conn.commit() + + cursor.execute( + "SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;", + (self.run_number, ipts_id[0], inst_id[0]), + ) + run_id = cursor.fetchone() + if run_id is None: + cursor.execute( + "INSERT INTO report_datarun (run_number, ipts_id_id, instrument_id_id, file, created_on) " + "VALUES (%s, %s, %s, %s, %s);", + ( + self.run_number, + ipts_id[0], + inst_id[0], + "/SNS/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5", + "2020-05-20 13:02:52.281964;", + ), + ) + cursor.execute( + "SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;", + (self.run_number, ipts_id[0], inst_id[0]), + ) + run_id = cursor.fetchone() + conn.commit() + + return run_id + + def get_status_queue_id(self, cursor, queue_name): + """return the if for the statusqueue for the provided name""" + cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,)) + queue_id = cursor.fetchone() + + if queue_id is None: + cursor.execute( + "INSERT INTO report_statusqueue (name, is_workflow_input) VALUES (%s, %s);", (queue_name, False) + ) + cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,)) + queue_id = cursor.fetchone() + + return queue_id[0] + + def set_reduction_request_queue(self, queue_name): + """create the task to send REDUCTION.REQUEST to the provided queue""" + conn = TestAutoreducerQueues.conn + cursor = conn.cursor() + + cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,)) + inst_id = cursor.fetchone()[0] + + queue_id = self.get_status_queue_id(cursor, queue_name) + success_queue_id = self.get_status_queue_id(cursor, "REDUCTION.COMPLETE") + reduction_request_queue_id = self.get_status_queue_id(cursor, "REDUCTION.REQUEST") + + cursor.execute( + "SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;", + (inst_id, reduction_request_queue_id), + ) + task_id = cursor.fetchone() + + if task_id is None: + cursor.execute( + "INSERT INTO report_task (instrument_id_id, input_queue_id_id) VALUES (%s, %s)", + (inst_id, reduction_request_queue_id), + ) + cursor.execute( + "SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;", + (inst_id, reduction_request_queue_id), + ) + task_id = cursor.fetchone() + conn.commit() + + task_id = task_id[0] + + cursor.execute("DELETE FROM report_task_task_queue_ids WHERE task_id = %s", (task_id,)) + cursor.execute("DELETE FROM report_task_success_queue_ids WHERE task_id = %s", (task_id,)) + + cursor.execute( + "INSERT INTO report_task_task_queue_ids (task_id, statusqueue_id) VALUES (%s, %s)", (task_id, queue_id) + ) + cursor.execute( + "INSERT INTO report_task_success_queue_ids (task_id, statusqueue_id) VALUES (%s, %s)", + (task_id, success_queue_id), + ) + conn.commit() + + def clear_previous_runstatus(self, run_id): + """remove all previous run statuses for the given run_id""" + conn = TestAutoreducerQueues.conn + cursor = conn.cursor() + # delete all information entries for runstatus + cursor.execute( + "DELETE FROM report_information WHERE run_status_id_id IN (SELECT id FROM report_runstatus " + "WHERE run_id_id = %s);", + run_id, + ) + cursor.execute("DELETE FROM report_runstatus WHERE run_id_id = %s;", run_id) + conn.commit() + + def get_autoreducer_hostname(self, run_id): + """return the hostname that executed the task that is stored in the report information""" + conn = TestAutoreducerQueues.conn + cursor = conn.cursor() + queue_id = self.get_status_queue_id(cursor, "REDUCTION.STARTED") + cursor.execute("SELECT id FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id)) + runstatus_id = cursor.fetchone()[0] + cursor.execute("SELECT description FROM report_information WHERE run_status_id_id = %s", (runstatus_id,)) + description = cursor.fetchone()[0] + return description + + def check_run_status_exist(self, run_id, queue_name): + """return if the run status was created for the given run_id and queue_name""" + conn = TestAutoreducerQueues.conn + cursor = conn.cursor() + queue_id = self.get_status_queue_id(cursor, queue_name) + cursor.execute("SELECT * FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id)) + return cursor.fetchone() is not None + + def test_normal_reduction_queue(self): + # switch to the REDUCTION.DATA_READY queue and check that the task goes to the correct node + run_id = self.create_test_data() + self.clear_previous_runstatus(run_id) + + self.set_reduction_request_queue("REDUCTION.DATA_READY") + + # login and send reduction request + response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd) + assert response.status_code == 200 + assert response.url.endswith("/report/vulcan/12345/") + + # wait for database to get updated + time.sleep(1.0) + + assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST") + assert self.check_run_status_exist(run_id, "REDUCTION.STARTED") + assert self.check_run_status_exist(run_id, "REDUCTION.DATA_READY") + assert not self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY") + + assert self.get_autoreducer_hostname(run_id) == "autoreducer" + + def test_himem_reduction_queue(self): + # switch to the REDUCTION.HIMEM.DATA_READY queue and check that the task goes to the correct node + run_id = self.create_test_data() + self.clear_previous_runstatus(run_id) + + self.set_reduction_request_queue("REDUCTION.HIMEM.DATA_READY") + # login and send reduction request + response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd) + assert response.status_code == 200 + assert response.url.endswith("/report/vulcan/12345/") + + # wait for database to get updated + time.sleep(1.0) + + assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST") + assert self.check_run_status_exist(run_id, "REDUCTION.STARTED") + assert not self.check_run_status_exist(run_id, "REDUCTION.DATA_READY") + assert self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY") + + assert self.get_autoreducer_hostname(run_id) == "autoreducer.himem"