Message Formats

class src.utils.message_formats.AveperiodMetadataMessage[source]

Bases: object

Defines a message containing metadata about an averaging period of data. Message is sent from radar_control to data_write.

__init__(
experiment_id=None,
experiment_name=None,
experiment_comment=None,
rx_ctr_freq=None,
num_sequences=None,
last_sqn_num=None,
scan_flag=None,
aveperiod_time=None,
input_sample_rate=None,
data_normalization_factor=None,
scheduling_mode=None,
sequences=<factory>,
cfs_freqs=<factory>,
cfs_noise=<factory>,
cfs_range=<factory>,
cfs_masks=<factory>,
cfs_slice_ids=<factory>,
)
Parameters:
Return type:

None

aveperiod_time: float = None
cfs_freqs: list
cfs_masks: dict
cfs_noise: dict
cfs_range: dict
cfs_slice_ids: list
data_normalization_factor: float = None
experiment_comment: str = None
experiment_id: int = None
experiment_name: str = None
input_sample_rate: float = None
last_sqn_num: int = None
num_sequences: int = None
rx_ctr_freq: float = None
scan_flag: bool = None
scheduling_mode: str = None
sequences: list[Sequence]
class src.utils.message_formats.Beam[source]

Bases: object

Defines a beam structure for inclusion in an RxChannelMetadata

__init__(beam_azimuth=None, beam_num=None)
Parameters:
Return type:

None

beam_azimuth: float = None
beam_num: int = None
class src.utils.message_formats.CustomSerialization[source]

Bases: object

CustomSerialization()

classmethod parse(message)[source]

Parses a string of k1=v1 k2=v2 into object

Parameters:

message (str)

__init__()
Return type:

None

format_for_ipc()[source]
class src.utils.message_formats.DebugDataStage[source]

Bases: object

Defines a stage of debug data (filtered data or antennas_iq data plus associated metadata).

__init__(
stage_name=None,
main_shm=None,
intf_shm=None,
num_samps=None,
)
Parameters:
  • stage_name (str)

  • main_shm (str)

  • intf_shm (str)

  • num_samps (int)

Return type:

None

intf_shm: str = None
main_shm: str = None
num_samps: int = None
stage_name: str = None
class src.utils.message_formats.DriverPacket[source]

Bases: CustomSerialization

Message from radar_control to usrp_driver.

__init__(
sequence_num=0,
rxrate=0.0,
txrate=0.0,
txcenterfreq=0.0,
rxcenterfreq=0.0,
num_rx_samps=0,
num_tx_samps=0,
seqtime=0.0,
sample_timing=0.0,
burst_start=False,
burst_end=False,
align_sequences=False,
buffer_offset=0,
)
Parameters:
Return type:

None

align_sequences: bool = False
buffer_offset: int = 0
burst_end: bool = False
burst_start: bool = False
num_rx_samps: int = 0
num_tx_samps: int = 0
rxcenterfreq: float = 0.0
rxrate: float = 0.0
sample_timing: float = 0.0
seqtime: float = 0.0
sequence_num: int = 0
txcenterfreq: float = 0.0
txrate: float = 0.0
class src.utils.message_formats.Lag[source]

Bases: object

Defines a lag structure within an RxChannel dataclass

__init__(
pulse_1=None,
pulse_2=None,
lag_num=None,
phase_offset_real=None,
phase_offset_imag=None,
)
Parameters:
  • pulse_1 (int)

  • pulse_2 (int)

  • lag_num (int)

  • phase_offset_real (float)

  • phase_offset_imag (float)

Return type:

None

lag_num: int = None
phase_offset_imag: float = None
phase_offset_real: float = None
pulse_1: int = None
pulse_2: int = None
class src.utils.message_formats.LagTable[source]

Bases: object

Defines a ltab structure for inclusion in a RxChannelMetadata

__init__(pulse_position=<factory>, lag_num=None)
Parameters:
Return type:

None

lag_num: int = None
pulse_position: list[int]
class src.utils.message_formats.OutputDataset[source]

Bases: object

Defines an output dataset message.

__init__(
slice_id=None,
num_beams=None,
num_ranges=None,
num_lags=None,
main_acf_shm=None,
intf_acf_shm=None,
xcf_shm=None,
cfs_data=<factory>,
)
Parameters:
  • slice_id (int)

  • num_beams (int)

  • num_ranges (int)

  • num_lags (int)

  • main_acf_shm (str)

  • intf_acf_shm (str)

  • xcf_shm (str)

  • cfs_data (list)

Return type:

None

cfs_data: list
intf_acf_shm: str = None
main_acf_shm: str = None
num_beams: int = None
num_lags: int = None
num_ranges: int = None
slice_id: int = None
xcf_shm: str = None
class src.utils.message_formats.ProcessedSequenceMessage[source]

Bases: object

Defines a message containing metadata about a processed sequence of data. This message format is for communication from rx_signal_processing to data_write.

__init__(
sequence_num=None,
rx_sample_rate=None,
output_sample_rate=None,
initialization_time=None,
sequence_start_time=None,
gps_to_system_time_diff=None,
agc_status_bank_h=None,
lp_status_bank_h=None,
agc_status_bank_l=None,
lp_status_bank_l=None,
gps_locked=None,
bfiq_main_shm=None,
bfiq_intf_shm=None,
max_num_beams=None,
num_samps=None,
main_corrs_shm=None,
intf_corrs_shm=None,
cross_corrs_shm=None,
rawrf_shm=None,
rawrf_num_samps=None,
debug_data=<factory>,
output_datasets=<factory>,
cfs_freq=<factory>,
)
Parameters:
Return type:

None

