Source code for src.utils.message_formats

from dataclasses import dataclass, field, fields
import numpy as np

from .decimation_scheme import DecimationScheme


[docs] @dataclass class DebugDataStage: """ Defines a stage of debug data (filtered data or antennas_iq data plus associated metadata). """ stage_name: str = None main_shm: str = None intf_shm: str = None num_samps: int = None
[docs] @dataclass class OutputDataset: """ Defines an output dataset message. """ slice_id: int = None num_beams: int = None num_ranges: int = None num_lags: int = None main_acf_shm: str = None intf_acf_shm: str = None xcf_shm: str = None cfs_data: list = field(default_factory=list)
[docs] @dataclass class ProcessedSequenceMessage: """ Defines a message containing metadata about a processed sequence of data. This message format is for communication from rx_signal_processing to data_write. """ sequence_num: int = None rx_sample_rate: float = None output_sample_rate: float = None initialization_time: float = None sequence_start_time: float = None gps_to_system_time_diff: float = None agc_status_bank_h: int = None lp_status_bank_h: int = None agc_status_bank_l: int = None lp_status_bank_l: int = None gps_locked: bool = None bfiq_main_shm: str = None bfiq_intf_shm: str = None max_num_beams: int = None num_samps: int = None main_corrs_shm: str = None intf_corrs_shm: str = None cross_corrs_shm: str = None rawrf_shm: str = None rawrf_num_samps: int = None debug_data: list[DebugDataStage] = field(default_factory=list) output_datasets: list[OutputDataset] = field(default_factory=list) cfs_freq: list = field(default_factory=list)
[docs] @dataclass class Lag: """Defines a lag structure within an RxChannel dataclass""" pulse_1: int = None pulse_2: int = None lag_num: int = None phase_offset_real: float = None phase_offset_imag: float = None
[docs] @dataclass class RxChannel: """Defines the rx_channel structure within a SequenceMetadataMessage""" slice_id: int = None tau_spacing: int = None rx_freq: float = None cfs_flag: bool = None num_ranges: int = None first_range: int = None range_sep: float = None rx_intf_antennas: list[int] = field(default_factory=list) beam_phases: np.ndarray = None lags: list[Lag] = field(default_factory=list) pulses: list = field(default_factory=list) acf: bool = False xcf: bool = False acfint: bool = False
[docs] @dataclass class SequenceMetadataMessage: """ Defines a message containing metadata about a sequence of data. This message format is for communication from radar_control to rx_signal_processing. """ sequence_num: int = None sequence_time: float = None offset_to_first_rx_sample: int = None rx_rate: float = None output_sample_rate: float = None rx_ctr_freq: float = None decimation_scheme: DecimationScheme = None rx_channels: list[RxChannel] = field(default_factory=list) acf: bool = False xcf: bool = False acfint: bool = False cfs_scan_flag: bool = False cfs_fft_n: int = None
[docs] @dataclass class Beam: """Defines a beam structure for inclusion in an RxChannelMetadata""" beam_azimuth: float = None beam_num: int = None
[docs] @dataclass class LagTable: """Defines a ltab structure for inclusion in a RxChannelMetadata""" pulse_position: list[int] = field(default_factory=list) lag_num: int = None
[docs] @dataclass class RxChannelMetadata: """Defines an RxChannelMetadata structure for inclusion in an AveperiodMetadataMessage""" slice_id: int = None slice_comment: str = None interfacing: str = None rx_only: bool = None pulse_len: int = None tau_spacing: int = None rx_freq: float = None ptab: list[int] = field(default_factory=list) sequence_encodings: list = field(default_factory=list) rx_main_antennas: list[int] = field(default_factory=list) rx_intf_antennas: list[int] = field(default_factory=list) rx_main_excitations: list[complex] = field(default_factory=list) rx_intf_excitations: list[complex] = field(default_factory=list) tx_antennas: list[int] = field(default_factory=list) tx_excitations: list[complex] = field(default_factory=list) beams: list[Beam] = field(default_factory=list) first_range: float = None num_ranges: int = None range_sep: int = None acf: bool = None xcf: bool = None acfint: bool = None ltabs: list[LagTable] = field(default_factory=list) averaging_method: str = None
[docs] @dataclass class Sequence: """Defines a sequence structure for inclusion in an AveperiodMetadataMessage""" blanks: list[int] = field(default_factory=list) output_sample_rate: float = None rx_channels: list[RxChannelMetadata] = field(default_factory=list)
[docs] @dataclass class AveperiodMetadataMessage: """ Defines a message containing metadata about an averaging period of data. Message is sent from radar_control to data_write. """ experiment_id: int = None experiment_name: str = None experiment_comment: str = None rx_ctr_freq: float = None num_sequences: int = None last_sqn_num: int = None scan_flag: bool = None aveperiod_time: float = None input_sample_rate: float = None data_normalization_factor: float = None scheduling_mode: str = None sequences: list[Sequence] = field(default_factory=list) cfs_freqs: list = field(default_factory=list) cfs_noise: dict = field(default_factory=dict) cfs_range: dict = field(default_factory=dict) cfs_masks: dict = field(default_factory=dict) cfs_slice_ids: list = field(default_factory=list)
[docs] @dataclass class CustomSerialization:
[docs] @classmethod def parse(cls, message: str): """Parses a string of `k1=v1 k2=v2` into object""" packet = cls() split_reply = message.split(" ") # expect "k1=v1 k2=v2 k3=v3" for token in split_reply: split_token = token.split("=") k = split_token[0] v = split_token[1] var_type = type(getattr(packet, k)) if var_type is bool: v = bool(int(v)) # bool("0") -> True, bool(int("0")) -> False else: v = var_type(v) setattr(packet, k, v) return packet
[docs] def format_for_ipc(self): str_list = list() for f in fields(self): v = getattr(self, f.name) if isinstance(v, bool): v = int(v) str_list.append(f"{f.name}={v}") msg_str = " ".join(str_list) return msg_str.encode("utf-8")
[docs] @dataclass class RxSamplesMetadata(CustomSerialization): """ Message from usrp_driver to rx_signal_processing. """ sequence_num: int = 0 num_rx_samps: int = 0 rx_rate: float = 0.0 sequence_time: float = 0.0 initialization_time: float = 0.0 sequence_start_time: float = 0.0 ringbuffer_size: int = 0 agc_status_bank_h: int = 0 lp_status_bank_h: int = 0 agc_status_bank_l: int = 0 lp_status_bank_l: int = 0 gps_locked: bool = False gps_to_system_time_diff: float = 0.0
[docs] @dataclass class DriverPacket(CustomSerialization): """ Message from radar_control to usrp_driver. """ sequence_num: int = 0 rxrate: float = 0.0 txrate: float = 0.0 txcenterfreq: float = 0.0 rxcenterfreq: float = 0.0 num_rx_samps: int = 0 num_tx_samps: int = 0 seqtime: float = 0.0 sample_timing: float = 0.0 burst_start: bool = False burst_end: bool = False align_sequences: bool = False buffer_offset: int = 0