# -*- coding: utf-8 -*-
"""Skyclc IPY user interface."""
import warnings
from pathlib import Path
from datetime import datetime as dt
import yaml
import numpy as np
from astropy import units as u
from astropy.io import fits
from astar_utils import get_logger
from .core import AlmanacQuery, SkyModel
__all__ = ["SkyCalc"]
# TODO: this isn't used, but something VERY similar is done in core......
observatory_dict = {
"lasilla": "2400",
"paranal": "2640",
"3060m": "3060",
"armazones": "3060",
"5000m": "5000",
"highanddry": "5000",
}
PATH_HERE = Path(__file__).parent
logger = get_logger(__name__)
[docs]class SkyCalc:
"""Main UI class."""
def __init__(self, ipt_str=None):
if ipt_str is None:
ipt_str = PATH_HERE / "params.yaml"
params = load_yaml(ipt_str)
self.defaults = {pp: params[pp][0] for pp in params}
self.values = {pp: params[pp][0] for pp in params}
self.data_type = {pp: params[pp][1] for pp in params}
self.check_type = {pp: params[pp][2] for pp in params}
self.allowed = {pp: params[pp][3] for pp in params}
self.comments = {pp: params[pp][4] for pp in params}
self.last_skycalc_response = None
[docs] def validate_params(self):
"""Check allowed range for parameters."""
invalid_keys = []
for key in self.values:
if self.check_type[key] == "no_check" or self.defaults[key] is None:
continue
if self.check_type[key] in {"range", "nearest"}:
if (
self.values[key] < self.allowed[key][0]
or self.values[key] > self.allowed[key][-1]
):
invalid_keys.append(key)
if self.check_type[key] == "nearest":
nearest = np.argmin(np.abs(self.allowed[key]
- self.values[key]))
self.values[key] = self.allowed[key][nearest]
elif self.check_type[key] in {"choice", "flag"}:
if self.values[key] not in self.allowed[key]:
invalid_keys.append(key)
elif self.check_type[key] == "greater_than":
if self.values[key] < self.allowed[key]:
invalid_keys.append(key)
if invalid_keys:
logger.warning("The following entries are invalid:")
for key in invalid_keys:
logger.warning("'%s' : %s : %s", key,
self.values[key], self.comments[key])
return not invalid_keys
[docs] def get_almanac_data(
self,
ra,
dec,
date=None,
mjd=None,
observatory=None,
update_values=False,
):
"""Query ESO Almanac with given parameters."""
if date is None and mjd is None:
raise ValueError("Either date or mjd must be set")
result = get_almanac_data(
ra=ra, dec=dec, date=date, mjd=mjd, observatory=observatory
)
if update_values:
self.values.update(result)
return result
[docs] def get_sky_spectrum(self, return_type="table", filename=None):
"""
Retrieve a fits.HDUList object from the SkyCalc server.
The HDUList can be returned in a variety of formats.
As of v0.1.3 the HDUList is no longer saved to disk.
Rather it is stored in the attribute <SkyCalc>.last_skycalc_response.
Parameters
----------
return_type : str
["table", "array", "synphot", "fits"]
filename : str, optional
Default None. If not None, the returned fits.HDUList object is
saved to disk under this path.
Returns
-------
Based on the return type, the method returns either:
- "table": tbl_return (astropy.Table)
- "array": wave, trans, flux (3x np.ndarray)
- "synphot": trans, flux (SpectralElement, SourceSpectrum,)
- "fits": hdu (HDUList)
"""
from astropy import table
if not self.validate_params():
raise ValueError(
"Object contains invalid parameters. Not calling ESO")
skm = SkyModel()
skm(**self.values)
self.last_skycalc_response = skm.data
if filename is not None:
skm.write(filename)
tbl = table.Table.read(skm.data)
if tbl["lam"].unit is None:
tbl["lam"].unit = u.um
# Set formatting to reasonable precision
tbl["lam"].format = "%.2f"
tbl["trans"].format = "%.5f"
tbl["flux"].format = "%.3f"
for colname in tbl.colnames:
if "flux" in colname:
# Somehow, astropy doesn't quite parse the unit correctly.
# Still, we shouldn't blindly overwrite it, so at least check.
funit = tbl[colname].unit
if str(funit) not in ("ph/s/m2/micron/arcsec2", "None"):
raise ValueError(f"Unexpected flux unit: {funit}")
tbl[colname].unit = u.Unit("ph s-1 m-2 um-1 arcsec-2")
date_created = dt.now().strftime("%Y-%m-%dT%H:%M:%S")
meta_data = {
"DESCRIPT": "Sky transmission and emission curves",
"SOURCE": "ESO Skycalc utility",
"AUTHOR": "ESO Skycalc utility",
"STATUS": "Tested - Generated from ESO Skycalc utility",
"DATE_CRE": date_created,
"ETYPE": "TERCurve",
"EDIM": 1,
}
params = {k: (self.values[k], self.comments[k]) for k in self.keys}
meta_data.update(params)
if "tab" in return_type:
tbl.meta.update(meta_data)
if "ext" in return_type:
tbl_return = tbl
else:
tbl_small = table.Table()
tbl_small.add_columns([tbl["lam"], tbl["trans"], tbl["flux"]])
tbl_return = tbl_small
return tbl_return
if "arr" in return_type:
wave = tbl["lam"].data * tbl["lam"].unit
trans = tbl["trans"].data
flux = tbl["flux"].data * tbl["flux"].unit
return wave, trans, flux
if "syn" in return_type:
import synphot as sp
trans = sp.SpectralElement(
sp.Empirical1D,
points=tbl["lam"].data * tbl["lam"].unit,
lookup_table=tbl["trans"].data,
)
funit = u.Unit("ph s-1 m-2 um-1")
flux = sp.SourceSpectrum(
sp.Empirical1D,
points=tbl["lam"].data * tbl["lam"].unit,
lookup_table=tbl["flux"].data * funit,
)
logger.warning(
"Synphot doesn't accept surface brightnesses \n"
"The resulting spectrum should be multiplied by arcsec-2"
)
return trans, flux
if "fit" in return_type:
hdu0 = fits.PrimaryHDU()
for key, meta_data_value in meta_data.items():
hdu0.header[key] = meta_data_value
hdu1 = fits.table_to_hdu(tbl)
hdu = fits.HDUList([hdu0, hdu1])
return hdu
[docs] def update(self, kwargs):
self.values.update(kwargs)
def __setitem__(self, key, value):
if key not in self.keys:
raise KeyError(f"Key {key} is not defined. Only predefined keys "
"can be set. See SkyCalc.keys for a list of those.")
self.values[key] = value
def __getitem__(self, item):
return self.values[item]
@property
def keys(self):
return list(self.defaults.keys())
def load_yaml(ipt_str):
# TODO: why not just load, what's all of this?
if ".yaml" in str(ipt_str).lower():
if not ipt_str.exists():
raise ValueError(f"{ipt_str} not found")
with ipt_str.open("r", encoding="utf-8") as fd:
fd = "\n".join(fd.readlines())
opts_dict = yaml.load(fd, Loader=yaml.FullLoader)
else:
opts_dict = yaml.load(ipt_str, Loader=yaml.FullLoader)
return opts_dict
def get_almanac_data(
ra,
dec,
date=None,
mjd=None,
return_full_dict=False,
observatory=None,
):
if date is not None and mjd is not None:
warnings.warn("Both date and mjd are set. Using date",
UserWarning, stacklevel=2)
skycalc_params = SkyCalc()
skycalc_params.values.update(
{"ra": ra, "dec": dec, "date": date, "mjd": mjd})
if observatory is not None:
skycalc_params.values["observatory"] = observatory
skycalc_params.validate_params()
response = AlmanacQuery(skycalc_params.values)()
if return_full_dict:
skycalc_params.values.update(response)
return skycalc_params.values
return response