"""Defines messages transmitted between purpledrop microcontroller and driver
via the USB channel
"""
import struct
from typing import Optional, Sequence, Type, Union
[docs]class PurpleDropMessage(object):
[docs] @classmethod
def predictSize(cls, buf: bytes) -> int:
if len(buf) == 0:
return 0
msg_class = cls.findClassById(buf[0])
if msg_class is None:
return -1
return msg_class.predictSize(buf)
[docs] @classmethod
def findClassById(cls, id: int) -> Optional[Type]:
for sub_type in cls.__subclasses__():
if getattr(sub_type, 'ID') == id:
return sub_type
return None
[docs] @classmethod
def from_bytes(cls, buf: bytes) -> object:
if len(buf) == 0:
return None
msg_class = cls.findClassById(buf[0])
if msg_class is None:
return None
# TODO: Handle errors here. There's no guarantee that the frame we receive
# is proper.
return msg_class(buf)
[docs] def to_bytes(self) -> bytes:
raise RuntimeError("Abstract method called")
[docs]class ActiveCapacitanceMsg(PurpleDropMessage):
ID = 3
def __init__(self, fill_data: Optional[bytes]=None):
self.baseline = 0
self.measurement = 0
self.settings = 0
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 6
[docs] def fill(self, fill_data):
self.baseline, self.measurement, self.settings = struct.unpack_from("<HHB", fill_data, 1)
def __str__(self):
return f"ActiveCapacitanceMsg(baseline={self.baseline}, measurement={self.measurement}, settings={self.settings})"
[docs]class BulkCapacitanceMsg(PurpleDropMessage):
ID = 2
def __init__(self, fill_data: Optional[bytes]=None):
self.group_scan = 0
self.start_index = 0
self.count = 0
self.measurements: Sequence[int] = []
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
if len(buf) < 4:
return 0
else:
return buf[3] * 2 + 4
[docs] def fill(self, buf):
if len(buf) < 4:
raise ValueError("Need at least 4 bytes to parse a BulkCapacitanceMsg")
self.group_scan, self.start_index, self.count = struct.unpack_from("<BBB", buf, 1)
if len(buf) < self.count * 2 + 4:
raise ValueError(f"Not enough data for BulkCapacitanceMsg with count {self.count}")
self.measurements = struct.unpack_from("<" + "H" * self.count, buf, 4)
[docs]class CalibrateCommandMsg(PurpleDropMessage):
ID = 13
CAP_OFFSET_CMD = 0
def __init__(self, fill_data: Optional[bytes]=None):
self.command: Optional[int] = None
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 2
[docs] def fill(self, fill_data: bytes):
if len(fill_data) < 2:
raise ValueError("Need at least 2 bytes to parse a CalibrateCommandMsg")
self.command = int(fill_data[1])
[docs] def to_bytes(self) -> bytes:
return struct.pack("<BB", self.ID, self.command)
[docs]class CommandAckMsg(PurpleDropMessage):
ID = 4
def __init__(self, fill_data: Optional[bytes]=None):
self.acked_id = 0
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 2
[docs] def fill(self, buf):
if len(buf) < 2:
raise ValueError("Require at least 2 bytes to parse a CommandAckMsg")
self.acked_id = buf[1]
def __str__(self):
return f"CommandAckMsg(acked_id={self.acked_id})"
[docs]class DataBlobMsg(PurpleDropMessage):
ID = 10
# Types of blob data that can be requested
SOFTWARE_VERSION_ID = 0
OFFSET_CALIBRATION_ID = 1
def __init__(self, fill_data: Optional[bytes]=None):
self.blob_id = 0
self.chunk_index = 0
self.payload_size = 0
self.payload = bytes([])
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
if len(buf) < 3:
return 0
else:
return buf[2] + 5
[docs] def fill(self, fill_data: bytes):
if len(fill_data) < 5:
raise ValueError("Need at least 5 bytes for a DataBlobMsg")
self.blob_id = fill_data[1]
self.payload_size = fill_data[2]
self.chunk_index = struct.unpack_from("<H", fill_data, 3)[0]
if len(fill_data) < 5 + self.payload_size:
print(f"Insufficient data for DataBlobMsg. "\
"payload_size={self.payload_size}, only {len(fill_data)} bytes")
self.payload = fill_data[5:5+self.payload_size]
[docs] def to_bytes(self) -> bytes:
ret = struct.pack("<BBBH", self.ID, self.blob_id, self.payload_size, self.chunk_index)
ret += self.payload
return ret
[docs]class DutyCycleUpdatedMsg(PurpleDropMessage):
ID = 15
def __init__(self, fill_data: Optional[bytes]=None):
self.duty_cycle_A = 0
self.duty_cycle_B = 0
if(fill_data):
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 3
[docs] def fill(self, fill_data: bytes):
if len(fill_data) < 3:
raise ValueError("Need at least 3 bytes for a DutyCycleUpdated message")
self.duty_cycle_A = fill_data[1]
self.duty_cycle_B = fill_data[2]
[docs]class FeedbackCommandMsg(PurpleDropMessage):
ID = 16
# Modes
DISABLED = 0
NORMAL = 1
DIFFERENTIAL = 2
def __init__(self, fill_data: Optional[bytes]=None):
self.target = 0.0
self.mode = 0
self.input_groups_p_mask = 0
self.input_groups_n_mask = 0
self.baseline = 0
if(fill_data):
raise RuntimeError("Receiving FeedbackCommandMsg unimplemented")
[docs] def to_bytes(self) -> bytes:
return struct.pack(
"<BfBBBB",
self.ID,
self.target,
self.mode,
self.input_groups_p_mask,
self.input_groups_n_mask,
self.baseline
)
[docs]class ElectrodeEnableMsg(PurpleDropMessage):
ID = 0
def __init__(self, fill_data: Optional[bytes]=None):
self.group_id = 0
self.setting = 0
self.values = [0] * 16
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 19
[docs] def to_bytes(self):
return struct.pack("<BBB" + "B" * len(self.values),
*([self.ID, self.group_id, self.setting] + self.values))
[docs]class GpioControlMsg(PurpleDropMessage):
ID = 14
VALUE_FLAG = 1
OUTPUT_FLAG = 2
READ_FLAG = 128
def __init__(self, fill_data: Optional[bytes]=None):
self.pin = 0
self.flags = 0
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 3
[docs] def fill(self, fill_data: bytes):
if len(fill_data) < 3:
raise ValueError("Need at least 3 bytes for a GpioControlMsg")
self.pin = fill_data[1]
self.flags = fill_data[2]
[docs] def to_bytes(self) -> bytes:
return struct.pack("<BBB", self.ID, self.pin, self.flags)
@property
def value(self):
return (self.flags & self.VALUE_FLAG) != 0
@value.setter
def value(self, value):
if value:
self.flags |= self.VALUE_FLAG
else:
self.flags &= ~self.VALUE_FLAG
@property
def output_enable(self):
return (self.flags & self.OUTPUT_FLAG) != 0
@output_enable.setter
def output_enable(self, value):
if value:
self.flags |= self.OUTPUT_FLAG
else:
self.flags &= ~self.OUTPUT_FLAG
@property
def read(self):
return (self.flags & self.READ_FLAG) != 0
@read.setter
def read(self, value):
if value:
self.flags |= self.READ_FLAG
else:
self.flags &= ~self.READ_FLAG
[docs]class ParameterDescriptorMsg(PurpleDropMessage):
ID = 12
def __init__(self, fill_data: Optional[bytes]=None):
self.param_id: Optional[int] = None
self.value: Optional[Union[float, int]] = None
self.sequence_number: Optional[int] = None
self.sequence_total: Optional[int] = None
self.name: Optional[str] = None
self.description: Optional[str] = None
self.type: Optional[str] = None
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
if len(buf) < 3:
return 0
str_size = struct.unpack_from("<H", buf, 1)[0]
return str_size + 15
[docs] def fill(self, fill_data: bytes):
str_size, self.param_id = struct.unpack_from("<HI", fill_data, 1)
str_section = fill_data[15:]
separators = [i for i, b in enumerate(str_section) if b == 0]
if len(separators) != 2:
raise ValueError(f"Expected two string separators in ParameterDescriptorMsg, found {len(separators)}")
self.name = str_section[0:separators[0]].decode('utf-8')
self.description = str_section[separators[0]+1:separators[1]].decode('utf-8')
self.type = str_section[separators[1]+1:].decode('utf-8')
if self.type == 'float':
self.value = struct.unpack_from("<f", fill_data, 7)[0]
else:
self.value = struct.unpack_from("<i", fill_data, 7)[0]
self.sequence_number, self.sequence_total = struct.unpack_from("<HH", fill_data, 11)
[docs] def to_bytes(self) -> bytes:
# Send request message
return struct.pack("<B", self.ID)
[docs]class SetGainMsg(PurpleDropMessage):
ID = 11
def __init__(self, fill_data: Optional[bytes]=None):
self.gains: Sequence[int] = []
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return -1
[docs] def to_bytes(self):
# Store a count byte, and then 2 bits per gain
data = [self.ID, len(self.gains)]
counter = 0
for g in self.gains:
if counter == 0:
data.append(0)
data[-1] |= (g & 0x3) << (counter * 2)
counter = (counter + 1) % 4
return bytes(data)
[docs]class SetParameterMsg(PurpleDropMessage):
ID = 6
def __init__(self, fill_data: Optional[bytes]=None):
if fill_data is not None:
if len(fill_data) < 10:
raise RuntimeError("Need at least 10 bytes to fill a SetParameterMsg")
self._buf = bytearray(fill_data)
else:
self._buf = bytearray([self.ID] + [0]*9)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 10
[docs] def param_idx(self) -> int:
return struct.unpack_from("<I", self._buf, 1)[0]
[docs] def set_param_idx(self, value: int):
struct.pack_into("<I", self._buf, 1, value)
[docs] def param_value_float(self) -> float:
return struct.unpack_from("<f", self._buf, 5)[0]
[docs] def set_param_value_float(self, value: float):
struct.pack_into("<f", self._buf, 5, value)
[docs] def param_value_int(self) -> int:
return struct.unpack_from("<i", self._buf, 5)[0]
[docs] def set_param_value_int(self, value: int):
struct.pack_into("<i", self._buf, 5, value)
[docs] def write_flag(self) -> bool:
if self._buf[9] == 0:
return False
else:
return True
[docs] def set_write_flag(self, flag: bool):
if flag:
self._buf[9] = 1
else:
self._buf[9] = 0
[docs] def fill(self, fill_data: bytes):
self._buf = bytearray(fill_data)
[docs] def to_bytes(self) -> bytes:
return bytes(self._buf)
def __str__(self):
return "SetParameterMsg(param_idx=%d, param_value=%d, write_flag=%d)" % \
(self.param_idx(), self.param_value_int(), self.write_flag())
[docs]class SetPwmMsg(PurpleDropMessage):
ID = 9
def __init__(self, fill_data: Optional[bytes]=None):
self.chan = 0
self.duty_cycle = 0.0
if fill_data is not None:
self.fill(fill_data)
[docs] def fill(self, buf: bytes):
raise RuntimeError("Not implemented")
[docs] def to_bytes(self) -> bytes:
return struct.pack("<BBH", self.ID, self.chan, int(self.duty_cycle * 4096))
[docs]class TemperatureMsg(PurpleDropMessage):
ID = 7
def __init__(self, fill_data: Optional[bytes]=None):
self.measurements: Sequence[int] = []
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
if(len(buf) < 2):
return 0
else:
return buf[1]*2 + 2
[docs] def fill(self, buf: bytes):
if len(buf) < 2:
raise ValueError("Insufficient bytes for TemperatureMsg")
count = buf[1]
if len(buf) < count * 2 + 2:
raise ValueError("Insufficient bytes for TemperatureMsg")
self.measurements = struct.unpack_from("<" + "h"*count, buf, 2)
[docs] def to_bytes(self) -> bytes:
count = len(self.measurements)
return struct.pack("<BB"+"h"*count, [self.ID, count] + list(self.measurements))
def __str__(self):
return "TemperatureMsg(measurements=%s)" % str(self.measurements)
[docs]class HvRegulatorMsg(PurpleDropMessage):
ID = 8
def __init__(self, fill_data: Optional[bytes]=None):
self.voltage = 0.0
self.v_target_out = 0
if fill_data is not None:
self.fill(fill_data)
[docs] @staticmethod
def predictSize(buf: bytes) -> int:
return 7
[docs] def fill(self, buf: bytes):
if len(buf) < 7:
raise ValueError("Insufficient bytes for HvRegulatorMsg")
self.voltage = struct.unpack_from("<f" , buf, 1)[0]
self.v_target_out = struct.unpack_from("<h", buf, 5)[0]
[docs] def to_bytes(self) -> bytes:
return struct.pack("<Bfh", [self.ID, self.voltage, self.v_target_out])
def __str__(self):
return "HvRegulatorMsg(voltage=%0.1f, v_target_out=%d)" % \
(self.voltage, self.v_target_out)