Skip to content

Commit

Permalink
Merge pull request #159 from neutrons/autoreducer_himem
Browse files Browse the repository at this point in the history
Move autoreducer to 3.1 in docker containers and add systemtest for new HIMEM autoreducer queue
  • Loading branch information
rosswhitfield authored Apr 19, 2024
2 parents 2dd77ec + 6ca5e85 commit 2fb877c
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 12 deletions.
12 changes: 3 additions & 9 deletions Dockerfile.autoreducer
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
FROM registry.access.redhat.com/ubi9/ubi

# install various dependencies
RUN curl http://packages.sns.gov/distros/rhel/9/sns/sns.repo -o /etc/dnf.repos.d/sns.repo || echo "Cannot see packages.sns.gov"
RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
RUN dnf updateinfo
RUN dnf install -y procps-ng # pgrep is used for health check

# rpm needs to special groups first
RUN groupadd snswheel
Expand All @@ -12,16 +12,10 @@ RUN useradd snsdata -G snswheel
# put the developer configuration file in place
RUN mkdir -p /etc/autoreduce/
RUN mkdir -p /opt/postprocessing/log/
RUN curl https://raw.githubusercontent.com/neutrons/post_processing_agent/v3.0/configuration/post_process_consumer.conf.development -o /etc/autoreduce/post_processing.conf
RUN dnf install -y jq
RUN contents="$(jq 'del(.processors)' /etc/autoreduce/post_processing.conf)" && \
echo -E "${contents}" > /etc/autoreduce/post_processing.conf

# This configuration allows it to run with docker-compose from https://github.com/neutrons/data_workflow
RUN sed -i 's/localhost/activemq/' /etc/autoreduce/post_processing.conf
COPY tests/configuration/post_process_consumer.conf /etc/autoreduce/post_processing.conf

# install postprocessing
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.0/postprocessing-3.0.0-1.el9.noarch.rpm
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.1/postprocessing-3.1.0-1.el9.noarch.rpm

# install the fake test data
ARG DATA_TARBALL=/tmp/SNSdata.tar.gz
Expand Down
39 changes: 39 additions & 0 deletions Dockerfile.autoreducer.himem
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM registry.access.redhat.com/ubi9/ubi

# install various dependencies
RUN dnf install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-9.noarch.rpm
RUN dnf updateinfo
RUN dnf install -y procps-ng # pgrep is used for health check

# rpm needs to special groups first
RUN groupadd snswheel
RUN useradd snsdata -G snswheel

# put the developer configuration file in place
RUN mkdir -p /etc/autoreduce/
RUN mkdir -p /opt/postprocessing/log/
COPY tests/configuration/post_process_consumer.himem.conf /etc/autoreduce/post_processing.conf

# install postprocessing
RUN dnf install -y https://github.com/neutrons/post_processing_agent/releases/download/v3.1/postprocessing-3.1.0-1.el9.noarch.rpm

# install the fake test data
ARG DATA_TARBALL=/tmp/SNSdata.tar.gz
COPY SNSdata.tar.gz ${DATA_TARBALL}
RUN ls ${DATA_TARBALL}
RUN mkdir /SNS
RUN cd /SNS && tar xzf ${DATA_TARBALL}

# add fake ONCat ingest scripts
RUN touch /opt/postprocessing/scripts/oncat_ingest.py && \
touch /opt/postprocessing/scripts/oncat_reduced_ingest.py

# create startup script
RUN echo "#!/bin/bash" > /usr/bin/run_postprocessing && \
echo "/opt/postprocessing/queueProcessor.py &" >> /usr/bin/run_postprocessing && \
echo "sleep 1" >> /usr/bin/run_postprocessing && \
echo "tail -F /opt/postprocessing/log/postprocessing.log" >> /usr/bin/run_postprocessing && \
chmod +x /usr/bin/run_postprocessing

# start the service
CMD run_postprocessing
14 changes: 12 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,25 @@ services:
context: .
dockerfile: Dockerfile.autoreducer
network: host
ports:
- "8888:8888"
hostname: autoreducer
healthcheck:
test: ["CMD", "pgrep", "python"]
depends_on:
activemq:
condition: service_healthy

autoreducer_himem:
build:
context: .
dockerfile: Dockerfile.autoreducer.himem
network: host
hostname: autoreducer.himem
healthcheck:
test: ["CMD", "pgrep", "python"]
depends_on:
activemq:
condition: service_healthy

