Source code for seabreeze.pyseabreeze.features.rawusb

from __future__ import annotations

import sys

if sys.version_info >= (3, 8):
    from typing import Literal
else:
    from typing_extensions import Literal

from seabreeze.pyseabreeze.features._base import SeaBreezeFeature
from seabreeze.pyseabreeze.transport import USBTransport
from seabreeze.pyseabreeze.types import PySeaBreezeProtocol


[docs]class SeaBreezeRawUSBBusAccessFeature(SeaBreezeFeature): r""" Reading and writing raw usb Example usage .. code-block:: python >>> import struct # needed for packing binary data into bytestrings >>> from seabreeze.spectrometers import Spectrometer >>> spec = Spectrometer.from_first_available() # features need to be accessed on the spectrometer instance via spec.f or spec.features # the data you provide needs to packed into a bytestring via struct.pack >>> spec.f.raw_usb_bus_access.raw_usb_write(data=struct.pack('<B', 0xFE), endpoint='primary_out') # when reading via the raw_usb feature you can easily block the spectrometer. # so make sure to only read when you expect new data (after you sent a command) # and only read as many bytes as the command returns >>> output = spec.f.raw_usb_bus_access.raw_usb_read(endpoint='primary_in', buffer_length=16) >>> print(output) b'\x00\x08p\x17\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x80U' # extract by providing the data format from the developer datasheet and `output` to struct.unpack """ identifier = "raw_usb_bus_access" _required_protocol_cls = PySeaBreezeProtocol @classmethod def supports_protocol(cls, protocol: PySeaBreezeProtocol) -> bool: return isinstance(protocol.transport, USBTransport) def raw_usb_write( self, data: bytes, endpoint: Literal["primary_out", "secondary_out"] ) -> None: if endpoint not in {"primary_out", "secondary_out"}: raise ValueError("endpoint has to be one of 'primary_out', 'secondary_out'") if endpoint == "secondary_out": pass self.protocol.transport.write(data) def raw_usb_read( self, endpoint: Literal["primary_in", "secondary_in", "secondary_in2"], buffer_length: int | None = None, ) -> bytes: if endpoint == "primary_in": return self.protocol.transport.read(size=buffer_length, mode="low_speed") elif endpoint == "secondary_in": return self.protocol.transport.read(size=buffer_length, mode="high_speed") elif endpoint == "secondary_in2": return self.protocol.transport.read( size=buffer_length, mode="high_speed_alt" ) else: raise ValueError( "endpoint has to be one of 'primary_in', 'secondary_in', 'secondary_in2'" )