# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
sbpy Photometry Module
created on June 23, 2017
"""
__all__ = ['DiskIntegratedPhaseFunc', 'LinearPhaseFunc',
'InvalidPhaseFunctionWarning']
from collections import OrderedDict
import numpy as np
from scipy.integrate import quad
from astropy.modeling import (Fittable1DModel, Parameter)
import astropy.units as u
from astropy import log
from ..data import (Phys, Obs, Ephem, dataclass_input,
quantity_to_dataclass)
from ..units import reflectance
from ..exceptions import SbpyWarning
[docs]class InvalidPhaseFunctionWarning(SbpyWarning):
pass
[docs]class DiskIntegratedPhaseFunc(Fittable1DModel):
"""Base class for disk-integrated phase function model
Examples
--------
Define a linear phase function with phase slope 0.04 mag/deg, and
study its properties:
>>> # Define a disk-integrated phase function model
>>> import numpy as np
>>> import astropy.units as u
>>> from astropy.modeling import Parameter
>>> from sbpy.calib import solar_fluxd
>>> from sbpy.photometry import DiskIntegratedPhaseFunc
>>>
>>> class LinearPhaseFunc(DiskIntegratedPhaseFunc):
...
... _unit = 'mag'
... H = Parameter()
... S = Parameter()
...
... @staticmethod
... def evaluate(a, H, S):
... return H + S * a
...
>>> linear_phasefunc = LinearPhaseFunc(5 * u.mag, 0.04 * u.mag/u.deg,
... radius = 300 * u.km, wfb = 'V')
>>> pha = np.linspace(0, 180, 200) * u.deg
>>> with solar_fluxd.set({'V': -26.77 * u.mag}):
... mag = linear_phasefunc.to_mag(pha)
... ref = linear_phasefunc.to_ref(pha)
... geomalb = linear_phasefunc.geomalb
... phaseint = linear_phasefunc.phaseint
... bondalb = linear_phasefunc.bondalb
>>> print('Geometric albedo is {0:.3}'.format(geomalb))
Geometric albedo is 0.0487
>>> print('Bond albedo is {0:.3}'.format(bondalb))
Bond albedo is 0.0179
>>> print('Phase integral is {0:.3}'.format(phaseint))
Phase integral is 0.367
Initialization with subclass of `~sbpy.data.DataClass`:
The subclassed models can either be initialized by model parameters, or by
subclass of `~sbpy.data.DataClass`. Below example uses the `HG` model
class.
>>> from sbpy.photometry import HG
>>> from sbpy.data import Phys, Orbit, Ephem
>>>
>>> # Initialize from physical parameters pulled from JPL SBDB
>>> phys = Phys.from_sbdb('Ceres') # doctest: +REMOTE_DATA
>>> print(phys['targetname','H','G']) # doctest: +SKIP
<QTable length=1>
targetname H G
mag
str17 float64 float64
----------------- ------- -------
1 Ceres (A801 AA) 3.31 0.12
>>> m = HG.from_phys(phys) # doctest: +REMOTE_DATA
INFO: Model initialized for 1 Ceres (A801 AA). [sbpy.photometry.core]
>>> print(m) # doctest: +SKIP
Model: HG
Inputs: ('x',)
Outputs: ('y',)
Model set size: 1
Parameters:
H G
mag
---- ----
3.31 0.12
>>> print(m.meta['targetname']) # doctest: +REMOTE_DATA
1 Ceres (A801 AA)
>>> print(m.radius) # doctest: +SKIP
469.7 km
>>>
>>> # Initialize from orbital elements pulled from JPL Horizons that also
>>> # contain the H and G parameters
>>> elem = Orbit.from_horizons('Ceres') # doctest: +REMOTE_DATA
>>> print(elem['targetname','H','G']) # doctest: +SKIP
<QTable length=1>
targetname H G
mag
str17 float64 float64
----------------- ------- -------
1 Ceres (A801 AA) 3.33 0.12
>>> m = HG.from_phys(elem) # doctest: +REMOTE_DATA
INFO: Model initialized for 1 Ceres (A801 AA). [sbpy.photometry.core]
>>>
>>> # Failed initialization due to the lack of field 'G'
>>> phys = Phys.from_sbdb('12893') # doctest: +REMOTE_DATA
>>> print('G' in phys.field_names) # doctest: +REMOTE_DATA
False
>>> m = HG(data=phys) # doctest: +SKIP
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
KeyError: 'field G not available.'
"""
# Some phase function models are defined in magnitude space, such as the
# IAU H, G system. Some phase function models are defined in reflectance
# space, such as the disk-integrated phase function of the Hapke model.
# _unit defines which unit the model is defined in.
_unit = None
# The default unit for model input when the model is dimensional
input_units = {'x': u.rad}
# Whether or not the model input is allowed to be dimensionless
input_units_allow_dimensionless = {'x': True}
@u.quantity_input(radius=u.km)
def __init__(self, *args, radius=None, wfb=None, **kwargs):
"""Initialize DiskIntegratedPhaseFunc
Parameters
----------
radius : astropy.units.Quantity, optional
Radius of object. Required if conversion between magnitude and
reflectance is involved.
wfb : `~astropy.units.Quantity`, `~synphot.SpectralElement`, string
Wavelengths, frequencies, or bandpasses. Bandpasses may
be a filter name (string). Required if conversion between
magnitude and reflectance is involved.
**kwargs : optional parameters accepted by
`astropy.modeling.Model.__init__()`
"""
super().__init__(*args, **kwargs)
self.radius = radius
self.wfb = wfb
def _check_unit(self):
if self._unit is None:
raise ValueError('the unit of phase function is unknown')
@property
def geomalb(self):
"""Geometric albedo"""
alb = np.pi*self.to_ref(0.*u.rad)
if hasattr(alb, 'unit') and (alb.unit == 1/u.sr):
alb = alb*u.sr
return alb
@property
def bondalb(self):
"""Bond albedo"""
return self.geomalb*self.phaseint
@property
def phaseint(self):
"""Phase integral"""
return self._phase_integral()
[docs] @classmethod
def from_phys(cls, phys, **kwargs):
"""Initialize an object from `~sbpy.data.Phys` object
Parameters
----------
phys : `~sbpy.data.Phys`
Contains the parameters needed to initialize the model class
object. If the required field is not found, then an `KeyError`
exception will be thrown.
**kwargs : optional parameters accepted by
`astropy.modeling.Model.__init__()`
Returns
-------
Object of `DiskIntegratedPhaseFunc` subclass
The phase function model object
Examples
--------
Initialization with `~sbpy.data.Phys`. This example uses the `HG`
model class.
>>> from sbpy.photometry import HG
>>> from sbpy.data import Phys
>>>
>>> # Initialize from physical parameters pulled from JPL SBDB
>>> phys = Phys.from_sbdb('Ceres') # doctest: +REMOTE_DATA
>>> print(phys['targetname','H','G']) # doctest: +SKIP
<QTable length=1>
targetname H G
mag
str17 float64 float64
----------------- ------- -------
1 Ceres (A801 AA) 3.31 0.12
>>> m = HG.from_phys(phys) # doctest: +REMOTE_DATA
INFO: Model initialized for 1 Ceres (A801 AA). [sbpy.photometry.core]
>>> print(m) # doctest: +SKIP
Model: HG
Inputs: ('x',)
Outputs: ('y',)
Model set size: 1
Parameters:
H G
mag
---- ----
3.31 0.12
>>> print(m.meta['targetname']) # doctest: +REMOTE_DATA
1 Ceres (A801 AA)
>>> print(m.radius) # doctest: +REMOTE_DATA
469.7 km
>>>
>>> # Failed initialization due to the lack of field 'G'
>>> phys = Phys.from_sbdb('12893') # doctest: +REMOTE_DATA
>>> print('G' in phys.field_names) # doctest: +REMOTE_DATA
False
>>> m = HG.from_phys(phys) # doctest: +SKIP
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
KeyError: 'field G not available.'
"""
par = {}
valid = np.ones(len(phys), dtype=bool)
for p in cls.param_names:
par[p] = phys[p]
valid = valid & np.isfinite(par[p])
if valid.any():
valid = list(valid).index(True)
for p in cls.param_names:
par[p] = par[p][valid]
meta = kwargs.pop('meta', OrderedDict())
if 'targetname' in phys.field_names:
meta.update({'targetname': phys['targetname'][valid]})
kwargs['meta'] = meta
for p in cls.param_names:
val = kwargs.pop(p, None)
try:
par['radius'] = phys['diameter'][valid]/2
except KeyError:
pass
if 'targetname' in meta.keys():
log.info("Model initialized for {}.".format(
meta['targetname']))
else:
log.info("Model initialized.")
kwargs.update(par)
else:
raise ValueError(
'no valid model parameters found in `data` keyword')
out = cls(**kwargs)
return out
[docs] def to_phys(self):
"""Wrap the model into a `sbpy.data.Phys` object
Returns
-------
`~sbpy.data.Phys` object
Examples
--------
>>> import astropy.units as u
>>> from sbpy.calib import solar_fluxd
>>> from sbpy.photometry import HG
>>> from sbpy.data import Phys
>>>
>>> # Initialize from physical parameters pulled from JPL SBDB
>>> phys = Phys.from_sbdb('Ceres') # doctest: +REMOTE_DATA
>>> print(phys['targetname','radius','H','G']) # doctest: +SKIP
<QTable length=1>
targetname radius H G
km mag
str17 float64 float64 float64
----------------- ------- ------- -------
1 Ceres (A801 AA) 469.7 3.31 0.12
>>> m = HG.from_phys(phys) # doctest: +REMOTE_DATA
INFO: Model initialized for 1 Ceres (A801 AA). [sbpy.photometry.core]
>>> m.wfb = 'V' # doctest: +REMOTE_DATA
>>> with solar_fluxd.set({'V': -26.77 * u.mag}):
... p = m.to_phys() # doctest: +REMOTE_DATA
>>> print(type(p)) # doctest: +REMOTE_DATA
<class 'sbpy.data.phys.Phys'>
>>> p.table.pprint(max_width=-1) # doctest: +SKIP
targetname diameter H G pv A
km mag
----------------- -------- ---- ---- ------------------- --------------------
1 Ceres (A801 AA) 939.4 3.31 0.12 0.07624470768627523 0.027779803126557152
""" # noqa: E501
cols = {}
if (self.meta is not None) and ('targetname' in self.meta.keys()):
val = self.meta['targetname']
if isinstance(val, str):
val = [val]
cols['targetname'] = val
if self.radius is not None:
cols['diameter'] = self.radius * 2
for p in self.param_names:
val = getattr(self, p)
if val.quantity is None:
cols[p] = val.value
else:
cols[p] = val.quantity
try:
cols['pv'] = self.geomalb
cols['A'] = self.bondalb
except ValueError:
pass
return Phys.from_dict(cols)
[docs] @classmethod
@dataclass_input(obs=Obs)
def from_obs(cls, obs, fitter, fields='mag', init=None, **kwargs):
"""Instantiate a photometric model class object from data
Parameters
----------
obs : `~sbpy.data.DataClass`, dict_like
If `~sbpy.data.DataClass` or dict_like, must contain
``'phaseangle'`` or the equivalent names (see
`~sbpy.data.DataClass`). If any distance (heliocentric and
geocentric) is provided, then they will be used to correct
magnitude to 1 au before fitting.
fitter : `~astropy.modeling.fitting.Fitter`
The fitter to be used for fitting.
fields : str or array_like of str
The field name or names in ``obs`` to be fitted. If an array_like
str, then multiple fields will be fitted one by one and a model
set will be returned. In this case, ``.meta['fields']`` of the
returned object contains the names of fields fitted.
init : numpy array, `~astropy.units.Quantity`, optional
The initial parameters for model fitting. Its first dimension has
the length of the model parameters, and its second dimension has
the length of ``n_model`` if multiple models are fitted.
**kwargs : optional parameters accepted by `fitter()`.
Note that the magnitude uncertainty can also be supplied to the fit
via `weights` keyword for all fitters provided by
`~astropy.modeling.fitting`.
Returns
-------
Object of `DiskIntegratedPhaseFunc` subclass
The best-fit model class object.
Examples
--------
>>> from sbpy.photometry import HG # doctest: +SKIP
>>> from sbpy.data import Misc # doctest: +SKIP
>>> from astropy.modeling.fitting import SLSQPLSQFitter
>>> fitter = SLSQPLSQFitter()
>>> obs = Misc.mpc_observations('Bennu') # doctest: +SKIP
>>> hg = HG() # doctest: +SKIP
>>> best_hg = hg.from_obs(obs, eph['mag'], fitter) # doctest: +SKIP
"""
pha = obs['alpha']
if isinstance(fields, (str, bytes)):
n_models = 1
else:
n_models = len(fields)
if init is not None:
init = np.asanyarray(init)
dist_corr = cls()._distance_module(obs)
if n_models == 1:
mag = obs[fields]
if isinstance(mag, u.Quantity):
dist_corr = u.Quantity(dist_corr).to(u.mag, u.logarithmic())
else:
dist_corr = -2.5 * np.log10(dist_corr)
mag0 = mag + dist_corr
if init is None:
m0 = cls()
else:
m0 = cls(*init)
return fitter(m0, pha, mag0, **kwargs)
else:
if init is not None:
sz1 = init.shape
sz2 = len(cls.param_names), n_models
if sz1 != sz2:
raise ValueError('`init` must have a shape of ({}, {}),'
' shape {} is given.'.format(sz2[0],
sz2[1], sz1))
par = np.zeros((len(cls.param_names), n_models))
for i in range(n_models):
mag = obs[fields[i]]
if isinstance(mag, u.Quantity):
dist_corr1 = u.Quantity(dist_corr).to(u.mag,
u.logarithmic())
else:
dist_corr1 = -2.5 * np.log10(dist_corr)
mag0 = mag + dist_corr1
if init is None:
m0 = cls()
else:
m0 = cls(*init[:, i])
m = fitter(m0, pha, mag0, **kwargs)
par[:, i] = m.parameters
pars_list = []
for i, p_name in enumerate(cls.param_names):
p = getattr(m, p_name)
if p.unit is None:
pars_list.append(par[i])
else:
pars_list.append(par[i]*p.unit)
model = cls(*pars_list, n_models=n_models)
if not isinstance(model.meta, dict):
model.meta = OrderedDict()
model.meta['fields'] = fields
return model
@dataclass_input(eph=Ephem)
def _distance_module(self, eph):
"""Return the correction magnitude or factor for heliocentric distance
and observer distance
Parameters
----------
eph : any type
If `~sbpy.data.Ephem` or dict_like, then the relevant fields, such
as 'rh' and 'delta' or the equivalent will be searched and, if
exist, used to calculate distance correction. If non-exist, then
no correction will be included for the corresponding field. If no
unit is provided via type `~astropy.units.Quantity`, then the
distance is assumed to be in unit of au. For any other data type,
a factor 1 or magnitude of 0 will be returned (implying no
correction).
Returns
-------
float or numpy array
Linear factors to be applied to flux to correct to heliocentric
distance and observer distance of both 1 au.
"""
module = 1.
try:
rh = eph['r']
if isinstance(rh, u.Quantity):
rh = rh.to('au').value
module = module * rh * rh
except (KeyError, TypeError):
pass
try:
delta = eph['delta']
if isinstance(delta, u.Quantity):
delta = delta.to('au').value
module = module * delta * delta
except (KeyError, TypeError):
pass
return np.asarray(module)
[docs] @quantity_to_dataclass(eph=(Ephem, 'alpha'))
def to_mag(self, eph, unit=None, append_results=False, **kwargs):
"""Calculate phase function in magnitude
Parameters
----------
eph : `~sbpy.data.Ephem`, numbers, iterables of numbers, or
`~astropy.units.Quantity`
If `~sbpy.data.Ephem` or dict_like, ephemerides of the object that
can include phase angle, heliocentric and geocentric distances via
keywords `phase`, `r` and `delta`. If float or array_like, then
the phase angle of object. If any distance (heliocentric and
geocentric) is not provided, then it will be assumed to be 1 au.
If no unit is provided via type `~astropy.units.Quantity`, then
radians is assumed for phase angle, and au is assumed for
distances.
unit : `astropy.units.mag`, `astropy.units.MagUnit`, optional
The unit of output magnitude. The corresponding solar magnitude
must be available either through `~sbpy.calib.sun` module or set
by `~sbpy.calib.solar_fluxd.set`.
append_results : bool, optional
Controls the return of this method.
**kwargs : optional parameters accepted by
`astropy.modeling.Model.__call__`
Returns
-------
`~astropy.units.Quantity`, array if ``append_results == False``
`~sbpy.data.Ephem` if ``append_results == True``
When ``append_results == False``: The calculated magnitude will be
returned.
When ``append_results == True``: If ``eph`` is a `~sbpy.data.Ephem`
object, then the calculated magnitude will be appended to ``eph`` as
a new column. Otherwise a new `~sbpy.data.Ephem` object is created to
contain the input ``eph`` and the calculated magnitude in two columns.
Examples
--------
>>> import numpy as np
>>> from astropy import units as u
>>> from sbpy.photometry import HG
>>> from sbpy.data import Ephem
>>> ceres_hg = HG(3.34 * u.mag, 0.12)
>>> # parameter `eph` as `~sbpy.data.Ephem` type
>>> eph = Ephem.from_dict({'alpha': np.linspace(
... 0, np.pi * 0.9, 200) * u.rad,
... 'r': np.repeat(2.7 * u.au, 200),
... 'delta': np.repeat(1.8 * u.au, 200)})
>>> mag1 = ceres_hg.to_mag(eph)
>>> # parameter `eph` as numpy array
>>> pha = np.linspace(0, 170, 200) * u.deg
>>> mag2 = ceres_hg.to_mag(pha)
"""
self._check_unit()
pha = eph['alpha']
if len(pha) == 1:
pha = pha[0]
out = self(pha, **kwargs)
if self._unit == 'ref':
if unit is None:
raise ValueError('Magnitude unit is not specified.')
if self.radius is None:
raise ValueError(
'Cannot calculate phase function in magnitude because the'
' size of object is unknown.')
if self.wfb is None:
raise ValueError('Wavelength/Frequency/Band is unknown.')
out = out.to(
unit,
reflectance(self.wfb, cross_section=np.pi * self.radius**2)
)
dist_corr = self._distance_module(eph)
dist_corr = u.Quantity(dist_corr).to(u.mag, u.logarithmic())
out = out - dist_corr
if append_results:
name = 'mag'
i = 1
while name in eph.field_names:
name = 'mag'+str(i)
i += 1
eph.apply(out, name=name)
return eph
else:
return out
[docs] @quantity_to_dataclass(eph=(Ephem, 'alpha'))
def to_ref(self, eph, normalized=None, append_results=False, **kwargs):
"""Calculate phase function in average bidirectional reflectance
Parameters
----------
eph : `~sbpy.data.Ephem`, numbers, iterables of numbers, or
`~astropy.units.Quantity`
If `~sbpy.data.Ephem` or dict_like, ephemerides of the object that
can include phase angle, heliocentric and geocentric distances via
keywords `phase`, `r` and `delta`. If float or array_like, then
the phase angle of object. If any distance (heliocentric and
geocentric) is not provided, then it will be assumed to be 1 au.
If no unit is provided via type `~astropy.units.Quantity`, then
radians is assumed for phase angle, and au is assumed for
distances.
normalized : number, `~astropy.units.Quantity`
The angle to which the reflectance is normalized.
append_results : bool
Controls the return of this method.
**kwargs : optional parameters accepted by
`astropy.modeling.Model.__call__`
Returns
-------
`~astropy.units.Quantity`, array if ``append_results == False``
`~sbpy.data.Ephem` if ``append_results == True``
When ``append_results == False``: The calculated reflectance will be
returned.
When ``append_results == True``: If ``eph`` is a `~sbpy.data.Ephem`
object, then the calculated reflectance will be appended to ``eph`` as
a new column. Otherwise a new `~sbpy.data.Ephem` object is created to
contain the input ``eph`` and the calculated reflectance in two
columns.
Examples
--------
>>> import numpy as np
>>> from astropy import units as u
>>> from sbpy.calib import solar_fluxd
>>> from sbpy.photometry import HG
>>> from sbpy.data import Ephem
>>> ceres_hg = HG(3.34 * u.mag, 0.12, radius = 480 * u.km, wfb= 'V')
>>> # parameter `eph` as `~sbpy.data.Ephem` type
>>> eph = Ephem.from_dict({'alpha': np.linspace(
... 0, np.pi * 0.9, 200) * u.rad,
... 'r': np.repeat(2.7 * u.au, 200),
... 'delta': np.repeat(1.8 * u.au, 200)})
>>> with solar_fluxd.set({'V': -26.77 * u.mag}):
... ref1 = ceres_hg.to_ref(eph)
... # parameter `eph` as numpy array
... pha = np.linspace(0, 170, 200) * u.deg
... ref2 = ceres_hg.to_ref(pha)
"""
self._check_unit()
pha = eph['alpha']
if len(pha) == 1:
pha = pha[0]
out = self(pha, **kwargs)
if normalized is not None:
norm = self(normalized, **kwargs)
if self._unit == 'ref':
if normalized is not None:
out /= norm
else:
if normalized is None:
if self.radius is None:
raise ValueError(
'Cannot calculate phase function in reflectance unit'
' because the size of object is unknown. Normalized'
' phase function can be calculated.')
if self.wfb is None:
raise ValueError('Wavelength/Frequency/Band is unknown.')
out = out.to(
'1/sr',
reflectance(self.wfb, cross_section=np.pi*self.radius**2)
)
else:
out = out - norm
out = out.to('', u.logarithmic())
if append_results:
name = 'ref'
i = 1
while name in eph.field_names:
name = 'ref'+str(i)
i += 1
eph.apply(out, name=name)
return eph
else:
return out
def _phase_integral(self, integrator=quad):
"""Calculate phase integral with numerical integration
Parameters
----------
integrator : function, optional
Numerical integrator, default is `~scipy.integrate.quad`.
If caller supplies a numerical integrator, it must has the same
return signature as `~scipy.integrator.quad`, i.e., a tuple of
(y, ...), where `y` is the result of numerical integration
Returns
-------
Float, phase integral
Examples
--------
>>> import astropy.units as u
>>> from sbpy.calib import solar_fluxd
>>> from sbpy.photometry import HG
>>> ceres_hg = HG(3.34 * u.mag, 0.12, radius = 480 * u.km, wfb = 'V')
>>> with solar_fluxd.set({'V': -26.77 * u.mag}):
... print('{0:.3}'.format(ceres_hg._phase_integral()))
0.364
"""
def integrand(x):
return 2*self.to_ref(x * u.rad, normalized=0. * u.rad) * \
np.sin(x * u.rad)
return integrator(integrand, 0, np.pi)[0]
[docs]class LinearPhaseFunc(DiskIntegratedPhaseFunc):
"""Linear phase function model
Examples
--------
>>> # Define a linear phase function model with absolute magnitude
>>> # H = 5 and slope = 0.04 mag/deg = 2.29 mag/rad
>>> import astropy.units as u
>>> from sbpy.calib import solar_fluxd
>>> from sbpy.photometry import LinearPhaseFunc
>>>
>>> linear_phasefunc = LinearPhaseFunc(5 * u.mag, 0.04 * u.mag/u.deg,
... radius = 300 * u.km, wfb = 'V')
>>> with solar_fluxd.set({'V': -26.77 * u.mag}):
... pha = np.linspace(0, 180, 200) * u.deg
... mag = linear_phasefunc.to_mag(pha)
... ref = linear_phasefunc.to_ref(pha)
... geomalb = linear_phasefunc.geomalb
... phaseint = linear_phasefunc.phaseint
... bondalb = linear_phasefunc.bondalb
>>> print('Geometric albedo is {0:.3}'.format(geomalb))
Geometric albedo is 0.0487
>>> print('Bond albedo is {0:.3}'.format(bondalb))
Bond albedo is 0.0179
>>> print('Phase integral is {0:.3}'.format(phaseint))
Phase integral is 0.367
"""
_unit = 'mag'
H = Parameter(description='Absolute magnitude')
S = Parameter(description='Linear slope (mag/deg)')
input_units = {'x': u.deg}
[docs] @staticmethod
def evaluate(a, H, S):
return H + S * a
[docs] @staticmethod
def fit_deriv(a, H, S):
if hasattr(a, '__iter__'):
ddh = np.ones_like(a)
else:
ddh = 1.
dds = a
return [ddh, dds]
def _parameter_units_for_data_units(self, inputs_unit, outputs_unit):
return OrderedDict([('H', outputs_unit['y']),
('S', outputs_unit['y']/inputs_unit['x'])])