Page MenuHomec4science

pyfits.py
No OneTemporary

File Metadata

Created
Mon, Oct 7, 01:31

pyfits.py

#!/usr/bin/env python
# $Id: NP_pyfits.py 356 2007-12-18 19:13:37Z jtaylor2 $
"""
A module for reading and writing FITS files and manipulating their contents.
A module for reading and writing Flexible Image Transport System
(FITS) files. This file format was endorsed by the International
Astronomical Union in 1999 and mandated by NASA as the standard format
for storing high energy astrophysics data. For details of the FITS
standard, see the NASA/Science Office of Standards and Technology
publication, NOST 100-2.0.
License: http://www.stsci.edu/resources/software_hardware/pyraf/LICENSE
For detailed examples of usage, see the I{PyFITS User's Manual} available from
U{http://www.stsci.edu/resources/software_hardware/pyfits/Users_Manual1.pdf}
Epydoc markup used for all docstrings in this module.
@group Header-related Classes: Card, CardList, _Card_with_continue,
Header, _Hierarch
@group HDU Classes: _AllHDU, BinTableHDU, _CorruptedHDU, _ExtensionHDU,
GroupsHDU, ImageHDU, _ImageBaseHDU, PrimaryHDU, TableHDU,
_TableBaseHDU, _TempHDU, _ValidHDU
@group Table-related Classes: ColDefs, Column, FITS_rec, _FormatP,
_FormatX, _VLF
"""
"""
Do you mean: "Profits"?
- Google Search, when asked for "PyFITS"
"""
import re, os, tempfile, exceptions
import operator
import __builtin__
import urllib
import tempfile
import gzip
import zipfile
import numpy as np
from numpy import char as chararray
import rec
from numpy import memmap as Memmap
from string import maketrans
import types
import signal
import threading
import sys
import warnings
# Module variables
_blockLen = 2880 # the FITS block size
_python_mode = {'readonly':'rb', 'copyonwrite':'rb', 'update':'rb+', 'append':'ab+'} # open modes
_memmap_mode = {'readonly':'r', 'copyonwrite':'c', 'update':'r+'}
TRUE = True # deprecated
FALSE = False # deprecated
_INDENT = " "
DELAYED = "delayed" # used for lazy instantiation of data
ASCIITNULL = 0 # value for ASCII table cell with value = TNULL
# this can be reset by user.
_isInt = "isinstance(val, (int, long, np.integer))"
# Warnings routines
_showwarning = warnings.showwarning
def showwarning(message, category, filename, lineno, file=None):
if file is None:
file = sys.stdout
_showwarning(message, category, filename, lineno, file)
def formatwarning(message, category, filename, lineno):
return str(message)+'\n'
warnings.showwarning = showwarning
warnings.formatwarning = formatwarning
warnings.filterwarnings('always',category=UserWarning,append=True)
# Functions
def _padLength(stringLen):
"""Bytes needed to pad the input stringLen to the next FITS block."""
return (_blockLen - stringLen%_blockLen) % _blockLen
def _tmpName(input):
"""Create a temporary file name which should not already exist.
Use the directory of the input file and the base name of the mktemp()
output.
"""
dirName = os.path.dirname(input)
if dirName != '':
dirName += '/'
_name = dirName + os.path.basename(tempfile.mktemp())
if not os.path.exists(_name):
return _name
else:
raise _name, "exists"
class VerifyError(exceptions.Exception):
"""Verify exception class."""
pass
class _ErrList(list):
"""Verification errors list class. It has a nested list structure
constructed by error messages generated by verifications at different
class levels.
"""
def __init__(self, val, unit="Element"):
list.__init__(self, val)
self.unit = unit
def __str__(self, tab=0):
"""Print out nested structure with corresponding indentations.
A tricky use of __str__, since normally __str__ has only one
argument.
"""
result = ""
element = 0
# go through the list twice, first time print out all top level messages
for item in self:
if not isinstance(item, _ErrList):
result += _INDENT*tab+"%s\n" % item
# second time go through the next level items, each of the next level
# must present, even it has nothing.
for item in self:
if isinstance(item, _ErrList):
_dummy = item.__str__(tab=tab+1)
# print out a message only if there is something
if _dummy.strip():
if self.unit:
result += _INDENT*tab+"%s %s:\n" % (self.unit, element)
result += _dummy
element += 1
return result
class _Verify:
"""Shared methods for verification."""
def run_option(self, option="warn", err_text="", fix_text="Fixed.", fix = "pass", fixable=1):
"""Execute the verification with selected option."""
_text = err_text
if not fixable:
option = 'unfixable'
if option in ['warn', 'exception']:
#raise VerifyError, _text
#elif option == 'warn':
pass
# fix the value
elif option == 'unfixable':
_text = "Unfixable error: %s" % _text
else:
exec(fix)
#if option != 'silentfix':
_text += ' ' + fix_text
return _text
def verify (self, option='warn'):
"""Wrapper for _verify."""
_option = option.lower()
if _option not in ['fix', 'silentfix', 'ignore', 'warn', 'exception']:
raise ValueError, 'Option %s not recognized.' % option
if (_option == "ignore"):
return
x = str(self._verify(_option)).rstrip()
if _option in ['fix', 'silentfix'] and x.find('Unfixable') != -1:
raise VerifyError, '\n'+x
if (_option != "silentfix"and _option != 'exception') and x:
warnings.warn('Output verification result:')
warnings.warn(x)
if _option == 'exception' and x:
raise VerifyError, '\n'+x
def _pad(input):
"""Pad balnk space to the input string to be multiple of 80."""
_len = len(input)
if _len == Card.length:
return input
elif _len > Card.length:
strlen = _len % Card.length
if strlen == 0:
return input
else:
return input + ' ' * (Card.length-strlen)
# minimum length is 80
else:
strlen = _len % Card.length
return input + ' ' * (Card.length-strlen)
def _floatFormat(value):
"""Format the floating number to make sure it gets the decimal point."""
valueStr = "%.16G" % value
if "." not in valueStr and "E" not in valueStr:
valueStr += ".0"
return valueStr
class Undefined:
"""Undefined value."""
pass
class Delayed:
"""Delayed file-reading data."""
def __init__(self, hdu=None, field=None):
self.hdu = hdu
self.field = field
# translation table for floating value string
_fix_table = maketrans('de', 'DE')
_fix_table2 = maketrans('dD', 'eE')
class Card(_Verify):
# string length of a card
length = 80
# String for a FITS standard compliant (FSC) keyword.
_keywd_FSC = r'[A-Z0-9_-]* *$'
_keywd_FSC_RE = re.compile(_keywd_FSC)
# A number sub-string, either an integer or a float in fixed or
# scientific notation. One for FSC and one for non-FSC (NFSC) format:
# NFSC allows lower case of DE for exponent, allows space between sign,
# digits, exponent sign, and exponents
_digits_FSC = r'(\.\d+|\d+(\.\d*)?)([DE][+-]?\d+)?'
_digits_NFSC = r'(\.\d+|\d+(\.\d*)?) *([deDE] *[+-]? *\d+)?'
_numr_FSC = r'[+-]?' + _digits_FSC
_numr_NFSC = r'[+-]? *' + _digits_NFSC
# This regex helps delete leading zeros from numbers, otherwise
# Python might evaluate them as octal values.
_number_FSC_RE = re.compile(r'(?P<sign>[+-])?0*(?P<digt>' + _digits_FSC+')')
_number_NFSC_RE = re.compile(r'(?P<sign>[+-])? *0*(?P<digt>' + _digits_NFSC + ')')
# FSC commentary card string which must contain printable ASCII characters.
_ASCII_text = r'[ -~]*$'
_comment_FSC_RE = re.compile(_ASCII_text)
# Checks for a valid value/comment string. It returns a match object
# for a valid value/comment string.
# The valu group will return a match if a FITS string, boolean,
# number, or complex value is found, otherwise it will return
# None, meaning the keyword is undefined. The comment field will
# return a match if the comment separator is found, though the
# comment maybe an empty string.
_value_FSC_RE = re.compile(
r'(?P<valu_field> *'
r'(?P<valu>'
# The <strg> regex is not correct for all cases, but
# it comes pretty darn close. It appears to find the
# end of a string rather well, but will accept
# strings with an odd number of single quotes,
# instead of issuing an error. The FITS standard
# appears vague on this issue and only states that a
# string should not end with two single quotes,
# whereas it should not end with an even number of
# quotes to be precise.
#
# Note that a non-greedy match is done for a string,
# since a greedy match will find a single-quote after
# the comment separator resulting in an incorrect
# match.
r'\'(?P<strg>([ -~]+?|\'\'|)) *?\'(?=$|/| )|'
r'(?P<bool>[FT])|'
r'(?P<numr>' + _numr_FSC + ')|'
r'(?P<cplx>\( *'
r'(?P<real>' + _numr_FSC + ') *, *(?P<imag>' + _numr_FSC + ') *\))'
r')? *)'
r'(?P<comm_field>'
r'(?P<sepr>/ *)'
r'(?P<comm>[!-~][ -~]*)?'
r')?$')
_value_NFSC_RE = re.compile(
r'(?P<valu_field> *'
r'(?P<valu>'
r'\'(?P<strg>([ -~]+?|\'\'|)) *?\'(?=$|/| )|'
r'(?P<bool>[FT])|'
r'(?P<numr>' + _numr_NFSC + ')|'
r'(?P<cplx>\( *'
r'(?P<real>' + _numr_NFSC + ') *, *(?P<imag>' + _numr_NFSC + ') *\))'
r')? *)'
r'(?P<comm_field>'
r'(?P<sepr>/ *)'
r'(?P<comm>.*)'
r')?$')
# keys of commentary cards
_commentaryKeys = ['', 'COMMENT', 'HISTORY']
def __init__(self, key='', value='', comment=''):
"""Construct a card from key, value, and (optionally) comment.
Any specifed arguments, except defaults, must be compliant to
FITS standard.
key: keyword name, default=''.
value: keyword value, default=''.
comment: comment, default=''.
"""
if key != '' or value != '' or comment != '':
self._setkey(key)
self._setvalue(value)
self._setcomment(comment)
# for commentary cards, value can only be strings and there
# is no comment
if self.key in Card._commentaryKeys:
if not isinstance(self.value, str):
raise ValueError, 'Value in a commentary card must be a string'
else:
self.__dict__['_cardimage'] = ' '*80
def __repr__(self):
return self._cardimage
def __getattr__(self, name):
""" instanciate specified attribute object."""
if name == '_cardimage':
self.ascardimage()
elif name == 'key':
self._extractKey()
elif name in ['value', 'comment']:
self._extractValueComment(name)
else:
raise AttributeError, name
return getattr(self, name)
def _setkey(self, val):
"""Set the key attribute, surrogate for the __setattr__ key case."""
if isinstance(val, str):
val = val.strip()
if len(val) <= 8:
val = val.upper()
if val == 'END':
raise ValueError, "keyword 'END' not allowed"
self._checkKey(val)
else:
if val[:8].upper() == 'HIERARCH':
val = val[8:].strip()
self.__class__ = _Hierarch
else:
raise ValueError, 'keyword name %s is too long (> 8), use HIERARCH.' % val
else:
raise ValueError, 'keyword name %s is not a string' % val
self.__dict__['key'] = val
def _setvalue(self, val):
"""Set the value attribute."""
if isinstance(val, (str, int, long, float, complex, bool, Undefined,
np.floating, np.integer, np.complexfloating)):
if isinstance(val, str):
self._checkText(val)
self.__dict__['_valueModified'] = 1
else:
raise ValueError, 'Illegal value %s' % str(val)
self.__dict__['value'] = val
def _setcomment(self, val):
"""Set the comment attribute."""
if isinstance(val,str):
self._checkText(val)
else:
if val is not None:
raise ValueError, 'comment %s is not a string' % val
self.__dict__['comment'] = val
def __setattr__(self, name, val):
if name == 'key':
raise SyntaxError, 'keyword name cannot be reset.'
elif name == 'value':
self._setvalue(val)
elif name == 'comment':
self._setcomment(val)
else:
raise AttributeError, name
# When an attribute (value or comment) is changed, will reconstructe
# the card image.
self._ascardimage()
def ascardimage(self, option='silentfix'):
"""Generate a (new) card image from the attributes: key, value,
and comment, or from raw string.
option: verification option, default=silentfix.
"""
# Only if the card image already exist (to avoid infinite loop),
# fix it first.
if self.__dict__.has_key('_cardimage'):
self._check(option)
self._ascardimage()
return self.__dict__['_cardimage']
def _ascardimage(self):
"""Generate a (new) card image from the attributes: key, value,
and comment. Core code for ascardimage.
"""
# keyword string
if self.__dict__.has_key('key') or self.__dict__.has_key('_cardimage'):
if isinstance(self, _Hierarch):
keyStr = 'HIERARCH %s ' % self.key
else:
keyStr = '%-8s' % self.key
else:
keyStr = ' '*8
# value string
# check if both value and _cardimage attributes are missing,
# to avoid infinite loops
if not (self.__dict__.has_key('value') or self.__dict__.has_key('_cardimage')):
valStr = ''
# string value should occupies at least 8 columns, unless it is
# a null string
elif isinstance(self.value, str):
if self.value == '':
valStr = "''"
else:
_expValStr = self.value.replace("'","''")
valStr = "'%-8s'" % _expValStr
valStr = '%-20s' % valStr
# must be before int checking since bool is also int
elif isinstance(self.value ,(bool,np.bool_)):
valStr = '%20s' % `self.value`[0]
elif isinstance(self.value , (int, long, np.integer)):
valStr = '%20d' % self.value
# XXX need to consider platform dependence of the format (e.g. E-009 vs. E-09)
elif isinstance(self.value, (float, np.floating)):
if self._valueModified:
valStr = '%20s' % _floatFormat(self.value)
else:
valStr = '%20s' % self._valuestring
elif isinstance(self.value, (complex,np.complexfloating)):
if self._valueModified:
_tmp = '(' + _floatFormat(self.value.real) + ', ' + _floatFormat(self.value.imag) + ')'
valStr = '%20s' % _tmp
else:
valStr = '%20s' % self._valuestring
elif isinstance(self.value, Undefined):
valStr = ''
# conserve space for HIERARCH cards
if isinstance(self, _Hierarch):
valStr = valStr.strip()
# comment string
if keyStr.strip() in Card._commentaryKeys: # do NOT use self.key
commentStr = ''
elif self.__dict__.has_key('comment') or self.__dict__.has_key('_cardimage'):
if self.comment in [None, '']:
commentStr = ''
else:
commentStr = ' / ' + self.comment
else:
commentStr = ''
# equal sign string
eqStr = '= '
if keyStr.strip() in Card._commentaryKeys: # not using self.key
eqStr = ''
if self.__dict__.has_key('value'):
valStr = str(self.value)
# put all parts together
output = keyStr + eqStr + valStr + commentStr
# need this in case card-with-continue's value is shortened
if not isinstance(self, _Hierarch):
self.__class__ = Card
else:
# does not support CONTINUE for HIERARCH
if len(keyStr + eqStr + valStr) > Card.length:
raise ValueError, "The keyword %s with its value is too long." % self.key
if len(output) <= Card.length:
output = "%-80s" % output
# longstring case (CONTINUE card)
else:
# try not to use CONTINUE if the string value can fit in one line.
# Instead, just truncate the comment
if isinstance(self.value, str) and len(valStr) > (Card.length-10):
self.__class__ = _Card_with_continue
output = self._breakup_strings()
else:
warnings.warn('card is too long, comment is truncated.')
output = output[:Card.length]
self.__dict__['_cardimage'] = output
def _checkText(self, val):
"""Verify val to be printable ASCII text."""
if Card._comment_FSC_RE.match(val) is None:
self.__dict__['_err_text'] = 'Unprintable string %s' % repr(val)
self.__dict__['_fixable'] = 0
raise ValueError, self._err_text
def _checkKey(self, val):
"""Verify the keyword to be FITS standard."""
# use repr (not str) in case of control character
if Card._keywd_FSC_RE.match(val) is None:
self.__dict__['_err_text'] = 'Illegal keyword name %s' % repr(val)
self.__dict__['_fixable'] = 0
raise ValueError, self._err_text
def _extractKey(self):
"""Returns the keyword name parsed from the card image."""
head = self._getKeyString()
if isinstance(self, _Hierarch):
self.__dict__['key'] = head.strip()
else:
self.__dict__['key'] = head.strip().upper()
def _extractValueComment(self, name):
"""Exatrct the keyword value or comment from the card image."""
# for commentary cards, no need to parse further
if self.key in Card._commentaryKeys:
self.__dict__['value'] = self._cardimage[8:].rstrip()
self.__dict__['comment'] = ''
return
valu = self._check(option='parse')
if name == 'value':
if valu is None:
raise ValueError, "Unparsable card, fix it first with .verify('fix')."
if valu.group('bool') != None:
_val = valu.group('bool')=='T'
elif valu.group('strg') != None:
_val = re.sub("''", "'", valu.group('strg'))
elif valu.group('numr') != None:
# Check for numbers with leading 0s.
numr = Card._number_NFSC_RE.match(valu.group('numr'))
_digt = numr.group('digt').translate(_fix_table2, ' ')
if numr.group('sign') == None:
_val = eval(_digt)
else:
_val = eval(numr.group('sign')+_digt)
elif valu.group('cplx') != None:
# Check for numbers with leading 0s.
real = Card._number_NFSC_RE.match(valu.group('real'))
_rdigt = real.group('digt').translate(_fix_table2, ' ')
if real.group('sign') == None:
_val = eval(_rdigt)
else:
_val = eval(real.group('sign')+_rdigt)
imag = Card._number_NFSC_RE.match(valu.group('imag'))
_idigt = imag.group('digt').translate(_fix_table2, ' ')
if imag.group('sign') == None:
_val += eval(_idigt)*1j
else:
_val += eval(imag.group('sign') + _idigt)*1j
else:
_val = UNDEFINED
self.__dict__['value'] = _val
if '_valuestring' not in self.__dict__:
self.__dict__['_valuestring'] = valu.group('valu')
if '_valueModified' not in self.__dict__:
self.__dict__['_valueModified'] = 0
elif name == 'comment':
self.__dict__['comment'] = ''
if valu is not None:
_comm = valu.group('comm')
if isinstance(_comm, str):
self.__dict__['comment'] = _comm.rstrip()
def _fixValue(self, input):
"""Fix the card image for fixable non-standard compliance."""
_valStr = None
# for the unparsable case
if input is None:
_tmp = self._getValueCommentString()
try:
slashLoc = _tmp.index("/")
self.__dict__['value'] = _tmp[:slashLoc].strip()
self.__dict__['comment'] = _tmp[slashLoc+1:].strip()
except:
self.__dict__['value'] = _tmp.strip()
elif input.group('numr') != None:
numr = Card._number_NFSC_RE.match(input.group('numr'))
_valStr = numr.group('digt').translate(_fix_table, ' ')
if numr.group('sign') is not None:
_valStr = numr.group('sign')+_valStr
elif input.group('cplx') != None:
real = Card._number_NFSC_RE.match(input.group('real'))
_realStr = real.group('digt').translate(_fix_table, ' ')
if real.group('sign') is not None:
_realStr = real.group('sign')+_realStr
imag = Card._number_NFSC_RE.match(input.group('imag'))
_imagStr = imag.group('digt').translate(_fix_table, ' ')
if imag.group('sign') is not None:
_imagStr = imag.group('sign') + _imagStr
_valStr = '(' + _realStr + ', ' + _imagStr + ')'
self.__dict__['_valuestring'] = _valStr
self._ascardimage()
def _locateEq(self):
"""Locate the equal sign in the card image before column 10 and
return its location. It returns None if equal sign is not present,
or it is a commentary card.
"""
# no equal sign for commentary cards (i.e. part of the string value)
_key = self._cardimage[:8].strip().upper()
if _key in Card._commentaryKeys:
eqLoc = None
else:
if _key == 'HIERARCH':
_limit = Card.length
else:
_limit = 10
try:
eqLoc = self._cardimage[:_limit].index("=")
except:
eqLoc = None
return eqLoc
def _getKeyString(self):
"""Locate the equal sign in the card image and return the string
before the equal sign. If there is no equal sign, return the
string before column 9.
"""
eqLoc = self._locateEq()
if eqLoc is None:
eqLoc = 8
_start = 0
if self._cardimage[:8].upper() == 'HIERARCH':
_start = 8
self.__class__ = _Hierarch
return self._cardimage[_start:eqLoc]
def _getValueCommentString(self):
"""Locate the equal sign in the card image and return the string
after the equal sign. If there is no equal sign, return the
string after column 8.
"""
eqLoc = self._locateEq()
if eqLoc is None:
eqLoc = 7
return self._cardimage[eqLoc+1:]
def _check(self, option='ignore'):
"""Verify the card image with the specified option. """
self.__dict__['_err_text'] = ''
self.__dict__['_fix_text'] = ''
self.__dict__['_fixable'] = 1
if option == 'ignore':
return
elif option == 'parse':
# check the value only, no need to check key and comment for 'parse'
result = Card._value_NFSC_RE.match(self._getValueCommentString())
# if not parsable (i.e. everything else) result = None
return result
else:
# verify the equal sign position
if self.key not in Card._commentaryKeys and self._cardimage.find('=') != 8:
if option in ['exception', 'warn']:
self.__dict__['_err_text'] = 'Card image is not FITS standard (equal sign not at column 8).'
raise ValueError, self._err_text + '\n%s' % self._cardimage
elif option in ['fix', 'silentfix']:
result = self._check('parse')
self._fixValue(result)
if option == 'fix':
self.__dict__['_fix_text'] = 'Fixed card to be FITS standard.: %s' % self.key
# verify the key, it is never fixable
# always fix silently the case where "=" is before column 9,
# since there is no way to communicate back to the _keylist.
self._checkKey(self.key)
# verify the value, it may be fixable
result = Card._value_FSC_RE.match(self._getValueCommentString())
if result is not None or self.key in Card._commentaryKeys:
return result
else:
if option in ['fix', 'silentfix']:
result = self._check('parse')
self._fixValue(result)
if option == 'fix':
self.__dict__['_fix_text'] = 'Fixed card to be FITS standard.: %s' % self.key
else:
self.__dict__['_err_text'] = 'Card image is not FITS standard (unparsable value string).'
raise ValueError, self._err_text + '\n%s' % self._cardimage
# verify the comment (string), it is never fixable
if result is not None:
_str = result.group('comm')
if _str is not None:
self._checkText(_str)
def fromstring(self, input):
"""Construct a Card object from a (raw) string. It will pad the
string if it is not the length of a card image (80 columns).
If the card image is longer than 80, assume it contains CONTINUE
card(s).
"""
self.__dict__['_cardimage'] = _pad(input)
if self._cardimage[:8].upper() == 'HIERARCH':
self.__class__ = _Hierarch
# for card image longer than 80, assume it contains CONTINUE card(s).
elif len(self._cardimage) > Card.length:
self.__class__ = _Card_with_continue
# remove the key/value/comment attributes, some of them may not exist
for name in ['key', 'value', 'comment', '_valueModified']:
if self.__dict__.has_key(name):
delattr(self, name)
return self
def _ncards(self):
return len(self._cardimage) / Card.length
def _verify(self, option='warn'):
"""Card class verification method."""
_err = _ErrList([])
try:
self._check(option)
except ValueError:
# Trapping the ValueError raised by _check method. Want execution to continue while printing
# exception message.
pass
_err.append(self.run_option(option, err_text=self._err_text, fix_text=self._fix_text, fixable=self._fixable))
return _err
class _Hierarch(Card):
"""Cards begins with HIERARCH which allows keyword name longer than 8
characters.
"""
def _verify(self, option='warn'):
"""No verification (for now)."""
return _ErrList([])
class _Card_with_continue(Card):
"""Cards having more than one 80-char "physical" cards, the cards after
the first one must start with CONTINUE and the whole card must have
string value.
"""
def __str__(self):
"""Format a list of cards into a printable string."""
kard = self._cardimage
output = ''
for i in range(len(kard)/80):
output += kard[i*80:(i+1)*80] + '\n'
return output[:-1]
def _extractValueComment(self, name):
"""Exatrct the keyword value or comment from the card image."""
longstring = ''
ncards = self._ncards()
for i in range(ncards):
# take each 80-char card as a regular card and use its methods.
_card = Card().fromstring(self._cardimage[i*80:(i+1)*80])
if i > 0 and _card.key != 'CONTINUE':
raise ValueError, 'Long card image must have CONTINUE cards after the first card.'
if not isinstance(_card.value, str):
raise ValueError, 'Cards with CONTINUE must have string value.'
if name == 'value':
_val = re.sub("''", "'", _card.value).rstrip()
# drop the ending "&"
if _val[-1] == '&':
_val = _val[:-1]
longstring = longstring + _val
elif name == 'comment':
_comm = _card.comment
if isinstance(_comm, str) and _comm != '':
longstring = longstring + _comm.rstrip() + ' '
self.__dict__[name] = longstring.rstrip()
def _breakup_strings(self):
"""Break up long string value/comment into CONTINUE cards.
This is a primitive implementation, it will put the value
string in one block and the comment string in another.
Also, it does not break at the blank space between words.
So it may not look pretty.
"""
val_len = 67
comm_len = 64
output = ''
# do the value string
valfmt = "'%-s&'"
val = self.value.replace("'", "''")
val_list = self._words_group(val, val_len)
for i in range(len(val_list)):
if i == 0:
headstr = "%-8s= " % self.key
else:
headstr = "CONTINUE "
valstr = valfmt % val_list[i]
output = output + '%-80s' % (headstr + valstr)
# do the comment string
if self.comment is None:
comm = ''
else:
comm = self.comment
commfmt = "%-s"
if not comm == '':
nlines = len(comm) / comm_len + 1
comm_list = self._words_group(comm, comm_len)
for i in comm_list:
commstr = "CONTINUE '&' / " + commfmt % i
output = output + '%-80s' % commstr
return output
def _words_group(self, input, strlen):
"""Split a long string into parts where each part is no longer than
strlen and no word is cut into two pieces. But if there is one
single word which is longer than strlen, then it will be split in
the middle of the word.
"""
list = []
_nblanks = input.count(' ')
nmax = max(_nblanks, len(input)/strlen+1)
arr = chararray.array(input+' ', itemsize=1)
# locations of the blanks
blank_loc = np.nonzero(arr == ' ')[0]
offset = 0
xoffset = 0
for i in range(nmax):
try:
loc = np.nonzero(blank_loc >= strlen+offset)[0][0]
offset = blank_loc[loc-1] + 1
if loc == 0:
offset = -1
except:
offset = len(input)
# check for one word longer than strlen, break in the middle
if offset <= xoffset:
offset = xoffset + strlen
# collect the pieces in a list
tmp = input[xoffset:offset]
list.append(tmp)
if len(input) == offset:
break
xoffset = offset
return list
class Header:
"""FITS header class."""
def __init__(self, cards=[]):
"""Construct a Header from a CardList.
cards: A list of Cards, default=[].
"""
# decide which kind of header it belongs to
try:
if cards[0].key == 'SIMPLE':
if 'GROUPS' in cards._keylist and cards['GROUPS'].value == True:
self._hdutype = GroupsHDU
elif cards[0].value == True:
self._hdutype = PrimaryHDU
else:
self._hdutype = _ValidHDU
elif cards[0].key == 'XTENSION':
xtension = cards[0].value.rstrip()
if xtension == 'TABLE':
self._hdutype = TableHDU
elif xtension == 'IMAGE':
self._hdutype = ImageHDU
elif xtension in ('BINTABLE', 'A3DTABLE'):
self._hdutype = BinTableHDU
else:
self._hdutype = _ExtensionHDU
else:
self._hdutype = _ValidHDU
except:
self._hdutype = _CorruptedHDU
# populate the cardlist
self.ascard = CardList(cards)
def __getitem__ (self, key):
"""Get a header keyword value."""
return self.ascard[key].value
def __setitem__ (self, key, value):
"""Set a header keyword value."""
self.ascard[key].value = value
self._mod = 1
def __delitem__(self, key):
"""Delete card(s) with the name 'key'."""
# delete ALL cards with the same keyword name
if isinstance(key, str):
while 1:
try:
del self.ascard[key]
self._mod = 1
except:
return
# for integer key only delete once
else:
del self.ascard[key]
self._mod = 1
def __str__(self):
return self.ascard.__str__()
def ascardlist(self):
"""Returns a CardList."""
return self.ascard
def items(self):
"""Return a list of all keyword-value pairs from the CardList."""
pairs = []
for card in self.ascard:
pairs.append((card.key, card.value))
return pairs
def has_key(self, key):
"""Check for existence of a keyword. Returns 1 if found, otherwise, 0.
key: keyword name. If given an index, always returns 0.
"""
try:
key = key.strip().upper()
if key[:8] == 'HIERARCH':
key = key[8:].strip()
_index = self.ascard._keylist.index(key)
return 1
except:
return 0
def rename_key(self, oldkey, newkey, force=0):
"""Rename a card's keyword in the header.
oldkey: old keyword, can be a name or index.
newkey: new keyword, must be a string.
force: if new key name already exist, force to have duplicate name.
"""
oldkey = oldkey.strip().upper()
newkey = newkey.strip().upper()
if newkey == 'CONTINUE':
raise ValueError, 'Can not rename to CONTINUE'
if newkey in Card._commentaryKeys or oldkey in Card._commentaryKeys:
if not (newkey in Card._commentaryKeys and oldkey in Card._commentaryKeys):
raise ValueError, 'Regular and commentary keys can not be renamed to each other.'
elif (force == 0) and (newkey in self.ascard._keylist):
raise ValueError, 'Intended keyword %s already exists in header.' % newkey
_index = self.ascard.index_of(oldkey)
_comment = self.ascard[_index].comment
_value = self.ascard[_index].value
self.ascard[_index] = Card(newkey, _value, _comment)
# self.ascard[_index].__dict__['key']=newkey
# self.ascard[_index].ascardimage()
# self.ascard._keylist[_index] = newkey
def get(self, key, default=None):
"""Get a keyword value from the CardList.
If no keyword is found, return the default value.
key: keyword name or index
default: if no keyword is found, the value to be returned.
"""
try:
return self[key]
except KeyError:
return default
def update(self, key, value, comment=None, before=None, after=None):
"""Update one header card."""
"""
If the keyword already exists, it's value/comment will be updated.
If it does not exist, a new card will be created and it will be
placed before or after the specified location. If no "before"
or "after" is specified, it will be appended at the end.
key: keyword name
value: keyword value (to be used for updating)
comment: keyword comment (to be used for updating), default=None.
before: name of the keyword, or index of the Card before which
the new card will be placed. The argument `before' takes
precedence over `after' if both specified. default=None.
after: name of the keyword, or index of the Card after which
the new card will be placed. default=None.
"""
if self.has_key(key):
j = self.ascard.index_of(key)
if comment is not None:
_comment = comment
else:
_comment = self.ascard[j].comment
self.ascard[j] = Card(key, value, _comment)
elif before != None or after != None:
_card = Card(key, value, comment)
self.ascard._pos_insert(_card, before=before, after=after)
else:
self.ascard.append(Card(key, value, comment))
self._mod = 1
def add_history(self, value, before=None, after=None):
"""Add a HISTORY card.
value: History text to be added.
before: [same as in update()]
after: [same as in update()]
"""
self._add_commentary('history', value, before=before, after=after)
def add_comment(self, value, before=None, after=None):
"""Add a COMMENT card.
value: Comment text to be added.
before: [same as in update()]
after: [same as in update()]
"""
self._add_commentary('comment', value, before=before, after=after)
def add_blank(self, value='', before=None, after=None):
"""Add a blank card.
value: Text to be added.
before: [same as in update()]
after: [same as in update()]
"""
self._add_commentary(' ', value, before=before, after=after)
def get_history(self):
"""Get all histories as a list of string texts."""
output = []
for _card in self.ascardlist():
if _card.key == 'HISTORY':
output.append(_card.value)
return output
def get_comment(self):
"""Get all comments as a list of string texts."""
output = []
for _card in self.ascardlist():
if _card.key == 'COMMENT':
output.append(_card.value)
return output
def _add_commentary(self, key, value, before=None, after=None):
"""Add a commentary card.
If before and after are None, add to the last occurrence of
cards of the same name (except blank card). If there is no card
(or blank card), append at the end.
"""
new_card = Card(key, value)
if before != None or after != None:
self.ascard._pos_insert(new_card, before=before, after=after)
else:
if key[0] == ' ':
useblanks = new_card._cardimage != ' '*80
self.ascard.append(new_card, useblanks=useblanks, bottom=1)
else:
try:
_last = self.ascard.index_of(key, backward=1)
self.ascard.insert(_last+1, new_card)
except:
self.ascard.append(new_card, bottom=1)
self._mod = 1
def copy(self):
"""Make a copy of the Header."""
tmp = Header(self.ascard.copy())
# also copy the class
tmp._hdutype = self._hdutype
return tmp
def _strip(self):
"""Strip cards specific to a certain kind of header.
Strip cards like SIMPLE, BITPIX, etc. so the rest of the header
can be used to reconstruct another kind of header.
"""
try:
# have both SIMPLE and XTENSION to accomodate Extension
# and Corrupted cases
del self['SIMPLE']
del self['XTENSION']
del self['BITPIX']
_naxis = self['NAXIS']
if issubclass(self._hdutype, _TableBaseHDU):
_tfields = self['TFIELDS']
del self['NAXIS']
for i in range(_naxis):
del self['NAXIS'+`i+1`]
if issubclass(self._hdutype, PrimaryHDU):
del self['EXTEND']
del self['PCOUNT']
del self['GCOUNT']
if issubclass(self._hdutype, PrimaryHDU):
del self['GROUPS']
if issubclass(self._hdutype, _ImageBaseHDU):
del self['BSCALE']
del self['BZERO']
if issubclass(self._hdutype, _TableBaseHDU):
del self['TFIELDS']
for name in ['TFORM', 'TSCAL', 'TZERO', 'TNULL', 'TTYPE', 'TUNIT']:
for i in range(_tfields):
del self[name+`i+1`]
if issubclass(self._hdutype, BinTableHDU):
for name in ['TDISP', 'TDIM', 'THEAP']:
for i in range(_tfields):
del self[name+`i+1`]
if issubclass(self._hdutype, TableHDU):
for i in range(_tfields):
del self['TBCOL'+`i+1`]
except KeyError:
pass
class CardList(list):
"""FITS header card list class."""
def __init__(self, cards=[], keylist=None):
"""Construct the CardList object from a list of Cards.
cards: A list of Cards, default=[].
"""
list.__init__(self, cards)
self._cards = cards
# if the key list is not supplied (as in reading in the FITS file),
# it will be constructed from the card list.
if keylist is None:
self._keylist = [k.upper() for k in self.keys()]
else:
self._keylist = keylist
# find out how many blank cards are *directly* before the END card
self._blanks = 0
self.count_blanks()
def __getitem__(self, key):
"""Get a Card by indexing or by the keyword name."""
_key = self.index_of(key)
return super(CardList, self).__getitem__(_key)
def __getslice__(self, start, end):
_cards = super(CardList, self).__getslice__(start,end)
result = CardList(_cards, self._keylist[start:end])
return result
def __setitem__(self, key, value):
"""Set a Card by indexing or by the keyword name."""
if isinstance (value, Card):
_key = self.index_of(key)
# only set if the value is different from the old one
if str(self[_key]) != str(value):
super(CardList, self).__setitem__(_key, value)
self._keylist[_key] = value.key.upper()
self.count_blanks()
self._mod = 1
else:
raise SyntaxError, "%s is not a Card" % str(value)
def __delitem__(self, key):
"""Delete a Card from the CardList."""
_key = self.index_of(key)
super(CardList, self).__delitem__(_key)
del self._keylist[_key] # update the keylist
self.count_blanks()
self._mod = 1
def count_blanks(self):
"""Find out how many blank cards are *directly* before the END card."""
for i in range(1, len(self)):
if str(self[-i]) != ' '*Card.length:
self._blanks = i - 1
break
def append(self, card, useblanks=1, bottom=0):
"""Append a Card to the CardList.
card: The Card to be appended.
useblanks: Use any *extra* blank cards? default=1.
If useblanks != 0, and if there are blank cards directly
before END, it will use this space first, instead of
appending after these blank cards, so the total space
will not increase (default). When useblanks == 0, the
card will be appended at the end, even if there are
blank cards in front of END.
bottom: If =0 (default) the card will be appended after the last
non-commentary card. If =1, the card will be appended
after the last non-blank card.
"""
if isinstance (card, Card):
nc = len(self) - self._blanks
i = nc - 1
if not bottom:
for i in range(nc-1, -1, -1): # locate last non-commentary card
if self[i].key not in Card._commentaryKeys:
break
super(CardList, self).insert(i+1, card)
self._keylist.insert(i+1, card.key.upper())
if useblanks:
self._use_blanks(card._ncards())
self.count_blanks()
self._mod = 1
else:
raise SyntaxError, "%s is not a Card" % str(card)
def _pos_insert(self, card, before, after, useblanks=1):
"""Insert a Card to the location specified by before or after.
The argument `before' takes precedence over `after' if both
specified. They can be either a keyword name or index.
"""
if before != None:
loc = self.index_of(before)
self.insert(loc, card, useblanks=useblanks)
elif after != None:
loc = self.index_of(after)
self.insert(loc+1, card, useblanks=useblanks)
def insert(self, pos, card, useblanks=1):
"""Insert a Card to the CardList.
pos: The position (index, keyword name will not be allowed) to
insert. The new card will be inserted before it.
card: The Card to be inserted.
useblanks: Use any *extra* blank cards? default=1.
If useblanks != 0, and if there are blank cards directly
before END, it will use this space first, instead of
appending after these blank cards, so the total space
will not increase (default). When useblanks == 0, the
card will be appended at the end, even if there are
blank cards in front of END.
"""
if isinstance (card, Card):
super(CardList, self).insert(pos, card)
self._keylist.insert(pos, card.key) # update the keylist
self.count_blanks()
if useblanks:
self._use_blanks(card._ncards())
self.count_blanks()
self._mod = 1
else:
raise SyntaxError, "%s is not a Card" % str(card)
def _use_blanks(self, how_many):
if self._blanks > 0:
for i in range(min(self._blanks, how_many)):
del self[-1] # it also delete the keylist item
def keys(self):
"""Return a list of all keywords from the CardList."""
return map(lambda x: getattr(x,'key'), self)
def index_of(self, key, backward=0):
"""Get the index of a keyword in the CardList.
key: the keyword name (a string) or the index (an integer).
backward: search the index from the END, i.e. backward? default=0.
If backward = 1, search from the end.
"""
if isinstance(key, (int, long,np.integer)):
return key
elif isinstance(key, str):
_key = key.strip().upper()
if _key[:8] == 'HIERARCH':
_key = _key[8:].strip()
_keylist = self._keylist
if backward:
_keylist = self._keylist[:] # make a copy
_keylist.reverse()
try:
_indx = _keylist.index(_key)
if backward:
_indx = len(_keylist) - _indx - 1
return _indx
except ValueError:
raise KeyError, 'Keyword %s not found.' % `key`
else:
raise KeyError, 'Illegal key data type %s' % type(key)
def copy(self):
"""Make a (deep)copy of the CardList."""
return CardList([Card('').fromstring(repr(c)) for c in self])
def __repr__(self):
"""Format a list of cards into a string."""
return ''.join(map(repr,self))
def __str__(self):
"""Format a list of cards into a printable string."""
return '\n'.join(map(str,self))
# ----------------------------- HDU classes ------------------------------------
class _AllHDU(object):
"""Base class for all HDU (header data unit) classes."""
pass
class _CorruptedHDU(_AllHDU):
"""A Corrupted HDU class."""
""" This class is used when one or more mandatory Cards are
corrupted (unparsable), such as the 'BITPIX', 'NAXIS', or 'END' cards.
A corrupted HDU usually means that the data size cannot be
calculated or the 'END' card is not found. In the case of a
missing 'END' card, the Header may also contain the binary data(*).
(*) In future it may be possible to decipher where the last block
of the Header ends, but this task may be difficult when the
extension is a TableHDU containing ASCII data.
"""
def __init__(self, data=None, header=None):
self._file, self._offset, self._datLoc = None, None, None
self.header = header
self.data = data
self.name = None
def size(self):
"""Returns the size (in bytes) of the HDU's data part."""
self._file.seek(0, 2)
return self._file.tell() - self._datLoc
def _summary(self):
return "%-10s %-11s" % (self.name, "CorruptedHDU")
def verify(self):
pass
class _ValidHDU(_AllHDU, _Verify):
"""Base class for all HDUs which are not corrupted."""
# 0.6.5.5
def size(self):
"""Size (in bytes) of the data portion of the HDU."""
size = 0
naxis = self.header.get('NAXIS', 0)
if naxis > 0:
size = 1
for j in range(naxis):
size = size * self.header['NAXIS'+`j+1`]
bitpix = self.header['BITPIX']
gcount = self.header.get('GCOUNT', 1)
pcount = self.header.get('PCOUNT', 0)
size = abs(bitpix) * gcount * (pcount + size) / 8
return size
def copy(self):
"""Make a copy of the HDU, both header and data are copied."""
if self.data is not None:
_data = self.data.copy()
else:
_data = None
return self.__class__(data=_data, header=self.header.copy())
def writeto(self, name, output_verify='exception', clobber=False,
classExtensions={}):
"""Write the HDU to a new file. This is a convenience method
to provide a user easier output interface if only one HDU
needs to be written to a file.
name: output FITS file name to be written to.
output_verify: output verification option, default='exception'.
clobber: Overwrite the output file if exists, default = False.
classExtensions: A dictionary that maps pyfits classes to extensions
of those classes. When present in the dictionary,
the extension class will be constructed in place of
the pyfits class.
"""
if isinstance(self, _ExtensionHDU):
if classExtensions.has_key(HDUList):
hdulist = classExtensions[HDUList]([PrimaryHDU(),self])
else:
hdulist = HDUList([PrimaryHDU(), self])
elif isinstance(self, PrimaryHDU):
if classExtensions.has_key(HDUList):
hdulist = classExtensions[HDUList]([self])
else:
hdulist = HDUList([self])
hdulist.writeto(name, output_verify, clobber=clobber,
classExtensions=classExtensions)
def _verify(self, option='warn'):
_err = _ErrList([], unit='Card')
isValid = "val in [8, 16, 32, 64, -32, -64]"
# Verify location and value of mandatory keywords.
# Do the first card here, instead of in the respective HDU classes,
# so the checking is in order, in case of required cards in wrong order.
if isinstance(self, _ExtensionHDU):
firstkey = 'XTENSION'
firstval = self._xtn
else:
firstkey = 'SIMPLE'
firstval = True
self.req_cards(firstkey, '== 0', '', firstval, option, _err)
self.req_cards('BITPIX', '== 1', _isInt+" and "+isValid, 8, option, _err)
self.req_cards('NAXIS', '== 2', _isInt+" and val >= 0 and val <= 999", 0, option, _err)
naxis = self.header.get('NAXIS', 0)
if naxis < 1000:
for j in range(3, naxis+3):
self.req_cards('NAXIS'+`j-2`, '== '+`j`, _isInt+" and val>= 0", 1, option, _err)
# verify each card
for _card in self.header.ascard:
_err.append(_card._verify(option))
return _err
def req_cards(self, keywd, pos, test, fix_value, option, errlist):
"""Check the existence, location, and value of a required Card."""
"""If pos = None, it can be anywhere. If the card does not exist,
the new card will have the fix_value as its value when created.
Also check the card's value by using the "test" argument.
"""
_err = errlist
fix = ''
cards = self.header.ascard
try:
_index = cards.index_of(keywd)
except:
_index = None
fixable = fix_value is not None
# if pos is a string, it must be of the syntax of "> n",
# where n is an int
if isinstance(pos, str):
_parse = pos.split()
if _parse[0] in ['>=', '==']:
insert_pos = eval(_parse[1])
# if the card does not exist
if _index is None:
err_text = "'%s' card does not exist." % keywd
fix_text = "Fixed by inserting a new '%s' card." % keywd
if fixable:
# use repr to accomodate both string and non-string types
# Boolean is also OK in this constructor
_card = "Card('%s', %s)" % (keywd, `fix_value`)
fix = "self.header.ascard.insert(%d, %s)" % (insert_pos, _card)
_err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix, fixable=fixable))
else:
# if the supposed location is specified
if pos is not None:
test_pos = '_index '+ pos
if not eval(test_pos):
err_text = "'%s' card at the wrong place (card %d)." % (keywd, _index)
fix_text = "Fixed by moving it to the right place (card %d)." % insert_pos
fix = "_cards=self.header.ascard; dummy=_cards[%d]; del _cards[%d];_cards.insert(%d, dummy)" % (_index, _index, insert_pos)
_err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix))
# if value checking is specified
if test:
val = self.header[keywd]
if not eval(test):
err_text = "'%s' card has invalid value '%s'." % (keywd, val)
fix_text = "Fixed by setting a new value '%s'." % fix_value
if fixable:
fix = "self.header['%s'] = %s" % (keywd, `fix_value`)
_err.append(self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix, fixable=fixable))
return _err
class _TempHDU(_ValidHDU):
"""Temporary HDU, used when the file is first opened. This is to
speed up the open. Any header will not be initialized till the
HDU is accessed.
"""
def _getname(self):
"""Get the extname and extver from the header."""
re_extname = re.compile(r"EXTNAME\s*=\s*'([ -&(-~]*)'")
re_extver = re.compile(r"EXTVER\s*=\s*(\d+)")
mo = re_extname.search(self._raw)
if mo:
name = mo.group(1).rstrip()
else:
name = ''
mo = re_extver.search(self._raw)
if mo:
extver = int(mo.group(1))
else:
extver = 1
return name, extver
def _getsize(self, block):
"""Get the size from the first block of the HDU."""
re_simple = re.compile(r'SIMPLE =\s*')
re_bitpix = re.compile(r'BITPIX =\s*(-?\d+)')
re_naxis = re.compile(r'NAXIS =\s*(\d+)')
re_naxisn = re.compile(r'NAXIS(\d) =\s*(\d+)')
re_gcount = re.compile(r'GCOUNT =\s*(-?\d+)')
re_pcount = re.compile(r'PCOUNT =\s*(-?\d+)')
re_groups = re.compile(r'GROUPS =\s*(T)')
simple = re_simple.search(block[:80])
mo = re_bitpix.search(block)
if mo is not None:
bitpix = int(mo.group(1))
else:
raise ValueError("BITPIX not found where expected")
mo = re_gcount.search(block)
if mo is not None:
gcount = int(mo.group(1))
else:
gcount = 1
mo = re_pcount.search(block)
if mo is not None:
pcount = int(mo.group(1))
else:
pcount = 0
mo = re_groups.search(block)
if mo and simple:
groups = 1
else:
groups = 0
mo = re_naxis.search(block)
if mo is not None:
naxis = int(mo.group(1))
pos = mo.end(0)
else:
raise ValueError("NAXIS not found where expected")
if naxis == 0:
datasize = 0
else:
dims = [0]*naxis
for i in range(naxis):
mo = re_naxisn.search(block, pos)
pos = mo.end(0)
dims[int(mo.group(1))-1] = int(mo.group(2))
datasize = reduce(operator.mul, dims[groups:])
size = abs(bitpix) * gcount * (pcount + datasize) / 8
if simple and not groups:
name = 'PRIMARY'
else:
name = ''
return size, name
def setupHDU(self, classExtensions={}):
"""Read one FITS HDU, data portions are not actually read here, but
the beginning locations are computed.
"""
_cardList = []
_keyList = []
blocks = self._raw
if (len(blocks) % _blockLen) != 0:
raise IOError, 'Header size is not multiple of %d: %d' % (_blockLen, len(blocks))
elif (blocks[:8] not in ['SIMPLE ', 'XTENSION']):
raise IOError, 'Block does not begin with SIMPLE or XTENSION'
for i in range(0, len(blocks), Card.length):
_card = Card('').fromstring(blocks[i:i+Card.length])
_key = _card.key
if _key == 'END':
break
else:
_cardList.append(_card)
_keyList.append(_key)
# Deal with CONTINUE cards
# if a long string has CONTINUE cards, the "Card" is considered
# to be more than one 80-char "physical" cards.
_max = _keyList.count('CONTINUE')
_start = 0
for i in range(_max):
_where = _keyList[_start:].index('CONTINUE') + _start
for nc in range(1, _max+1):
if _where+nc >= len(_keyList):
break
if _cardList[_where+nc]._cardimage[:10].upper() != 'CONTINUE ':
break
# combine contiguous CONTINUE cards with its parent card
if nc > 0:
_longstring = _cardList[_where-1]._cardimage
for c in _cardList[_where:_where+nc]:
_longstring += c._cardimage
_cardList[_where-1] = _Card_with_continue().fromstring(_longstring)
del _cardList[_where:_where+nc]
del _keyList[_where:_where+nc]
_start = _where
# if not the real CONTINUE card, skip to the next card to search
# to avoid starting at the same CONTINUE card
else:
_start = _where + 1
if _keyList[_start:].count('CONTINUE') == 0:
break
# construct the Header object, using the cards.
try:
header = Header(CardList(_cardList, keylist=_keyList))
if classExtensions.has_key(header._hdutype):
header._hdutype = classExtensions[header._hdutype]
hdu = header._hdutype(data=DELAYED, header=header)
# pass these attributes
hdu._file = self._file
hdu._hdrLoc = self._hdrLoc
hdu._datLoc = self._datLoc
hdu._datSpan = self._datSpan
hdu._ffile = self._ffile
hdu.name = self.name
hdu._extver = self._extver
hdu._new = 0
hdu.header._mod = 0
hdu.header.ascard._mod = 0
except:
pass
return hdu
class _ExtensionHDU(_ValidHDU):
"""An extension HDU class.
This class is the base class for the TableHDU, ImageHDU, and
BinTableHDU classes.
"""
def __init__(self, data=None, header=None):
self._file, self._offset, self._datLoc = None, None, None
self.header = header
self.data = data
self._xtn = ' '
def __setattr__(self, attr, value):
"""Set an HDU attribute."""
if attr == 'name' and value:
if not isinstance(value, str):
raise TypeError, 'bad value type'
value = value.upper()
if self.header.has_key('EXTNAME'):
self.header['EXTNAME'] = value
else:
self.header.ascard.append(Card('EXTNAME', value, 'extension name'))
_ValidHDU.__setattr__(self,attr,value)
def _verify(self, option='warn'):
_err = _ValidHDU._verify(self, option=option)
# Verify location and value of mandatory keywords.
naxis = self.header.get('NAXIS', 0)
self.req_cards('PCOUNT', '== '+`naxis+3`, _isInt+" and val >= 0", 0, option, _err)
self.req_cards('GCOUNT', '== '+`naxis+4`, _isInt+" and val == 1", 1, option, _err)
return _err
# 0.8.8
def _iswholeline(indx, naxis):
if isinstance(indx, (int, long,np.integer)):
if indx >= 0 and indx < naxis:
if naxis > 1:
return _SinglePoint(1, indx)
elif naxis == 1:
return _OnePointAxis(1, 0)
else:
raise IndexError, 'Index %s out of range.' % indx
elif isinstance(indx, slice):
indx = _normalize_slice(indx, naxis)
if (indx.start == 0) and (indx.stop == naxis) and (indx.step == 1):
return _WholeLine(naxis, 0)
else:
if indx.step == 1:
return _LineSlice(indx.stop-indx.start, indx.start)
else:
return _SteppedSlice((indx.stop-indx.start)/indx.step, indx.start)
else:
raise IndexError, 'Illegal index %s' % indx
def _normalize_slice(input, naxis):
"""Set the slice's start/stop in the regular range."""
def _normalize(indx, npts):
if indx < -npts:
indx = 0
elif indx < 0:
indx += npts
elif indx > npts:
indx = npts
return indx
_start = input.start
if _start is None:
_start = 0
elif isinstance(_start, (int, long,np.integer)):
_start = _normalize(_start, naxis)
else:
raise IndexError, 'Illegal slice %s, start must be integer.' % input
_stop = input.stop
if _stop is None:
_stop = naxis
elif isinstance(_stop, (int, long,np.integer)):
_stop = _normalize(_stop, naxis)
else:
raise IndexError, 'Illegal slice %s, stop must be integer.' % input
if _stop < _start:
raise IndexError, 'Illegal slice %s, stop < start.' % input
_step = input.step
if _step is None:
_step = 1
elif isinstance(_step, (int, long, np.integer)):
if _step <= 0:
raise IndexError, 'Illegal slice %s, step must be positive.' % input
else:
raise IndexError, 'Illegal slice %s, step must be integer.' % input
return slice(_start, _stop, _step)
class _KeyType:
def __init__(self, npts, offset):
self.npts = npts
self.offset = offset
class _WholeLine(_KeyType):
pass
class _SinglePoint(_KeyType):
pass
class _OnePointAxis(_KeyType):
pass
class _LineSlice(_KeyType):
pass
class _SteppedSlice(_KeyType):
pass
class Section:
"""Image section."""
def __init__(self, hdu):
self.hdu = hdu
def __getitem__(self, key):
dims = []
if not isinstance(key, tuple):
key = (key,)
naxis = self.hdu.header['NAXIS']
if naxis < len(key):
raise IndexError, 'too many indices.'
elif naxis > len(key):
key = key + (slice(None),) * (naxis-len(key))
offset = 0
for i in range(naxis):
_naxis = self.hdu.header['NAXIS'+`naxis-i`]
indx = _iswholeline(key[i], _naxis)
offset = offset * _naxis + indx.offset
# all elements after the first WholeLine must be WholeLine or
# OnePointAxis
if isinstance(indx, (_WholeLine, _LineSlice)):
dims.append(indx.npts)
break
elif isinstance(indx, _SteppedSlice):
raise IndexError, 'Subsection data must be contiguous.'
for j in range(i+1,naxis):
_naxis = self.hdu.header['NAXIS'+`naxis-j`]
indx = _iswholeline(key[j], _naxis)
dims.append(indx.npts)
if not isinstance(indx, _WholeLine):
raise IndexError, 'Subsection data is not contiguous.'
# the offset needs to multiply the length of all remaining axes
else:
offset *= _naxis
if dims == []:
dims = [1]
npt = 1
for n in dims:
npt *= n
# Now, get the data (does not include bscale/bzero for now XXX)
_bitpix = self.hdu.header['BITPIX']
code = _ImageBaseHDU.NumCode[_bitpix]
self.hdu._file.seek(self.hdu._datLoc+offset*abs(_bitpix)/8)
nelements = 1
for dim in dims:
nelements = nelements*dim
raw_data = np.fromfile(self.hdu._file, dtype=code, count=nelements, sep="")
raw_data.shape = dims
# raw_data._byteorder = 'big'
raw_data.dtype = raw_data.dtype.newbyteorder(">")
return raw_data
class _ImageBaseHDU(_ValidHDU):
"""FITS image HDU base class."""
"""Attributes:
header: image header
data: image data
_file: file associated with array (None)
_datLoc: starting byte location of data block in file (None)
"""
# mappings between FITS and numpy typecodes
# NumCode = {8:'int8', 16:'int16', 32:'int32', 64:'int64', -32:'float32', -64:'float64'}
# ImgCode = {'<i2':8, '<i4':16, '<i8':32, '<i16':64, '<f8':-32, '<f16':-64}
NumCode = {8:'uint8', 16:'int16', 32:'int32', 64:'int64', -32:'float32', -64:'float64'}
ImgCode = {'uint8':8, 'int16':16, 'int32':32, 'int64':64, 'float32':-32, 'float64':-64}
def __init__(self, data=None, header=None):
self._file, self._datLoc = None, None
if header is not None:
if not isinstance(header, Header):
raise ValueError, "header must be a Header object"
if data is DELAYED:
# this should never happen
if header is None:
raise ValueError, "No header to setup HDU."
# if the file is read the first time, no need to copy, and keep it unchanged
else:
self.header = header
else:
# construct a list of cards of minimal header
if isinstance(self, _ExtensionHDU):
c0 = Card('XTENSION', 'IMAGE', 'Image extension')
else:
c0 = Card('SIMPLE', True, 'conforms to FITS standard')
_list = CardList([
c0,
Card('BITPIX', 8, 'array data type'),
Card('NAXIS', 0, 'number of array dimensions'),
])
if isinstance(self, GroupsHDU):
_list.append(Card('GROUPS', True, 'has groups'))
if isinstance(self, (_ExtensionHDU, GroupsHDU)):
_list.append(Card('PCOUNT', 0, 'number of parameters'))
_list.append(Card('GCOUNT', 1, 'number of groups'))
if header is not None:
hcopy = header.copy()
hcopy._strip()
_list.extend(hcopy.ascardlist())
self.header = Header(_list)
self._bzero = self.header.get('BZERO', 0)
self._bscale = self.header.get('BSCALE', 1)
if (data is DELAYED): return
self.data = data
# update the header
self.update_header()
self._bitpix = self.header['BITPIX']
# delete the keywords BSCALE and BZERO
del self.header['BSCALE']
del self.header['BZERO']
def update_header(self):
"""Update the header keywords to agree with the data."""
old_naxis = self.header.get('NAXIS', 0)
if isinstance(self.data, GroupData):
self.header['BITPIX'] = _ImageBaseHDU.ImgCode[self.data.dtype.name]
axes = list(self.data.data.getshape())[1:]
axes.reverse()
axes = [0] + axes
elif isinstance(self.data, np.ndarray):
self.header['BITPIX'] = _ImageBaseHDU.ImgCode[self.data.dtype.name]
axes = list(self.data.shape)
axes.reverse()
elif self.data is None:
axes = []
else:
raise ValueError, "incorrect array type"
self.header['NAXIS'] = len(axes)
# add NAXISi if it does not exist
for j in range(len(axes)):
try:
self.header['NAXIS'+`j+1`] = axes[j]
except KeyError:
if (j == 0):
_after = 'naxis'
else :
_after = 'naxis'+`j`
self.header.update('naxis'+`j+1`, axes[j], after = _after)
# delete extra NAXISi's
for j in range(len(axes)+1, old_naxis+1):
try:
del self.header.ascard['NAXIS'+`j`]
except KeyError:
pass
if isinstance(self.data, GroupData):
self.header.update('GROUPS', True, after='NAXIS'+`len(axes)`)
self.header.update('PCOUNT', len(self.data.parnames), after='GROUPS')
self.header.update('GCOUNT', len(self.data), after='PCOUNT')
npars = len(self.data.parnames)
(_scale, _zero) = self.data._get_scale_factors(npars)[3:5]
if _scale:
self.header.update('BSCALE', self.data._coldefs.bscales[npars])
if _zero:
self.header.update('BZERO', self.data._coldefs.bzeros[npars])
for i in range(npars):
self.header.update('PTYPE'+`i+1`, self.data.parnames[i])
(_scale, _zero) = self.data._get_scale_factors(i)[3:5]
if _scale:
self.header.update('PSCAL'+`i+1`, self.data._coldefs.bscales[i])
if _zero:
self.header.update('PZERO'+`i+1`, self.data._coldefs.bzeros[i])
def __getattr__(self, attr):
"""Get the data attribute."""
if attr == 'section':
return Section(self)
elif attr == 'data':
self.__dict__[attr] = None
if self.header['NAXIS'] > 0:
_bitpix = self.header['BITPIX']
self._file.seek(self._datLoc)
if isinstance(self, GroupsHDU):
dims = self.size()*8/abs(_bitpix)
else:
dims = self._dimShape()
code = _ImageBaseHDU.NumCode[self.header['BITPIX']]
if self._ffile.memmap:
self._ffile.code = code
self._ffile.dims = dims
self._ffile.offset = self._datLoc
raw_data = self._ffile._mm
else:
nelements = 1
for x in range(len(dims)):
nelements = nelements * dims[x]
raw_data = np.fromfile(self._file, dtype=code, count=nelements,sep="")
raw_data.shape=dims
# print "raw_data.shape: ",raw_data.shape
# raw_data._byteorder = 'big'
raw_data.dtype = raw_data.dtype.newbyteorder('>')
if (self._bzero != 0 or self._bscale != 1):
if _bitpix > 16: # scale integers to Float64
self.data = np.array(raw_data, dtype=np.float64)
elif _bitpix > 0: # scale integers to Float32
self.data = np.array(raw_data, dtype=np.float32)
else: # floating point cases
if self._ffile.memmap:
self.data = raw_data.copy()
# if not memmap, use the space already in memory
else:
self.data = raw_data
if self._bscale != 1:
np.multiply(self.data, self._bscale, self.data)
if self._bzero != 0:
self.data += self._bzero
# delete the keywords BSCALE and BZERO after scaling
del self.header['BSCALE']
del self.header['BZERO']
self.header['BITPIX'] = _ImageBaseHDU.ImgCode[self.data.dtype.name]
else:
self.data = raw_data
try:
return self.__dict__[attr]
except KeyError:
raise AttributeError(attr)
def _dimShape(self):
"""Returns a tuple of image dimensions, reverse the order of NAXIS."""
naxis = self.header['NAXIS']
axes = naxis*[0]
for j in range(naxis):
axes[j] = self.header['NAXIS'+`j+1`]
axes.reverse()
# print "axes in _dimShape line 2081:",axes
return tuple(axes)
def _summary(self):
"""Summarize the HDU: name, dimensions, and formats."""
class_name = str(self.__class__)
type = class_name[class_name.rfind('.')+1:-2]
if type.find('_') != -1:
type = type[type.find('_')+1:]
# if data is touched, use data info.
if 'data' in dir(self):
if self.data is None:
_shape, _format = (), ''
else:
# the shape will be in the order of NAXIS's which is the
# reverse of the numarray shape
if isinstance(self, GroupsHDU):
_shape = list(self.data.data.shape)[1:]
_format = `self.data._parent.field(0).dtype.name`
else:
_shape = list(self.data.shape)
_format = `self.data.dtype.name`
_shape.reverse()
_shape = tuple(_shape)
_format = _format[_format.rfind('.')+1:]
# if data is not touched yet, use header info.
else:
_shape = ()
for j in range(self.header['NAXIS']):
if isinstance(self, GroupsHDU) and j == 0:
continue
_shape += (self.header['NAXIS'+`j+1`],)
_format = self.NumCode[self.header['BITPIX']]
if isinstance(self, GroupsHDU):
_gcount = ' %d Groups %d Parameters' % (self.header['GCOUNT'], self.header['PCOUNT'])
else:
_gcount = ''
return "%-10s %-11s %5d %-12s %s%s" % \
(self.name, type, len(self.header.ascard), _shape, _format, _gcount)
def scale(self, type=None, option="old", bscale=1, bzero=0):
"""Scale image data by using BSCALE/BZERO.
Call to this method will scale self.data and update the keywords
of BSCALE and BZERO in self.header. This method should only be
used right before writing to the output file, as the data will be
scaled and is therefore not very usable after the call.
type (string): destination data type, use numpy attribute format,
(e.g. 'uint8', 'int16', 'float32' etc.). If is None, use the
current data type.
option: how to scale the data: if "old", use the original BSCALE
and BZERO values when the data was read/created. If
"minmax", use the minimum and maximum of the data to scale.
The option will be overwritten by any user specified
bscale/bzero values.
bscale/bzero: user specified BSCALE and BZERO values.
"""
if self.data is None:
return
# Determine the destination (numpy) data type
if type is None:
type = self.NumCode[self._bitpix]
_type = getattr(np, type)
# Determine how to scale the data
# bscale and bzero takes priority
if (bscale != 1 or bzero !=0):
_scale = bscale
_zero = bzero
else:
if option == 'old':
_scale = self._bscale
_zero = self._bzero
elif option == 'minmax':
if isinstance(_type, np.floating):
_scale = 1
_zero = 0
else:
# flat the shape temporarily to save memory
dims = self.data.shape
self.data.shape = self.data.size
min = np.minimum.reduce(self.data)
max = np.maximum.reduce(self.data)
self.data.shape = dims
if _type == np.uint8: # uint8 case
_zero = min
_scale = (max - min) / (2.**8 - 1)
else:
_zero = (max + min) / 2.
# throw away -2^N
_scale = (max - min) / (2.**(8*_type.bytes) - 2)
# Do the scaling
if _zero != 0:
self.data += -_zero # 0.9.6.3 to avoid out of range error for BZERO = +32768
self.header.update('BZERO', _zero)
else:
del self.header['BZERO']
if _scale != 1:
self.data /= _scale
self.header.update('BSCALE', _scale)
else:
del self.header['BSCALE']
if self.data.dtype.type != _type:
self.data = np.array(np.around(self.data), dtype=_type) #0.7.7.1
class PrimaryHDU(_ImageBaseHDU):
"""FITS primary HDU class."""
def __init__(self, data=None, header=None):
"""Construct a primary HDU.
data: the data in the HDU, default=None.
header: the header to be used (as a template), default=None.
If header=None, a minimal Header will be provided.
"""
_ImageBaseHDU.__init__(self, data=data, header=header)
self.name = 'PRIMARY'
# insert the keywords EXTEND
if header is None:
dim = `self.header['NAXIS']`
if dim == '0':
dim = ''
self.header.update('EXTEND', True, after='NAXIS'+dim)
class ImageHDU(_ExtensionHDU, _ImageBaseHDU):
"""FITS image extension HDU class."""
def __init__(self, data=None, header=None, name=None):
"""Construct an image HDU.
data: the data in the HDU, default=None.
header: the header to be used (as a template), default=None.
If header=None, a minimal Header will be provided.
name: The name of the HDU, will be the value of the keywod EXTNAME,
default=None.
"""
# no need to run _ExtensionHDU.__init__ since it is not doing anything.
_ImageBaseHDU.__init__(self, data=data, header=header)
self._xtn = 'IMAGE'
self.header._hdutype = ImageHDU
# insert the require keywords PCOUNT and GCOUNT
dim = `self.header['NAXIS']`
if dim == '0':
dim = ''
# set extension name
if (name is None) and self.header.has_key('EXTNAME'):
name = self.header['EXTNAME']
self.name = name
def _verify(self, option='warn'):
"""ImageHDU verify method."""
_err = _ExtensionHDU._verify(self, option=option)
self.req_cards('PCOUNT', None, _isInt+" and val == 0", 0, option, _err)
return _err
class GroupsHDU(PrimaryHDU):
"""FITS Random Groups HDU class."""
_dict = {8:'B', 16:'I', 32:'J', 64:'K', -32:'E', -64:'D'}
def __init__(self, data=None, header=None, name=None):
PrimaryHDU.__init__(self, data=data, header=header)
self.header._hdutype = GroupsHDU
self.name = name
if self.header['NAXIS'] <= 0:
self.header['NAXIS'] = 1
self.header.update('NAXIS1', 0, after='NAXIS')
def __getattr__(self, attr):
"""Get the 'data' or 'columns' attribute. The data of random group
FITS file will be like a binary table's data.
"""
if attr == 'data': # same code as in _TableBaseHDU
size = self.size()
if size:
self._file.seek(self._datLoc)
data = GroupData(_get_tbdata(self))
data._coldefs = self.columns
data.formats = self.columns.formats
data.parnames = self.columns._pnames
else:
data = None
self.__dict__[attr] = data
elif attr == 'columns':
_cols = []
_pnames = []
_pcount = self.header['PCOUNT']
_format = GroupsHDU._dict[self.header['BITPIX']]
for i in range(self.header['PCOUNT']):
_bscale = self.header.get('PSCAL'+`i+1`, 1)
_bzero = self.header.get('PZERO'+`i+1`, 0)
_pnames.append(self.header['PTYPE'+`i+1`].lower())
_cols.append(Column(name='c'+`i+1`, format = _format, bscale = _bscale, bzero = _bzero))
data_shape = self._dimShape()[:-1]
dat_format = `int(np.array(data_shape).sum())` + _format
_bscale = self.header.get('BSCALE', 1)
_bzero = self.header.get('BZERO', 0)
_cols.append(Column(name='data', format = dat_format, bscale = _bscale, bzero = _bzero))
_coldefs = ColDefs(_cols)
_coldefs._shape = self.header['GCOUNT']
_coldefs._dat_format = _fits2rec[_format]
_coldefs._pnames = _pnames
self.__dict__[attr] = _coldefs
elif attr == '_theap':
self.__dict__[attr] = 0
try:
return self.__dict__[attr]
except KeyError:
raise AttributeError(attr)
# 0.6.5.5
def size(self):
"""Returns the size (in bytes) of the HDU's data part."""
size = 0
naxis = self.header.get('NAXIS', 0)
# for random group image, NAXIS1 should be 0, so we skip NAXIS1.
if naxis > 1:
size = 1
for j in range(1, naxis):
size = size * self.header['NAXIS'+`j+1`]
bitpix = self.header['BITPIX']
gcount = self.header.get('GCOUNT', 1)
pcount = self.header.get('PCOUNT', 0)
size = abs(bitpix) * gcount * (pcount + size) / 8
return size
def _verify(self, option='warn'):
_err = PrimaryHDU._verify(self, option=option)
# Verify locations and values of mandatory keywords.
self.req_cards('NAXIS', '== 2', _isInt+" and val >= 1 and val <= 999", 1, option, _err)
self.req_cards('NAXIS1', '== 3', _isInt+" and val == 0", 0, option, _err)
_after = self.header['NAXIS'] + 3
# if the card EXTEND exists, must be after it.
try:
_dum = self.header['EXTEND']
#_after += 1
except KeyError:
pass
_pos = '>= '+`_after`
self.req_cards('GCOUNT', _pos, _isInt, 1, option, _err)
self.req_cards('PCOUNT', _pos, _isInt, 0, option, _err)
self.req_cards('GROUPS', _pos, 'val == True', True, option, _err)
return _err
# --------------------------Table related code----------------------------------
# lists of column/field definition common names and keyword names, make
# sure to preserve the one-to-one correspondence when updating the list(s).
# Use lists, instead of dictionaries so the names can be displayed in a
# preferred order.
_commonNames = ['name', 'format', 'unit', 'null', 'bscale', 'bzero', 'disp', 'start', 'dim']
_keyNames = ['TTYPE', 'TFORM', 'TUNIT', 'TNULL', 'TSCAL', 'TZERO', 'TDISP', 'TBCOL', 'TDIM']
# mapping from TFORM data type to numpy data type (code)
_booltype = 'i1'
_fits2rec = {'L':_booltype, 'B':'u1', 'I':'i2', 'E':'f4', 'D':'f8', 'J':'i4', 'A':'a', 'C':'c8', 'M':'c16', 'K':'i8'}
# the reverse dictionary of the above
_rec2fits = {}
for key in _fits2rec.keys():
_rec2fits[_fits2rec[key]]=key
class _FormatX(str):
"""For X format in binary tables."""
pass
class _FormatP(str):
"""For P format in variable length table."""
pass
# TFORM regular expression
_tformat_re = re.compile(r'(?P<repeat>^[0-9]*)(?P<dtype>[A-Za-z])(?P<option>[!-~]*)')
# table definition keyword regular expression
_tdef_re = re.compile(r'(?P<label>^T[A-Z]*)(?P<num>[1-9][0-9 ]*$)')
def _parse_tformat(tform):
"""Parse the TFORM value into repeat, data type, and option."""
try:
(repeat, dtype, option) = _tformat_re.match(tform.strip()).groups()
except:
warnings.warn('Format "%s" is not recognized.' % tform)
if repeat == '': repeat = 1
else: repeat = eval(repeat)
return (repeat, dtype, option)
def _convert_format(input_format, reverse=0):
"""Convert FITS format spec to record format spec. Do the opposite
if reverse = 1.
"""
fmt = input_format
(repeat, dtype, option) = _parse_tformat(fmt)
if reverse == 0:
if dtype in _fits2rec.keys(): # FITS format
if dtype == 'A':
output_format = _fits2rec[dtype]+`repeat`
# to accomodate both the ASCII table and binary table column
# format spec, i.e. A7 in ASCII table is the same as 7A in
# binary table, so both will produce 'a7'.
if fmt.lstrip()[0] == 'A' and option != '':
output_format = _fits2rec[dtype]+`int(option)` # make sure option is integer
else:
_repeat = ''
if repeat != 1:
_repeat = `repeat`
output_format = _repeat+_fits2rec[dtype]
elif dtype == 'X':
nbytes = ((repeat-1) / 8) + 1
# use an array, even if it is only ONE u1 (i.e. use tuple always)
output_format = _FormatX(`(nbytes,)`+'u1')
output_format._nx = repeat
elif dtype == 'P':
output_format = _FormatP('2i4')
output_format._dtype = _fits2rec[option[0]]
elif dtype == 'F':
output_format = 'f8'
else:
raise ValueError, "Illegal format %s" % fmt
else:
if dtype == 'a':
output_format = option+_rec2fits[dtype]
elif isinstance(dtype, _FormatX):
warnings.warn('X format')
elif dtype+option in _rec2fits.keys(): # record format
_repeat = ''
if repeat != 1:
_repeat = `repeat`
output_format = _repeat+_rec2fits[dtype+option]
else:
raise ValueError, "Illegal format %s" % fmt
return output_format
def _convert_ASCII_format(input_format):
"""Convert ASCII table format spec to record format spec. """
ascii2rec = {'A':'a', 'I':'i4', 'F':'f4', 'E':'f4', 'D':'f8'}
_re = re.compile(r'(?P<dtype>[AIFED])(?P<width>[0-9]*)')
# Parse the TFORM value into data type and width.
try:
(dtype, width) = _re.match(input_format.strip()).groups()
dtype = ascii2rec[dtype]
if width == '':
width = None
else:
width = eval(width)
except KeyError:
raise ValueError, 'Illegal format `%s` for ASCII table.' % input_format
return (dtype, width)
def _get_index(nameList, key):
"""
Get the index of the key in the name list.
The key can be an integer or string. If integer, it is the index
in the list. If string,
(a) Field (column) names are case sensitive: you can have two
different columns called 'abc' and 'ABC' respectively.
(b) When you *refer* to a field (presumably with the field method),
it will try to match the exact name first, so in the example in
(a), field('abc') will get the first field, and field('ABC') will
get the second field.
If there is no exact name matched, it will try to match the name
with case insensitivity. So, in the last example, field('Abc')
will cause an exception since there is no unique mapping. If
there is a field named "XYZ" and no other field name is a case
variant of "XYZ", then field('xyz'), field('Xyz'), etc. will get
this field.
"""
if isinstance(key, (int, long,np.integer)):
indx = int(key)
elif isinstance(key, str):
# try to find exact match first
try:
indx = nameList.index(key.rstrip())
except ValueError:
# try to match case-insentively,
_key = key.lower().rstrip()
_list = map(lambda x: x.lower().rstrip(), nameList)
_count = operator.countOf(_list, _key) # occurrence of _key in _list
if _count == 1:
indx = _list.index(_key)
elif _count == 0:
raise NameError, "Key '%s' does not exist." % key
else: # multiple match
raise NameError, "Ambiguous key name '%s'." % key
else:
raise NameError, "Illegal key '%s'." % `key`
return indx
def _unwrapx(input, output, nx):
"""Unwrap the X format column into a Boolean array.
input: input Uint8 array of shape (s, nbytes)
output: output Boolean array of shape (s, nx)
nx: number of bits
"""
pow2 = [128, 64, 32, 16, 8, 4, 2, 1]
nbytes = ((nx-1) / 8) + 1
for i in range(nbytes):
_min = i*8
_max = min((i+1)*8, nx)
for j in range(_min, _max):
np.bitwise_and(input[...,i], pow2[j-i*8], output[...,j])
def _wrapx(input, output, nx):
"""Wrap the X format column Boolean array into an UInt8 array.
input: input Boolean array of shape (s, nx)
output: output Uint8 array of shape (s, nbytes)
nx: number of bits
"""
output[...] = 0 # reset the output
nbytes = ((nx-1) / 8) + 1
unused = nbytes*8 - nx
for i in range(nbytes):
_min = i*8
_max = min((i+1)*8, nx)
for j in range(_min, _max):
if j != _min:
np.left_shift(output[...,i], 1, output[...,i])
np.add(output[...,i], input[...,j], output[...,i])
# shift the unused bits
np.left_shift(output[...,i], unused, output[...,i])
def _makep(input, desp_output, dtype):
"""Construct the P format column array, both the data descriptors and
the data. It returns the output "data" array of data type dtype.
The descriptor location will have a zero offset for all columns
after this call. The final offset will be calculated when the file
is written.
input: input object array
desp_output: output "descriptor" array of data type 2Int32
dtype: data type of the variable array
"""
_offset = 0
data_output = _VLF([None]*len(input))
data_output._dtype = dtype
if dtype == 'a':
_nbytes = 1
else:
_nbytes = np.array([],dtype=np.typeDict[dtype]).itemsize
for i in range(len(input)):
if dtype == 'a':
data_output[i] = chararray.array(input[i], itemsize=1)
else:
data_output[i] = np.array(input[i], dtype=dtype)
desp_output[i,0] = len(data_output[i])
desp_output[i,1] = _offset
_offset += len(data_output[i]) * _nbytes
return data_output
class _VLF(np.ndarray):
"""variable length field object."""
def __new__(subtype, input):
"""
input: a sequence of variable-sized elements.
"""
self = np.ndarray.__new__(subtype, shape=(len(input)), dtype=np.object)
self._max = 0
return self
def __array_finalize__(self,obj):
if obj is None:
return
self._max = obj._max
def __setitem__(self, key, value):
"""To make sure the new item has consistent data type to avoid
misalignment.
"""
if isinstance(value, np.ndarray) and value.dtype == self.dtype:
pass
elif isinstance(value, chararray.chararray) and value.itemsize == 1:
pass
elif self._dtype == 'a':
value = chararray.array(value, itemsize=1)
else:
value = np.array(value, dtype=self._dtype)
np.ndarray.__setitem__(self, key, value)
self._max = max(self._max, len(value))
class Column:
"""Column class which contains the definition of one column, e.g.
ttype, tform, etc. and the array. Does not support theap yet.
"""
def __init__(self, name=None, format=None, unit=None, null=None, \
bscale=None, bzero=None, disp=None, start=None, \
dim=None, array=None):
"""Construct a Column by specifying attributes. All attributes
except format can be optional.
name: column name, corresponding to TTYPE keyword
format: column format, corresponding to TFORM keyword
unit: column unit, corresponding to TUNIT keyword
null: null value, corresponding to TNULL keyword
bscale: bscale value, corresponding to TSCAL keyword
bzero: bzero value, corresponding to TZERO keyword
disp: display format, corresponding to TDISP keyword
start: column starting position (ASCII table only),
corresponding to TBCOL keyword
dim: column dimension corresponding to TDIM keyword
"""
# any of the input argument (except array) can be a Card or just
# a number/string
for cname in _commonNames:
value = eval(cname) # get the argument's value
keyword = _keyNames[_commonNames.index(cname)]
if isinstance(value, Card):
setattr(self, cname, value.value)
else:
setattr(self, cname, value)
# if the column data is not ndarray, make it to be one, i.e.
# input arrays can be just list or tuple, not required to be ndarray
if format is not None:
# check format
try:
# legit FITS format? convert to record format (e.g. '3J'->'3i4')
recfmt = _convert_format(format)
except ValueError:
try:
# legit recarray format?
recfmt = format
format = _convert_format(recfmt, reverse=1)
except ValueError:
raise ValueError, "Illegal format `%s`." % format
self.format = format
# does not include Object array because there is no guarantee
# the elements in the object array are consistent.
if not isinstance(array, (np.ndarray, chararray.chararray, Delayed)):
try: # try to convert to a ndarray first
if array is not None:
array = np.array(array)
except:
try: # then try to conver it to a strings array
array = chararray.array(array, itemsize=eval(recfmt[1:]))
# then try variable length array
except:
if isinstance(recfmt, _FormatP):
try:
_func = lambda x: np.array(x, type=recfmt._dtype)
array = _VLF(map(_func, array))
except:
try:
# this handles ['abc'] and [['a','b','c']]
# equally, beautiful!
_func = lambda x: chararray.array(x, itemsize=1)
array = _VLF(map(_func, array))
except:
raise ValueError, "Inconsistent input data array: %s" % array
array._dtype = recfmt._dtype
else:
raise ValueError, "Data is inconsistent with the format `%s`." % format
else:
raise ValueError, "Must specify format to construct Column"
# scale the array back to storage values if there is bscale/bzero
if isinstance(array, np.ndarray):
# boolean needs to be scaled too
if recfmt == _booltype:
_out = np.zeros(array.shape, dtype=recfmt)
array = np.where(array==0, ord('F'), ord('T'))
# make a copy if scaled, so as not to corrupt the original array
if bzero not in ['', None, 0] or bscale not in ['', None, 1]:
array = array.copy()
if bzero not in ['', None, 0]:
array += -bzero
if bscale not in ['', None, 1]:
array /= bscale
array = self.__checkValidDataType(array,self.format)
self.array = array
def __checkValidDataType(self,array,format):
# Convert the format to a type we understand
if isinstance(array,Delayed):
return array
elif (array is None):
return array
else:
if (format.find('A') != -1):
if str(array.dtype).find('S') != -1:
# For ASCII arrays, reconstruct the array and ensure
# that all elements have enough characters to comply
# with the format. The new array will have the data
# left justified in the field with trailing blanks
# added to complete the format requirements.
fsize=eval(_convert_format(format)[1:])
if fsize > array.itemsize:
l = []
for i in range(len(array)):
l.append(array[i][:min(fsize,array.itemsize)]+
' '*(fsize-array.itemsize))
return chararray.array(l)
else:
return array
else:
numpyFormat = _convert_format(format)
return array.astype(numpyFormat)
elif (format.find('X') == -1 and format.find('P') == -1):
(repeat, fmt, option) = _parse_tformat(format)
numpyFormat = _convert_format(fmt)
return array.astype(numpyFormat)
elif (format.find('X') !=-1):
return array.astype(np.uint8)
else:
return array
def __repr__(self):
text = ''
for cname in _commonNames:
value = getattr(self, cname)
if value != None:
text += cname + ' = ' + `value` + '\n'
return text[:-1]
def copy(self):
tmp = Column(format='I') # just use a throw-away format
tmp.__dict__=self.__dict__.copy()
return tmp
class ColDefs(object):
"""Column definitions class. It has attributes corresponding to the
Column attributes (e.g. ColDefs has the attribute .names while Column
has .name), Each attribute in ColDefs is a list of corresponding
attribute values from all Columns.
"""
def __init__(self, input, tbtype='BinTableHDU'):
"""input: a list of Columns, an (table) HDU
tbtype: which table HDU, 'BinTableHDU' (default) or
'TableHDU' (text table).
"""
ascii_fmt = {'A':'A1', 'I':'I10', 'E':'E14.6', 'F':'F16.7', 'D':'D24.16'}
self._tbtype = tbtype
if isinstance(input, ColDefs):
self.data = [col.copy() for col in input.data]
# if the input is a list of Columns
elif isinstance(input, (list, tuple)):
for col in input:
if not isinstance(col, Column):
raise "Element %d in the ColDefs input is not a Column." % input.index(col)
self.data = [col.copy() for col in input]
# if the format of an ASCII column has no width, add one
if tbtype == 'TableHDU':
for i in range(len(self)):
(type, width) = _convert_ASCII_format(self.data[i].format)
if width is None:
self.data[i].format = ascii_fmt[self.data[i].format[0]]
elif isinstance(input, _TableBaseHDU):
hdr = input.header
_nfields = hdr['TFIELDS']
self._width = hdr['NAXIS1']
self._shape = hdr['NAXIS2']
# go through header keywords to pick out column definition keywords
dict = [{} for i in range(_nfields)] # definition dictionaries for each field
for _card in hdr.ascardlist():
_key = _tdef_re.match(_card.key)
try:
keyword = _key.group('label')
except:
continue # skip if there is no match
if (keyword in _keyNames):
col = eval(_key.group('num'))
if col <= _nfields and col > 0:
cname = _commonNames[_keyNames.index(keyword)]
dict[col-1][cname] = _card.value
# data reading will be delayed
for col in range(_nfields):
dict[col]['array'] = Delayed(input, col)
# now build the columns
tmp = [Column(**attrs) for attrs in dict]
self.data = tmp
else:
raise TypeError, "input to ColDefs must be a table HDU or a list of Columns"
def __getattr__(self, name):
"""Populate the attributes."""
cname = name[:-1]
if cname in _commonNames:
attr = [''] * len(self)
for i in range(len(self)):
val = getattr(self[i], cname)
if val != None:
attr[i] = val
elif name == '_arrays':
attr = [col.array for col in self.data]
elif name == '_recformats':
if self._tbtype == 'BinTableHDU':
attr = [_convert_format(fmt) for fmt in self.formats]
elif self._tbtype == 'TableHDU':
self._Formats = self.formats
if len(self) == 1:
dummy = []
else:
dummy = map(lambda x, y: x-y, self.starts[1:], [self.starts[0]]+self.starts[1:-1])
dummy.append(self._width-self.starts[-1]+1)
attr = map(lambda y: 'a'+`y`, dummy)
elif name == 'spans':
# make sure to consider the case that the starting column of
# a field may not be the column right after the last field
if self._tbtype == 'TableHDU':
last_end = 0
attr = [0] * len(self)
for i in range(len(self)):
(_format, _width) = _convert_ASCII_format(self.formats[i])
if self.starts[i] is '':
self.starts[i] = last_end + 1
_end = self.starts[i] + _width - 1
attr[i] = _width
last_end = _end
self._width = _end
else:
raise KeyError, 'Attribute %s not defined.' % name
self.__dict__[name] = attr
return self.__dict__[name]
"""
# make sure to consider the case that the starting column of
# a field may not be the column right after the last field
elif tbtype == 'TableHDU':
(_format, _width) = _convert_ASCII_format(self.formats[i])
if self.starts[i] is '':
self.starts[i] = last_end + 1
_end = self.starts[i] + _width - 1
self.spans[i] = _end - last_end
last_end = _end
self._Formats = self.formats
self._arrays[i] = input[i].array
"""
def __getitem__(self, key):
x = self.data[key]
if isinstance(key, (int, long, np.integer)):
return x
else:
return ColDefs(x)
def __len__(self):
return len(self.data)
def __repr__(self):
return 'ColDefs'+ `tuple(self.data)`
def __coerce__(self, other):
pass # needed for __add__
def __add__(self, other, option='left'):
if isinstance(other, Column):
b = [other]
elif isinstance(other, ColDefs):
b = list(other.data)
else:
raise TypeError, 'Wrong type of input'
if option == 'left':
tmp = list(self.data) + b
else:
tmp = b + list(self.data)
return ColDefs(tmp)
def __radd__(self, other):
return self.__add__(other, 'right')
def __sub__(self, other):
if not isinstance(other, (list, tuple)):
other = [other]
_other = [_get_index(self.names, key) for key in other]
indx=range(len(self))
for x in _other:
indx.remove(x)
tmp = [self[i] for i in indx]
return ColDefs(tmp)
def _setup(self):
""" Initialize all attributes to be a list of null strings."""
for cname in _commonNames:
setattr(self, cname+'s', ['']*self._nfields)
setattr(self, '_arrays', [None]*self._nfields)
def add_col(self, column):
"""Append one Column to the column definition."""
return self+column
def del_col(self, col_name):
"""Delete (the definition of) one Column."""
indx = _get_index(self.names, col_name)
for cname in _commonNames:
attr = getattr(self, cname+'s')
del attr[indx]
del self._arrays[indx]
self._nfields -= 1
def change_attrib(self, col_name, attrib, new_value):
"""Change an attribute (in the commonName list) of a Column."""
indx = _get_index(self.names, col_name)
getattr(self, attrib+'s')[indx] = new_value
def change_name(self, col_name, new_name):
"""Change a Column's name."""
if new_name != col_name and new_name in self.names:
raise ValueError, 'New name %s already exists.' % new_name
else:
self.change_attrib(col_name, 'name', new_name)
def change_unit(self, col_name, new_unit):
"""Change a Column's unit."""
self.change_attrib(col_name, 'unit', new_unit)
def info(self, attrib='all'):
"""Get attribute(s) information of the column definition."""
"""The attrib can be one or more of the attributes listed in
_commonNames. The default is "all" which will print out
all attributes. It forgives plurals and blanks. If there are
two or more attribute names, they must be separated by comma(s).
"""
if attrib.strip().lower() in ['all', '']:
list = _commonNames
else:
list = attrib.split(',')
for i in range(len(list)):
list[i]=list[i].strip().lower()
if list[i][-1] == 's':
list[i]=list[i][:-1]
for att in list:
if att not in _commonNames:
print "'%s' is not an attribute of the column definitions."%att
continue
print "%s:" % att
print ' ', getattr(self, att+'s')
#def change_format(self, col_name, new_format):
#new_format = _convert_format(new_format)
#self.change_attrib(col_name, 'format', new_format)
def _get_tbdata(hdu):
""" Get the table data from input (an HDU object)."""
tmp = hdu.columns
# get the right shape for the data part of the random group,
# since binary table does not support ND yet
if isinstance(hdu, GroupsHDU):
tmp._recformats[-1] = `hdu._dimShape()[:-1]` + tmp._dat_format
if hdu._ffile.memmap:
_mmap = hdu._ffile._mm[hdu._datLoc:hdu._datLoc+hdu._datSpan]
_data = rec.recarray(_mmap, formats=tmp._recformats, names=tmp.names, shape=tmp._shape)
else:
if isinstance(hdu, TableHDU):
itemsize = tmp.spans[-1]+tmp.starts[-1]-1
dtype = {}
for j in range(len(tmp)):
data_type = 'S'+str(tmp.spans[j])
if j == len(tmp)-1:
if hdu.header['NAXIS1'] > itemsize:
data_type = 'S'+str(tmp.spans[j]+ \
hdu.header['NAXIS1']-itemsize)
dtype[tmp.names[j]] = (data_type,tmp.starts[j]-1)
_data = rec.array(hdu._file, dtype=dtype, names=tmp.names, shape=tmp._shape)
else:
_data = rec.array(hdu._file, formats=",".join(tmp._recformats), names=tmp.names, shape=tmp._shape)
if isinstance(hdu._ffile, _File):
# _data._byteorder = 'big'
_data.dtype = _data.dtype.newbyteorder(">")
# pass datLoc, for P format
_data._heapoffset = hdu._theap + hdu._datLoc
_data._file = hdu._file
_tbsize = hdu.header['NAXIS1']*hdu.header['NAXIS2']
_data._gap = hdu._theap - _tbsize
# comment out to avoid circular reference of _pcount
# pass the attributes
for attr in ['formats', 'names']:
setattr(_data, attr, getattr(tmp, attr))
for i in range(len(tmp)):
tmp._arrays[i] = _data.field(i)
return FITS_rec(_data)
def new_table (input, header=None, nrows=0, fill=0, tbtype='BinTableHDU'):
"""Create a new table from the input column definitions."""
"""
input: a list of Columns or a ColDefs object.
header: header to be used to populate the non-required keywords
nrows: number of rows in the new table
fill: if = 1, will fill all cells with zeros or blanks
if = 0, copy the data from input, undefined cells will still
be filled with zeros/blanks.
tbtype: table type to be created (BinTableHDU or TableHDU)
"""
# construct a table HDU
hdu = eval(tbtype)(header=header)
if isinstance(input, ColDefs):
if input._tbtype == tbtype:
tmp = hdu.columns = input
else:
raise ValueError, 'column definitions have a different table type'
elif isinstance(input, FITS_rec): # input is a FITS_rec
tmp = hdu.columns = input._coldefs
elif isinstance(input, np.ndarray):
tmp = hdu.columns = eval(tbtype)(input).data._coldefs
else: # input is a list of Columns
tmp = hdu.columns = ColDefs(input, tbtype)
# read the delayed data
for i in range(len(tmp)):
_arr = tmp._arrays[i]
if isinstance(_arr, Delayed):
if _arr.hdu.data == None:
tmp._arrays[i] = None
else:
tmp._arrays[i] = rec.recarray.field(_arr.hdu.data,_arr.field)
# use the largest column shape as the shape of the record
if nrows == 0:
for arr in tmp._arrays:
if (arr is not None):
dim = arr.shape[0]
else:
dim = 0
if dim > nrows:
nrows = dim
if tbtype == 'TableHDU':
_itemsize = tmp.spans[-1]+tmp.starts[-1]-1
dtype = {}
for j in range(len(tmp)):
data_type = 'S'+str(tmp.spans[j])
dtype[tmp.names[j]] = (data_type,tmp.starts[j]-1)
hdu.data = FITS_rec(rec.array(' '*_itemsize*nrows, dtype=dtype, shape=nrows))
hdu.data.setflags(write=True)
else:
hdu.data = FITS_rec(rec.array(None, formats=",".join(tmp._recformats), names=tmp.names, shape=nrows))
hdu.data._coldefs = hdu.columns
hdu.data.formats = hdu.columns.formats
# populate data to the new table
for i in range(len(tmp)):
if tmp._arrays[i] is None:
size = 0
else:
size = len(tmp._arrays[i])
n = min(size, nrows)
if fill:
n = 0
(_scale, _zero, bscale, bzero) = hdu.data._get_scale_factors(i)[3:]
if n > 0:
if isinstance(tmp._recformats[i], _FormatX):
if tmp._arrays[i][:n].shape[-1] == tmp._recformats[i]._nx:
_wrapx(tmp._arrays[i][:n], rec.recarray.field(hdu.data,i)[:n], tmp._recformats[i]._nx)
else: # from a table parent data, just pass it
rec.recarray.field(hdu.data,i)[:n] = tmp._arrays[i][:n]
elif isinstance(tmp._recformats[i], _FormatP):
hdu.data._convert[i] = _makep(tmp._arrays[i][:n], rec.recarray.field(hdu.data,i)[:n], tmp._recformats[i]._dtype)
else:
if tbtype == 'TableHDU':
# string no need to convert,
if isinstance(tmp._arrays[i], chararray.chararray):
rec.recarray.field(hdu.data,i)[:n] = tmp._arrays[i][:n]
else:
hdu.data._convert[i] = np.zeros(nrows, dtype=tmp._arrays[i].dtype)
if _scale or _zero:
_arr = tmp._arrays[i].copy()
else:
_arr = tmp._arrays[i]
if _scale:
_arr *= bscale
if _zero:
_arr += bzero
hdu.data._convert[i][:n] = _arr[:n]
else:
rec.recarray.field(hdu.data,i)[:n] = tmp._arrays[i][:n]
if n < nrows:
if tbtype == 'BinTableHDU':
# resize the data in the hdu ColDefs attribute
hdu.columns._arrays[i] = rec.recarray.field(hdu.data,i)
# initialize the new data to zero
if isinstance(rec.recarray.field(hdu.data,i), np.ndarray):
# make the scaled data = 0, not the stored data
rec.recarray.field(hdu.data,i)[n:] = -bzero/bscale
else:
rec.recarray.field(hdu.data,i)[n:] = ''
else:
# resize the data in the hdu ColDefs attribute
hdu.columns._arrays[i] = rec.recarray.field(hdu.data,i)
rec.recarray.field(hdu.data,i)[n:] = ' '*hdu.data._coldefs.spans[i]
hdu.update()
return hdu
class FITS_record(object):
"""FITS record class. FITS record class is used to access records of
the FITS_rec object. This will allow us to deal with scaled columns.
The FITS_record class expects a FITS_rec object as input
"""
def __init__(self, input, row=0):
self.array = input
self.row = row
def field(self, fieldName):
"""Get the field data of the record."""
return self.array.field(fieldName)[self.row]
def setfield(self, fieldName, value):
"""Set the field data of the record."""
self.array.field(fieldName)[self.row] = value
def __str__(self):
"""Print one row."""
outlist = []
for i in range(self.array._nfields):
outlist.append(`self.array.field(i)[self.row]`)
return "(" + ", ".join(outlist) + ")"
def __repr__(self):
return self.__str__()
def __getitem__(self,key):
return self.array.field(key)[self.row]
def __setitem__(self,fieldname,value):
self.array.field(fieldName)[self.row] = value
class FITS_rec(rec.recarray):
"""FITS record array class. FITS record array is the data part of a
table HDU's data part. This is a layer over the recarray, so we
can deal with scaled columns.
"""
def __new__(subtype, input):
"""Construct a FITS record array from a recarray."""
# input should be a record array
if input.dtype.subdtype is None:
self = rec.recarray.__new__(subtype, input.shape, input.dtype,
buf=input.data,heapoffset=input._heapoffset,file=input._file)
else:
self = rec.recarray.__new__(subtype, input.shape, input.dtype,
buf=input.data, strides=input.strides,heapoffset=input._heapoffset,file=input._file)
self._nfields = len(self.dtype.names)
self._convert = [None]*len(self.dtype.names)
self._coldefs = None
self._gap = 0
self.names = self.dtype.names
self._names = self.dtype.names # This attribute added for backward compatibility with numarray version of FITS_rec
self.formats = None
return self
def __array_finalize__(self,obj):
if obj is None:
return
self._convert = obj._convert
self._coldefs = obj._coldefs
self._nfields = obj._nfields
self.names = obj.names
self._names = obj._names
self._gap = obj._gap
self.formats = obj.formats
def _clone(self, shape):
"""Overload this to make mask array indexing work properly."""
hdu = new_table(self._coldefs, nrows=shape[0])
return hdu.data
def __repr__(self):
tmp = rec.recarray.__repr__(self)
return tmp
def __getitem__(self, key):
tmp = rec.recarray.__getitem__(self, key)
if isinstance(key, slice) or isinstance(key,np.ndarray):
out = tmp
out._convert = [None]*len(self.dtype.names)
for i in range(len(self.dtype.names)):
# touch all fields to expand the original ._convert list
# so the sliced FITS_rec will view the same scaled columns as
# the original
dummy = self.field(i)
if self._convert[i] is not None:
out._convert[i] = np.ndarray.__getitem__(self._convert[i], key)
del dummy
return out
# if not a slice, do this because Record has no __getstate__.
# also more efficient.
else:
newrecord = FITS_record(self,key)
return newrecord
def __setitem__(self,row,value):
for i in range(self._nfields):
self.field(self.names[i])[row] = value.field(self.names[i])
def _get_scale_factors(self, indx):
"""
Get the scaling flags and factors for one field.
indx is the index of the field.
"""
if self._coldefs._tbtype == 'BinTableHDU':
_str = 'a' in self._coldefs.formats[indx]
_bool = self._coldefs._recformats[indx][-2:] == _booltype
else:
_str = self._coldefs.formats[indx][0] == 'A'
_bool = 0 # there is no boolean in ASCII table
_number = not(_bool or _str)
bscale = self._coldefs.bscales[indx]
bzero = self._coldefs.bzeros[indx]
_scale = bscale not in ['', None, 1]
_zero = bzero not in ['', None, 0]
# ensure bscale/bzero are numbers
if not _scale:
bscale = 1
if not _zero:
bzero = 0
return (_str, _bool, _number, _scale, _zero, bscale, bzero)
def field(self, key):
"""A view of a Column's data as an array."""
indx = _get_index(self._coldefs.names, key)
if (self._convert[indx] is None):
# for X format
if isinstance(self._coldefs._recformats[indx], _FormatX):
_nx = self._coldefs._recformats[indx]._nx
dummy = np.zeros(self.shape+(_nx,), dtype=np.bool_)
_unwrapx(rec.recarray.field(self,indx), dummy, _nx)
self._convert[indx] = dummy
return self._convert[indx]
(_str, _bool, _number, _scale, _zero, bscale, bzero) = self._get_scale_factors(indx)
# for P format
if isinstance(self._coldefs._recformats[indx], _FormatP):
dummy = _VLF([None]*len(self))
dummy._dtype = self._coldefs._recformats[indx]._dtype
for i in range(len(self)):
_offset = rec.recarray.field(self,indx)[i,1] + self._heapoffset
self._file.seek(_offset)
if self._coldefs._recformats[indx]._dtype is 'a':
dummy[i] = chararray.array(self._file, itemsize=rec.recarray.field(self,indx)[i,0], shape=1)
else:
# print type(self._file)
# print "type =",self._coldefs._recformats[indx]._dtype
count = rec.recarray.field(self,indx)[i,0]
dummy[i] = np.fromfile(self._file, dtype=self._coldefs._recformats[indx]._dtype,count =count,sep="")
dummy[i].dtype = dummy[i].dtype.newbyteorder(">")
# scale by TSCAL and TZERO
if _scale or _zero:
for i in range(len(self)):
dummy[i][:] = dummy[i]*bscale+bzero
# Boolean (logical) column
if self._coldefs._recformats[indx]._dtype is _booltype:
for i in range(len(self)):
dummy[i] = np.equal(dummy[i], ord('T'))
self._convert[indx] = dummy
return self._convert[indx]
if _str:
return rec.recarray.field(self,indx)
# ASCII table, convert strings to numbers
if self._coldefs._tbtype == 'TableHDU':
_dict = {'I':np.int32, 'F':np.float32, 'E':np.float32, 'D':np.float64}
_type = _dict[self._coldefs._Formats[indx][0]]
# if the string = TNULL, return ASCIITNULL
nullval = self._coldefs.nulls[indx].strip()
dummy = np.zeros(len(self), dtype=_type)
dummy[:] = ASCIITNULL
self._convert[indx] = dummy
for i in range(len(self)):
if rec.recarray.field(self,indx)[i].strip() != nullval:
dummy[i] = float(rec.recarray.field(self,indx)[i].replace('D', 'E'))
else:
dummy = rec.recarray.field(self,indx)
# further conversion for both ASCII and binary tables
if _number and (_scale or _zero):
# only do the scaling the first time and store it in _convert
self._convert[indx] = np.array(dummy, dtype=np.float64)
if _scale:
np.multiply(self._convert[indx], bscale, self._convert[indx])
if _zero:
self._convert[indx] += bzero
elif _bool:
self._convert[indx] = np.equal(dummy, ord('T'))
else:
return dummy
return self._convert[indx]
def _scale_back(self):
"""Update the parent array, using the (latest) scaled array."""
_dict = {'A':'s', 'I':'d', 'F':'f', 'E':'E', 'D':'E'}
# calculate the starting point and width of each field for ASCII table
if self._coldefs._tbtype == 'TableHDU':
_loc = self._coldefs.starts
_width = []
for i in range(len(self.dtype.names)):
_width.append(_convert_ASCII_format(self._coldefs._Formats[i])[1])
_loc.append(_loc[-1]+rec.recarray.field(self,i).itemsize)
self._heapsize = 0
for indx in range(len(self.dtype.names)):
if (self._convert[indx] is not None):
if isinstance(self._coldefs._recformats[indx], _FormatX):
_wrapx(self._convert[indx], rec.recarray.field(self,indx), self._coldefs._recformats[indx]._nx)
continue
(_str, _bool, _number, _scale, _zero, bscale, bzero) = self._get_scale_factors(indx)
# add the location offset of the heap area for each
# variable length column
if isinstance(self._coldefs._recformats[indx], _FormatP):
desc = rec.recarray.field(self,indx)
desc[:] = 0 # reset
_npts = map(len, self._convert[indx])
desc[:len(_npts),0] = _npts
_dtype= np.array([],dtype=self._coldefs._recformats[indx]._dtype)
desc[1:,1] = np.add.accumulate(desc[:-1,0])*_dtype.itemsize
desc[:,1][:] += self._heapsize
self._heapsize += desc[:,0].sum()*_dtype.itemsize
# conversion for both ASCII and binary tables
if _number or _str:
if _number and (_scale or _zero):
dummy = self._convert[indx].copy()
if _zero:
dummy -= bzero
if _scale:
dummy /= bscale
elif self._coldefs._tbtype == 'TableHDU':
dummy = self._convert[indx]
else:
continue
# ASCII table, convert numbers to strings
if self._coldefs._tbtype == 'TableHDU':
_format = self._coldefs._Formats[indx].strip()
_lead = self._coldefs.starts[indx] - _loc[indx]
if _lead < 0:
raise ValueError, "column `%s` starting point overlaps to the previous column" % indx+1
_trail = _loc[indx+1] - _width[indx] - self._coldefs.starts[indx]
if _trail < 0:
raise ValueError, "column `%s` ending point overlaps to the next column" % indx+1
if 'A' in _format:
_pc = '%-'
else:
_pc = '%'
_fmt = ' '*_lead + _pc + _format[1:] + _dict[_format[0]] + ' '*_trail
# not using numarray.strings's num2char because the
# result is not allowed to expand (as C/Python does).
for i in range(len(dummy)):
x = _fmt % dummy[i]
if len(x) > (_loc[indx+1]-_loc[indx]):
raise ValueError, "number `%s` does not fit into the output's itemsize of %s" % (x, _width[indx])
else:
rec.recarray.field(self,indx)[i] = x
if 'D' in _format:
rec.recarray.field(self,indx).replace('E', 'D')
# binary table
else:
if isinstance(rec.recarray.field(self,indx)[0], np.integer):
dummy = np.around(dummy)
rec.recarray.field(self,indx)[:] = dummy.astype(rec.recarray.field(self,indx).dtype)
del dummy
# ASCII table does not have Boolean type
elif _bool:
rec.recarray.field(self,indx)[:] = np.choose(self._convert[indx],
(np.array([ord('F')],dtype=np.int8)[0],
np.array([ord('T')],dtype=np.int8)[0]))
class GroupData(FITS_rec):
"""Random groups data object.
Allows structured access to FITS Group data in a manner analogous to tables
"""
def __new__(subtype, input=None, bitpix=None, pardata=None, parnames=[],
bscale=None, bzero=None, parbscales=None, parbzeros=None):
"""input: input data, either the group data itself (a numarray) or
a record array (FITS_rec) which will contain both group
parameter info and the data. The rest of the arguments are
used only for the first case.
bitpix: data type as expressed in FITS BITPIX value
(8, 16, 32, 64, -32, or -64)
pardata: parameter data, as a list of (numeric) arrays.
parnames: list of parameter names.
bscale: BSCALE of the data
bzero: BZERO of the data
parbscales: list of bscales for the parameters
parbzeros: list of bzeros for the parameters
"""
if not isinstance(input, FITS_rec):
_formats = ''
_cols = []
if pardata is None:
npars = 0
else:
npars = len(pardata)
if parbscales is None:
parbscales = [None]*npars
if parbzeros is None:
parbzeros = [None]*npars
if bitpix is None:
bitpix = _ImageBaseHDU.ImgCode[input.dtype.name]
fits_fmt = GroupsHDU._dict[bitpix] # -32 -> 'E'
_fmt = _fits2rec[fits_fmt] # 'E' -> 'f4'
_formats = (_fmt+',') * npars
data_fmt = '%s%s' % (`input.shape[1:]`, _fmt)
_formats += data_fmt
gcount = input.shape[0]
for i in range(npars):
_cols.append(Column(name='c'+`i+1`, format = fits_fmt, bscale = parbscales[i], bzero = parbzeros[i]))
_cols.append(Column(name='data', format = fits_fmt, bscale = bscale, bzero = bzero))
subtype._coldefs = ColDefs(_cols)
subtype.parnames = [i.lower() for i in parnames]
# need to inherit from FITS rec. What is being done here?
# tmp = FITS_rec(rec.array(None, formats=_formats, shape=gcount, names= self._coldefs.names))
# self.__setstate__(tmp.__getstate__())
self = FITS_rec(rec.array(None, formats=_formats, names=self._coldefs.names, shape=gcount))
for i in range(npars):
(_scale, _zero) = self._get_scale_factors(i)[3:5]
if _scale or _zero:
self._convert[i] = pardata[i]
else:
# self._parent.field(i)[:] = pardata[i]
rec.recarray.field(self,i)[:] = pardata[i]
(_scale, _zero) = self._get_scale_factors(npars)[3:5]
if _scale or _zero:
self._convert[npars] = input
else:
# self._parent.field(npars)[:] = input
rec.recarray.field(self,npars)[:] = input
else:
# self.__setstate__(input.__getstate__())
self = FITS_rec.__new__(subtype,input)
return self
def __str__(self):
# Byteswap temporarily the byte order for presentation (if needed)
outlist = []
for i in self:
outlist.append(FITS_record.__str__(i))
# When finished, restore the byte order (if needed)
return "RecArray[ \n" + ",\n".join(outlist) + "\n]"
def __getattr__(self, attr):
if attr == 'data':
self.__dict__[attr] = self.field('data')
elif attr == '_unique':
_unique = {}
for i in range(len(self.parnames)):
_name = self.parnames[i]
if _name in _unique:
_unique[_name].append(i)
else:
_unique[_name] = [i]
self.__dict__[attr] = _unique
try:
return self.__dict__[attr]
except KeyError:
raise AttributeError(attr)
def par(self, parName):
"""Get the group parameter values."""
if isinstance(parName, (int, long, np.integer)):
result = self.field(parName)
else:
indx = self._unique[parName.lower()]
if len(indx) == 1:
result = self.field(indx[0])
# if more than one group parameter have the same name
else:
result = self.field(indx[0]).astype('f8')
for i in indx[1:]:
result += self.field(i)
return result
def setpar(self, parName, value):
"""Set the group parameter values."""
if isinstance(parName, (int, long, np.integer)):
self.field(parName)[:] = value
else:
indx = self._unique[parName]
if len(indx) == 1:
self.field(indx[0])[:] = value
# if more than one group parameter have the same name, the
# value must be a list (or tuple) containing arrays
else:
if isinstance(value, (list, tuple)) and len(indx) == len(value):
for i in range(len(indx)):
self.field(indx[i])[:] = value[i]
else:
raise ValueError, "parameter value must be a sequence with %d arrays/numbers." % len(indx)
def _getitem(self, offset):
row = (offset - self._byteoffset) / self._strides[0]
return _Group(self, row)
class _Group(rec.record):
"""One group of the random group data."""
def __init__(self, input, row=0):
rec.Record.__init__(self, input, row)
def par(self, fieldName):
"""Get the group parameter value."""
return self.array.par(fieldName)[self.row]
def setpar(self, fieldName, value):
"""Set the group parameter value."""
self.array[self.row:self.row+1].setpar(fieldName, value)
class _TableBaseHDU(_ExtensionHDU):
"""FITS table extension base HDU class."""
def __init__(self, data=None, header=None, name=None):
"""
header: header to be used
data: data to be used
name: name to be populated in EXTNAME keyword
"""
if header is not None:
if not isinstance(header, Header):
raise ValueError, "header must be a Header object"
if data is DELAYED:
# this should never happen
if header is None:
raise ValueError, "No header to setup HDU."
# if the file is read the first time, no need to copy, and keep it unchanged
else:
self.header = header
else:
# construct a list of cards of minimal header
_list = CardList([
Card('XTENSION', '', ''),
Card('BITPIX', 8, 'array data type'),
Card('NAXIS', 2, 'number of array dimensions'),
Card('NAXIS1', 0, 'length of dimension 1'),
Card('NAXIS2', 0, 'length of dimension 2'),
Card('PCOUNT', 0, 'number of group parameters'),
Card('GCOUNT', 1, 'number of groups'),
Card('TFIELDS', 0, 'number of table fields')
])
if header is not None:
# Make a "copy" (not just a view) of the input header, since it
# may get modified. the data is still a "view" (for now)
hcopy = header.copy()
hcopy._strip()
_list.extend(hcopy.ascardlist())
self.header = Header(_list)
if (data is not DELAYED):
if isinstance(data,np.ndarray) and not data.dtype.fields == None:
if isinstance(data,FITS_rec):
self.data = data
elif isinstance(data,rec.recarray):
self.data = FITS_rec(data)
else:
d = data.view(rec.recarray)
self.data = FITS_rec(d)
self.header['NAXIS1'] = self.data.itemsize
self.header['NAXIS2'] = self.data.shape[0]
self.header['TFIELDS'] = self.data._nfields
if self.data._coldefs == None:
#
# The data does not have a _coldefs attribute so
# create one from the underlying recarray.
#
columns = []
for i in range(len(data.dtype.names)):
cname = data.dtype.names[i]
if data.dtype.fields[cname][0].type == np.string_:
format = \
'A'+str(data.dtype.fields[cname][0].itemsize)
else:
format = \
_convert_format(data.dtype.fields[cname][0].str[1:],
True)
c = Column(name=cname,format=format,array=data[cname])
columns.append(c)
try:
tbtype = 'BinTableHDU'
if self._xtn == 'TABLE':
tbtype = 'TableHDU'
except AttributeError:
pass
self.data._coldefs = ColDefs(columns,tbtype=tbtype)
self.columns = self.data._coldefs
self.update()
elif data is None:
pass
else:
raise TypeError, "table data has incorrect type"
# set extension name
if not name and self.header.has_key('EXTNAME'):
name = self.header['EXTNAME']
self.name = name
def __getattr__(self, attr):
"""Get the 'data' or 'columns' attribute."""
if attr == 'data':
size = self.size()
if size:
self._file.seek(self._datLoc)
data = _get_tbdata(self)
data._coldefs = self.columns
data.formats = self.columns.formats
# print "Got data?"
else:
data = None
self.__dict__[attr] = data
elif attr == 'columns':
class_name = str(self.__class__)
class_name = class_name[class_name.rfind('.')+1:-2]
self.__dict__[attr] = ColDefs(self, tbtype=class_name)
elif attr == '_theap':
self.__dict__[attr] = self.header.get('THEAP', self.header['NAXIS1']*self.header['NAXIS2'])
elif attr == '_pcount':
self.__dict__[attr] = self.header.get('PCOUNT', 0)
try:
return self.__dict__[attr]
except KeyError:
raise AttributeError(attr)
def _summary(self):
"""Summarize the HDU: name, dimensions, and formats."""
class_name = str(self.__class__)
type = class_name[class_name.rfind('.')+1:-2]
# if data is touched, use data info.
if 'data' in dir(self):
if self.data is None:
_shape, _format = (), ''
_nrows = 0
else:
_nrows = len(self.data)
_ncols = len(self.columns.formats)
_format = self.columns.formats
# if data is not touched yet, use header info.
else:
_shape = ()
_nrows = self.header['NAXIS2']
_ncols = self.header['TFIELDS']
_format = '['
for j in range(_ncols):
_format += self.header['TFORM'+`j+1`] + ', '
_format = _format[:-2] + ']'
_dims = "%dR x %dC" % (_nrows, _ncols)
return "%-10s %-11s %5d %-12s %s" % \
(self.name, type, len(self.header.ascard), _dims, _format)
def get_coldefs(self):
"""Returns the table's column definitions."""
return self.columns
def update(self):
""" Update header keywords to reflect recent changes of columns."""
_update = self.header.update
_append = self.header.ascard.append
_cols = self.columns
_update('naxis1', self.data.itemsize, after='naxis')
_update('naxis2', self.data.shape[0], after='naxis1')
_update('tfields', len(_cols), after='gcount')
# Wipe out the old table definition keywords. Mark them first,
# then delete from the end so as not to confuse the indexing.
_list = []
for i in range(len(self.header.ascard)-1,-1,-1):
_card = self.header.ascard[i]
_key = _tdef_re.match(_card.key)
try: keyword = _key.group('label')
except: continue # skip if there is no match
if (keyword in _keyNames):
_list.append(i)
for i in _list:
del self.header.ascard[i]
del _list
# populate the new table definition keywords
for i in range(len(_cols)):
for cname in _commonNames:
val = getattr(_cols, cname+'s')[i]
if val != '':
keyword = _keyNames[_commonNames.index(cname)]+`i+1`
if cname == 'format' and isinstance(self, BinTableHDU):
val = _cols._recformats[i]
if isinstance(val, _FormatX):
val = `val._nx` + 'X'
elif isinstance(val, _FormatP):
VLdata = self.data.field(i)
VLdata._max = max(map(len, VLdata))
val = 'P' + _convert_format(val._dtype, reverse=1) + '(%d)' % VLdata._max
else:
val = _convert_format(val, reverse=1)
#_update(keyword, val)
_append(Card(keyword, val))
def copy(self):
"""Make a copy of the table HDU, both header and data are copied."""
# touch the data, so it's defined (in the case of reading from a
# FITS file)
self.data
return new_table(self.columns, header=self.header, tbtype=self.columns._tbtype)
def _verify(self, option='warn'):
"""_TableBaseHDU verify method."""
_err = _ExtensionHDU._verify(self, option=option)
self.req_cards('NAXIS', None, 'val == 2', 2, option, _err)
self.req_cards('BITPIX', None, 'val == 8', 8, option, _err)
self.req_cards('TFIELDS', '== 7', _isInt+" and val >= 0 and val <= 999", 0, option, _err)
tfields = self.header['TFIELDS']
for i in range(tfields):
self.req_cards('TFORM'+`i+1`, None, None, None, option, _err)
return _err
class TableHDU(_TableBaseHDU):
"""FITS ASCII table extension HDU class."""
__format_RE = re.compile(
r'(?P<code>[ADEFI])(?P<width>\d+)(?:\.(?P<prec>\d+))?')
def __init__(self, data=None, header=None, name=None):
"""data: data of the table
header: header to be used for the HDU
name: the EXTNAME value
"""
self._xtn = 'TABLE'
_TableBaseHDU.__init__(self, data=data, header=header, name=name)
if self.header[0].rstrip() != self._xtn:
self.header[0] = self._xtn
self.header.ascard[0].comment = 'ASCII table extension'
'''
def format(self):
strfmt, strlen = '', 0
for j in range(self.header['TFIELDS']):
bcol = self.header['TBCOL'+`j+1`]
valu = self.header['TFORM'+`j+1`]
fmt = self.__format_RE.match(valu)
if fmt:
code, width, prec = fmt.group('code', 'width', 'prec')
else:
raise ValueError, valu
size = eval(width)+1
strfmt = strfmt + 's'+str(size) + ','
strlen = strlen + size
else:
strfmt = '>' + strfmt[:-1]
return strfmt
'''
def _verify(self, option='warn'):
"""TableHDU verify method."""
_err = _TableBaseHDU._verify(self, option=option)
self.req_cards('PCOUNT', None, 'val == 0', 0, option, _err)
tfields = self.header['TFIELDS']
for i in range(tfields):
self.req_cards('TBCOL'+`i+1`, None, _isInt, None, option, _err)
return _err
class BinTableHDU(_TableBaseHDU):
"""Binary table HDU class."""
def __init__(self, data=None, header=None, name=None):
"""data: data of the table
header: header to be used for the HDU
name: the EXTNAME value
"""
self._xtn = 'BINTABLE'
_TableBaseHDU.__init__(self, data=data, header=header, name=name)
hdr = self.header
if hdr[0] != self._xtn:
hdr[0] = self._xtn
hdr.ascard[0].comment = 'binary table extension'
class StreamingHDU:
"""
A class that provides the capability to stream data to a FITS file
instead of requiring data to all be written at once.
The following psudo code illustrates its use:
header = pyfits.Header()
for all the cards you need in the header:
header.update(key,value,comment)
shdu = pyfits.StreamingHDU('filename.fits',header)
for each piece of data:
shdu.write(data)
shdu.close()
"""
def __init__(self, name, header):
"""
Construct a StreamingHDU object given a file name and a header.
:Parameters:
name : string
The name of the file to which the header and data will be
streamed.
header : Header
The header object associated with the data to be written
to the file.
:Returns:
None
Notes
-----
The file will be opened and the header appended to the end of
the file. If the file does not already exist, it will be created
and if the header represents a Primary header, it will be written
to the beginning of the file. If the file does not exist and the
provided header is not a Primary header, a default Primary HDU will
be inserted at the beginning of the file and the provided header
will be added as the first extension. If the file does already
exist, but the provided header represents a Primary header, the
header will be modified to an image extension header and appended
to the end of the file.
"""
self.header = header.copy()
#
# Check if the file already exists. If it does not, check to see
# if we were provided with a Primary Header. If not we will need
# to prepend a default PrimaryHDU to the file before writing the
# given header.
#
if not os.path.exists(name):
if not self.header.has_key('SIMPLE'):
hdulist = HDUList([PrimaryHDU()])
hdulist.writeto(name, 'exception')
else:
if self.header.has_key('SIMPLE') and os.path.getsize(name) > 0:
#
# This will not be the first extension in the file so we
# must change the Primary header provided into an image
# extension header.
#
self.header.update('XTENSION','IMAGE','Image extension',
after='SIMPLE')
del self.header['SIMPLE']
if not self.header.has_key('PCOUNT'):
dim = self.header['NAXIS']
if dim == 0:
dim = ''
else:
dim = str(dim)
self.header.update('PCOUNT', 0, 'number of parameters', after='NAXIS'+dim)
if not self.header.has_key('GCOUNT'):
self.header.update('GCOUNT', 1, 'number of groups', after='PCOUNT')
self._ffo = _File(name, 'append')
self._ffo.getfile().seek(0,2)
self._hdrLoc = self._ffo.writeHDUheader(self)
self._datLoc = self._ffo.getfile().tell()
self._size = self.size()
if self._size != 0:
self.writeComplete = 0
else:
self.writeComplete = 1
def write(self,data):
"""
Write the given data to the stream.
:Parameters:
data : ndarray
Data to stream to the file.
:Returns:
writeComplete : integer
Flag that when true indicates that all of the required data
has been written to the stream.
Notes
-----
Only the amount of data specified in the header provided to the
class constructor may be written to the stream. If the provided
data would cause the stream to overflow, an IOError exception is
raised and the data is not written. Once sufficient data has been
written to the stream to satisfy the amount specified in the header,
the stream is padded to fill a complete FITS block and no more data
will be accepted. An attempt to write more data after the stream
has been filled will raise an IOError exception. If the dtype of
the input data does not match what is expected by the header, a
TypeError exception is raised.
"""
curDataSize = self._ffo.getfile().tell() - self._datLoc
if self.writeComplete or curDataSize + data.nbytes > self._size:
raise IOError, \
"Attempt to write more data to the stream than the header specified"
if _ImageBaseHDU.NumCode[self.header['BITPIX']] != data.dtype.name:
raise TypeError, \
"Supplied data does not match the type specified in the header."
if data.dtype.str[0] != '>':
#
# byteswap little endian arrays before writing
#
output = data.byteswap()
else:
output = data
output.tofile(self._ffo.getfile())
if self._ffo.getfile().tell() - self._datLoc == self._size:
#
# the stream is full so pad the data to the next FITS block
#
self._ffo.getfile().write(_padLength(self._size)*'\0')
self.writeComplete = 1
self._ffo.getfile().flush()
return self.writeComplete
def size(self):
"""
Return the size (in bytes) of the data portion of the HDU.
:Parameters:
None
:Returns:
size : integer
The number of bytes of data required to fill the stream
per the header provided in the constructor.
"""
size = 0
naxis = self.header.get('NAXIS', 0)
if naxis > 0:
simple = self.header.get('SIMPLE','F')
randomGroups = self.header.get('GROUPS','F')
if simple == 'T' and randomGroups == 'T':
groups = 1
else:
groups = 0
size = 1
for j in range(groups,naxis):
size = size * self.header['NAXIS'+`j+1`]
bitpix = self.header['BITPIX']
gcount = self.header.get('GCOUNT', 1)
pcount = self.header.get('PCOUNT', 0)
size = abs(bitpix) * gcount * (pcount + size) / 8
return size
def close(self):
"""
Close the 'physical' FITS file.
:Parameters:
None
:Returns:
None
"""
self._ffo.close()
class ErrorURLopener(urllib.FancyURLopener):
"""A class to use with urlretrieve to allow IOError exceptions to be
raised when a file specified by a URL cannot be accessed"""
def http_error_default(self, url, fp, errcode, errmsg, headers):
raise IOError, (errcode, errmsg, url)
urllib._urlopener = ErrorURLopener() # Assign the locally subclassed opener
# class to the urllibrary
urllib._urlopener.tempcache = {} # Initialize tempcache with an empty
# dictionary to enable file cacheing
class _File:
"""A file I/O class"""
def __init__(self, name, mode='copyonwrite', memmap=0):
if mode not in _python_mode.keys():
raise "Mode '%s' not recognized" % mode
if isinstance(name, file):
self.name = name.name
elif mode != 'append' and not os.path.exists(name) and \
not os.path.splitdrive(name)[0]:
#
# Not writing file and file does not exist on local machine and
# name does not begin with a drive letter (Windows), try to
# get it over the web.
#
self.name, fileheader = urllib.urlretrieve(name)
else:
self.name = name
self.mode = mode
self.memmap = memmap
self.code = None
self.dims = None
self.offset = 0
if memmap and mode not in ['readonly', 'copyonwrite', 'update']:
raise "Memory mapping is not implemented for mode `%s`." % mode
else:
if isinstance(name, file) and not name.closed:
if _python_mode[mode] != name.mode:
raise "Input mode '%s' (%s) " \
% (mode, _python_mode[mode]) + \
"does not match mode of the input file (%s)." \
% name.mode
self.__file = name
elif os.path.splitext(self.name)[1] == '.gz':
# Handle gzip files
if mode in ['update', 'append']:
raise "Writing to gzipped fits files is not supported"
zfile = gzip.GzipFile(self.name)
self.tfile = tempfile.NamedTemporaryFile('rb+',-1,'.fits')
self.name = self.tfile.name
self.__file = self.tfile.file
self.__file.write(zfile.read())
zfile.close()
elif os.path.splitext(self.name)[1] == '.zip':
# Handle zip files
if mode in ['update', 'append']:
raise "Writing to zipped fits files is not supported"
zfile = zipfile.ZipFile(self.name)
namelist = zfile.namelist()
if len(namelist) != 1:
raise "Zip files with multiple members are not supported."
self.tfile = tempfile.NamedTemporaryFile('rb+',-1,'.fits')
self.name = self.tfile.name
self.__file = self.tfile.file
self.__file.write(zfile.read(namelist[0]))
zfile.close()
else:
self.__file = __builtin__.open(self.name, _python_mode[mode])
# For 'ab+' mode, the pointer is at the end after the open in
# Linux, but is at the beginning in Solaris.
self.__file.seek(0, 2)
self._size = self.__file.tell()
self.__file.seek(0)
def __getattr__(self, attr):
"""Get the _mm attribute."""
if attr == '_mm':
self.__dict__[attr] = Memmap(self.name,offset=self.offset,mode=_memmap_mode[self.mode],dtype=self.code,shape=self.dims)
try:
return self.__dict__[attr]
except KeyError:
raise AttributeError(attr)
def getfile(self):
return self.__file
def _readheader(self, cardList, keyList, blocks):
"""Read blocks of header, and put each card into a list of cards.
Will deal with CONTINUE cards in a later stage as CONTINUE cards
may span across blocks.
"""
if len(block) != _blockLen:
raise IOError, 'Block length is not %d: %d' % (_blockLen, len(block))
elif (blocks[:8] not in ['SIMPLE ', 'XTENSION']):
raise IOError, 'Block does not begin with SIMPLE or XTENSION'
for i in range(0, len(_blockLen), Card.length):
_card = Card('').fromstring(block[i:i+Card.length])
_key = _card.key
cardList.append(_card)
keyList.append(_key)
if _key == 'END':
break
def _readHDU(self):
"""Read the skeleton structure of the HDU."""
end_RE = re.compile('END'+' '*77)
_hdrLoc = self.__file.tell()
# Read the first header block.
block = self.__file.read(_blockLen)
if block == '':
raise EOFError
hdu = _TempHDU()
hdu._raw = ''
# continue reading header blocks until END card is reached
while 1:
# find the END card
mo = end_RE.search(block)
if mo is None:
hdu._raw += block
block = self.__file.read(_blockLen)
if block == '':
break
else:
break
hdu._raw += block
_size, hdu.name = hdu._getsize(hdu._raw)
# get extname and extver
if hdu.name == '':
hdu.name, hdu._extver = hdu._getname()
elif hdu.name == 'PRIMARY':
hdu._extver = 1
hdu._file = self.__file
hdu._hdrLoc = _hdrLoc # beginning of the header area
hdu._datLoc = self.__file.tell() # beginning of the data area
# data area size, including padding
hdu._datSpan = _size + _padLength(_size)
hdu._new = 0
self.__file.seek(hdu._datSpan, 1)
if self.__file.tell() > self._size:
warnings.warn('Warning: File size is smaller than specified data size. File may have been truncated.')
hdu._ffile = self
return hdu
def writeHDU(self, hdu):
"""Write *one* FITS HDU. Must seek to the correct location before
calling this method.
"""
if isinstance(hdu, _ImageBaseHDU):
hdu.update_header()
return (self.writeHDUheader(hdu),) + self.writeHDUdata(hdu)
def writeHDUheader(self, hdu):
"""Write FITS HDU header part."""
blocks = repr(hdu.header.ascard) + _pad('END')
blocks = blocks + _padLength(len(blocks))*' '
if len(blocks)%_blockLen != 0:
raise IOError
self.__file.flush()
loc = self.__file.tell()
self.__file.write(blocks)
# flush, to make sure the content is written
self.__file.flush()
return loc
def writeHDUdata(self, hdu):
"""Write FITS HDU data part."""
byteswapped = False
self.__file.flush()
loc = self.__file.tell()
_size = 0
if hdu.data is not None:
# if image, need to deal with byte order
if isinstance(hdu, _ImageBaseHDU):
# if the data is littleendian
if hdu.data.dtype.str[0] != '>':
byteswapped = True
output = hdu.data.byteswap(True)
output.dtype = output.dtype.newbyteorder('>')
else:
output = hdu.data
# Binary table byteswap
elif isinstance(hdu, BinTableHDU):
for i in range(hdu.data._nfields):
coldata = hdu.data.field(i)
if not isinstance(coldata, chararray.chararray):
# only swap unswapped
# deal with var length table
if isinstance(coldata, _VLF):
k = 0
for j in coldata:
if not isinstance(j, chararray.chararray):
#if hdu.data.field(i).itemsize > 1:
#if hdu.data.field(i).dtype.str[0] != '>':
#hdu.data.field(i)[:] = hdu.data.field(i).byteswap()
if j.itemsize > 1:
if j.dtype.str[0] != '>':
j[:] = j.byteswap()
j.dtype = j.dtype.newbyteorder('>')
if rec.recarray.field(hdu.data,i)[k:k+1].dtype.str[0] != '>':
rec.recarray.field(hdu.data,i)[k:k+1].byteswap(True)
k = k + 1
else:
if coldata.itemsize > 1:
if hdu.data.field(i).dtype.str[0] != '>':
hdu.data.field(i)[:] = hdu.data.field(i).byteswap()
# In case the FITS_rec was created in a LittleEndian machine
hdu.data.dtype = hdu.data.dtype.newbyteorder('>')
output = hdu.data
else:
output = hdu.data
output.tofile(self.__file)
_size = output.size * output.itemsize
# if the image data was byteswapped in this method return it to
# it's original little-endian order
if (byteswapped == True):
output.byteswap(True)
output.dtype = output.dtype.newbyteorder('<')
# write out the heap of variable length array columns
# this has to be done after the "regular" data is written (above)
_where = self.__file.tell()
if isinstance(hdu, BinTableHDU):
self.__file.write(hdu.data._gap*'\0')
for i in range(hdu.data._nfields):
if isinstance(hdu.data._coldefs._recformats[i], _FormatP):
for j in range(len(hdu.data.field(i))):
coldata = hdu.data.field(i)[j]
if len(coldata) > 0:
coldata.tofile(self.__file)
_shift = self.__file.tell() - _where
hdu.data._heapsize = _shift - hdu.data._gap
_size = _size + _shift
# pad the FITS data block
if _size > 0:
if isinstance(hdu, TableHDU):
self.__file.write(_padLength(_size)*' ')
else:
self.__file.write(_padLength(_size)*'\0')
# flush, to make sure the content is written
self.__file.flush()
# return both the location and the size of the data area
return loc, _size+_padLength(_size)
def close(self):
"""Close the 'physical' FITS file."""
self.__file.close()
class HDUList(list, _Verify):
"""HDU list class. This is the top-level FITS object. When a FITS
file is opened, a HDUList object is returned.
"""
def __init__(self, hdus=[], file=None):
"""Construct a HDUList object.
hdus: Input, can be a list of HDU's or a single HDU. Default = None,
i.e. an empty HDUList.
file: The opened physical file associated with the HDUList.
Default = None.
"""
self.__file = file
if hdus is None:
hdus = []
# can take one HDU, as well as a list of HDU's as input
if isinstance(hdus, _ValidHDU):
hdus = [hdus]
elif not isinstance(hdus, (HDUList, list)):
raise "Invalid input for HDUList."
for hdu in hdus:
if not isinstance(hdu, _AllHDU):
raise "Element %d in the HDUList input is not an HDU." % hdus.index(hdu)
list.__init__(self, hdus)
def __iter__(self):
return [self[i] for i in range(len(self))].__iter__()
def __getitem__(self, key, classExtensions={}):
"""Get an HDU from the HDUList, indexed by number or name."""
key = self.index_of(key)
_item = super(HDUList, self).__getitem__(key)
if isinstance(_item, _TempHDU):
super(HDUList, self).__setitem__(key,
_item.setupHDU(classExtensions))
return super(HDUList, self).__getitem__(key)
def __getslice__(self, start, end):
_hdus = super(HDUList, self).__getslice__(start,end)
result = HDUList(_hdus)
return result
def __setitem__(self, key, hdu):
"""Set an HDU to the HDUList, indexed by number or name."""
_key = self.index_of(key)
if isinstance(hdu, (slice, list)):
if isinstance(_key, (int,np.integer)):
raise ValueError, "An element in the HDUList must be an HDU."
for item in hdu:
if not isinstance(item, _AllHDU):
raise ValueError, "%s is not an HDU." % item
else:
if not isinstance(hdu, _AllHDU):
raise ValueError, "%s is not an HDU." % hdu
try:
super(HDUList, self).__setitem__(_key, hdu)
except IndexError:
raise IndexError, 'Extension %s is out of bound or not found.' % key
self._resize = 1
self._truncate = 0
def __delitem__(self, key):
"""Delete an HDU from the HDUList, indexed by number or name."""
key = self.index_of(key)
endIndex = len(self)-1
super(HDUList, self).__delitem__(key)
if ( key == endIndex or key == -1 and not self._resize):
self._truncate = 1
else:
self._truncate = 0
self._resize = 1
def __delslice__(self, i, j):
"""Delete a slice of HDUs from the HDUList, indexed by number only."""
endIndex = len(self)
super(HDUList, self).__delslice__(i, j)
if ( j == endIndex or j == sys.maxint and not self._resize):
self._truncate = 1
else:
self._truncate = 0
self._resize = 1
def _verify (self, option='warn'):
_text = ''
_err = _ErrList([], unit='HDU')
# the first (0th) element must be a primary HDU
if len(self) > 0 and (not isinstance(self[0], PrimaryHDU)):
err_text = "HDUList's 0th element is not a primary HDU."
fix_text = 'Fixed by inserting one as 0th HDU.'
fix = "self.insert(0, PrimaryHDU())"
_text = self.run_option(option, err_text=err_text, fix_text=fix_text, fix=fix)
_err.append(_text)
# each element calls their own verify
for i in range(len(self)):
if i > 0 and (not isinstance(self[i], _ExtensionHDU)):
err_text = "HDUList's element %s is not an extension HDU." % `i`
_text = self.run_option(option, err_text=err_text, fixable=0)
_err.append(_text)
else:
_result = self[i]._verify(option)
if _result:
_err.append(_result)
return _err
def append(self, hdu):
"""Append a new HDU to the HDUList."""
if isinstance(hdu, _AllHDU):
super(HDUList, self).append(hdu)
hdu._new = 1
self._resize = 1
self._truncate = 0
else:
raise "HDUList can only append an HDU"
# make sure the EXTEND keyword is in primary HDU if there is extension
if len(self) > 1:
self.update_extend()
def index_of(self, key):
"""Get the index of an HDU from the HDUList. The key can be an
integer, a string, or a tuple of (string, integer).
"""
if isinstance(key, (int, np.integer,slice)):
return key
elif isinstance(key, tuple):
_key = key[0]
_ver = key[1]
else:
_key = key
_ver = None
if not isinstance(_key, str):
raise KeyError, key
_key = (_key.strip()).upper()
nfound = 0
for j in range(len(self)):
_name = self[j].name
if isinstance(_name, str):
_name = _name.strip().upper()
if _name == _key:
# if only specify extname, can only have one extension with
# that name
if _ver == None:
found = j
nfound += 1
else:
# if the keyword EXTVER does not exist, default it to 1
_extver = self[j]._extver
if _ver == _extver:
found = j
nfound += 1
if (nfound == 0):
raise KeyError, 'extension %s not found' % `key`
elif (nfound > 1):
raise KeyError, 'there are %d extensions of %s' % (nfound, `key`)
else:
return found
def readall(self):
"""Read data of all HDU's into memory."""
for i in range(len(self)):
if self[i].data is not None:
continue
def update_tbhdu(self):
"""Update all table HDU's for scaled fields."""
for hdu in self:
if 'data' in dir(hdu):
if isinstance(hdu, (GroupsHDU, _TableBaseHDU)) and hdu.data is not None:
hdu.data._scale_back()
if isinstance(hdu, _TableBaseHDU) and hdu.data is not None:
# check TFIELDS and NAXIS2
hdu.header['TFIELDS'] = hdu.data._nfields
hdu.header['NAXIS2'] = hdu.data.shape[0]
# calculate PCOUNT, for variable length tables
_tbsize = hdu.header['NAXIS1']*hdu.header['NAXIS2']
_heapstart = hdu.header.get('THEAP', _tbsize)
hdu.data._gap = _heapstart - _tbsize
_pcount = hdu.data._heapsize + hdu.data._gap
if _pcount > 0:
hdu.header['PCOUNT'] = _pcount
# update TFORM for variable length columns
for i in range(hdu.data._nfields):
if isinstance(hdu.data._coldefs.formats[i], _FormatP):
key = hdu.header['TFORM'+`i+1`]
hdu.header['TFORM'+`i+1`] = key[:key.find('(')+1] + `hdu.data.field(i)._max` + ')'
def flush(self, output_verify='exception', verbose=0, classExtensions={}):
"""Force a write of the HDUList back to the file (for append and
update modes only).
output_verify: output verification option, default = 'exception'.
verbose: print out verbose messages? default = 0.
classExtensions: A dictionary that maps pyfits classes to extensions
of those classes. When present in the dictionary,
the extension class will be constructed in place of
the pyfits class.
"""
# Get the name of the current thread and determine if this is a single treaded application
threadName = threading.currentThread()
singleThread = (threading.activeCount() == 1) and (threadName.getName() == 'MainThread')
# Define new signal interput handler
if singleThread:
keyboardInterruptSent = False
def New_SIGINT(*args):
warnings.warn("KeyboardInterrupt ignored until flush is complete!")
keyboardInterruptSent = True
# Install new handler
signal.signal(signal.SIGINT,New_SIGINT)
if self.__file.mode not in ('append', 'update'):
warnings.warn("flush for '%s' mode is not supported." % self.__file.mode)
return
self.update_tbhdu()
self.verify(option=output_verify)
if self.__file.mode == 'append':
for hdu in self:
if (verbose):
try: _extver = `hdu.header['extver']`
except: _extver = ''
# only append HDU's which are "new"
if hdu._new:
self.__file.writeHDU(hdu)
if (verbose):
print "append HDU", hdu.name, _extver
hdu._new = 0
elif self.__file.mode == 'update':
if not self._resize:
# determine if any of the HDU is resized
for hdu in self:
# Header:
# Add 1 to .ascard to include the END card
_nch80 = reduce(operator.add, map(Card._ncards, hdu.header.ascard))
_bytes = (_nch80+1) * Card.length
_bytes = _bytes + _padLength(_bytes)
if _bytes != (hdu._datLoc-hdu._hdrLoc):
self._resize = 1
self._truncate = 0
if verbose:
print "One or more header is resized."
break
# Data:
if 'data' not in dir(hdu):
continue
if hdu.data is None:
continue
_bytes = hdu.data.nbytes
_bytes = _bytes + _padLength(_bytes)
if _bytes != hdu._datSpan:
self._resize = 1
self._truncate = 0
if verbose:
print "One or more data area is resized."
break
if self._truncate:
try:
self.__file.getfile().truncate(hdu._datLoc+hdu._datSpan)
except IOError:
self._resize = 1
self._truncate = 0
# if the HDUList is resized, need to write it to a tmp file,
# delete the original file, and rename the tmp to the original file
if self._resize:
oldName = self.__file.name
oldMemmap = self.__file.memmap
_name = _tmpName(oldName)
_hduList = open(_name, mode="append",
classExtensions=classExtensions)
if (verbose): print "open a temp file", _name
for hdu in self:
(hdu._hdrLoc, hdu._datLoc, hdu._datSpan) = _hduList.__file.writeHDU(hdu)
_hduList.__file.close()
self.__file.close()
os.remove(self.__file.name)
if (verbose): print "delete the original file", oldName
# reopen the renamed new file with "update" mode
os.rename(_name, oldName)
if classExtensions.has_key(_File):
ffo = classExtensions[_File](oldName, mode="update",
memmap=oldMemmap)
else:
ffo = _File(oldName, mode="update", memmap=oldMemmap)
self.__file = ffo
if (verbose): print "reopen the newly renamed file", oldName
# reset the resize attributes after updating
self._resize = 0
self._truncate = 0
for hdu in self:
hdu.header._mod = 0
hdu.header.ascard._mod = 0
hdu._new = 0
hdu._file = ffo.getfile()
# if not resized, update in place
else:
for hdu in self:
if (verbose):
try: _extver = `hdu.header['extver']`
except: _extver = ''
if hdu.header._mod or hdu.header.ascard._mod:
hdu._file.seek(hdu._hdrLoc)
self.__file.writeHDUheader(hdu)
if (verbose):
print "update header in place: Name =", hdu.name, _extver
if 'data' in dir(hdu):
if hdu.data is not None:
hdu._file.seek(hdu._datLoc)
self.__file.writeHDUdata(hdu)
if (verbose):
print "update data in place: Name =", hdu.name, _extver
# reset the modification attributes after updating
for hdu in self:
hdu.header._mod = 0
hdu.header.ascard._mod = 0
if singleThread:
if keyboardInterruptSent:
raise KeyboardInterrupt
signal.signal(signal.SIGINT,signal.getsignal(signal.SIGINT))
def update_extend(self):
"""Make sure if the primary header needs the keyword EXTEND or if
it has the proper value.
"""
hdr = self[0].header
if hdr.has_key('extend'):
if (hdr['extend'] == False):
hdr['extend'] = True
else:
if hdr['naxis'] == 0:
hdr.update('extend', True, after='naxis')
else:
n = hdr['naxis']
hdr.update('extend', True, after='naxis'+`n`)
def writeto(self, name, output_verify='exception', clobber=False,
classExtensions={}):
"""Write the HDUList to a new file.
name: output FITS file name to be written to.
output_verify: output verification option, default = 'exception'.
clobber: Overwrite the output file if exists, default = False.
classExtensions: A dictionary that maps pyfits classes to extensions
of those classes. When present in the dictionary,
the extension class will be constructed in place of
the pyfits class.
"""
if (len(self) == 0):
warnings.warn("There is nothing to write.")
return
self.update_tbhdu()
if output_verify == 'warn':
output_verify = 'exception'
self.verify(option=output_verify)
# check if the output file already exists
if os.path.exists(name):
if clobber:
warnings.warn( "Overwrite existing file '%s'." % name)
os.remove(name)
else:
raise IOError, "File '%s' already exist." % name
# make sure the EXTEND keyword is there if there is extension
if len(self) > 1:
self.update_extend()
hduList = open(name, mode="append", classExtensions=classExtensions)
for hdu in self:
hduList.__file.writeHDU(hdu)
hduList.close(output_verify=output_verify)
def close(self, output_verify='exception', verbose=0):
"""Close the associated FITS file and memmap object, if any.
output_verify: output verification option, default = 'exception'.
verbose: print out verbose messages? default = 0.
This simply calls the close method of the _File class. It has this
two-tier calls because _File has ts own private attribute __file.
"""
if self.__file != None:
if self.__file.memmap == 1:
self.mmobject = self.__file._mm
self.mmobject.sync()
if self.__file.mode in ['append', 'update']:
self.flush(output_verify=output_verify, verbose=verbose)
self.__file.close()
# close the memmap object, it is designed to use an independent
# attribute of mmobject so if the HDUList object is created from files
# other than FITS, the close() call can also close the mm object.
# try:
# self.mmobject.close()
# except:
# pass
def info(self):
"""Summarize the info of the HDU's in this HDUList."""
if self.__file is None:
_name = '(No file associated with this HDUList)'
else:
_name = self.__file.name
results = "Filename: %s\nNo. Name Type"\
" Cards Dimensions Format\n" % _name
for j in range(len(self)):
results = results + "%-3d %s\n"%(j, self[j]._summary())
results = results[:-1]
print results
def open(name, mode="copyonwrite", memmap=0, classExtensions={}):
"""Factory function to open a FITS file and return an HDUList object.
name: Name of the FITS file to be opened or already opened file object.
mode: Open mode, 'readonly' (default), 'update', or 'append'.
memmap: Is memmory mapping to be used? default=0.
classExtensions: A dictionary that maps pyfits classes to extensions of
those classes. When present in the dictionary, the
extension class will be constructed in place of the
pyfits class.
"""
# instantiate a FITS file object (ffo)
if classExtensions.has_key(_File):
ffo = classExtensions[_File](name, mode=mode, memmap=memmap)
else:
ffo = _File(name, mode=mode, memmap=memmap)
if classExtensions.has_key(HDUList):
hduList = classExtensions[HDUList](file=ffo)
else:
hduList = HDUList(file=ffo)
# read all HDU's
while 1:
try:
hduList.append(ffo._readHDU())
except EOFError:
break
# check in the case there is extra space after the last HDU or corrupted HDU
except ValueError:
warnings.warn('Warning: Required keywords missing when trying to read HDU #%d.\n There may be extra bytes after the last HDU or the file is corrupted.' % (len(hduList)+1))
break
# initialize/reset attributes to be used in "update/append" mode
# CardList needs its own _mod attribute since it has methods to change
# the content of header without being able to pass it to the header object
hduList._resize = 0
hduList._truncate = 0
return hduList
fitsopen = open
# Convenience functions
class _Zero(int):
def __init__(self):
self = 0
def _getext(filename, mode, *ext1, **ext2):
"""Open the input file, return the HDUList and the extension."""
if ext2.has_key('classExtensions'):
hdulist = open(filename, mode=mode,
classExtensions=ext2['classExtensions'])
del ext2['classExtensions']
else:
hdulist = open(filename, mode=mode)
n_ext1 = len(ext1)
n_ext2 = len(ext2)
keys = ext2.keys()
# parse the extension spec
if n_ext1 > 2:
raise ValueError, "too many positional arguments"
elif n_ext1 == 1:
if n_ext2 == 0:
ext = ext1[0]
else:
if isinstance(ext1[0], (int, np.integer, tuple)):
raise KeyError, 'Redundant/conflicting keyword argument(s): %s' % ext2
if isinstance(ext1[0], str):
if n_ext2 == 1 and 'extver' in keys:
ext = ext1[0], ext2['extver']
raise KeyError, 'Redundant/conflicting keyword argument(s): %s' % ext2
elif n_ext1 == 2:
if n_ext2 == 0:
ext = ext1
else:
raise KeyError, 'Redundant/conflicting keyword argument(s): %s' % ext2
elif n_ext1 == 0:
if n_ext2 == 0:
ext = _Zero()
elif 'ext' in keys:
if n_ext2 == 1:
ext = ext2['ext']
elif n_ext2 == 2 and 'extver' in keys:
ext = ext2['ext'], ext2['extver']
else:
raise KeyError, 'Redundant/conflicting keyword argument(s): %s' % ext2
else:
if 'extname' in keys:
if 'extver' in keys:
ext = ext2['extname'], ext2['extver']
else:
ext = ext2['extname']
else:
raise KeyError, 'Insufficient keyword argument: %s' % ext2
return hdulist, ext
def getheader(filename, *ext, **extkeys):
"""Get the header from an extension of a FITS file.
@param filename: input FITS file name
@type: string
@keyword classExtensions: (optional) A dictionary that maps pyfits
classes to extensions of those classes. When present in the
dictionary, the extension class will be constructed in place
of the pyfits class.
@param ext: The rest of the arguments are for extension specification.
See L{getdata} for explanations/examples.
@rtype: L{Header} object
@return: header
"""
hdulist, _ext = _getext(filename, 'readonly', *ext, **extkeys)
hdu = hdulist[_ext]
hdr = hdu.header
hdulist.close()
return hdr
def getdata(filename, *ext, **extkeys):
"""Get the data from an extension of a FITS file (and optionally the header).
@type filename: string
@param filename: input FITS file name
@keyword classExtensions: (optional) A dictionary that maps pyfits
classes to extensions of those classes. When present in the
dictionary, the extension class will be constructed in place
of the pyfits class.
@param ext: The rest of the arguments are for extension specification. They are
flexible and are best illustrated by examples:
No extra arguments implies the primary header
>>> getdata('in.fits')
By extension number:
>>> getdata('in.fits', 0) # the primary header
>>> getdata('in.fits', 2) # the second extension
>>> getdata('in.fits', ext=2) # the second extension
By name, i.e., EXTNAME value (if unique):
>>> getdata('in.fits', 'sci')
>>> getdata('in.fits', extname='sci') # equivalent
Note EXTNAMEs are not case sensitive
By combination of EXTNAME and EXTVER, as separate arguments or as a tuple:
>>> getdata('in.fits', 'sci', 2) # EXTNAME='SCI' & EXTVER=2
>>> getdata('in.fits', extname='sci', extver=2) # equivalent
>>> getdata('in.fits', ('sci', 2)) # equivalent
Ambiguous or conflicting specifications will raise an exception, e.g.,
>>> getdata('in.fits', ext=('sci',1), extname='err', extver=2)
@return: an array, record array (i.e. table), or groups data object
depending on the type of the extension being referenced
If the optional keyword 'header' is set to True, this function will
return a (data, header) tuple.
"""
if 'header' in extkeys:
_gethdr = extkeys['header']
del extkeys['header']
else:
_gethdr = False
hdulist, _ext = _getext(filename, 'readonly', *ext, **extkeys)
hdu = hdulist[_ext]
_data = hdu.data
if _data is None and isinstance(_ext, _Zero):
try:
hdu = hdulist[1]
_data = hdu.data
except IndexError:
raise IndexError, 'No data in this HDU.'
if _data is None:
raise IndexError, 'No data in this HDU.'
if _gethdr:
_hdr = hdu.header
hdulist.close()
if _gethdr:
return _data, _hdr
else:
return _data
def getval(filename, key, *ext, **extkeys):
"""Get a keyword's value from a header in a FITS file.
@type filename: string
@param filename: input FITS file name
@type key: string
@param key: keyword name
@keyword classExtensions: (optional) A dictionary that maps pyfits
classes to extensions of those classes. When present in the
dictionary, the extension class will be constructed in place
of the pyfits class.
@param ext: The rest of the arguments are for extension specification.
See L{getdata} for explanations/examples.
@return: keyword value
@rtype: string, integer, or float
"""
_hdr = getheader(filename, *ext, **extkeys)
return _hdr[key]
def _makehdu(data, header, classExtensions={}):
if header is None:
if isinstance(data, FITS_rec):
hdu = BinTableHDU(data)
elif isinstance(data, np.ndarray):
if classExtensions.has_key(ImageHDU):
hdu = classExtensions[ImageHDU](data)
else:
hdu = ImageHDU(data)
else:
raise KeyError, 'data must be numarray or table data.'
else:
if classExtensions.has_key(header._hdutype):
header._hdutype = classExtensions[header._hdutype]
hdu=header._hdutype(data=data, header=header)
return hdu
def writeto(filename, data, header=None, **keys):
"""Create a new FITS file using the supplied data/header.
@type filename: string
@param filename: name of the new FITS file to write to
@type data: array, record array, or groups data object
@param data: data to write to the new file
@type header: L{Header} object or None
@param header: the header associated with 'data', if None, a
header of the appropriate type is created for the supplied
data. This argument is optional.
@keyword classExtensions: (optional) A dictionary that maps pyfits
classes to extensions of those classes. When present in the
dictionary, the extension class will be constructed in place
of the pyfits class.
@keyword clobber: (optional) if True and if filename already exists, it
will overwrite the file. Default is False.
"""
if header is None:
if 'header' in keys:
header = keys['header']
classExtensions = keys.get('classExtensions', {})
hdu=_makehdu(data, header, classExtensions)
if not isinstance(hdu, PrimaryHDU):
if classExtensions.has_key(PrimaryHDU):
hdu = classExtensions[PrimaryHDU](data, header=header)
else:
hdu = PrimaryHDU(data, header=header)
clobber = keys.get('clobber', False)
output_verify = keys.get('output_verify', 'exception')
hdu.writeto(filename, clobber=clobber, output_verify=output_verify,
classExtensions=classExtensions)
def append(filename, data, header=None, classExtensions={}):
"""Append the header/data to FITS file if filename exists, create if not.
If only data is supplied, a minimal header is created
@type filename: string
@param filename: name of the file to append to
@type data: array, table, or group data object
@param data: the new data used for appending
@type header: L{Header} object or None
@param header: the header associated with 'data', if None,
an appropriate header will be created for the data object
supplied.
@type classExtensions: dictionary
@param classExtensions: A dictionary that maps pyfits classes to
extensions of those classes. When present in
the dictionary, the extension class will be
constructed in place of the pyfits class.
"""
if not os.path.exists(filename):
writeto(filename, data, header, classExtensions)
else:
hdu=_makehdu(data, header, classExtensions)
if isinstance(hdu, PrimaryHDU):
if classExtensions.has_key(ImageHDU):
hdu = classExtensions[ImageHDU](data, header)
else:
hdu = ImageHDU(data, header)
f = open(filename, mode='update', classExtensions=classExtensions)
f.append(hdu)
f.close()
def update(filename, data, *ext, **extkeys):
"""Update the specified extension with the input data/header.
@type filename: string
@param filename: name of the file to be updated
data: the new data used for updating
@keyword classExtensions: (optional) A dictionary that maps pyfits
classes to extensions of those classes. When present in the
dictionary, the extension class will be constructed in place
of the pyfits class.
The rest of the arguments are flexible:
the 3rd argument can be the header associated with the data.
If the 3rd argument is not a header, it (and other positional
arguments) are assumed to be the extension specification(s).
Header and extension specs can also be keyword arguments.
For example:
>>> update(file, dat, hdr, 'sci') # update the 'sci' extension
>>> update(file, dat, 3) # update the 3rd extension
>>> update(file, dat, hdr, 3) # update the 3rd extension
>>> update(file, dat, 'sci', 2) # update the 2nd SCI extension
>>> update(file, dat, 3, header=hdr) # update the 3rd extension
>>> update(file, dat, header=hdr, ext=5) # update the 5th extension
"""
# parse the arguments
header = None
if len(ext) > 0:
if isinstance(ext[0], Header):
header = ext[0]
ext = ext[1:]
elif not isinstance(ext[0], (int, long, np.integer, str, tuple)):
raise KeyError, 'Input argument has wrong data type.'
if 'header' in extkeys:
header = extkeys['header']
del extkeys['header']
classExtensions = extkeys.get('classExtensions', {})
new_hdu=_makehdu(data, header, classExtensions)
hdulist, _ext = _getext(filename, 'update', *ext, **extkeys)
hdulist[_ext] = new_hdu
hdulist.close()
def info(filename, classExtensions={}):
"""Print the summary information on a FITS file.
This includes the name, type, length of header, data shape and type
for each extension.
@type filename: string
@param filename: input FITS file name
@type classExtensions: dictionary
@param classExtensions: A dictionary that maps pyfits classes to
extensions of those classes. When present in
the dictionary, the extension class will be
constructed in place of the pyfits class.
"""
f = open(filename,classExtensions=classExtensions)
f.info()
f.close()
UNDEFINED = Undefined()
__credits__="""
Copyright (C) 2004 Association of Universities for Research in Astronomy (AURA)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
3. The name of AURA and its representatives may not be used to
endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY AURA ``AS IS'' AND ANY EXPRESS OR IMPLIED
WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL AURA BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
"""

Event Timeline