from os import path
from datetime import datetime
import logging
from socket import timeout
from urllib import parse
import numpy as np
import astropy.io.fits as fits
from astropy.time import Time
from astropy.coordinates import Angle, name_resolve, SkyCoord, AltAz
from astropy import units as u
from astropy.table import Table
from ccdproc import ImageFileCollection
try:
from .feder import Feder
feder = Feder()
except ImportError:
feder = None
pass
from .fitskeyword import FITSKeyword
logger = logging.getLogger(__name__)
#__all__ = ['patch_headers', 'add_object_info', 'add_ra_dec_from_object_name']
[docs]def IRAF_image_type(image_type):
"""
Convert MaximDL default image type names to IRAF
Parameters
----------
image_type : str
Value of the FITS header keyword IMAGETYP; acceptable values are
below in Notes.
Returns
-------
str
IRAF image type (one of 'BIAS', 'DARK', 'FLAT' or 'LIGHT')
Notes
-----
The MaximDL default is, e.g. 'Bias Frame', which IRAF calls
'BIAS'. Can safely be called with an IRAF-style image_type.
"""
return image_type.split()[0].upper()
def _lst_from_obstime(obstime):
try:
LST = obstime.sidereal_time('apparent',
longitude=feder.site.lon)
except IndexError:
# We are outside the range of the IERS table installed with astropy,
# so get a newer one.
from astropy.utils import iers
from astropy.utils.data import download_file
iers_a = iers.IERS_A.open(download_file(iers.IERS_A_URL,
cache=True))
obstime.delta_ut1_utc = obstime.get_delta_ut1_utc(iers_a)
LST = obstime.sidereal_time('apparent',
longitude=feder.site.lon)
return LST
[docs]def add_time_info(header, history=False):
"""
Add JD, MJD, LST to FITS header
Parameters
----------
header : astropy..io.fits.Header
FITS header to be modified.
history : bool
If `True`, write history for each keyword changed.
"""
dateobs = Time(header['date-obs'], scale='utc')
feder.JD_OBS.value = dateobs.jd
feder.MJD_OBS.value = dateobs.mjd
LST_tmp = _lst_from_obstime(dateobs)
feder.LST.value = LST_tmp.to_string(unit=u.hour, sep=':', precision=4,
pad=True)
for keyword in feder.keywords_for_all_files:
keyword.add_to_header(header, history=history)
logger.info(keyword.history_comment())
[docs]def add_object_pos_airmass(header, history=False):
"""
Add object information, such as RA/Dec and airmass.
Parameters
----------
header : astropy..io.fits.Header
FITS header to be modified.
history : bool
If `True`, write history for each keyword changed.
Notes
-----
Has side effect of setting feder site JD to JD-OBS, which means it
also assume JD.value has been set.
"""
# not sure why coverage is not picking up both branches, but it is not, so
# marking it no cover
if feder.JD_OBS.value is None: # pragma: no cover
raise ValueError('Need to set JD_OBS.value '
'before calling.')
try:
feder.RA.set_value_from_header(header)
except ValueError:
raise ValueError("No RA is present.")
feder.DEC.set_value_from_header(header)
feder.RA.value = feder.RA.value.replace(' ', ':')
feder.DEC.value = feder.DEC.value.replace(' ', ':')
obj_coord2 = SkyCoord(feder.RA.value, feder.DEC.value,
unit=(u.hour, u.degree), frame='fk5')
obstime = Time(feder.MJD_OBS.value, format='mjd')
alt_az = obj_coord2.transform_to(AltAz(obstime=obstime,
location=feder.site))
feder.ALT_OBJ.value = round(alt_az.alt.degree, 5)
feder.AZ_OBJ.value = round(alt_az.az.degree, 5)
feder.AIRMASS.value = round(1 / np.cos(np.pi / 2 - alt_az.alt.radian), 3)
# TODO: replace the LST calculation
LST = _lst_from_obstime(obstime)
HA = LST.hour - obj_coord2.ra.hour
HA = Angle(HA, unit=u.hour)
feder.HA.value = HA.to_string(unit=u.hour, sep=':')
for keyword in feder.keywords_for_light_files:
if keyword.value is not None:
keyword.add_to_header(header, history=history)
logger.info(keyword.history_comment())
[docs]def get_software_name(header, file_name=None, use_observatory=None):
"""
Determine the name of the software that created FITIS header
Parameters
----------
header : astropy.io.fits Header
Header from a FITS extension/hdu
file_name : str, optional
Name of the file containing this header; used to add information to
error/warning messages.
use_observatory : msumastro.Feder instance, optional
Object that contains names of FITS keywords that might be present and
contain name of the software that made this header. The default value
is the instance defined at the beginning of this module
Returns
-------
msumastro.feder.Software object
"""
fits_file = file_name or ''
observatory = use_observatory or feder
known_software_keywords = observatory.software_FITS_keywords
software_name_in_header = FITSKeyword(name=known_software_keywords[0],
synonyms=known_software_keywords)
software_name_in_header.set_value_from_header(header)
try:
software = observatory.software[software_name_in_header.value]
except KeyError:
raise KeyError('Software named {0} not recognized in header '
'for file {1}'.format(software_name_in_header.value,
fits_file))
return software
[docs]def purge_bad_keywords(header, history=False, force=False, file_name=None):
"""
Remove keywords from FITS header that may be incorrect
Parameters
----------
header : astropy.io.fits.Header
Header from which the bad keywords (as defined by the software that
recorded the image) should be purged.
history : bool
If `True` write detailed history for each keyword removed.
force : bool
If `True`, force keywords to be purged even if the FITS header
indicates it has already been purged.
file_name : str, optional
Name of file containing the header; if provided it is used to generate
more informative log messages.
"""
software = get_software_name(header, file_name=file_name)
try:
purged = header[software.purged_flag_keyword]
except KeyError:
purged = False
if purged and not force:
warn_msg = 'Not removing headers from {0} again, '
warn_msg += 'set force=True to force removal.'
logger.warn(warn_msg.format(file_name))
return
for keyword in software.bad_keywords:
try:
comment = ('Deleted keyword ' + keyword +
' with value ' + str(header[keyword]))
del header[keyword]
except KeyError:
continue
logger.info(comment)
if history:
header.add_history(comment)
header[software.purged_flag_keyword] = (True,
'Have bad keywords been removed?')
[docs]def change_imagetype_to_IRAF(header, history=True):
"""
Change IMAGETYP to default used by IRAF
Parameters
----------
header : astropy.io.fits.Header
Header object in which image type is to be changed.
history : bool, optional
If `True`, add history of keyword modification to `header`.
"""
imagetype = 'imagetyp' # FITS keyword name is truncated at 8 chars
current_type = header[imagetype]
IRAF_type = IRAF_image_type(current_type)
if current_type != IRAF_type:
header[imagetype] = IRAF_type
comment = 'Changed {0} from {1} to {2}'.format(imagetype.upper(),
current_type,
IRAF_type)
logger.info(comment)
if history:
header.add_history(comment)
[docs]def add_image_unit(header, history=True):
"""
Add unit of image to header.
Parameters
----------
header : astropy.io.fits.Header
Header object in which image type is to be changed.
history : bool, optional
If `True`, add history of keyword modification to `header`.
"""
instrument_key = 'instrume'
instrument = feder.instruments[header[instrument_key]]
if instrument.image_unit is not None:
unit_string = instrument.image_unit.to_string()
comment = 'Set image data unit to {}'.format(unit_string)
header['BUNIT'] = unit_string
print('UNIT STRING IS =========> {}'.format(unit_string))
logger.info(comment)
if history:
header.add_history(comment)
[docs]def list_name_is_url(name):
may_be_url = parse.urlparse(name)
return (may_be_url.scheme and may_be_url.netloc)
[docs]def read_object_list(directory=None, input_list=None,
skip_consistency_check=False, check_radius=20.0,
skip_lookup_from_object_name=False):
"""
Read a list of objects from a text file.
Parameters
----------
directory : str
Directory containing the file. Default is the current directory, ``.``
input_list : str, optional
Name of the file or URL of file. Default value is ``obsinfo.txt``. If
the name is a URL the directory argument is ignored.
skip_consistency_check : bool optional
If ``True``, skip checking whether objects on the list have unique
coordinates given `check_radius`.
check_radius : float, optional
Match radius, in arcminutes. Objects on the list must be separated by
an angular distance greater than this for the list to be
self-consistent.
skip_lookup_from_object_name : bool, optional
Set to ``True`` to skip lookup of coordinates from Simbad if RA/Dec
are not in the object file.
Notes
-----
There are two file formats; one contains just a list of objects, the
other has an RA and Dec for each object.
In both types any lines that start with ``#`` are ignored and treated
as comments.
**File with list of objects only:**
+ Object coordinates are determined by lookup with
`Simbad <http://simbad.u-strasbg.fr/simbad/>`_. You should make sure
the object names you use are known to simbad.
+ The first non-comment line MUST be the word ``object`` and only
the word ``object``. It is case sensitive; ``Object`` or ``OBJECT``
will not work.
+ Remaining line(s) are name(s) of object(s), one per line. Case does
**not** matter for object name.
+ Example::
# my list is below
object
m101
sz lyn
# the next object is after this comment
RR LYR
**File with object name and position:**
+ RA and Dec **must be J2000**.
+ RA **must be given in hours**, though it can be either sexagesimal
(e.g. ``19:25:27.9``) or decimal (e.g. ``19.423861``).
+ Dec **must be given in degrees**, though it can be either sexagesimal
(e.g. ``42:47:3.69``) or decimal (e.g. ``42.7843583``)
+ The first non-comment line MUST be these words: ``object,RA,Dec``.
These are column headings for your file. It is **not** case
sensitive; for example, using ``DEC`` instead of ``Dec`` will work.
+ Each remaining line should be an object name, object RA and Dec.
Case does **not** matter for object name.
+ Example::
# my list with RA and Dec
# RA and Dec are assumed to be J2000
# RA MUST BE IN HOURS
# DEC MUST BE IN DEGREES
object,RA,Dec
m101,14:03:12.583, +54:20:55.50
# note that the leading sign for the Dec is optional if Dec is
# positive
sz lyn,08:09:35.748, 44:28:17.61
# You can mix sexagesimal and decimal RA/Dec.
RR Lyr, 19.423861,42.7843583
"""
def normalize_column_name(key, table):
"""
Find any column in table whose name matches, aside from case, key
and change the name of the column to key
"""
contains_col = lambda key, names: key.lower() in [name.lower() for name
in names]
if not contains_col(key, table.columns):
raise KeyError('Keyword {0} not found in table'.format(key))
for column in table.columns:
if ((key.lower() == column.lower()) and (key != column)):
table.rename_column(column, key)
break
if directory is None:
directory = '.'
list_name = input_list if input_list is not None else 'obsinfo.txt'
if not list_name_is_url(list_name):
full_name = path.join(directory, list_name)
else:
full_name = list_name
objects = Table.read(full_name,
format='ascii',
comment='#',
delimiter=',')
try:
normalize_column_name('object', objects)
except KeyError as e:
logger.debug('%s', e)
raise RuntimeError(
'No column named object found in file {}'.format(list_name))
try:
normalize_column_name('RA', objects)
normalize_column_name('Dec', objects)
RA = objects['RA']
Dec = objects['Dec']
except KeyError:
RA = None
Dec = None
ra_dec = None
object_names = objects['object']
if (RA is not None) and (Dec is not None):
default_angle_units = (u.hour, u.degree)
ra_dec = SkyCoord(RA, Dec, unit=default_angle_units, frame='fk5')
else:
if skip_lookup_from_object_name:
ra_dec = None
else:
try:
ra_dec = [SkyCoord.from_name(obj) for obj in object_names]
except (name_resolve.NameResolveError, timeout) as e:
logger.error('Unable to do lookup of object positions')
logger.error(e)
raise name_resolve.NameResolveError('Unable to do lookup of '
'object positions')
ra = []
dec = []
for a_coord in ra_dec:
ra.append(a_coord.ra.radian)
dec.append(a_coord.dec.radian)
ra_dec = SkyCoord(ra, dec, unit=(u.radian, u.radian), frame='fk5')
if skip_consistency_check and skip_lookup_from_object_name:
return object_names, ra_dec
if skip_consistency_check or not ra_dec:
return object_names, ra_dec
# sanity check object list--the objects should not be so close together
# that any pair is within match radius of each other.
logger.debug('Testing object list for self-consistency')
# need 2nd neighbor below or objects will match themselves
try:
matches, d2d, d3d = ra_dec.match_to_catalog_sky(ra_dec,
nthneighbor=2)
bad_object_list = (d2d.arcmin < check_radius).any()
except IndexError:
# There was only one item in the table...so no object can
# be duplicated, so make a fake distance that doesn't match
bad_object_list = False
if bad_object_list:
err_msg = ('Object list {} in directory {} contains at least one '
'pair of objects that '
'are closer together than '
'match radius {} arcmin').format(input_list,
directory,
check_radius)
logger.error(err_msg)
raise RuntimeError(err_msg)
logger.debug('Passed: object list is self-consistent')
return object_names, ra_dec
[docs]def history(function, mode=None, time=None):
"""
Construct nicely formatted start/end markers in FITS history.
Parameters
----------
function : func
Function calling `history`
mode : str, 'begin' or 'end'
A different string is produced for the beginning and the end. Default
is 'begin'.
time : datetime
If not set, defaults to current date/time.
"""
mode = mode or 'begin'
if mode == 'begin':
marker = '+'
elif mode == 'end':
marker = '-'
else:
raise ValueError('mode must be "begin" or "end"')
if time is None:
time = datetime.now()
marker *= 5
return "%s %s %s history on %s %s" % (marker, mode.upper(),
function.__name__, time,
marker)
[docs]def add_object_info(directory=None,
object_list=None,
object_list_dir=None,
match_radius=20.0, new_file_ext=None,
save_location=None,
overwrite=False, detailed_history=True):
"""
Add object information to FITS files that contain pointing information
given a list of objects.
Parameters
----------
directory : str
Directory containing the FITS files to be fixed. Default is the
current directory, ``.``.
object_list : str, optional
Name of file containing list of objects. Default is set by
:func:`read_object_list` which also explains the format of this file.
object_list_dir : str, optional
Directory in which the `object_list` is contained. Default is
`directory`.
match_radius : float, optional
Maximum distance, in arcmin, between the RA/Dec of the image and a
particular object for the image to be considered an image of that
object.
new_file_ext : str, optional
Name added to the FITS files with updated header information. It is
added to the base name of the input file, between the old file name
and the `.fit` or `.fits` extension. Default is 'new'.
save_location : str, optional
Directory to which the patched files should be written, if not `dir`.
overwrite : bool, optional
Set to `True` to replace the original files.
"""
directory = directory or '.'
if new_file_ext is None:
new_file_ext = 'new'
images = ImageFileCollection(directory,
keywords=['imagetyp', 'ra',
'dec', 'object'])
im_table = images.summary
object_dir = directory if object_list_dir is None else object_list_dir
logger.debug('About to read object list')
try:
object_names, ra_dec = read_object_list(object_dir,
input_list=object_list)
except IOError:
warn_msg = 'No object list in directory {0}, skipping.'
logger.warn(warn_msg.format(directory))
return
except name_resolve.NameResolveError:
logger.error('Unable to add objects--name resolve error')
return
object_names = np.array(object_names)
# I want rows which...
#
# ...have no OBJECT...
needs_object = im_table['object'].mask
# ...and have coordinates.
needs_object &= ~ (im_table['ra'].mask | im_table['dec'].mask)
logger.debug('Looking for objects for %s images', needs_object.sum())
# Qualifying rows need a search for a match.
# the search returns a match for every row provided, but some matches
# may be farther away than desired, so...
#
# ...`and` the previous index mask with those that matched, and
# ...construct list of object names for those images.
default_angle_units = (u.hour, u.degree)
img_pos = SkyCoord(im_table['ra'][needs_object],
im_table['dec'][needs_object],
unit=default_angle_units,
frame='fk5')
match_idx, d2d, d3d = img_pos.match_to_catalog_sky(ra_dec)
good_match = (d2d.arcmin <= match_radius)
found_object = np.array(needs_object)
found_object[needs_object] = good_match
matched_object_name = object_names[match_idx][good_match]
no_match_found = needs_object & ~found_object
if no_match_found.any():
for fname in np.array(images.files)[no_match_found]:
warn_msg = "No object found for image {0}".format(fname)
logger.warn(warn_msg)
if not found_object.any():
logger.info('NO OBJECTS MATCHED TO IMAGES IN: {0}'.format(directory))
return
im_table['file'].mask = ~found_object
for idx, (header, fname) in enumerate(images.headers(save_with_name=new_file_ext,
overwrite=overwrite,
save_location=save_location,
return_fname=True)):
logger.info('START ATTEMPTING TO ADD OBJECT to: {0}'.format(fname))
object_name = matched_object_name[idx]
logger.debug('Found matching object named %s', object_name)
obj_keyword = FITSKeyword('object', value=object_name)
obj_keyword.add_to_header(header, history=True)
logger.info(obj_keyword.history_comment())
logger.info('END ATTEMPTING TO ADD OBJECT to: {0}'.format(fname))
[docs]def add_ra_dec_from_object_name(directory=None, new_file_ext=None,
object_list=None, object_list_dir=None):
"""
Add RA/Dec to FITS file that has object name but no pointing.
Parameters
----------
dir : str, optional
Directory containing the files to be patched. Default is the current
directory, ``.``
new_file_ext : str, optional
Name added to the FITS files with updated header information. It is
added to the base name of the input file, between the old file name
and the `.fit` or `.fits` extension. Default is 'new'.
object_list : str, optional
Name of file containing list of objects. Default is set by
:func:`read_object_list` which also explains the format of this file.
object_list_dir : str, optional
Directory in which the `object_list` is contained. Default is
`directory`.
"""
directory = directory or '.'
if new_file_ext is None:
new_file_ext = 'new'
images = ImageFileCollection(directory,
keywords=['imagetyp', 'ra',
'dec', 'object'])
summary = images.summary
missing_dec = summary[(np.logical_not(summary['object'].mask)) &
(summary['ra'].mask) &
(summary['dec'].mask) &
(summary['object'] != '') &
(summary['imagetyp'] == 'LIGHT')]
if not missing_dec:
return
objects = np.unique(missing_dec['object'])
try:
object_list, ra_dec_list = read_object_list(object_list_dir,
input_list=object_list)
except IOError:
object_list = []
ra_dec_list = []
object_dict = {}
if len(object_list) and len(ra_dec_list):
object_dict = {obj: ra_dec for obj, ra_dec in zip(object_list,
ra_dec_list)}
# checks prior to this mean this loop always happens at least once,
# which confuses coverage
for object_name in objects: # pragma: nobranch
try:
object_coords = object_dict[object_name]
except KeyError:
try:
object_coords = SkyCoord.from_name(object_name)
except (name_resolve.NameResolveError, timeout) as e:
logger.warning('Unable to lookup position for %s', object_name)
logger.warning(e)
return
common_format_keywords = {'sep': ':',
'precision': 2,
'pad': True}
feder.RA.value = object_coords.ra.to_string(unit=u.hour,
**common_format_keywords)
feder.DEC.value = object_coords.dec.to_string(unit=u.degree,
alwayssign=True,
**common_format_keywords)
these_files = missing_dec[missing_dec['object'] == object_name]
for image in these_files:
full_name = path.join(directory, image['file'])
hdulist = fits.open(full_name, do_not_scale_image_data=True)
header = hdulist[0].header
feder.RA.add_to_header(header, history=True)
feder.DEC.add_to_header(header, history=True)
if new_file_ext:
base, ext = path.splitext(full_name)
new_file_name = base + new_file_ext + ext
overwrite = False
else:
new_file_name = full_name
overwrite = True
hdulist.writeto(new_file_name, overwrite=overwrite)
hdulist.close()