import struct
import numpy
from seabreeze.pyseabreeze.protocol import OOIProtocol, OBPProtocol
from seabreeze.pyseabreeze.exceptions import SeaBreezeError, SeaBreezeNotSupported
from seabreeze.pyseabreeze.features._base import SeaBreezeFeature
from seabreeze.pyseabreeze.features.eeprom import SeaBreezeEEPromFeatureOOI
# Definition
# ==========
#
[docs]class SeaBreezeSpectrometerFeature(SeaBreezeFeature):
identifier = 'spectrometer'
def set_trigger_mode(self, mode):
raise NotImplementedError("implement in derived class")
def set_integration_time_micros(self, integration_time_micros):
raise NotImplementedError("implement in derived class")
def get_integration_time_micros_limits(self):
raise NotImplementedError("implement in derived class")
def get_maximum_intensity(self):
raise NotImplementedError("implement in derived class")
def get_electric_dark_pixel_indices(self):
raise NotImplementedError("implement in derived class")
def _spectrum_length(self):
raise NotImplementedError("implement in derived class")
def get_wavelengths(self):
raise NotImplementedError("implement in derived class")
def get_intensities(self):
raise NotImplementedError("implement in derived class")
def _get_spectrum_raw(self):
raise NotImplementedError("implement in derived class")
def get_fast_buffer_spectrum(self):
raise SeaBreezeNotSupported("needs to be provided in the specific implementation if supported")
# Spectrometer Features based on USBCommOOI
# =========================================
#
class SeaBreezeSpectrometerFeatureOOI(SeaBreezeSpectrometerFeature):
_required_protocol_cls = OOIProtocol
_required_features = ('eeprom',)
_required_kwargs = (
'dark_pixel_indices',
'integration_time_min',
'integration_time_max',
'integration_time_base',
'spectrum_num_pixel',
'spectrum_raw_length',
'spectrum_max_value',
'trigger_modes',
)
_normalization_value = 1.0
# config
_dark_pixel_indices = None
_integration_time_min = None
_integration_time_max = None
_integration_time_base = None
_spectrum_num_pixel = None
_spectrum_raw_length = None
_spectrum_max_value = None
_trigger_modes = None
def __init__(self, device, feature_id, **kwargs):
super(SeaBreezeSpectrometerFeatureOOI, self).__init__(device, feature_id, **kwargs)
self._dark_pixel_indices = kwargs['dark_pixel_indices']
self._integration_time_min = kwargs['integration_time_min']
self._integration_time_max = kwargs['integration_time_max']
self._integration_time_base = kwargs['integration_time_base']
self._spectrum_num_pixel = kwargs['spectrum_num_pixel']
self._spectrum_raw_length = kwargs['spectrum_raw_length']
self._spectrum_max_value = kwargs['spectrum_max_value']
self._trigger_modes = kwargs['trigger_modes']
def set_trigger_mode(self, mode):
if mode in self._trigger_modes:
self.protocol.send(0x0A, mode)
else:
raise SeaBreezeError("Only supports: %s" % str(self._trigger_modes))
def set_integration_time_micros(self, integration_time_micros):
t_min = self._integration_time_min
t_max = self._integration_time_max
if t_min <= integration_time_micros < t_max:
i_time = int(integration_time_micros / self._integration_time_base)
self.protocol.send(0x02, i_time)
else:
raise SeaBreezeError("Integration not in [{:d}, {:d}]".format(t_min, t_max))
def get_integration_time_micros_limits(self):
return self._integration_time_min, self._integration_time_max
def get_maximum_intensity(self):
return float(self._spectrum_max_value)
def get_electric_dark_pixel_indices(self):
return numpy.array(self._dark_pixel_indices)
@property
def _spectrum_length(self):
return self._spectrum_num_pixel
def get_wavelengths(self):
indices = numpy.arange(self._spectrum_length, dtype=numpy.float64)
# OOI spectrometers store the wavelength calibration in slots 1,2,3,4
coeffs = []
for i in range(1, 5):
# noinspection PyProtectedMember
coeffs.append(float(SeaBreezeEEPromFeatureOOI._func_eeprom_read_slot(self.protocol, i)))
return sum(wl * (indices ** i) for i, wl in enumerate(coeffs))
def get_intensities(self):
tmp = self._get_spectrum_raw()
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1]), dtype=numpy.double)
return ret * self._normalization_value
def _get_spectrum_raw(self):
tmp = numpy.empty((self._spectrum_raw_length,), dtype=numpy.uint8)
self.protocol.send(0x09)
timeout = int(self._integration_time_max * 1e-3 + self.protocol.transport.default_timeout_ms)
tmp[:] = self.protocol.receive(size=self._spectrum_raw_length, timeout_ms=timeout, mode='high_speed')
return tmp
def get_fast_buffer_spectrum(self):
raise SeaBreezeNotSupported("needs to be provided in the specific implementation if supported")
class SeaBreezeSpectrometerFeatureOOI2K(SeaBreezeSpectrometerFeatureOOI):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The byte order is different for some models
N_raw = self._spectrum_raw_length - 1
N_pix = self._spectrum_length
idx = [(i // 2) % 64 + (i % 2) * 64 + (i // 128) * 128 for i in range(N_raw)]
# high nibble not guaranteed to be pulled low
tsorted = tmp[idx] & numpy.array((0xFF, 0x0F) * N_pix, dtype=numpy.uint8)
ret = numpy.array(struct.unpack("<" + "H" * N_pix, tsorted), dtype=numpy.double)
# sorted and parsed
return ret * self._normalization_value
class SeaBreezeSpectrometerFeatureOOIFPGA(SeaBreezeSpectrometerFeatureOOI):
def __init__(self, device, feature_id, **kwargs):
super(SeaBreezeSpectrometerFeatureOOIFPGA, self).__init__(device, feature_id, **kwargs)
self.protocol.send(0xFE)
ret = self.protocol.receive(size=16)
data = struct.unpack("<HLBBBBBBBBBB", ret[:])
# FIXME: check data[10] return values
self.protocol.transport._default_read_endpoint = 'low_speed' if data[10] else 'high_speed'
class SeaBreezeSpectrometerFeatureOOIFPGA4K(SeaBreezeSpectrometerFeatureOOIFPGA):
def _get_spectrum_raw(self):
tmp = numpy.empty((self._spectrum_raw_length,), dtype=numpy.uint8)
timeout = int(self._integration_time_max * 1e-3 + self.protocol.transport.default_timeout_ms)
self.protocol.send(0x09)
# noinspection PyProtectedMember
if self.protocol.transport._default_read_endpoint == 'low_speed':
tmp[:] = self.protocol.receive(size=self._spectrum_raw_length, timeout_ms=timeout)
else: # high_speed
tmp[:2048] = self.protocol.receive(size=2048, timeout_ms=timeout, mode='high_speed_alt')
tmp[2048:] = self.protocol.receive(size=self._spectrum_raw_length - 2048,
timeout_ms=timeout, mode='high_speed')
return tmp
class _SeaBreezeSpectrometerSaturationMixin(object):
def _saturation_unpack(self, ret):
return struct.unpack("<H", ret[6:8])[0]
def _saturation_not_initialized(self, x):
return x == 0
# noinspection PyUnresolvedReferences
def _saturation_get_normalization_value(self):
"""internal only"""
# noinspection PyProtectedMember
ret = SeaBreezeEEPromFeatureOOI._func_eeprom_read_slot(self.protocol, 17, raw=True)
# ret contains the first two response bytes, then the eeprom data
saturation = self._saturation_unpack(ret)
if self._saturation_not_initialized(saturation):
# pass # not initialized?
return self._normalization_value
else:
return float(self._spectrum_max_value) / saturation
class SeaBreezeSpectrometerFeatureOOIGain(SeaBreezeSpectrometerFeatureOOI,
_SeaBreezeSpectrometerSaturationMixin):
def __init__(self, device, feature_id, **kwargs):
# set the usbspeed
super(SeaBreezeSpectrometerFeatureOOIGain, self).__init__(device, feature_id, **kwargs)
# load the saturation value
self._normalization_value = self._saturation_get_normalization_value()
class SeaBreezeSpectrometerFeatureOOIFPGAGain(SeaBreezeSpectrometerFeatureOOIFPGA,
_SeaBreezeSpectrometerSaturationMixin):
def __init__(self, device, feature_id, **kwargs):
# set the usbspeed
super(SeaBreezeSpectrometerFeatureOOIFPGAGain, self).__init__(device, feature_id, **kwargs)
# load the saturation value
self._normalization_value = self._saturation_get_normalization_value()
class SeaBreezeSpectrometerFeatureOOIFPGA4KGain(SeaBreezeSpectrometerFeatureOOIFPGA4K,
_SeaBreezeSpectrometerSaturationMixin):
def __init__(self, device, feature_id, **kwargs):
# set the usbspeed
super(SeaBreezeSpectrometerFeatureOOIFPGA4KGain, self).__init__(device, feature_id, **kwargs)
# get the saturation value
self._normalization_value = self._saturation_get_normalization_value()
class SeaBreezeSpectrometerFeatureOOIGainAlt(SeaBreezeSpectrometerFeatureOOIGain):
# XXX: The NIRQUEST stores this value somewhere else
# and might also not have been programmed yet..
# TODO: And this is planned for the QE65000 apparently.
def _saturation_unpack(self, ret):
return struct.unpack("<L", ret[6:10])[0]
class SeaBreezeSpectrometerFeatureOOIFPGAGainAlt(SeaBreezeSpectrometerFeatureOOIFPGAGain):
# XXX: The Apex, Maya2000pro, MayaLSL store this value somewhere else
# and might also not have been programmed yet...
# ret contains the first two response bytes, then the eeprom data
def _saturation_unpack(self, ret):
return struct.unpack("<H", ret[2:4])[0]
def _saturation_not_initialized(self, x):
return x <= 32768 or x > self._spectrum_max_value
# Spectrometer Features based on USBCommOBP
# =========================================
#
class SeaBreezeSpectrometerFeatureOBP(SeaBreezeSpectrometerFeature):
_required_protocol_cls = OBPProtocol
# required_interface_cls = USBCommOBP
required_features = ()
_required_kwargs = (
'dark_pixel_indices',
'integration_time_min',
'integration_time_max',
'integration_time_base',
'spectrum_num_pixel',
'spectrum_raw_length',
'spectrum_max_value',
'trigger_modes',
)
_normalization_value = 1.0
# config
_dark_pixel_indices = None
_integration_time_min = None
_integration_time_max = None
_integration_time_base = None
_spectrum_num_pixel = None
_spectrum_raw_length = None
_spectrum_max_value = None
_trigger_modes = None
def __init__(self, device, feature_id, **kwargs):
super(SeaBreezeSpectrometerFeatureOBP, self).__init__(device, feature_id, **kwargs)
self._dark_pixel_indices = kwargs['dark_pixel_indices']
self._integration_time_min = kwargs['integration_time_min']
self._integration_time_max = kwargs['integration_time_max']
self._integration_time_base = kwargs['integration_time_base']
self._spectrum_num_pixel = kwargs['spectrum_num_pixel']
self._spectrum_raw_length = kwargs['spectrum_raw_length']
self._spectrum_max_value = kwargs['spectrum_max_value']
self._trigger_modes = kwargs['trigger_modes']
def set_trigger_mode(self, mode):
if mode in self._trigger_modes:
self.protocol.send(0x00110110, mode)
else:
raise SeaBreezeError("Only supports: %s" % str(self._trigger_modes))
def set_integration_time_micros(self, integration_time_micros):
t_min = self._integration_time_min
t_max = self._integration_time_max
if t_min <= integration_time_micros < t_max:
i_time = int(integration_time_micros / self._integration_time_base)
self.protocol.send(0x00110010, i_time)
else:
raise SeaBreezeError("Integration not in [{:d}, {:d}]".format(t_min, t_max))
def get_integration_time_micros_limits(self):
return self._integration_time_min, self._integration_time_max
def get_maximum_intensity(self):
return float(self._spectrum_max_value)
def get_electric_dark_pixel_indices(self):
return numpy.array(self._dark_pixel_indices)
@property
def _spectrum_length(self):
return self._spectrum_num_pixel
def get_wavelengths(self):
# get number of wavelength coefficients
data = self.protocol.query(0x00180100)
N = struct.unpack("<B", data)[0]
# now query the coefficients
coeffs = []
for i in range(N):
data = self.protocol.query(0x00180101, i)
coeffs.append(struct.unpack("<f", data)[0])
# and generate the wavelength array
indices = numpy.arange(self._spectrum_length, dtype=numpy.float64)
return sum(wl * (indices ** i) for i, wl in enumerate(coeffs))
def get_intensities(self):
tmp = self._get_spectrum_raw()
# noinspection PyTypeChecker
return numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp), dtype=numpy.double)
def _get_spectrum_raw(self):
timeout = int(self._integration_time_max * 1e-3 + self.protocol.transport.default_timeout_ms)
datastring = self.protocol.query(0x00101100, timeout_ms=timeout)
return numpy.fromstring(datastring, dtype=numpy.uint8)
def get_fast_buffer_spectrum(self):
raise SeaBreezeNotSupported("needs to be provided in the specific implementation if supported")
# Model specific changes
# ======================
#
class SeaBreezeSpectrometerFeatureUSB2000(SeaBreezeSpectrometerFeatureOOI2K):
pass
class SeaBreezeSpectrometerFeatureHR2000(SeaBreezeSpectrometerFeatureOOI2K):
pass
class SeaBreezeSpectrometerFeatureHR4000(SeaBreezeSpectrometerFeatureOOIFPGA4K):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The HR4000 needs to xor with 0x2000
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1])) ^ 0x2000
return ret.astype(numpy.double) * self._normalization_value
class SeaBreezeSpectrometerFeatureUSB650(SeaBreezeSpectrometerFeatureOOI2K):
pass
class SeaBreezeSpectrometerFeatureHR2000PLUS(SeaBreezeSpectrometerFeatureOOIFPGA):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The HR2000PLUS needs to xor with 0x2000
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1])) ^ 0x2000
return ret.astype(numpy.double) * self._normalization_value
class SeaBreezeSpectrometerFeatureQE65000(SeaBreezeSpectrometerFeatureOOIFPGA):
def get_wavelengths(self):
# QE65000 specific override
indices = numpy.arange(-10, self._spectrum_length - 10, dtype=numpy.float64)
# OOI spectrometers store the wavelength calibration in slots 1,2,3,4
coeffs = []
for i in range(1, 5):
# noinspection PyProtectedMember
coeffs.append(float(SeaBreezeEEPromFeatureOOI._func_eeprom_read_slot(self.protocol, i)))
return sum(wl * (indices ** i) for i, wl in enumerate(coeffs))
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The QE65000 needs to xor with 0x8000
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1])) ^ 0x8000
return ret.astype(numpy.double) * self._normalization_value
class SeaBreezeSpectrometerFeatureUSB2000PLUS(SeaBreezeSpectrometerFeatureOOIFPGAGain):
pass
class SeaBreezeSpectrometerFeatureUSB4000(SeaBreezeSpectrometerFeatureOOIFPGA4KGain):
pass
class SeaBreezeSpectrometerFeatureNIRQUEST512(SeaBreezeSpectrometerFeatureOOIGainAlt):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The NIRQUEST512 needs to xor with 0x8000
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1])) ^ 0x8000
return ret.astype(numpy.double) * self._normalization_value
class SeaBreezeSpectrometerFeatureNIRQUEST256(SeaBreezeSpectrometerFeatureOOIGainAlt):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# The NIRQUEST256 needs to xor with 0x8000
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:-1])) ^ 0x8000
return ret.astype(numpy.double) * self._normalization_value
class SeaBreezeSpectrometerFeatureMAYA2000PRO(SeaBreezeSpectrometerFeatureOOIFPGAGainAlt):
pass
class SeaBreezeSpectrometerFeatureMAYA2000(SeaBreezeSpectrometerFeatureOOIFPGA):
pass
class SeaBreezeSpectrometerFeatureTORUS(SeaBreezeSpectrometerFeatureOOIFPGAGain):
# The Torus uses the USB2000Plus spec feature
pass
class SeaBreezeSpectrometerFeatureAPEX(SeaBreezeSpectrometerFeatureOOIFPGAGainAlt):
pass
class SeaBreezeSpectrometerFeatureMAYALSL(SeaBreezeSpectrometerFeatureOOIFPGAGainAlt):
pass
class SeaBreezeSpectrometerFeatureJAZ(SeaBreezeSpectrometerFeatureOOIGain):
def get_intensities(self):
tmp = self._get_spectrum_raw()
# XXX: No sync byte for the Jaz
ret = numpy.array(struct.unpack("<" + "H" * self._spectrum_length, tmp[:]), dtype=numpy.double)
return ret * self._normalization_value
class SeaBreezeSpectrometerFeatureSTS(SeaBreezeSpectrometerFeatureOBP):
pass
class SeaBreezeSpectrometerFeatureQEPRO(SeaBreezeSpectrometerFeatureOBP):
def _get_spectrum_raw(self):
timeout = int(self._integration_time_max * 1e-3 + self.protocol.transport.default_timeout_ms)
datastring = self.protocol.query(0x00100928, timeout_ms=timeout)
return numpy.fromstring(datastring, dtype=numpy.uint8)
def get_intensities(self):
tmp = self._get_spectrum_raw()
# 32byte metadata block at beginning
ret = numpy.array(struct.unpack("<" + "I" * self._spectrum_length, tmp[32:]), dtype=numpy.double)
return ret * self._normalization_value
class SeaBreezeSpectrometerFeatureVENTANA(SeaBreezeSpectrometerFeatureOBP):
pass