# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""sbpy Spectroscopy Sources Module
Spectrophotometric classes that encapsulate synphot.SpectralSource and
synphot.Observation in order to generate sbpy spectra and photometry.
Requires synphot.
"""
__all__ = [
'BlackbodySource', 'Reddening'
]
import numpy as np
from abc import ABC
import astropy.units as u
from astropy.utils.data import download_file, _is_url
try:
import synphot
from synphot import SpectralElement, BaseUnitlessSpectrum
except ImportError:
synphot = None
class SpectralElement:
pass
from ..exceptions import SbpyException
__doctest_requires__ = {
'SpectralSource': ['synphot'],
'BlackbodySource': ['synphot'],
}
class SinglePointSpectrumError(SbpyException):
"""Single point provided, but multiple values expected."""
class SynphotRequired(SbpyException):
pass
class SpectralSource(ABC):
"""Abstract base class for SBPy spectral sources.
Requires `~synphot`.
Parameters
----------
source : `~synphot.SourceSpectrum`
The source spectrum.
description : string, optional
A brief description of the source spectrum.
Attributes
----------
wave - Wavelengths of the source spectrum.
fluxd - Source spectrum.
description - Brief description of the source spectrum.
meta - Meta data from ``source``, if any.
"""
def __init__(self, source, description=None):
if synphot is None:
raise SynphotRequired(
'synphot required for {}.'.format(self.__class__.__name__))
self._source = source
self._description = description
@classmethod
def from_array(cls, wave, fluxd, meta=None, **kwargs):
"""Create standard from arrays.
Parameters
----------
wave : `~astropy.units.Quantity`
The spectral wavelengths.
fluxd : `~astropy.units.Quantity`
The spectral flux densities.
meta : dict, optional
Meta data.
**kwargs
Passed to object initialization.
"""
if synphot is None:
raise ImportError(
'synphot required for {}.'.format(cls.__name__))
source = synphot.SourceSpectrum(
synphot.Empirical1D, points=wave, lookup_table=fluxd,
meta=meta)
return cls(source, **kwargs)
@classmethod
def from_file(cls, filename, wave_unit=None, flux_unit=None,
cache=True, **kwargs):
"""Load the source spectrum from a file.
NaNs are dropped.
Parameters
----------
filename : string
The name of the file. See
`~synphot.SourceSpectrum.from_file` for details.
wave_unit, flux_unit : str or `~astropy.units.Unit`, optional
Wavelength and flux units in the file.
cache : bool, optional
If ``True``, cache the contents of URLs.
**kwargs
Passed to object initialization.
"""
if synphot is None:
raise ImportError(
'synphot required for {}.'.format(cls.__name__))
if filename.lower().endswith(('.fits', '.fit', '.fz')):
read_spec = synphot.specio.read_fits_spec
else:
read_spec = synphot.specio.read_ascii_spec
# URL cache because synphot.SourceSpectrum.from_file does not
if _is_url(filename):
fn = download_file(filename, cache=True)
else:
fn = filename
spec = read_spec(fn, wave_unit=wave_unit, flux_unit=flux_unit)
i = np.isfinite(spec[1] * spec[2])
source = synphot.SourceSpectrum(
synphot.Empirical1D, points=spec[1][i], lookup_table=spec[2][i],
meta={'header': spec[0]})
return cls(source, **kwargs)
@property
def description(self):
"""Description of the source spectrum."""
return self._description
@property
def wave(self):
"""Wavelengths of the source spectrum."""
return self.source.waveset
@property
def fluxd(self):
"""The source spectrum."""
return self.source(self._source.waveset, flux_unit='W / (m2 um)')
@property
def source(self):
return self._source
@property
def meta(self):
self._source.meta
def __call__(self, wave_or_freq, unit=None):
"""Evaluate/interpolate the source spectrum.
Parameters
----------
wave_or_freq : `~astropy.units.Quantity`
Requested wavelengths or frequencies of the resulting
spectrum.
unit : string, `~astropy.units.Unit`, optional
Spectral units of the output (flux density). If ``None``,
the default depends on ``wave_or_freq``: W/(m2 μm) for
wavelengths, Jy for frequencies.
Returns
-------
fluxd : `~astropy.units.Quantity`
The spectrum evaluated/interpolated to the requested
wavelengths or frequencies.
"""
from .. import units as sbu # avoid circular dependency
if unit is not None:
unit = u.Unit(unit)
elif wave_or_freq.unit.is_equivalent('m'):
unit = u.Unit('W/(m2 um)')
else:
unit = u.Jy
if unit.is_equivalent(sbu.VEGA):
fluxd = self.source(wave_or_freq, 'W/(m2 um)').to(
unit, sbu.spectral_density_vega(wave_or_freq))
else:
fluxd = self.source(wave_or_freq, unit)
return fluxd
def observe(self, wfb, unit=None, interpolate=False, **kwargs):
"""Observe source as through filters or spectrometer.
Calls ``observe_bandpass``, ``observe_spectrum``, or
``self()``, as appropriate.
Parameters
----------
wfb : `~astropy.units.Quantity`, `~synphot.SpectralElement`
Wavelengths, frequencies, or bandpasses. May also be a
list of ``SpectralElement``s.
unit : string, `~astropy.units.Unit`, optional
Spectral flux density units for the output. If ``None``,
the default depends on ``wfb``: W/(m2 μm) for wavelengths
or bandpasses, Jy for frequencies.
interpolate : bool, optional
For wavelengths/frequencies, set to ``True`` for
interpolation instead of rebinning. Use this when the
spectral resolution of the source is close to that of the
requested wavelengths.
**kwargs
Additional keyword arguments for
`~synphot.observation.Observation`, e.g., ``force``.
Returns
-------
fluxd : `~astropy.units.Quantity`
The spectrum rebinned.
Raises
------
SinglePointSpectrumError - If requested wavelengths or
frequencies has only one value.
Notes
-----
Method for spectra adapted from AstroBetter post by Jessica Lu:
https://www.astrobetter.com/blog/2013/08/12/python-tip-re-sampling-spectra-with-pysynphot/
"""
if isinstance(wfb, (list, tuple, SpectralElement)):
lambda_eff, fluxd = self.observe_bandpass(
wfb, unit=unit, **kwargs)
elif isinstance(wfb, u.Quantity):
if interpolate:
fluxd = self(wfb, unit=unit)
else:
fluxd = self.observe_spectrum(wfb, unit=unit, **kwargs)
else:
raise TypeError('Unsupported type for `wfb` type: {}'
.format(type(wfb)))
return fluxd
def observe_bandpass(self, bp, unit=None, **kwargs):
"""Observe through a bandpass.
Parameters
----------
bp : `~synphot.SpectralElement`, list, or tuple
Bandpass.
unit : string, `~astropy.units.Unit`, optional
Spectral flux density units for the output. The default
is W/(m2 μm).
**kwargs
Additional keyword arguments for
`~synphot.observation.Observation`, e.g., ``force``.
Returns
-------
lambda_eff : `~astropy.units.Quantity`
Effective wavelength(s) of the observation(s).
fluxd : `~astropy.units.Quantity`
The spectrum rebinned.
"""
from .. import units as sbu # avoid circular dependency
# promote single bandpasses to a list, but preserve number of
# dimensions
if isinstance(bp, (SpectralElement, str)):
ndim = 0
bp = [bp]
else:
ndim = np.ndim(bp)
if unit is None:
unit = u.Unit('W/(m2 um)')
else:
unit = u.Unit(unit)
fluxd = np.ones(len(bp)) * unit
for i in range(len(bp)):
obs = synphot.Observation(self.source, bp[i], **kwargs)
lambda_eff = obs.effective_wavelength()
lambda_pivot = obs.bandpass.pivot()
_fluxd = obs.effstim('W/(m2 um)')
if unit.is_equivalent(sbu.VEGAmag):
fluxd[i] = _fluxd.to(unit, sbu.spectral_density_vega(bp[i]))
else:
fluxd[i] = _fluxd.to(unit, u.spectral_density(lambda_pivot))
if np.ndim(fluxd) != ndim:
fluxd = fluxd.squeeze()
return lambda_eff, fluxd
def observe_spectrum(self, wave_or_freq, unit=None, **kwargs):
"""Observe source as through a spectrometer.
.. Important:: This method works best when the requested
spectral resolution is lower than the spectral resolution
of the internal data. If the requested
wavelengths/frequencies are exactly the same as the
internal spectrum, then the internal spectrum will be
returned without binning. This special case does not work
for subsets of the wavelengths.
Parameters
----------
wave_or_freq : `~astropy.units.Quantity`
Wavelengths or frequencies of the spectrum. Spectral bins
will be centered at these values. The length must be
larger than 1.
unit : string, `~astropy.units.Unit`, optional
Spectral flux density units for the output. If ``None``,
the default is W/(m2 μm) for wavelengths, Jy for
frequencies.
**kwargs
Additional keyword arguments for
`~synphot.observation.Observation`, e.g., ``force``.
Returns
-------
fluxd : `~astropy.units.Quantity`
The spectrum rebinned.
Raises
------
SinglePointSpectrumError - If requested wavelengths or
frequencies has only one value.
Notes
-----
Method for spectra adapted from AstroBetter post by Jessica Lu:
https://www.astrobetter.com/blog/2013/08/12/python-tip-re-sampling-spectra-with-pysynphot/
"""
from .. import units as sbu # avoid circular dependency
if np.size(wave_or_freq) == 1:
raise SinglePointSpectrumError(
'Multiple wavelengths or frequencies required for '
'observe_spectrum. Instead consider interpolation '
'with {}().'
.format(self.__class__.__name__))
if unit is None:
if wave_or_freq.unit.is_equivalent('m'):
unit = u.Unit('W/(m2 um)')
else:
unit = u.Jy
else:
unit = u.Unit(unit)
specele = synphot.SpectralElement(synphot.ConstFlux1D, amplitude=1)
# Specele is defined over all wavelengths, but most spectral
# standards are not. force='taper' will affect retrieving
# flux densities at the edges of the spectrum, but is
# preferred to avoid wild extrapolation.
kwargs['force'] = kwargs.get('force', 'taper')
obs = synphot.Observation(
self.source, specele, binset=wave_or_freq, **kwargs)
if unit.is_equivalent(sbu.VEGAmag):
fluxd = obs.sample_binned(flux_unit='W/(m2 um)').to(
unit, sbu.spectral_density_vega(wave_or_freq))
else:
fluxd = obs.sample_binned(flux_unit=unit)
return fluxd
def color_index(self, wfb, unit):
"""Color index (magnitudes) and effective wavelengths.
Parameters
----------
wfb : `~astropy.units.Quantity` or tuple of `~synphot.SectralElement`
Two wavelengths, frequencies, or bandpasses.
unit : string or `~astropy.units.MagUnit`
Units for the calculation, e.g., ``astropy.units.ABmag`` or
``sbpy.units.VEGAmag``.
Returns
-------
eff_wave : `~astropy.units.Quantity`
Effective wavelengths for each ``wfb``.
ci : `~astropy.units.Quantity`
Color index, ``m_0 - m_1``, where 0 and 1 are element
indexes for ``wfb``.
"""
eff_wave = []
m = np.zeros(2) * u.Unit(unit)
for i in range(2):
if isinstance(wfb[i], u.Quantity):
if wfb[i].unit.is_equivalent(u.Hz):
eff_wave.append(wfb[i].to(u.um, u.spectral()))
else:
eff_wave.append(wfb[i])
m[i] = self(eff_wave[i], unit=unit)
elif isinstance(wfb[i], (list, tuple, SpectralElement)):
w, m[i] = self.observe_bandpass(wfb[i], unit=unit)
eff_wave.append(w)
else:
raise TypeError('Unsupported type for `wfb` type: {}'
.format(type(wfb[i])))
ci = m[0] - m[1]
return u.Quantity(eff_wave), ci
def redden(self, S):
"""Redden the spectrum.
Parameters
----------
S : `~SpectralGradient`
The spectral gradient to redden.
Returns
-------
spec : `~SpectralSource`
Reddened spectrum
"""
from copy import deepcopy
r = Reddening(S)
red_spec = deepcopy(self)
red_spec._source = red_spec.source * r
if red_spec.description is not None:
red_spec._description = '{} reddened by {} at {}'.format(
red_spec.description, S, S.wave0)
return red_spec
[docs]class BlackbodySource(SpectralSource):
"""Blackbody sphere.
Spectral flux densities are calculated from ``pi * B(T)``, where
``B`` is the Planck function.
Parameters
----------
T : `~astropy.units.Quantity`, required
Temperature in Kelvin.
"""
def __init__(self, T=None):
super().__init__(None, description='πB(T)')
if T is None:
raise TypeError('T is required.')
self._T = u.Quantity(T, u.K)
self._source = synphot.SourceSpectrum(
synphot.BlackBody1D, temperature=self._T.value) * np.pi
def __repr__(self):
return '<BlackbodySource: T={}>'.format(self._T)
@property
def T(self):
return self._T
[docs]class Reddening(BaseUnitlessSpectrum):
"""Class to handle simple linear reddening.
Parameters
----------
S : `~SpectralGradient`
The spectral gradient to redden.
"""
@u.quantity_input(S=u.percent / u.um)
def __init__(self, S):
if getattr(S, 'wave0', None) is None:
raise ValueError("Normalization wavelength in `S` (.wave0) is "
"required by not available.")
wv = [1, 2] * S.wave0
df = (S.wave0 * S).to('').value
super().__init__(
synphot.Empirical1D, points=wv, lookup_table=[1, 1+df],
fill_value=None)