Skip to content

Commit dfe37c7

Browse files
authored
Merge pull request #1258 from nasa/issue_1217_polarization
issue 1255: issue 1217: fix(DIST-S1): restrict number of granules in baseline
2 parents 636d69a + b89982d commit dfe37c7

File tree

7 files changed

+117
-26
lines changed

7 files changed

+117
-26
lines changed

data_subscriber/catalog.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,10 @@ def mark_download_job_id(self, batch_id, job_id):
142142
index=self.ES_INDEX_PATTERNS,
143143
body={
144144
"script": {
145-
"source": f"ctx._source.download_job_id = '{job_id}'",
145+
"source": f"ctx._source.download_job_id = params['job_id']",
146+
"params": {
147+
"job_id": str(job_id)
148+
},
146149
"lang": "painless"
147150
},
148151
"query": {

data_subscriber/cmr.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ def response_jsons_to_cmr_granules(collection, response_jsons, convert_results=T
353353
provider_datetime = provider_date["Date"]
354354
break
355355
production_datetime = item["umm"].get("DataGranule").get("ProductionDateTime")
356-
granules.append({
356+
granule = {
357357
"granule_id": item["umm"].get("GranuleUR"),
358358
"revision_id": item.get("meta").get("revision-id"),
359359
"provider": item.get("meta").get("provider-id"),
@@ -378,8 +378,14 @@ def response_jsons_to_cmr_granules(collection, response_jsons, convert_results=T
378378
attr.get("Values")[0]
379379
for attr in item["umm"].get("AdditionalAttributes")
380380
if attr.get("Name") == collection_identifier_map[collection]
381-
) if collection in collection_identifier_map else None
382-
})
381+
) if collection in collection_identifier_map else None,
382+
}
383+
if collection == Collection.RTC_S1_V1:
384+
for attr in item["umm"].get("AdditionalAttributes"):
385+
if attr.get("Name") == "POLARIZATION":
386+
polarization = attr.get("Values") # e.g. ["VV", "VH"]
387+
granule.update({"polarization": polarization})
388+
granules.append(granule)
383389

384390
return granules
385391

data_subscriber/rtc_for_dist/rtc_for_dist_catalog.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,9 @@ def form_document(self, filename: str, granule: dict, job_id: str, query_dt: dat
1919

2020
# Add http_urls and s3_urls to the document
2121
m["filtered_urls"] = granule.get("filtered_urls", [])
22+
if granule.get("polarization"):
23+
m["polarization"] = granule["polarization"]
24+
25+
m["@timestamp"] = datetime.now() # needed for opensearch
2226

2327
return m

data_subscriber/rtc_for_dist/rtc_for_dist_query.py

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
from collections import defaultdict
2-
from datetime import datetime, timedelta
3-
import dateutil
4-
from copy import deepcopy
51
import asyncio
2+
import functools
63
import json
4+
import operator
5+
import re
6+
from collections import Counter, defaultdict
7+
from copy import deepcopy
8+
from datetime import datetime, timedelta
79

8-
from util.job_submitter import try_submit_mozart_job
9-
10-
from data_subscriber.es_conn_util import get_document_timestamp_min_max
10+
import dateutil
1111

1212
from data_subscriber.cmr import CMR_TIME_FORMAT, async_query_cmr
13-
from data_subscriber.url import determine_acquisition_cycle, rtc_for_dist_unique_id
1413
from data_subscriber.cslc_utils import save_blocked_download_job, parse_r2_product_file_name
15-
from data_subscriber.query import BaseQuery, get_query_timerange, DateTimeRange
1614
from data_subscriber.dist_s1_utils import (localize_dist_burst_db, process_dist_burst_db, compute_dist_s1_triggering,
1715
extend_rtc_for_dist_records, build_rtc_native_ids, rtc_granules_by_acq_index,
1816
basic_decorate_granule, add_unique_rtc_granules, get_unique_rtc_id_for_dist,
19-
parse_k_parameter, decorate_granule, PENDING_TYPE_RTC_FOR_DIST_DOWNLOAD)
17+
parse_k_parameter, PENDING_TYPE_RTC_FOR_DIST_DOWNLOAD)
18+
from data_subscriber.es_conn_util import get_document_timestamp_min_max
19+
from data_subscriber.query import BaseQuery, DateTimeRange
2020
from data_subscriber.rtc_for_dist.dist_dependency import DistDependency, CMR_RTC_CACHE_INDEX
21-
21+
from rtc_utils import rtc_granule_regex
2222
from tools.populate_cmr_rtc_cache import populate_cmr_rtc_cache, parse_rtc_granule_metadata
23+
from util.job_submitter import try_submit_mozart_job
2324

2425
DIST_K_MULT_FACTOR = 2 # TODO: This should be a setting in probably settings.yaml; must be an integer
2526
EARLIEST_POSSIBLE_RTC_DATE = "2016-01-01T00:00:00Z"
@@ -298,7 +299,7 @@ def retrieve_baseline_granules(self, product_id, downloads, args, k_offsets_and_
298299
self.logger.info(f"Retrieving K-1 granules {start_date=} {end_date=} for {product_id=}")
299300
self.logger.debug(new_args)
300301

301-
# Step 1 of 2: This will return dict of acquisition_cycle -> set of granules for only onse that match the burst pattern
302+
# Step 1 of 3: This will return dict of acquisition_cycle -> set of granules for only onse that match the burst pattern
302303
granules = asyncio.run(async_query_cmr(new_args, self.token, self.cmr, self.settings, query_timerange, datetime.now(), verbose=verbose))
303304
for granule in granules:
304305
basic_decorate_granule(granule)
@@ -307,29 +308,72 @@ def retrieve_baseline_granules(self, product_id, downloads, args, k_offsets_and_
307308
granules = self.unique_latest_granules(granules)
308309
granules_map = rtc_granules_by_acq_index(granules)
309310

310-
# Step 2 of 2 ...Sort that by acquisition_cycle in decreasing order and then pick the first k-1 frames
311+
# Step 2 of 3 ...Sort that by acquisition_cycle in decreasing order and then pick the first k-1 frames
311312
acq_day_indices = sorted(granules_map.keys(), reverse=True)
313+
possible_k_granules = []
312314
for acq_day_index in acq_day_indices:
313315
granules = granules_map[acq_day_index]
314-
k_granules.extend(granules)
316+
possible_k_granules.extend(granules)
315317
k_satisfied += 1
316318
self.logger.info(f"{product_id=} {acq_day_index=} satisfies. {k_satisfied=} {k_offset=} {k_count=} {len(granules)=}")
317319
if k_satisfied == k_count:
318320
break
319321

320322
counter += 1
321323

324+
# Step 3 of 3: Only copy over k_count per burst_id from possible_k_granules to k_granules
325+
burst_id_to_granules_map = defaultdict(list)
326+
for granule in possible_k_granules:
327+
match_product_id = re.match(rtc_granule_regex, granule["granule_id"])
328+
burst_id = match_product_id.group("burst_id")
329+
if len(burst_id_to_granules_map[burst_id]) >= k_count:
330+
continue # skip any extra baseline granules per burst_id, capping the number to k_count, per k_offset
331+
332+
burst_id_to_granules_map[burst_id].append(granule)
333+
burst_id_to_granules_map = dict(burst_id_to_granules_map)
334+
335+
possible_k_granules = functools.reduce(operator.add, burst_id_to_granules_map.values(), [])
336+
k_granules.extend(possible_k_granules)
337+
338+
self.logger.info(f"{len(k_granules)=}")
322339
return k_granules
323340

324341
def download_job_submission_handler(self, total_granules, query_timerange):
325342

326343
def add_filtered_urls(granule, filtered_urls: list):
327344
if granule.get("filtered_urls"):
345+
polarizations = []
328346
for filter_url in granule.get("filtered_urls"):
329-
# Get rid of .h and mask.tif files that aren't used
330-
# NOTE: If we want to enable https downloads in the download worker, we need to change this
331-
if "s3://" in filter_url and (filter_url[-6:] in ["VV.tif", "VH.tif", "HH.tif", "HV.tif"]):
332-
filtered_urls.append(filter_url)
347+
if filter_url.endswith("VV.tif"):
348+
polarizations.append("VVVH")
349+
if filter_url.endswith("HH.tif"):
350+
polarizations.append("HHHV")
351+
352+
most_common_polarization = Counter(polarizations).most_common(1)
353+
354+
if most_common_polarization and most_common_polarization[0][0] == "VVVH":
355+
for filter_url in granule.get("filtered_urls"):
356+
# NOTE: If we want to enable https downloads in the download worker, we need to change this
357+
if not filter_url.startswith("s3://"):
358+
continue
359+
360+
if any(filter_url.endswith(s) for s in ["VV.tif", "VH.tif"]):
361+
filtered_urls.append(filter_url)
362+
elif most_common_polarization and most_common_polarization[0][0] == "HHHV":
363+
for filter_url in granule.get("filtered_urls"):
364+
# NOTE: If we want to enable https downloads in the download worker, we need to change this
365+
if not filter_url.startswith("s3://"):
366+
continue
367+
368+
if any(filter_url.endswith(s) for s in ["HH.tif", "HV.tif"]):
369+
filtered_urls.append(filter_url)
370+
else:
371+
self.logger.error(f"Unexpected polarization {most_common_polarization=}. Falling back to regular filtering.")
372+
for filter_url in granule.get("filtered_urls"):
373+
# Get rid of .h and mask.tif files that aren't used
374+
# NOTE: If we want to enable https downloads in the download worker, we need to change this
375+
if "s3://" in filter_url and (filter_url[-6:] in ["VV.tif", "VH.tif", "HH.tif", "HV.tif"]):
376+
filtered_urls.append(filter_url)
333377

334378
batch_id_to_urls_map = defaultdict(list)
335379
batch_id_to_baseline_urls = defaultdict(list)

docker/job-spec.json.rtc_for_dist_query

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
{
22
"command":"/home/ops/verdi/ops/opera-pcm/data_subscriber/rtc_for_dist/rtc_for_dist_query.sh",
33
"disk_usage":"1GB",
4-
"soft_time_limit": 3600,
5-
"time_limit": 3660,
4+
"soft_time_limit": 9939,
5+
"time_limit": 9999,
66
"imported_worker_files": {
77
"$HOST_VERDI_HOME/.netrc": "/home/ops/.netrc",
88
"$HOST_VERDI_HOME/.aws": "/home/ops/.aws",

opera_commons/es_connection.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
#!/usr/bin/env python
2-
from typing import Union
2+
from typing import Union, Any
3+
from urllib.parse import urlparse, unquote
34

45
from hysds.celery import app
56
from hysds_commons.elasticsearch_utils import ElasticsearchUtility
67
from pcm_commons.query.ancillary_utility import AncillaryUtility
78
from opera_commons.constants import product_metadata
89

9-
from .logger import logger as default_logger
10+
from opera_commons.logger import logger as default_logger
1011

1112

1213
GRQ_ES = None
@@ -79,6 +80,7 @@ def get_mozart_es(logger):
7980
es_cluster_mode = app.conf['ES_CLUSTER_MODE']
8081
if es_cluster_mode:
8182
hosts = [app.conf.JOBS_ES_URL, app.conf.GRQ_ES_URL, app.conf.METRICS_ES_URL]
83+
hosts = _normalize_hosts(hosts)
8284
else:
8385
hosts = [app.conf.JOBS_ES_URL]
8486

@@ -96,3 +98,34 @@ def get_mozart_es(logger):
9698
retry_on_timeout=True,
9799
)
98100
return MOZART_ES
101+
102+
def _normalize_hosts(hosts: Any) -> Any:
103+
out = []
104+
# normalize hosts to dicts
105+
for host in hosts:
106+
if "://" not in host:
107+
host = f"//{host}" # type: ignore
108+
109+
parsed_url = urlparse(host)
110+
h = {"host": parsed_url.hostname}
111+
112+
if parsed_url.port:
113+
h["port"] = parsed_url.port
114+
else:
115+
h["port"] = 9200
116+
117+
if parsed_url.scheme == "https":
118+
h["port"] = parsed_url.port or 443
119+
h["use_ssl"] = True
120+
121+
if parsed_url.username or parsed_url.password:
122+
h["http_auth"] = "{}:{}".format(
123+
unquote(parsed_url.username),
124+
unquote(parsed_url.password),
125+
)
126+
127+
if parsed_url.path and parsed_url.path != "/":
128+
h["url_prefix"] = parsed_url.path
129+
130+
out.append(h)
131+
return out

tools/populate_cmr_rtc_cache.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ def populate_cmr_rtc_cache(granules: List[Dict[str, Any]], es_conn) -> None:
138138

139139
# Prepare document for indexing
140140
doc = {
141+
"@timestamp": datetime.now(),
141142
"granule_id": granule["granule_id"],
142143
"burst_id": granule["burst_id"],
143144
"acquisition_timestamp": granule["acquisition_timestamp"],

0 commit comments

Comments
 (0)