agc_status_bank_h: int = None
agc_status_bank_l: int = None
bfiq_intf_shm: str = None
bfiq_main_shm: str = None
cfs_freq: list
cross_corrs_shm: str = None
debug_data: list[DebugDataStage]
gps_locked: bool = None
gps_to_system_time_diff: float = None
initialization_time: float = None
intf_corrs_shm: str = None
lp_status_bank_h: int = None
lp_status_bank_l: int = None
main_corrs_shm: str = None
max_num_beams: int = None
num_samps: int = None
output_datasets: list[OutputDataset]
output_sample_rate: float = None
rawrf_num_samps: int = None
rawrf_shm: str = None
rx_sample_rate: float = None
sequence_num: int = None
sequence_start_time: float = None
class src.utils.message_formats.RxChannel[source]

Bases: object

Defines the rx_channel structure within a SequenceMetadataMessage

__init__(
slice_id=None,
tau_spacing=None,
rx_freq=None,
cfs_flag=None,
num_ranges=None,
first_range=None,
range_sep=None,
rx_intf_antennas=<factory>,
beam_phases=None,
lags=<factory>,
pulses=<factory>,
acf=False,
xcf=False,
acfint=False,
)
Parameters:
Return type:

None

acf: bool = False
acfint: bool = False
beam_phases: ndarray = None
cfs_flag: bool = None
first_range: int = None
lags: list[Lag]
num_ranges: int = None
pulses: list
range_sep: float = None
rx_freq: float = None
rx_intf_antennas: list[int]
slice_id: int = None
tau_spacing: int = None
xcf: bool = False
class src.utils.message_formats.RxChannelMetadata[source]

Bases: object

Defines an RxChannelMetadata structure for inclusion in an AveperiodMetadataMessage

__init__(
slice_id=None,
slice_comment=None,
interfacing=None,
rx_only=None,
pulse_len=None,
tau_spacing=None,
rx_freq=None,
ptab=<factory>,
sequence_encodings=<factory>,
rx_main_antennas=<factory>,
rx_intf_antennas=<factory>,
rx_main_excitations=<factory>,
rx_intf_excitations=<factory>,
tx_antennas=<factory>,
tx_excitations=<factory>,
beams=<factory>,
first_range=None,
num_ranges=None,
range_sep=None,
acf=None,
xcf=None,
acfint=None,
ltabs=<factory>,
averaging_method=None,
)
Parameters:
Return type:

None

acf: bool = None
acfint: bool = None
averaging_method: str = None
beams: list[Beam]
first_range: float = None
interfacing: str = None
ltabs: list[LagTable]
num_ranges: int = None
ptab: list[int]
pulse_len: int = None
range_sep: int = None
rx_freq: float = None
rx_intf_antennas: list[int]
rx_intf_excitations: list[complex]
rx_main_antennas: list[int]
rx_main_excitations: list[complex]
rx_only: bool = None
sequence_encodings: list
slice_comment: str = None
slice_id: int = None
tau_spacing: int = None
tx_antennas: list[int]
tx_excitations: list[complex]
xcf: bool = None
class src.utils.message_formats.RxSamplesMetadata[source]

Bases: CustomSerialization

Message from usrp_driver to rx_signal_processing.

__init__(
sequence_num=0,
num_rx_samps=0,
rx_rate=0.0,
sequence_time=0.0,
initialization_time=0.0,
sequence_start_time=0.0,
ringbuffer_size=0,
agc_status_bank_h=0,
lp_status_bank_h=0,
agc_status_bank_l=0,
lp_status_bank_l=0,
gps_locked=False,
gps_to_system_time_diff=0.0,
)
Parameters:
  • sequence_num (int)

  • num_rx_samps (int)

  • rx_rate (float)

  • sequence_time (float)

  • initialization_time (float)

  • sequence_start_time (float)

  • ringbuffer_size (int)

  • agc_status_bank_h (int)

  • lp_status_bank_h (int)

  • agc_status_bank_l (int)

  • lp_status_bank_l (int)

  • gps_locked (bool)

  • gps_to_system_time_diff (float)

Return type:

None

agc_status_bank_h: int = 0
agc_status_bank_l: int = 0
gps_locked: bool = False
gps_to_system_time_diff: float = 0.0
initialization_time: float = 0.0
lp_status_bank_h: int = 0
lp_status_bank_l: int = 0
num_rx_samps: int = 0
ringbuffer_size: int = 0
rx_rate: float = 0.0
sequence_num: int = 0
sequence_start_time: float = 0.0
sequence_time: float = 0.0
class src.utils.message_formats.Sequence[source]

Bases: object

Defines a sequence structure for inclusion in an AveperiodMetadataMessage

__init__(
blanks=<factory>,
output_sample_rate=None,
rx_channels=<factory>,
)
Parameters:
Return type:

None

blanks: list[int]
output_sample_rate: float = None
rx_channels: list[RxChannelMetadata]
class src.utils.message_formats.SequenceMetadataMessage[source]

Bases: object

Defines a message containing metadata about a sequence of data. This message format is for communication from radar_control to rx_signal_processing.

__init__(
sequence_num=None,
sequence_time=None,
offset_to_first_rx_sample=None,
rx_rate=None,
output_sample_rate=None,
rx_ctr_freq=None,
decimation_scheme=None,
rx_channels=<factory>,
acf=False,
xcf=False,
acfint=False,
cfs_scan_flag=False,
cfs_fft_n=None,
)
Parameters:
Return type:

None

acf: bool = False
acfint: bool = False
cfs_fft_n: int = None
cfs_scan_flag: bool = False
decimation_scheme: DecimationScheme = None
offset_to_first_rx_sample: int = None
output_sample_rate: float = None
rx_channels: list[RxChannel]
rx_ctr_freq: float = None
rx_rate: float = None
sequence_num: int = None
sequence_time: float = None
xcf: bool = False