amq_test_gen:
restart: always
build:
Expand Down
28 changes: 28 additions & 0 deletions tests/configuration/post_process_consumer.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [["activemq", 61613]],
"amq_user": "icat",
"amq_pwd": "icat",
"sw_dir": "/opt/postprocessing",
"python_dir": "/opt/postprocessing/postprocessing",
"start_script": "python3",
"task_script": "PostProcessAdmin.py",
"task_script_queue_arg": "-q",
"task_script_data_arg": "-d",
"log_file": "/opt/postprocessing/log/postprocessing.log",
"postprocess_error": "POSTPROCESS.ERROR",
"reduction_started": "REDUCTION.STARTED",
"reduction_complete": "REDUCTION.COMPLETE",
"reduction_error": "REDUCTION.ERROR",
"reduction_disabled": "REDUCTION.DISABLED",
"heart_beat": "/topic/SNS.COMMON.STATUS.AUTOREDUCE.0",
"dev_output_dir": "",
"reduction_data_ready": "REDUCTION.DATA_READY",
"communication_only": 0,
"max_procs": 5,
"processors": ["oncat_processor.ONCatProcessor",
"oncat_reduced_processor.ONCatProcessor",
"create_reduction_script_processor.CreateReductionScriptProcessor",
"reduction_processor.ReductionProcessor"
]
}
24 changes: 24 additions & 0 deletions tests/configuration/post_process_consumer.himem.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"failover_uri": "failover:(tcp://localhost:61613)?randomize=false,startupMaxReconnectAttempts=100,initialReconnectDelay=1000,maxReconnectDelay=5000,maxReconnectAttempts=-1",
"brokers": [["activemq", 61613]],
"amq_user": "icat",
"amq_pwd": "icat",
"sw_dir": "/opt/postprocessing",
"python_dir": "/opt/postprocessing/postprocessing",
"start_script": "python3",
"task_script": "PostProcessAdmin.py",
"task_script_queue_arg": "-q",
"task_script_data_arg": "-d",
"log_file": "/opt/postprocessing/log/postprocessing.log",
"postprocess_error": "POSTPROCESS.ERROR",
"reduction_started": "REDUCTION.STARTED",
"reduction_complete": "REDUCTION.COMPLETE",
"reduction_error": "REDUCTION.ERROR",
"reduction_disabled": "REDUCTION.DISABLED",
"heart_beat": "/topic/SNS.COMMON.STATUS.AUTOREDUCE.0",
"dev_output_dir": "",
"reduction_data_ready": "REDUCTION.DATA_READY",
"communication_only": 0,
"max_procs": 5,
"processors": ["reduction_processor.ReductionProcessorHighMemory"]
}
Empty file.
2 changes: 1 addition & 1 deletion tests/test_DASMONPageView.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def testVerifyDASMONPageView(self, dasmon_diagnostics):
tree = etree.parse(StringIO(dasmon_diagnostics.text), parser)
table_content = tree.xpath("//tr/td//text()")
# verify number of entries in the tables
expected_number_of_entries = 39
expected_number_of_entries = 43
assert len(table_content) == expected_number_of_entries
# -- DASMON diagnostics
status = table_content[1]
Expand Down
228 changes: 228 additions & 0 deletions tests/test_autoreducer_high_memory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""This is to test that the reduction tasks go to the correct autoreducer node
depending on if it requires high memoery or not"""
import psycopg2
import requests
import time
from dotenv import dotenv_values


class TestAutoreducerQueues:
user = "InstrumentScientist"
pwd = "InstrumentScientist"
conn = None
instrument = "vulcan"
IPTS = "IPTS-1234"
run_number = 12345

def setup_class(cls):
config = {**dotenv_values(".env"), **dotenv_values(".env.ci")}
assert config
cls.conn = psycopg2.connect(
database=config["DATABASE_NAME"],
user=config["DATABASE_USER"],
password=config["DATABASE_PASS"],
port=config["DATABASE_PORT"],
host="localhost",
)
time.sleep(1)

def teardown_class(cls):
cls.conn.close()

def login(self, next, username, password):
# taken from test_RunPageView.py - consolidate as helper somewhere?
URL = "http://localhost/users/login?next="
client = requests.session()

# Retrieve the CSRF token first
client.get(URL) # sets the cookie
csrftoken = client.cookies["csrftoken"]

login_data = dict(username=username, password=password, csrfmiddlewaretoken=csrftoken)
return client.post(URL + next, data=login_data, timeout=None)

def create_test_data(self):
"""create the instrument, ipts and datarun if they don't already exist
returns the id for the created rundata"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()

if inst_id is None:
cursor.execute("INSERT INTO report_instrument (name) VALUES (%s);", (self.instrument,))
cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()
conn.commit()

cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
if ipts_id is None:
cursor.execute(
"INSERT INTO report_ipts (expt_name, created_on) VALUES (%s, %s);",
("IPTS-1234", "2020-05-20 13:02:52.281964;"),
)
cursor.execute("SELECT id FROM report_ipts WHERE expt_name = %s;", (self.IPTS,))
ipts_id = cursor.fetchone()
conn.commit()

cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
if run_id is None:
cursor.execute(
"INSERT INTO report_datarun (run_number, ipts_id_id, instrument_id_id, file, created_on) "
"VALUES (%s, %s, %s, %s, %s);",
(
self.run_number,
ipts_id[0],
inst_id[0],
"/SNS/VULCAN/IPTS-1234/nexus/VULCAN_12345.nxs.h5",
"2020-05-20 13:02:52.281964;",
),
)
cursor.execute(
"SELECT id FROM report_datarun WHERE run_number = %s AND ipts_id_id = %s AND instrument_id_id = %s;",
(self.run_number, ipts_id[0], inst_id[0]),
)
run_id = cursor.fetchone()
conn.commit()

