Source code for sbpy.data.obs

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
====================
sbpy data.Obs Module
====================

Class for storing and querying observations

created on July 3, 2019
"""

from astropy.time import Time
from astroquery.mpc import MPC
from astropy.table import vstack, hstack

from .ephem import Ephem
from .core import QueryError
from ..bib import cite

__all__ = ['Obs']


[docs]class Obs(Ephem): """Class for querying, storing, and manipulating observations """
[docs] @classmethod @cite({'data source': 'https://minorplanetcenter.net/db_search'}) @cite({'software: astroquery': '2019AJ....157...98G'}) def from_mpc(cls, targetid, id_type=None, **kwargs): """Load available observations for a target from the `Minor Planet Center <https://minorplanetcenter.net>`_ using `~astroquery.mpc.MPCClass.get_observations`. Parameters ---------- targetid : str Target identifier, resolvable by the Minor Planet Ephemeris Service. id_type : str, optional ``'asteroid number'``, ``'asteroid designation'``, ``'comet number'``, ``'comet designation'``, or ``None``. ``None`` attempts to automatically identify the target id type using `~sbpy.data.Names`. Default: ``None`` **kwargs : optional Additional keyword arguments are passed to `~astroquery.mpc.MPC.query_object` Returns ------- `~Obs` object The resulting object will be populated with the same fields defined in `~astroquery.mpc.MPCClass.get_observations`. Examples -------- >>> from sbpy.data import Obs >>> obs = Obs.from_mpc('12893') # doctest: +REMOTE_DATA >>> orb[:3] # doctest: +SKIP number desig discovery note1 ... DEC mag band observatory ... deg mag ------ --------- --------- ----- ... ------------------- --- ---- ----------- 12893 1998 QS55 -- -- ... -15.78888888888889 0.0 -- 413 12893 1998 QS55 -- -- ... -15.788944444444445 0.0 -- 413 12893 1998 QS55 * 4 ... 5.526472222222222 0.0 -- 809 """ from ..data import Names if id_type is None: id_type = Names.asteroid_or_comet(targetid) if id_type == 'asteroid': ident = Names.parse_asteroid(targetid) elif id_type == 'comet': ident = Names.parse_comet(targetid) if 'number' in ident: id_type += ' number' elif 'designation' in ident: id_type += ' designation' try: results = MPC.get_observations(targetid, id_type=id_type, **kwargs) except (RuntimeError, ValueError) as e: raise QueryError( ('Error raised by ' 'astroquery.mpc.MPCClass.get_observations: ' '{}').format(e)) results['epoch'] = Time(results['epoch'].to('d').value, scale='utc', format='jd') return cls.from_table(results)
[docs] @cite({'software: astroquery': '2019AJ....157...98G'}) def supplement(self, service='jplhorizons', id_field='targetname', epoch_field='epoch', location='500', modify_fieldnames='obs', **kwargs): """Supplement observational data with ephemerides queried from the selected service. Parameters ---------- service : str, optional Service from which to acquire data: ``'jplhorizons'``, ``'mpc'``, or ``'miriade'``, corresponding to the `JPL Horizons system <https://ssd.jpl.nasa.gov/horizons/>`_ (using `~sbpy.data.Ephem.from_horizons`), the `Minor Planet Center ephemeris service <https://minorplanetcenter.net/iau/MPEph/MPEph.html>`_ (using `~sbpy.data.Ephem.from_mpc`), and the `IMCCE Miriade service <http://vo.imcce.fr/webservices/miriade/>`_ (using `~sbpy.data.from_miriade`). Default: ``'jplhorizons'`` id_field : str, optional Field name that corresponds to a suitable target identifier in this `~sbpy.data.Obs` object. Default: ``'targetname'`` epoch_field : str, optional Field name that corresponds to a suitable epoch identifier in this `~sbpy.data.Obs` object. The corresponding column must be of type `~astropy.time.Time`. Default: ``'epoch'`` location : str, optional Location of the observer for the data stored in this `~sbpy.data.Obs` object. Default: ``'500'`` (geocenter) modify_fieldnames : str, optional Defines whether field names in this `~sbpy.data.Obs` object (``'obs'``) or in the supplemental data to be queried (``'eph'``) will be modified by adding a suffix in case of field name collisions. Default: ``'obs'`` **kwargs : optional Additional keyword arguments are passed to the corresponding ephemerides query service. Returns ------- `~Obs` object The resulting object will contain all data from this `~sbpy.data.Obs` object as well as the queried ephemeris data. Notes ----- * Not all available service are equally suited for this kind of query: only the JPL Horizons system enables quick queries for a large number of epochs. Queries using the other services may take a long time depending on the number of epochs and targets. Examples -------- >>> from sbpy.data import Obs >>> obs = Obs.from_mpc('2019 AA', id_type='asteroid designation') # doctest: +SKIP >>> data = obs.supplement(id_field='designation') # doctest: +SKIP >>> data.field_names # doctest: +SKIP <TableColumns names=('number','desig','discovery','note1','note2','epoch','RA_obs','DEC_obs','mag','band','observatory','target','RA','DEC','delta','V','alpha','elong','RAcosD_rate','DEC_rate','delta_rate')> """ try: targetids = set(self[id_field]) except (TypeError, KeyError): raise QueryError('cannot use field {} as id_field.'.format( id_field)) all_obs = None all_eph = None for targetid in targetids: if all_obs is None: all_obs = self.table[self[id_field] == targetid] else: all_obs = vstack([all_obs, self.table[self[id_field] == targetid]]) if service == 'jplhorizons': eph = Ephem.from_horizons( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('epoch') elif service == 'mpc': eph = Ephem.from_mpc( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('Date') elif service == 'miriade': eph = Ephem.from_miriade( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('epoch') else: raise QueryError('service {} not known.'.format(service)) if all_eph is None: all_eph = eph.table else: all_eph = vstack([all_eph, eph.table]) # identify field names that both obs and eph have in common fieldnames_intersect = set(all_eph.columns).intersection( all_obs.columns) for fieldname in fieldnames_intersect: if modify_fieldnames == 'obs': all_obs.rename_column(fieldname, fieldname+'_obs') elif modify_fieldnames == 'eph': all_eph.rename_column(fieldname, fieldname+'_eph') return Obs.from_table(hstack([all_obs, all_eph]), meta=self.meta)