Source code for sbpy.data.obs

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
====================
sbpy data.Obs Module
====================

Class for storing and querying observations

created on July 3, 2019
"""

from astropy.time import Time

try:
    from astroquery.mpc import MPC
except ImportError:
    pass

from astropy.table import vstack, hstack

from .ephem import Ephem
from .core import QueryError
from ..bib import cite
from ..utils.decorators import requires

__all__ = ['Obs']


[docs] class Obs(Ephem): """Class for querying, storing, and manipulating observations """
[docs] @classmethod @requires("astroquery") @cite({'data source': 'https://minorplanetcenter.net/db_search', 'software: astroquery': '2019AJ....157...98G'}) def from_mpc(cls, targetid, id_type=None, **kwargs): """Load available observations for a target from the `Minor Planet Center <https://minorplanetcenter.net>`_ using `~astroquery.mpc.MPCClass.get_observations`. Parameters ---------- targetid : str Target identifier, resolvable by the Minor Planet Ephemeris Service. id_type : str, optional ``'asteroid number'``, ``'asteroid designation'``, ``'comet number'``, ``'comet designation'``, or ``None``. ``None`` attempts to automatically identify the target id type using `~sbpy.data.Names`. Default: ``None`` **kwargs : optional Additional keyword arguments are passed to `~astroquery.mpc.MPC.query_object` Returns ------- `~Obs` object The resulting object will be populated with the same fields defined in `~astroquery.mpc.MPCClass.get_observations`. Examples -------- >>> from sbpy.data import Obs >>> obs = Obs.from_mpc('12893') # doctest: +REMOTE_DATA >>> orb[:3] # doctest: +SKIP number desig discovery note1 ... DEC mag band observatory ... deg mag ------ --------- --------- ----- ... ------------------- --- ---- ----------- 12893 1998 QS55 -- -- ... -15.78888888888889 0.0 -- 413 12893 1998 QS55 -- -- ... -15.788944444444445 0.0 -- 413 12893 1998 QS55 * 4 ... 5.526472222222222 0.0 -- 809 """ from ..data import Names if id_type is None: id_type = Names.asteroid_or_comet(targetid) if id_type == 'asteroid': ident = Names.parse_asteroid(targetid) elif id_type == 'comet': ident = Names.parse_comet(targetid) if 'number' in ident: id_type += ' number' elif 'designation' in ident: id_type += ' designation' try: results = MPC.get_observations(targetid, id_type=id_type, **kwargs) except (RuntimeError, ValueError) as e: raise QueryError( ('Error raised by ' 'astroquery.mpc.MPCClass.get_observations: ' '{}').format(e)) results['epoch'] = Time(results['epoch'].to('d').value, scale='utc', format='jd') return cls.from_table(results)
[docs] @requires("astroquery") @cite({'software: astroquery': '2019AJ....157...98G'}) def supplement(self, service='jplhorizons', id_field='targetname', epoch_field='epoch', location='500', modify_fieldnames='obs', **kwargs): """Supplement observational data with ephemerides queried from the selected service. Parameters ---------- service : str, optional Service from which to acquire data: ``'jplhorizons'``, ``'mpc'``, or ``'miriade'``, corresponding to the `JPL Horizons system <https://ssd.jpl.nasa.gov/horizons/>`_ (using `~sbpy.data.Ephem.from_horizons`), the `Minor Planet Center ephemeris service <https://minorplanetcenter.net/iau/MPEph/MPEph.html>`_ (using `~sbpy.data.Ephem.from_mpc`), and the `IMCCE Miriade service <http://vo.imcce.fr/webservices/miriade/>`_ (using `~sbpy.data.from_miriade`). Default: ``'jplhorizons'`` id_field : str, optional Field name that corresponds to a suitable target identifier in this `~sbpy.data.Obs` object. Default: ``'targetname'`` epoch_field : str, optional Field name that corresponds to a suitable epoch identifier in this `~sbpy.data.Obs` object. The corresponding column must be of type `~astropy.time.Time`. Default: ``'epoch'`` location : str, optional Location of the observer for the data stored in this `~sbpy.data.Obs` object. Default: ``'500'`` (geocenter) modify_fieldnames : str, optional Defines whether field names in this `~sbpy.data.Obs` object (``'obs'``) or in the supplemental data to be queried (``'eph'``) will be modified by adding a suffix in case of field name collisions. Default: ``'obs'`` **kwargs : optional Additional keyword arguments are passed to the corresponding ephemerides query service. Returns ------- `~Obs` object The resulting object will contain all data from this `~sbpy.data.Obs` object as well as the queried ephemeris data. Notes ----- * Not all available service are equally suited for this kind of query: only the JPL Horizons system enables quick queries for a large number of epochs. Queries using the other services may take a long time depending on the number of epochs and targets. Examples -------- >>> from sbpy.data import Obs >>> obs = Obs.from_mpc('2019 AA', id_type='asteroid designation') # doctest: +SKIP >>> data = obs.supplement(id_field='designation') # doctest: +SKIP >>> data.field_names # doctest: +SKIP <TableColumns names=('number','desig','discovery','note1','note2','epoch','RA_obs','DEC_obs','mag','band','observatory','target','RA','DEC','delta','V','alpha','elong','RAcosD_rate','DEC_rate','delta_rate')> """ try: targetids = set(self[id_field]) except (TypeError, KeyError): raise QueryError('cannot use field {} as id_field.'.format( id_field)) all_obs = None all_eph = None for targetid in targetids: if all_obs is None: all_obs = self.table[self[id_field] == targetid] else: all_obs = vstack([all_obs, self.table[self[id_field] == targetid]]) if service == 'jplhorizons': eph = Ephem.from_horizons( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('epoch') elif service == 'mpc': eph = Ephem.from_mpc( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('Date') elif service == 'miriade': eph = Ephem.from_miriade( targetid, epochs=self[self[id_field] == targetid][epoch_field], location=location, **kwargs) eph.table.remove_column('epoch') else: raise QueryError('service {} not known.'.format(service)) if all_eph is None: all_eph = eph.table else: all_eph = vstack([all_eph, eph.table]) # identify field names that both obs and eph have in common fieldnames_intersect = set(all_eph.columns).intersection( all_obs.columns) for fieldname in fieldnames_intersect: if modify_fieldnames == 'obs': all_obs.rename_column(fieldname, fieldname+'_obs') elif modify_fieldnames == 'eph': all_eph.rename_column(fieldname, fieldname+'_eph') return Obs.from_table(hstack([all_obs, all_eph]), meta=self.meta)