From bcbe6840f20ba6519d2759f01cf8f2d51f2e39c6 Mon Sep 17 00:00:00 2001 From: "Adam Ginsburg (keflavich)" Date: Wed, 29 May 2024 16:31:06 -0400 Subject: [PATCH] WIP: nrao archive query - TAP only so far, based on ALMA (additional commit message was just debug notes and should be ignored) add back tapsql stuff remove alma stuff from nrao add nrao obscore thing Adapt region queries to work with NRAO TAP service add tapsql.py for nrao, updates to data columns supported by NRAO TAP fix imports implemented data retrieval code flush add VLA handling fix the wait-until-done step flex fixes --- astroquery/nrao/__init__.py | 44 ++++ astroquery/nrao/core.py | 455 ++++++++++++++++++++++++++++++++++++ astroquery/nrao/tapsql.py | 265 +++++++++++++++++++++ 3 files changed, 764 insertions(+) create mode 100644 astroquery/nrao/__init__.py create mode 100644 astroquery/nrao/core.py create mode 100644 astroquery/nrao/tapsql.py diff --git a/astroquery/nrao/__init__.py b/astroquery/nrao/__init__.py new file mode 100644 index 0000000000..fd6eb14fa7 --- /dev/null +++ b/astroquery/nrao/__init__.py @@ -0,0 +1,44 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst +""" +NRAO Archive service. +""" +from astropy import config as _config + + +# list the URLs here separately so they can be used in tests. +_url_list = ['https://data.nrao.edu' + ] + +tap_urls = ['https://data-query.nrao.edu/'] + +auth_urls = ['data.nrao.edu'] + + +class Conf(_config.ConfigNamespace): + """ + Configuration parameters for `astroquery.nrao`. + """ + + timeout = _config.ConfigItem(60, "Timeout in seconds.") + + archive_url = _config.ConfigItem( + _url_list, + 'The NRAO Archive mirror to use.') + + auth_url = _config.ConfigItem( + auth_urls, + 'NRAO Central Authentication Service URLs' + ) + + username = _config.ConfigItem( + "", + 'Optional default username for NRAO archive.') + + +conf = Conf() + +from .core import Nrao, NraoClass, NRAO_BANDS + +__all__ = ['Nrao', 'NraoClass', + 'Conf', 'conf', + ] diff --git a/astroquery/nrao/core.py b/astroquery/nrao/core.py new file mode 100644 index 0000000000..77dcf5c987 --- /dev/null +++ b/astroquery/nrao/core.py @@ -0,0 +1,455 @@ +# Licensed under a 3-clause BSD style license - see LICENSE.rst + +import os.path +import keyring +import numpy as np +import re +import tarfile +import string +import requests +import warnings +import json +import time + +from pkg_resources import resource_filename +from bs4 import BeautifulSoup +import pyvo +from urllib.parse import urljoin + +from astropy.table import Table, Column, vstack +from astroquery import log +from astropy.utils.console import ProgressBar +from astropy import units as u +from astropy.time import Time + +try: + from pyvo.dal.sia2 import SIA2_PARAMETERS_DESC, SIA2Service +except ImportError: + # Can be removed once min version of pyvo is 1.5 + from pyvo.dal.sia2 import SIA_PARAMETERS_DESC as SIA2_PARAMETERS_DESC + from pyvo.dal.sia2 import SIAService as SIA2Service + +from ..exceptions import LoginError +from ..utils import commons +from ..utils.process_asyncs import async_to_sync +from ..query import BaseQuery, QueryWithLogin, BaseVOQuery +from . import conf, auth_urls, tap_urls +from astroquery.exceptions import CorruptDataWarning +from ..alma.tapsql import (_gen_str_sql, _gen_numeric_sql, + _gen_band_list_sql, _gen_datetime_sql, _gen_pol_sql, _gen_pub_sql, + _gen_science_sql, _gen_spec_res_sql, ALMA_DATE_FORMAT) +from .tapsql import (_gen_pos_sql) + +__all__ = {'NraoClass',} + +__doctest_skip__ = ['NraoClass.*'] + +NRAO_BANDS = { + 'L': (1*u.GHz, 2*u.GHz), + 'S': (2*u.GHz, 4*u.GHz), + 'C': (4*u.GHz, 8*u.GHz), + 'X': (8*u.GHz, 12*u.GHz), + 'U': (12*u.GHz, 18*u.GHz), + 'K': (18*u.GHz, 26*u.GHz), + 'A': (26*u.GHz, 39*u.GHz), + 'Q': (39*u.GHz, 50*u.GHz), + 'W': (80*u.GHz, 115*u.GHz) +} + +TAP_SERVICE_PATH = 'tap' + +NRAO_FORM_KEYS = { + 'Position': { + 'Source name (astropy Resolver)': ['source_name_resolver', + 'SkyCoord.from_name', _gen_pos_sql], + 'Source name (NRAO)': ['source_name', 'target_name', _gen_str_sql], + 'RA Dec (Sexagesimal)': ['ra_dec', 's_ra, s_dec', _gen_pos_sql], + 'Galactic (Degrees)': ['galactic', 'gal_longitude, gal_latitude', + _gen_pos_sql], + 'Angular resolution (arcsec)': ['spatial_resolution', + 'spatial_resolution', _gen_numeric_sql], + 'Field of view (arcsec)': ['fov', 's_fov', _gen_numeric_sql], + 'Configuration': ['configuration', 'configuration', _gen_numeric_sql], + 'Maximum UV Distance (meters)': ['max_uv_dist', 'max_uv_dist', _gen_numeric_sql] + + + }, + 'Project': { + 'Project code': ['project_code', 'project_code', _gen_str_sql], + 'Telescope': ['instrument', 'instrument_name', _gen_str_sql], + 'Number of Antennas': ['n_ants', 'num_antennas', _gen_str_sql], + + }, + 'Time': { + 'Observation start': ['start_date', 't_min', _gen_datetime_sql], + 'Observation end': ['end_date', 't_max', _gen_datetime_sql], + 'Integration time (s)': ['integration_time', 't_exptime', + _gen_numeric_sql] + }, + 'Polarization': { + 'Polarisation type (Single, Dual, Full)': ['polarisation_type', + 'pol_states', _gen_pol_sql] + }, + 'Energy': { + 'Frequency (GHz)': ['frequency', 'center_frequencies', _gen_numeric_sql], + 'Bandwidth (Hz)': ['bandwidth', 'aggregate_bandwidth', _gen_numeric_sql], + 'Spectral resolution (KHz)': ['spectral_resolution', + 'em_resolution', _gen_spec_res_sql], + 'Band': ['band_list', 'band_list', _gen_band_list_sql] + }, + +} + +_OBSCORE_TO_NRAORESULT = { + 's_ra': 'RA', + 's_dec': 'Dec', +} + + +def _gen_sql(payload): + sql = 'select * from tap_schema.obscore' + where = '' + unused_payload = payload.copy() + if payload: + for constraint in payload: + for attrib_category in NRAO_FORM_KEYS.values(): + for attrib in attrib_category.values(): + if constraint in attrib: + # use the value and the second entry in attrib which + # is the new name of the column + val = payload[constraint] + if constraint == 'em_resolution': + # em_resolution does not require any transformation + attrib_where = _gen_numeric_sql(constraint, val) + else: + attrib_where = attrib[2](attrib[1], val) + if attrib_where: + if where: + where += ' AND ' + else: + where = ' WHERE ' + where += attrib_where + + # Delete this key to see what's left over afterward + # + # Use pop to avoid the slight possibility of trying to remove + # an already removed key + unused_payload.pop(constraint) + + if unused_payload: + # Left over (unused) constraints passed. Let the user know. + remaining = [f'{p} -> {unused_payload[p]}' for p in unused_payload] + raise TypeError(f'Unsupported arguments were passed:\n{remaining}') + + return sql + where + + +class NraoAuth(BaseVOQuery, BaseQuery): + pass + +class NraoClass(BaseQuery): + TIMEOUT = conf.timeout + archive_url = conf.archive_url + USERNAME = conf.username + + def __init__(self): + # sia service does not need disambiguation but tap does + super().__init__() + self._sia = None + self._tap = None + self._datalink = None + self._sia_url = None + self._tap_url = None + self._datalink_url = None + self._auth = NraoAuth() + + @property + def auth(self): + return self._auth + + @property + def datalink(self): + if not self._datalink: + self._datalink = pyvo.dal.adhoc.DatalinkService(self.datalink_url) + return self._datalink + + @property + def datalink_url(self): + if not self._datalink_url: + try: + self._datalink_url = urljoin(self._get_dataarchive_url(), DATALINK_SERVICE_PATH) + except requests.exceptions.HTTPError as err: + log.debug( + f"ERROR getting the NRAO Archive URL: {str(err)}") + raise err + return self._datalink_url + + @property + def sia(self): + if not self._sia: + self._sia = SIA2Service(baseurl=self.sia_url) + return self._sia + + @property + def sia_url(self): + if not self._sia_url: + try: + self._sia_url = urljoin(self._get_dataarchive_url(), SIA_SERVICE_PATH) + except requests.exceptions.HTTPError as err: + log.debug( + f"ERROR getting the NRAO Archive URL: {str(err)}") + raise err + return self._sia_url + + @property + def tap(self): + if not self._tap: + self._tap = pyvo.dal.tap.TAPService(baseurl=self.tap_url, session=self._session) + return self._tap + + @property + def tap_url(self): + if not self._tap_url: + try: + self._tap_url = urljoin(self._get_dataarchive_url(), TAP_SERVICE_PATH) + except requests.exceptions.HTTPError as err: + log.debug( + f"ERROR getting the NRAO Archive URL: {str(err)}") + raise err + return self._tap_url + + def query_tap(self, query, maxrec=None): + """ + Send query to the NRAO TAP. Results in pyvo.dal.TapResult format. + result.table in Astropy table format + + Parameters + ---------- + maxrec : int + maximum number of records to return + + """ + log.debug('TAP query: {}'.format(query)) + return self.tap.search(query, language='ADQL', maxrec=maxrec) + + def _get_dataarchive_url(self): + return tap_urls[0] + + def query_object_async(self, object_name, *, payload=None, **kwargs): + """ + Query the archive for a source name. + + Parameters + ---------- + object_name : str + The object name. Will be resolved by astropy.coord.SkyCoord + payload : dict + Dictionary of additional keywords. See `help`. + """ + if payload is not None: + payload['source_name_resolver'] = object_name + else: + payload = {'source_name_resolver': object_name} + return self.query_async(payload=payload, **kwargs) + + def query_region_async(self, coordinate, radius, *, + get_query_payload=False, + payload=None, **kwargs): + """ + Query the NRAO archive with a source name and radius + + Parameters + ---------- + coordinates : str / `astropy.coordinates` + the identifier or coordinates around which to query. + radius : str / `~astropy.units.Quantity`, optional + the radius of the region + payload : dict + Dictionary of additional keywords. See `help`. + """ + rad = radius + if not isinstance(radius, u.Quantity): + rad = radius*u.deg + obj_coord = commons.parse_coordinates(coordinate).icrs + ra_dec = '{}, {}'.format(obj_coord.to_string(), rad.to(u.deg).value) + if payload is None: + payload = {} + if 'ra_dec' in payload: + payload['ra_dec'] += ' | {}'.format(ra_dec) + else: + payload['ra_dec'] = ra_dec + + if get_query_payload: + return payload + + return self.query_async(payload=payload, **kwargs) + + def query_async(self, payload, *, get_query_payload=False, + maxrec=None, **kwargs): + """ + Perform a generic query with user-specified payload + + Parameters + ---------- + payload : dictionary + Please consult the `help` method + legacy_columns : bool + True to return the columns from the obsolete NRAO advanced query, + otherwise return the current columns based on ObsCore model. + get_query_payload : bool + Flag to indicate whether to simply return the payload. + maxrec : integer + Cap on the amount of records returned. Default is no limit. + + Returns + ------- + + Table with results. Columns are those in the NRAO ObsCore model + (see ``help_tap``) unless ``legacy_columns`` argument is set to True. + """ + + if payload is None: + payload = {} + for arg in kwargs: + value = kwargs[arg] + if arg in payload: + payload[arg] = '{} {}'.format(payload[arg], value) + else: + payload[arg] = value + print(payload) + query = _gen_sql(payload) + print(query) + if get_query_payload: + # Return the TAP query payload that goes out to the server rather + # than the unprocessed payload dict from the python side + return query + + result = self.query_tap(query, maxrec=maxrec) + + if result is not None: + result = result.to_table() + else: + # Should not happen + raise RuntimeError('BUG: Unexpected result None') + + return result + + + def _get_data(self, solr_id, email=None, workflow='runBasicMsWorkflow', + apply_flags=True + ): + """ + Defining this as a private function for now because it's using an + unverified API + + Parameters + ---------- + workflow : 'runBasicMsWorkflow', "runDownloadWorkflow" + """ + url = f'{self.archive_url}/portal/#/subscanViewer/{solr_id}' + + #self._session.headers['User-Agent'] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36" + + resp = self._request('GET', url, cache=False) + resp.raise_for_status() + + eb_deets = self._request('GET', + f'{self.archive_url}/archive-service/restapi_get_full_exec_block_details', + params={'solr_id': solr_id}, + cache=False + ) + eb_deets.raise_for_status() + assert len(self._session.cookies) > 0 + + resp1b = self._request('GET', + f'{self.archive_url}/archive-service/restapi_spw_details_view', + params={'exec_block_id': solr_id.split(":")[-1]}, + cache=False + ) + resp1b.raise_for_status() + + # returned data is doubly json-encoded + jd = json.loads(eb_deets.json()) + locator = jd['curr_eb']['sci_prod_locator'] + project_code = jd['curr_eb']['project_code'] + + instrument = ('VLBA' if 'vlba' in solr_id.lower() else + 'VLA' if 'vla' in solr_id.lower() else + 'EVLA' if 'nrao' in solr_id.lower() else + 'GBT' if 'gbt' in solr_id.lower() else None) + if instrument is None: + raise ValueError("Invalid instrument") + + if instrument == 'VLBA': + downloadDataFormat = "VLBARaw" + elif instrument in ('VLA', 'EVLA'): + # there are other options! + downloadDataFormat = 'MS' + + post_data = { + "emailNotification": email, + "requestDescription": f"{instrument} Download Request", + "archive": "VLA", + "p_telescope": instrument, + "p_project": project_code, + "productLocator": locator, + "requestCommand": "startVlaPPIWorkflow", + "p_workflowEventName": workflow, + "p_downloadDataFormat": downloadDataFormat, + "p_intentsFileName": "intents_hifv.xml", + "p_proceduresFileName": "procedure_hifv.xml" + } + + if instrument in ('VLA', 'EVLA'): + post_data['p_applyTelescopeFlags'] = apply_flags + casareq = self._request('GET', + f'{self.archive_url}/archive-service/restapi_get_casa_version_list', + cache=False + ) + casareq.raise_for_status() + casavdata = json.loads(casareq.json()) + for casav in casavdata['casa_version_list']: + if 'recommended' in casav['version']: + post_data['p_casaHome'] = casav['path'] + + presp = self._request('POST', + f'{self.archive_url}/rh/submission', + data=post_data, + cache=False + ) + presp.raise_for_status() + + # DEBUG print(f"presp.url: {presp.url}") + # DEBUG print(f"cookies: {self._session.cookies}") + resp2 = self._request('GET', presp.url, cache=False) + resp2.raise_for_status() + + for row in resp2.text.split(): + if 'window.location.href=' in row: + subrespurl = row.split("'")[1] + + # DEBUG print(f"subrespurl: {subrespurl}") + # DEBUG print(f"cookies: {self._session.cookies}") + nextresp = self._request('GET', subrespurl, cache=False) + wait_url = nextresp.url + nextresp.raise_for_status() + + if f'{self.archive_url}/rh/requests/' not in wait_url: + raise ValueError(f"Got wrong URL from post request: {wait_url}") + + # to get the right format of response, you need to specify this: + # accept = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7"} + + while True: + time.sleep(1) + print(".", end='', flush=True) + resp = self._request('GET', wait_url + "/state", cache=False) + resp.raise_for_status() + if resp.text == 'COMPLETE': + break + + return wait_url + + + +Nrao = NraoClass() diff --git a/astroquery/nrao/tapsql.py b/astroquery/nrao/tapsql.py new file mode 100644 index 0000000000..5a0d4bfe98 --- /dev/null +++ b/astroquery/nrao/tapsql.py @@ -0,0 +1,265 @@ +""" +Utilities for generating ADQL for ALMA TAP service +""" +from datetime import datetime + +from astropy import units as u +import astropy.coordinates as coord +from astropy.time import Time + +ALMA_DATE_FORMAT = '%d-%m-%Y' + + +def _gen_pos_sql(field, value): + result = '' + if field == 'SkyCoord.from_name': + # resolve the source first + if value: + obj_coord = coord.SkyCoord.from_name(value) + frame = 'icrs' + ras = [str(obj_coord.icrs.ra.to(u.deg).value)] + decs = [str(obj_coord.icrs.dec.to(u.deg).value)] + radius = 10 * u.arcmin + else: + raise ValueError('Object name missing') + else: + if field == 's_ra, s_dec': + frame = 'icrs' + else: + frame = 'galactic' + radius = 10*u.arcmin + if ',' in value: + center_coord, rad = value.split(',') + try: + radius = float(rad.strip())*u.degree + except ValueError: + raise ValueError('Cannot parse radius in ' + value) + else: + center_coord = value.strip() + try: + ra, dec = center_coord.split(' ') + except ValueError: + raise ValueError('Cannot find ra/dec in ' + value) + ras = _val_parse(ra, val_type=str) + decs = _val_parse(dec, val_type=str) + + for ra in ras: + for dec in decs: + if result: + result += ' OR ' + if isinstance(ra, str) and isinstance(dec, str): + # circle + center = coord.SkyCoord(ra, dec, + unit=(u.deg, u.deg), + frame=frame) + + result += \ + "CONTAINS(POINT('ICRS',s_ra,s_dec),CIRCLE('ICRS',{},{},{}))=1".\ + format(center.icrs.ra.to(u.deg).value, + center.icrs.dec.to(u.deg).value, + radius.to(u.deg).value) + else: + raise ValueError('Cannot interpret ra({}), dec({}'. + format(ra, dec)) + if ' OR ' in result: + # use brackets for multiple ORs + return '(' + result + ')' + else: + return result + + +def _gen_numeric_sql(field, value): + result = '' + for interval in _val_parse(value, float): + if result: + result += ' OR ' + if isinstance(interval, tuple): + int_min, int_max = interval + if int_min is None: + if int_max is None: + # no constraints on bandwith + pass + else: + result += '{}<={}'.format(field, int_max) + elif int_max is None: + result += '{}>={}'.format(field, int_min) + else: + result += '({1}<={0} AND {0}<={2})'.format(field, int_min, + int_max) + else: + result += '{}={}'.format(field, interval) + if ' OR ' in result: + # use brakets for multiple ORs + return '(' + result + ')' + else: + return result + + +def _gen_str_sql(field, value): + result = '' + for interval in _val_parse(value, str): + if result: + result += ' OR ' + if '*' in interval: + # use LIKE + # escape wildcards if they exists in the value + interval = interval.replace('%', r'\%') # noqa + interval = interval.replace('_', r'\_') # noqa + # ADQL wild cards are % and _ + interval = interval.replace('*', '%') + interval = interval.replace('?', '_') + result += "{} LIKE '{}'".format(field, interval) + else: + result += "{}='{}'".format(field, interval) + if ' OR ' in result: + # use brackets for multiple ORs + return '(' + result + ')' + else: + return result + + +def _gen_datetime_sql(field, value): + result = '' + for interval in _val_parse(value, str): + if result: + result += ' OR ' + if isinstance(interval, tuple): + min_datetime, max_datetime = interval + if max_datetime is None: + result += "{}>={}".format( + field, Time(datetime.strptime(min_datetime, ALMA_DATE_FORMAT)).mjd) + elif min_datetime is None: + result += "{}<={}".format( + field, Time(datetime.strptime(max_datetime, ALMA_DATE_FORMAT)).mjd) + else: + result += "({1}<={0} AND {0}<={2})".format( + field, Time(datetime.strptime(min_datetime, ALMA_DATE_FORMAT)).mjd, + Time(datetime.strptime(max_datetime, ALMA_DATE_FORMAT)).mjd) + else: + # TODO is it just a value (midnight) or the entire day? + result += "{}={}".format( + field, Time(datetime.strptime(interval, ALMA_DATE_FORMAT)).mjd) + if ' OR ' in result: + # use brackets for multiple ORs + return '(' + result + ')' + else: + return result + + +def _gen_spec_res_sql(field, value): + # This needs special treatment because spectral_resolution in AQ is in + # KHz while corresponding em_resolution is in m + result = '' + for interval in _val_parse(value): + if result: + result += ' OR ' + if isinstance(interval, tuple): + min_val, max_val = interval + if max_val is None: + result += "{}<={}".format( + field, + min_val*u.kHz.to(u.m, equivalencies=u.spectral())) + elif min_val is None: + result += "{}>={}".format( + field, + max_val*u.kHz.to(u.m, equivalencies=u.spectral())) + else: + result += "({1}<={0} AND {0}<={2})".format( + field, + max_val*u.kHz.to(u.m, equivalencies=u.spectral()), + min_val*u.kHz.to(u.m, equivalencies=u.spectral())) + else: + result += "{}={}".format( + field, interval*u.kHz.to(u.m, equivalencies=u.spectral())) + if ' OR ' in result: + # use brackets for multiple ORs + return '(' + result + ')' + else: + return result + + +def _gen_pub_sql(field, value): + if value is True: + return "{}='Public'".format(field) + elif value is False: + return "{}='Proprietary'".format(field) + else: + return None + + +def _gen_science_sql(field, value): + if value is True: + return "{}='T'".format(field) + elif value is False: + return "{}='F'".format(field) + else: + return None + + +def _gen_band_list_sql(field, value): + # band list value is expected to be space separated list of bands + if isinstance(value, list): + val = value + else: + val = value.split(' ') + return _gen_str_sql(field, '|'.join( + ['*{}*'.format(_) for _ in val])) + + +def _gen_pol_sql(field, value): + # band list value is expected to be space separated list of bands + val = '' + states_map = {'Stokes I': '*I*', + 'Single': '/LL/', + 'Dual': '/LL/RR/', + 'Full': '/LL/LR/RL/RR/'} + for state in states_map: + if state in value: + if val: + val += '|' + val += states_map[state] + return _gen_str_sql(field, val) + + +def _val_parse(value, val_type=float): + # parses an ALMA query field and returns a list of values (of type + # val_type) or tuples representing parsed values or intervals. Open + # intervals have None at one of the ends + def _one_val_parse(value, val_type=float): + # parses the value and returns corresponding interval for + # sia to work with. E.g <2 => (None, 2) + if value.startswith('<'): + return (None, val_type(value[1:])) + elif value.startswith('>'): + return (val_type(value[1:]), None) + else: + return val_type(value) + result = [] + if isinstance(value, str): + try: + if value.startswith('!'): + start, end = _val_parse(value[2:-1].strip(), val_type=val_type)[0] + result.append((None, start)) + result.append((end, None)) + elif value.startswith('('): + result += _val_parse(value[1:-1], val_type=val_type) + elif '|' in value: + for vv in value.split('|'): + result += _val_parse(vv.strip(), val_type=val_type) + elif '..' in value: + start, end = value.split('..') + if not start or not end: + raise ValueError('start or end interval missing in {}'. + format(value)) + result.append((_one_val_parse(start.strip(), val_type=val_type), + _one_val_parse(end.strip(), val_type=val_type))) + else: + result.append(_one_val_parse(value, val_type=val_type)) + except Exception as e: + raise ValueError( + 'Error parsing {}. Details: {}'.format(value, str(e))) + elif isinstance(value, list): + result = value + else: + result.append(value) + return result