Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,38 @@ services:
awslogs-group: ${LOG_GROUP}
awslogs-stream: ${LOG_STREAM}

jobq-redis:
image: redis:8.2.3
hostname: jobq-redis
container_name: jobq-redis
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 1m30s
timeout: 10s
retries: 3
start_period: 40s
# NOTE: The dhi image doesn't work as easily due to hardened security
# image: dhi.io/redis:8.2.3
# environment:
# REDIS_PASSWORD: jobq-secret
# command:
# [
# "redis-server",
# "--bind", "0.0.0.0",
# "--protected-mode", "yes",
# "--requirepass", "jobq-secret"
# ]
# healthcheck:
# test: ["CMD", "redis-cli", "-a", "jobq-secret", "ping"]
# interval: 1m30s
# timeout: 10s
# retries: 3
# start_period: 40s
networks:
- gateway_hubmap

networks:
# This is the network created by gateway to enable communicaton between multiple docker-compose projects
gateway_hubmap:
Expand Down
4 changes: 4 additions & 0 deletions docker/search-api/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,9 @@
# 'daemon off;' is nginx configuration directive
nginx -g 'daemon off;' &

# NOTE: Explicitly call the Python executable with its full path instead of just `python` or `python3.13`
# This is due to the api-base-image v1.2.0 uses aliases
/usr/local/bin/python3.13 /usr/src/app/src/jobq_workers.py &

# Start uwsgi and keep it running in foreground
/usr/local/python3.13/bin/uwsgi --ini /usr/src/app/src/uwsgi.ini
156 changes: 156 additions & 0 deletions src/hubmap_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@

logger = logging.getLogger(__name__)

config = {}
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), 'instance'),
instance_relative_config=True)
app.config.from_pyfile('app.cfg')
config['INDICES'] = safe_load((Path(__file__).absolute().parent / 'instance/search-config.yaml').read_text())

# This list contains fields that are added to the top-level at index runtime
entity_properties_list = [
'donor',
Expand Down Expand Up @@ -685,6 +691,144 @@ def _transform_and_write_entity_to_index_group(self, entity:dict, index_group:st
f" entity['uuid']={entity['uuid']},"
f" entity['entity_type']={entity['entity_type']}")

def enqueue_reindex(self, entity_id, reindex_queue, priority):
try:
logger.info(f"Start executing translate() on entity_id: {entity_id}")
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
logger.info(f"Enqueueing reindex for {entity['entity_type']} of uuid: {entity_id}")
subsequent_priority = max(priority, 2)

job_id = reindex_queue.enqueue(
task_func=reindex_entity_queued_wrapper,
entity_id=entity_id,
args=[entity_id, self.token],
priority=priority
)
collection_associations = []
upload_associations = []
if entity['entity_type'] in ['Collection', 'Epicollection']:
collection = self.get_collection_doc(entity_id=entity_id)
if 'datasets' in collection:
logger.info(f"Enqueing {len(collection['datasets'])} datasets for {entity['entity_type']} {entity_id}")
dataset_ids = [ds['uuid'] for ds in collection['datasets']]
for dataset_id in dataset_ids:
collection_associations.append(dataset_id)
if 'associated_publication' in collection and collection['associated_publication']:
logger.info(f"Enqueueing associated_publication for {entity['entity_type']} {entity_id}")
collection_associations.append(collection['associated_publication'])

logger.info(f"Finished executing enqueue_reindex() for {entity['entity_type']} of uuid: {entity_id}")
return job_id

if entity['entity_type'] == 'Upload':
if 'datasets' in entity:
logger.info(f"Enqueueing {len(entity['datasets'])} datasets for Upload {entity_id}")
for dataset in entity['datasets']:
upload_associations.append(dataset['uuid'])
logger.info(f"Finished executing enqueue_reindex() for Upload of uuid: {entity_id}")
return job_id

logger.info(f"Calculating related entities for {entity_id}")

neo4j_ancestor_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='ancestors',
endpoint_suffix=None,
url_property='uuid'
)

neo4j_descendant_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='descendants',
endpoint_suffix=None,
url_property='uuid'
)

previous_revision_ids = []
next_revision_ids = []
neo4j_collection_ids = []
neo4j_upload_ids = []

if entity['entity_type'] in ['Dataset', 'Publication']:
previous_revision_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='previous_revisions',
endpoint_suffix=None,
url_property='uuid'
)

next_revision_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='next_revisions',
endpoint_suffix=None,
url_property='uuid'
)

neo4j_collection_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='entities',
endpoint_suffix='collections',
url_property='uuid'
)

neo4j_upload_ids = self.call_entity_api(
entity_id=entity_id,
endpoint_base='entities',
endpoint_suffix='uploads',
url_property='uuid'
)

target_ids = set(
neo4j_ancestor_ids +
neo4j_descendant_ids +
previous_revision_ids +
next_revision_ids +
neo4j_collection_ids +
neo4j_upload_ids +
upload_associations +
collection_associations
)

logger.info(f"Enqueueing {len(target_ids)} related entities for {entity_id}")

for related_entity_id in target_ids:
reindex_queue.enqueue(
task_func=reindex_entity_queued_wrapper,
entity_id=related_entity_id,
args=[entity_id, self.token],
priority=subsequent_priority
)
logger.info(f"Finished executing translate() on {entity['entity_type']} of uuid: {entity_id}")
return job_id

