Source code for purpledrop.messages

"""Defines messages transmitted between purpledrop microcontroller and driver
via the USB channel
"""
import struct
from typing import Optional, Sequence, Type, Union

[docs]class PurpleDropMessage(object):
[docs] @classmethod def predictSize(cls, buf: bytes) -> int: if len(buf) == 0: return 0 msg_class = cls.findClassById(buf[0]) if msg_class is None: return -1 return msg_class.predictSize(buf)
[docs] @classmethod def findClassById(cls, id: int) -> Optional[Type]: for sub_type in cls.__subclasses__(): if getattr(sub_type, 'ID') == id: return sub_type return None
[docs] @classmethod def from_bytes(cls, buf: bytes) -> object: if len(buf) == 0: return None msg_class = cls.findClassById(buf[0]) if msg_class is None: return None # TODO: Handle errors here. There's no guarantee that the frame we receive # is proper. return msg_class(buf)
[docs] def to_bytes(self) -> bytes: raise RuntimeError("Abstract method called")
[docs]class ActiveCapacitanceMsg(PurpleDropMessage): ID = 3 def __init__(self, fill_data: Optional[bytes]=None): self.baseline = 0 self.measurement = 0 self.settings = 0 if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 6
[docs] def fill(self, fill_data): self.baseline, self.measurement, self.settings = struct.unpack_from("<HHB", fill_data, 1)
def __str__(self): return f"ActiveCapacitanceMsg(baseline={self.baseline}, measurement={self.measurement}, settings={self.settings})"
[docs]class BulkCapacitanceMsg(PurpleDropMessage): ID = 2 def __init__(self, fill_data: Optional[bytes]=None): self.group_scan = 0 self.start_index = 0 self.count = 0 self.measurements: Sequence[int] = [] if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: if len(buf) < 4: return 0 else: return buf[3] * 2 + 4
[docs] def fill(self, buf): if len(buf) < 4: raise ValueError("Need at least 4 bytes to parse a BulkCapacitanceMsg") self.group_scan, self.start_index, self.count = struct.unpack_from("<BBB", buf, 1) if len(buf) < self.count * 2 + 4: raise ValueError(f"Not enough data for BulkCapacitanceMsg with count {self.count}") self.measurements = struct.unpack_from("<" + "H" * self.count, buf, 4)
[docs]class CalibrateCommandMsg(PurpleDropMessage): ID = 13 CAP_OFFSET_CMD = 0 def __init__(self, fill_data: Optional[bytes]=None): self.command: Optional[int] = None if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 2
[docs] def fill(self, fill_data: bytes): if len(fill_data) < 2: raise ValueError("Need at least 2 bytes to parse a CalibrateCommandMsg") self.command = int(fill_data[1])
[docs] def to_bytes(self) -> bytes: return struct.pack("<BB", self.ID, self.command)
[docs]class CommandAckMsg(PurpleDropMessage): ID = 4 def __init__(self, fill_data: Optional[bytes]=None): self.acked_id = 0 if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 2
[docs] def fill(self, buf): if len(buf) < 2: raise ValueError("Require at least 2 bytes to parse a CommandAckMsg") self.acked_id = buf[1]
def __str__(self): return f"CommandAckMsg(acked_id={self.acked_id})"
[docs]class DataBlobMsg(PurpleDropMessage): ID = 10 # Types of blob data that can be requested SOFTWARE_VERSION_ID = 0 OFFSET_CALIBRATION_ID = 1 def __init__(self, fill_data: Optional[bytes]=None): self.blob_id = 0 self.chunk_index = 0 self.payload_size = 0 self.payload = bytes([]) if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: if len(buf) < 3: return 0 else: return buf[2] + 5
[docs] def fill(self, fill_data: bytes): if len(fill_data) < 5: raise ValueError("Need at least 5 bytes for a DataBlobMsg") self.blob_id = fill_data[1] self.payload_size = fill_data[2] self.chunk_index = struct.unpack_from("<H", fill_data, 3)[0] if len(fill_data) < 5 + self.payload_size: print(f"Insufficient data for DataBlobMsg. "\ "payload_size={self.payload_size}, only {len(fill_data)} bytes") self.payload = fill_data[5:5+self.payload_size]
[docs] def to_bytes(self) -> bytes: ret = struct.pack("<BBBH", self.ID, self.blob_id, self.payload_size, self.chunk_index) ret += self.payload return ret
[docs]class DutyCycleUpdatedMsg(PurpleDropMessage): ID = 15 def __init__(self, fill_data: Optional[bytes]=None): self.duty_cycle_A = 0 self.duty_cycle_B = 0 if(fill_data): self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 3
[docs] def fill(self, fill_data: bytes): if len(fill_data) < 3: raise ValueError("Need at least 3 bytes for a DutyCycleUpdated message") self.duty_cycle_A = fill_data[1] self.duty_cycle_B = fill_data[2]
[docs]class FeedbackCommandMsg(PurpleDropMessage): ID = 16 # Modes DISABLED = 0 NORMAL = 1 DIFFERENTIAL = 2 def __init__(self, fill_data: Optional[bytes]=None): self.target = 0.0 self.mode = 0 self.input_groups_p_mask = 0 self.input_groups_n_mask = 0 self.baseline = 0 if(fill_data): raise RuntimeError("Receiving FeedbackCommandMsg unimplemented")
[docs] def to_bytes(self) -> bytes: return struct.pack( "<BfBBBB", self.ID, self.target, self.mode, self.input_groups_p_mask, self.input_groups_n_mask, self.baseline )
[docs]class ElectrodeEnableMsg(PurpleDropMessage): ID = 0 def __init__(self, fill_data: Optional[bytes]=None): self.group_id = 0 self.setting = 0 self.values = [0] * 16
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 19
[docs] def to_bytes(self): return struct.pack("<BBB" + "B" * len(self.values), *([self.ID, self.group_id, self.setting] + self.values))
[docs]class GpioControlMsg(PurpleDropMessage): ID = 14 VALUE_FLAG = 1 OUTPUT_FLAG = 2 READ_FLAG = 128 def __init__(self, fill_data: Optional[bytes]=None): self.pin = 0 self.flags = 0 if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 3
[docs] def fill(self, fill_data: bytes): if len(fill_data) < 3: raise ValueError("Need at least 3 bytes for a GpioControlMsg") self.pin = fill_data[1] self.flags = fill_data[2]
[docs] def to_bytes(self) -> bytes: return struct.pack("<BBB", self.ID, self.pin, self.flags)
@property def value(self): return (self.flags & self.VALUE_FLAG) != 0 @value.setter def value(self, value): if value: self.flags |= self.VALUE_FLAG else: self.flags &= ~self.VALUE_FLAG @property def output_enable(self): return (self.flags & self.OUTPUT_FLAG) != 0 @output_enable.setter def output_enable(self, value): if value: self.flags |= self.OUTPUT_FLAG else: self.flags &= ~self.OUTPUT_FLAG @property def read(self): return (self.flags & self.READ_FLAG) != 0 @read.setter def read(self, value): if value: self.flags |= self.READ_FLAG else: self.flags &= ~self.READ_FLAG
[docs]class ParameterDescriptorMsg(PurpleDropMessage): ID = 12 def __init__(self, fill_data: Optional[bytes]=None): self.param_id: Optional[int] = None self.value: Optional[Union[float, int]] = None self.sequence_number: Optional[int] = None self.sequence_total: Optional[int] = None self.name: Optional[str] = None self.description: Optional[str] = None self.type: Optional[str] = None if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: if len(buf) < 3: return 0 str_size = struct.unpack_from("<H", buf, 1)[0] return str_size + 15
[docs] def fill(self, fill_data: bytes): str_size, self.param_id = struct.unpack_from("<HI", fill_data, 1) str_section = fill_data[15:] separators = [i for i, b in enumerate(str_section) if b == 0] if len(separators) != 2: raise ValueError(f"Expected two string separators in ParameterDescriptorMsg, found {len(separators)}") self.name = str_section[0:separators[0]].decode('utf-8') self.description = str_section[separators[0]+1:separators[1]].decode('utf-8') self.type = str_section[separators[1]+1:].decode('utf-8') if self.type == 'float': self.value = struct.unpack_from("<f", fill_data, 7)[0] else: self.value = struct.unpack_from("<i", fill_data, 7)[0] self.sequence_number, self.sequence_total = struct.unpack_from("<HH", fill_data, 11)
[docs] def to_bytes(self) -> bytes: # Send request message return struct.pack("<B", self.ID)
[docs]class SetGainMsg(PurpleDropMessage): ID = 11 def __init__(self, fill_data: Optional[bytes]=None): self.gains: Sequence[int] = []
[docs] @staticmethod def predictSize(buf: bytes) -> int: return -1
[docs] def to_bytes(self): # Store a count byte, and then 2 bits per gain data = [self.ID, len(self.gains)] counter = 0 for g in self.gains: if counter == 0: data.append(0) data[-1] |= (g & 0x3) << (counter * 2) counter = (counter + 1) % 4 return bytes(data)
[docs]class SetParameterMsg(PurpleDropMessage): ID = 6 def __init__(self, fill_data: Optional[bytes]=None): if fill_data is not None: if len(fill_data) < 10: raise RuntimeError("Need at least 10 bytes to fill a SetParameterMsg") self._buf = bytearray(fill_data) else: self._buf = bytearray([self.ID] + [0]*9)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 10
[docs] def param_idx(self) -> int: return struct.unpack_from("<I", self._buf, 1)[0]
[docs] def set_param_idx(self, value: int): struct.pack_into("<I", self._buf, 1, value)
[docs] def param_value_float(self) -> float: return struct.unpack_from("<f", self._buf, 5)[0]
[docs] def set_param_value_float(self, value: float): struct.pack_into("<f", self._buf, 5, value)
[docs] def param_value_int(self) -> int: return struct.unpack_from("<i", self._buf, 5)[0]
[docs] def set_param_value_int(self, value: int): struct.pack_into("<i", self._buf, 5, value)
[docs] def write_flag(self) -> bool: if self._buf[9] == 0: return False else: return True
[docs] def set_write_flag(self, flag: bool): if flag: self._buf[9] = 1 else: self._buf[9] = 0
[docs] def fill(self, fill_data: bytes): self._buf = bytearray(fill_data)
[docs] def to_bytes(self) -> bytes: return bytes(self._buf)
def __str__(self): return "SetParameterMsg(param_idx=%d, param_value=%d, write_flag=%d)" % \ (self.param_idx(), self.param_value_int(), self.write_flag())
[docs]class SetPwmMsg(PurpleDropMessage): ID = 9 def __init__(self, fill_data: Optional[bytes]=None): self.chan = 0 self.duty_cycle = 0.0 if fill_data is not None: self.fill(fill_data)
[docs] def fill(self, buf: bytes): raise RuntimeError("Not implemented")
[docs] def to_bytes(self) -> bytes: return struct.pack("<BBH", self.ID, self.chan, int(self.duty_cycle * 4096))
[docs]class TemperatureMsg(PurpleDropMessage): ID = 7 def __init__(self, fill_data: Optional[bytes]=None): self.measurements: Sequence[int] = [] if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: if(len(buf) < 2): return 0 else: return buf[1]*2 + 2
[docs] def fill(self, buf: bytes): if len(buf) < 2: raise ValueError("Insufficient bytes for TemperatureMsg") count = buf[1] if len(buf) < count * 2 + 2: raise ValueError("Insufficient bytes for TemperatureMsg") self.measurements = struct.unpack_from("<" + "h"*count, buf, 2)
[docs] def to_bytes(self) -> bytes: count = len(self.measurements) return struct.pack("<BB"+"h"*count, [self.ID, count] + list(self.measurements))
def __str__(self): return "TemperatureMsg(measurements=%s)" % str(self.measurements)
[docs]class HvRegulatorMsg(PurpleDropMessage): ID = 8 def __init__(self, fill_data: Optional[bytes]=None): self.voltage = 0.0 self.v_target_out = 0 if fill_data is not None: self.fill(fill_data)
[docs] @staticmethod def predictSize(buf: bytes) -> int: return 7
[docs] def fill(self, buf: bytes): if len(buf) < 7: raise ValueError("Insufficient bytes for HvRegulatorMsg") self.voltage = struct.unpack_from("<f" , buf, 1)[0] self.v_target_out = struct.unpack_from("<h", buf, 5)[0]
[docs] def to_bytes(self) -> bytes: return struct.pack("<Bfh", [self.ID, self.voltage, self.v_target_out])
def __str__(self): return "HvRegulatorMsg(voltage=%0.1f, v_target_out=%d)" % \ (self.voltage, self.v_target_out)