return run_id

def get_status_queue_id(self, cursor, queue_name):
"""return the if for the statusqueue for the provided name"""
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

if queue_id is None:
cursor.execute(
"INSERT INTO report_statusqueue (name, is_workflow_input) VALUES (%s, %s);", (queue_name, False)
)
cursor.execute("SELECT id FROM report_statusqueue where name = %s;", (queue_name,))
queue_id = cursor.fetchone()

return queue_id[0]

def set_reduction_request_queue(self, queue_name):
"""create the task to send REDUCTION.REQUEST to the provided queue"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()

cursor.execute("SELECT id FROM report_instrument where name = %s;", (self.instrument,))
inst_id = cursor.fetchone()[0]

queue_id = self.get_status_queue_id(cursor, queue_name)
success_queue_id = self.get_status_queue_id(cursor, "REDUCTION.COMPLETE")
reduction_request_queue_id = self.get_status_queue_id(cursor, "REDUCTION.REQUEST")

cursor.execute(
"SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;",
(inst_id, reduction_request_queue_id),
)
task_id = cursor.fetchone()

if task_id is None:
cursor.execute(
"INSERT INTO report_task (instrument_id_id, input_queue_id_id) VALUES (%s, %s)",
(inst_id, reduction_request_queue_id),
)
cursor.execute(
"SELECT id FROM report_task where instrument_id_id = %s AND input_queue_id_id = %s;",
(inst_id, reduction_request_queue_id),
)
task_id = cursor.fetchone()
conn.commit()

task_id = task_id[0]

cursor.execute("DELETE FROM report_task_task_queue_ids WHERE task_id = %s", (task_id,))
cursor.execute("DELETE FROM report_task_success_queue_ids WHERE task_id = %s", (task_id,))

cursor.execute(
"INSERT INTO report_task_task_queue_ids (task_id, statusqueue_id) VALUES (%s, %s)", (task_id, queue_id)
)
cursor.execute(
"INSERT INTO report_task_success_queue_ids (task_id, statusqueue_id) VALUES (%s, %s)",
(task_id, success_queue_id),
)
conn.commit()

def clear_previous_runstatus(self, run_id):
"""remove all previous run statuses for the given run_id"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
# delete all information entries for runstatus
cursor.execute(
"DELETE FROM report_information WHERE run_status_id_id IN (SELECT id FROM report_runstatus "
"WHERE run_id_id = %s);",
run_id,
)
cursor.execute("DELETE FROM report_runstatus WHERE run_id_id = %s;", run_id)
conn.commit()

def get_autoreducer_hostname(self, run_id):
"""return the hostname that executed the task that is stored in the report information"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = self.get_status_queue_id(cursor, "REDUCTION.STARTED")
cursor.execute("SELECT id FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
runstatus_id = cursor.fetchone()[0]
cursor.execute("SELECT description FROM report_information WHERE run_status_id_id = %s", (runstatus_id,))
description = cursor.fetchone()[0]
return description

def check_run_status_exist(self, run_id, queue_name):
"""return if the run status was created for the given run_id and queue_name"""
conn = TestAutoreducerQueues.conn
cursor = conn.cursor()
queue_id = self.get_status_queue_id(cursor, queue_name)
cursor.execute("SELECT * FROM report_runstatus WHERE run_id_id = %s AND queue_id_id = %s", (run_id, queue_id))
return cursor.fetchone() is not None

def test_normal_reduction_queue(self):
# switch to the REDUCTION.DATA_READY queue and check that the task goes to the correct node
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)

self.set_reduction_request_queue("REDUCTION.DATA_READY")

# login and send reduction request
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(1.0)

assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert not self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(run_id) == "autoreducer"

def test_himem_reduction_queue(self):
# switch to the REDUCTION.HIMEM.DATA_READY queue and check that the task goes to the correct node
run_id = self.create_test_data()
self.clear_previous_runstatus(run_id)

self.set_reduction_request_queue("REDUCTION.HIMEM.DATA_READY")
# login and send reduction request
response = self.login("/report/vulcan/12345/reduce/", self.user, self.pwd)
assert response.status_code == 200
assert response.url.endswith("/report/vulcan/12345/")

# wait for database to get updated
time.sleep(1.0)

assert self.check_run_status_exist(run_id, "REDUCTION.REQUEST")
assert self.check_run_status_exist(run_id, "REDUCTION.STARTED")
assert not self.check_run_status_exist(run_id, "REDUCTION.DATA_READY")
assert self.check_run_status_exist(run_id, "REDUCTION.HIMEM.DATA_READY")

assert self.get_autoreducer_hostname(run_id) == "autoreducer.himem"

0 comments on commit 2fb877c

Please sign in to comment.