except Exception:
msg = "Exception during executing translate()"
logger.exception(msg)
raise

def reindex_entity_queued(self, entity_id):
try:
logger.info(f"Start executing reindex_entity_queued() on uuid: {entity_id}")
entity = self.call_entity_api(entity_id=entity_id, endpoint_base='documents')
logger.info(f"Reindexing {entity['entity_type']} of uuid: {entity_id}")

if entity['entity_type'] in ['Collection', 'Epicollection']:
self.translate_collection(entity_id, reindex=True)

elif entity['entity_type'] == 'Upload':
self.translate_upload(entity_id, reindex=True)

else:
self._call_indexer(entity=entity, delete_existing_doc_first=True)

logger.info(f"Finished executing reindex_entity_queued() on {entity['entity_type']} of uuid: {entity_id}")

except Exception as e:
msg = f"Exception during reindex_entity_queued() for uuid: {entity_id}"
logger.exception(msg)

raise

# Used by individual live reindex call
def translate(self, entity_id):
try:
Expand Down Expand Up @@ -2027,6 +2171,18 @@ def get_organ_types(self):
# Running full reindex script in command line
# This approach is different from the live /reindex-all PUT call
# It'll delete all the existing indices and recreate then then index everything


def reindex_entity_queued_wrapper(entity_id, token):
translator = Translator(
indices=config['INDICES'],
app_client_id=app.config['APP_CLIENT_ID'],
app_client_secret=app.config['APP_CLIENT_SECRET'],
token=token,
ontology_api_base_url=app.config['ONTOLOGY_API_BASE_URL']
)
translator.reindex_entity_queued(entity_id)

if __name__ == "__main__":
# Specify the absolute path of the instance folder and use the config file relative to the instance path
app = Flask(__name__, instance_path=os.path.join(os.path.abspath(os.path.dirname(__file__)), '../src/instance'),
Expand Down
11 changes: 10 additions & 1 deletion src/instance/app.cfg.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Set to False to use INFO logging level
DEBUG_MODE = True

# Globus app client ID and secret
APP_CLIENT_ID = ''
APP_CLIENT_SECRET = ''
Expand Down Expand Up @@ -29,4 +32,10 @@ PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX = {
}
}

DEBUG_MODE = False
# Reindex job queue settings
JOB_QUEUE_MODE = False
QUEUE_WORKERS = 32
REDIS_HOST = jobq-redis
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
17 changes: 17 additions & 0 deletions src/jobq_workers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import os
from flask import Flask
from atlas_consortia_jobq import JobQueue

if __name__ == '__main__':
script_dir = os.path.dirname(os.path.abspath(__file__))
app = Flask(__name__,
instance_path=os.path.join(script_dir, 'instance'),
instance_relative_config=True)
app.config.from_pyfile('app.cfg')
queue = JobQueue(
redis_host=app.config.get('REDIS_HOST', 'localhost'),
redis_port=int(app.config.get('REDIS_PORT', 6379)),
redis_db=int(app.config.get('REDIS_DB', 0))
)
queue_workers = int(app.config.get('QUEUE_WORKERS', 4))
queue.start_workers(num_workers=queue_workers)
5 changes: 5 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX'] = app.config['PARAM_SEARCH_RECOGNIZED_ENTITIES_BY_INDEX']
config['ONTOLOGY_API_BASE_URL'] = app.config['ONTOLOGY_API_BASE_URL'].strip('/')
config['DEBUG_MODE'] = app.config['DEBUG_MODE']
config['JOB_QUEUE_MODE'] = app.config['JOB_QUEUE_MODE']
config['REDIS_HOST'] = app.config['REDIS_HOST']
config['REDIS_PORT'] = app.config['REDIS_PORT']
config['REDIS_DB'] = app.config['REDIS_DB']
config['REDIS_PASSWORD'] = app.config['REDIS_PASSWORD']

if not config['ONTOLOGY_API_BASE_URL']:
raise Exception(f"Unable retrieve ontology information using"
Expand Down
2 changes: 2 additions & 0 deletions src/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ portal-visualization==0.4.20
# Use the branch name of commons from github for testing new changes made in commons from different branch
# Default is main branch specified in search-api's docker-compose.development.yml if not set
# git+https://github.com/hubmapconsortium/commons.git@${COMMONS_BRANCH}#egg=hubmap-commons

hubmap-commons==2.1.22
atlas-consortia-jobq @ git+https://github.com/x-atlas-consortia/jobq.git@dev-integrate

# The use of `-r` lets us specify the transitive requirements in one place
-r search-adaptor/src/requirements.txt
2 changes: 1 addition & 1 deletion src/search-adaptor
8 changes: 5 additions & 3 deletions src/uwsgi.ini
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ log-master=true

# Master with 4 worker process (based on CPU number)
master = true
processes = 4
processes = 8

# Enable multithreading for search-api
enable-threads = true
# Enable the multithreading within uWSGI
# Launch the application across multiple threads inside each process
enable-threads = True
threads = 8

# Use http socket for integration with nginx running on the same machine
socket = localhost:5000
Expand Down