"""
Aggregator class for collecting data from each sequence within an averaging period.
The :mod:`src.data_write` module uses an :class:`Aggregator` object to ingest processed data from
:mod:`src.rx_signal_processing`. Once :mod:`src.radar_control` sends metadata for an averaging period, data from
the :class:`Aggregator` is collected into numpy arrays for writing to file.
"""
import collections
import copy
from dataclasses import dataclass, fields, field
from multiprocessing import shared_memory
from typing import Union
import numpy as np
from utils.message_formats import DebugDataStage, ProcessedSequenceMessage
[docs]
@dataclass
class Aggregator:
"""
Aggregator for ingesting :class:`ProcessedSequenceMessage` data during an averaging period.
**Usage**::
aggregator = Aggregator(
num_main_antennas=16,
rx_main_antennas=[i for i in range(16)],
rx_intf_antennas=[i for i in range(4)],
)
packet = ProcessedSequenceMessage() # this is generated by rx_signal_processing
aggregator.update(packet) # can do this for as many ProcessedSequenceMessage's as you get
aggregator.finalize() # Now internally aggregator contains numpy.array objects
Consider an averaging period with two slices, IDs ``0`` and ``1``, that are ``SEQUENCE`` interfaced. These two
slices run one after the other, so the number of pulse sequences for each at the end of the averaging period may
differ by at most one. These slices may have different beam orders, numbers of range gates, pulse sequences,
etc. At the end of the averaging period, the accumulator fields could look something like this::
antennas_iq_accumulator = {
0: { # all antennas sampled for this slice
'stage_0': {
0: np.ndarray, # shape [19, 45084] ([num_sequences, num_samples])
1: np.ndarray, # shape [19, 45084]
...,
19: np.ndarray, # shape [19, 45084]
},
'antennas_iq': {
0: np.ndarray, # shape [19, 299]
1: np.ndarray, # shape [19, 299]
...,
19: np.ndarray, # shape [19, 299]
},
}
1: { # only antennas 2 and 3 were sampled for this slice
'stage_0': {
2: np.ndarray, # shape [18, 54000]
3: np.ndarray, # shape [18, 54000]
},
'antennas_iq': {
2: np.ndarray, # shape [18, 324]
3: np.ndarray, # shape [18, 324]
},
}
}
bfiq_accumulator = {
0: {
'main': np.ndarray, # shape [19, 1, 299] ([num_sequences, num_beams, num_samples])
'intf': np.ndarray, # shape [19, 1, 299]
},
1: {
'main': np.ndarray, # shape [18, 16, 324]
},
}
main_acfs_accumulator = {
0: np.ndarray, # shape [19, 1, 75, 22] ([num_sequences, num_beams, num_ranges, num_lags])
1: np.ndarray, # shape [18, 16, 100, 26]
}
intf_acfs_accumulator = {
0: np.ndarray, # shape [19, 1, 75, 22]
}
xcfs_accumulator = {
0: np.ndarray, # shape [19, 1, 75, 22]
}
"""
agc_status_word: int = 0b0 #: AGC status bit-mapped to transmitter box
gps_locked: bool = True
"""
GPS lock status.
Initialized to True for updating with logical AND in :func:`update()`
"""
gps_to_system_time_diff: float = 0.0 #: Clock difference between computer and USRPs
lp_status_word: int = 0b0 #: Low-power status bit-mapped to transmitter box
sequence_num: int = field(init=False)
slice_ids: set[int] = field(
default_factory=set
) #: All slice IDs in the averaging period
timestamps: list[float] = field(
default_factory=list
) #: Timestamps of first pulse in each pulse sequence
num_main_antennas: int = 0 #: Number of physical antennas in main array
rx_main_antennas: list[int] = field(
default_factory=list
) #: Indices of main array antennas connected to USRP RX channels
rx_intf_antennas: list[int] = field(
default_factory=list
) #: Indices of intf array antennas connected to USRP RX channels
antenna_iq_accumulator: dict[int, dict[str, dict[int, Union[list, np.ndarray]]]] = (
field(default_factory=dict)
)
"""
Complex voltage samples for all slices, filtering stages, and RX channels.
By level of nesting, the keys for this field are:
1. Slice ID
2. Filter stage name. The final stage is always named ``"antennas_iq"``, intermediate stages are named
``"stage_x"`` where ``x`` is the stage of filtering that has been conducted. ``"stage_0"`` is the output
after filter 0, ``"stage_1"`` is the output after filters 0 and 1, and so forth.
3. Antenna index. This is the physical antenna index, not receive channel index. Indices start at 0 for the
main array. Interferometer antennas are numbered after the main array, typically starting at 16.
"""
antennas_iq_available: bool = False
bfiq_accumulator: dict[int, dict[str, Union[list, np.ndarray]]] = field(
default_factory=dict
)
"""
Beamformed samples for all slices, beam directions, and antenna arrays.
By level of nesting, the keys for this field are:
1. Slice ID
2. Antenna array name. This is either ``"main"`` or ``"intf"``.
"""
bfiq_available: bool = False
intfacfs_accumulator: dict[int, Union[list, np.ndarray]] = field(
default_factory=dict
)
"""
Interferometer array autocorrelations for all slices. Keyed by slice ID.
"""
intf_acf_slices: set[int] = field(
default_factory=set
) #: Slice IDs that have interferometer array ACF data
mainacfs_accumulator: dict[int, Union[list, np.ndarray]] = field(
default_factory=dict
)
"""
Main array autocorrelations for all slices. Keyed by slice ID.
"""
main_acf_slices: set[int] = field(
default_factory=set
) #: Slice IDs that have main array ACF data
rawrf_available: bool = False
rawrf_locations: list[str] = field(
default_factory=list
) #: Rawrf data names in shared memory
rawrf_num_samps: int = 0 #: number of IQ samples per sequence for rawrf data
xcfs_accumulator: dict[int, Union[list, np.ndarray]] = field(default_factory=dict)
"""
Antenna array cross-correlations for all slices. Keyed by slice ID.
"""
xcf_slices: set[int] = field(default_factory=set) #: Slice IDs that have XCF data
def _get_accumulators(self):
"""Returns a list of all accumulator dictionaries in this object."""
accumulators = []
for f in fields(self):
name = f.name
if "accumulator" in name:
accumulators.append(getattr(self, name))
return accumulators
@staticmethod
def _extract_from_shm(location: str, dims: tuple[int, int, int]):
"""
Copies a numpy array out of shared memory.
"""
shm = shared_memory.SharedMemory(name=location)
data = np.ndarray(dims, dtype=np.complex64, buffer=shm.buf)
owned_array = data.copy()
shm.close()
shm.unlink()
return owned_array
@staticmethod
def _accumulate_acfs(
accumulator: dict,
slice_id: int,
data_shape: tuple[int, int, int],
shm_name: str,
):
"""
Opens a numpy array from shared memory into ``accumulator``.
:param accumulator: accumulator to hold data
:type accumulator: dict
:param slice_id: slice identifier
:type slice_id: int
:param data_shape: shape of the numpy array
:type data_shape: tuple[int, int, int]
:param shm_name: shared memory object name
:type shm_name: str
"""
# Put the data in the accumulator
if slice_id not in accumulator:
accumulator[slice_id] = []
accumulator[slice_id].append(Aggregator._extract_from_shm(shm_name, data_shape))
def _parse_acfs(self, processed_data: ProcessedSequenceMessage):
"""
Populates ``self.mainacfs_accumulator``, ``self.intfacfs_accumulator``, and ``self.xcfs_accumulator``
with data from ``processed_data``.
:param processed_data: Processed sequence from rx_signal_processing module.
:type processed_data: ProcessedSequenceMessage
"""
for data_set in processed_data.output_datasets:
slice_id = data_set.slice_id
data_shape = (data_set.num_beams, data_set.num_ranges, data_set.num_lags)
if data_set.main_acf_shm is not None:
self.main_acf_slices.add(slice_id)
self._accumulate_acfs(
self.mainacfs_accumulator,
slice_id,
data_shape,
data_set.main_acf_shm,
)
if data_set.xcf_shm is not None:
self.xcf_slices.add(slice_id)
self._accumulate_acfs(
self.xcfs_accumulator, slice_id, data_shape, data_set.xcf_shm
)
if data_set.intf_acf_shm is not None:
self.intf_acf_slices.add(slice_id)
self._accumulate_acfs(
self.intfacfs_accumulator,
slice_id,
data_shape,
data_set.intf_acf_shm,
)
@staticmethod
def _accumulate_bfiq(
accumulator: dict,
name: str,
dims: tuple[int, int, int],
sqn: ProcessedSequenceMessage,
):
"""
Extracts data of shape ``dims`` from shared memory and adds it to ``accumulator``.
"""
data = Aggregator._extract_from_shm(getattr(sqn, f"bfiq_{name}_shm"), dims)
for i, data_set in enumerate(sqn.output_datasets):
slice_id = data_set.slice_id
num_beams = data_set.num_beams
if slice_id not in accumulator:
accumulator[slice_id] = dict()
if f"{name}" not in accumulator[slice_id]:
accumulator[slice_id][f"{name}"] = []
accumulator[slice_id][f"{name}"].append(data[i, :num_beams, :])
def _parse_bfiq(self, processed_data: ProcessedSequenceMessage):
"""
Populates ``self.bfiq_accumulator`` with beamformed IQ data from ``processed_data``.
:param processed_data: Processed sequence from rx_signal_processing module.
:type processed_data: ProcessedSequenceMessage
"""
num_slices = len(processed_data.output_datasets)
max_num_beams = processed_data.max_num_beams
num_samps = processed_data.num_samps
shape = (num_slices, max_num_beams, num_samps)
self._accumulate_bfiq(self.bfiq_accumulator, "main", shape, processed_data)
if processed_data.bfiq_intf_shm:
self._accumulate_bfiq(self.bfiq_accumulator, "intf", shape, processed_data)
self.bfiq_available = True
@staticmethod
def _accumulate_iq(
stage_data: DebugDataStage,
num_slices: int,
num_main_antennas: int,
num_intf_antennas: int,
):
"""
Retrieves all intermediate filtered IQ data from shared memory into
"""
stages = []
for filter_stage in stage_data:
stage_samps = filter_stage.num_samps
stage_data = Aggregator._extract_from_shm(
filter_stage.main_shm, (num_slices, num_main_antennas, stage_samps)
)
if filter_stage.intf_shm:
intf_data = Aggregator._extract_from_shm(
filter_stage.intf_shm, (num_slices, num_intf_antennas, stage_samps)
)
stage_data = np.hstack((stage_data, intf_data))
stage_dict = {
"name": filter_stage.stage_name,
"data": stage_data,
}
stages.append(stage_dict)
return stages
def _parse_antennas_iq(self, processed_data: ProcessedSequenceMessage):
"""
Populates ``self.antennas_iq_accumulator`` with IQ data from ``processed_data``.
:param processed_data: Processed sequence from rx_signal_processing module.
:type processed_data: ProcessedSequenceMessage
"""
# Get data dimensions for reading in the shared memory
num_slices = len(processed_data.output_datasets)
num_main_antennas = len(self.rx_main_antennas)
num_intf_antennas = len(self.rx_intf_antennas)
stages = self._accumulate_iq(
processed_data.debug_data, num_slices, num_main_antennas, num_intf_antennas
)
self.antennas_iq_available = True
# All possible antenna numbers, given the config file
antenna_indices = copy.deepcopy(self.rx_main_antennas)
# The interferometer antenna numbers start after the last main antenna number
antenna_indices.extend(
[ant + self.num_main_antennas for ant in self.rx_intf_antennas]
)
# Iterate over every data set, one data set per slice
for i, data_set in enumerate(processed_data.output_datasets):
slice_id = data_set.slice_id
if slice_id not in self.antenna_iq_accumulator:
self.antenna_iq_accumulator[slice_id] = dict()
# non beamformed IQ samples are available
for debug_stage in stages:
stage_name = debug_stage["name"]
antennas_data = debug_stage["data"][i]
if stage_name not in self.antenna_iq_accumulator[slice_id]:
self.antenna_iq_accumulator[slice_id][stage_name] = (
collections.OrderedDict()
)
antenna_iq_stage = self.antenna_iq_accumulator[slice_id][stage_name]
# Loops over antenna data within stage
for ant_num in range(antennas_data.shape[0]):
# Convert index in the data array to antenna number from the config file
ant_name = antenna_indices[ant_num]
if ant_name not in antenna_iq_stage:
antenna_iq_stage[ant_name] = []
antenna_iq_stage[ant_name].append(antennas_data[ant_num, :])
[docs]
def finalize(self):
"""
Consolidates data for each data type to one numpy array.
When :func:`update()` is called, new data arrays are appended to a list for speed considerations.
This function converts these lists into numpy arrays.
"""
for slice_id, slice_data in self.antenna_iq_accumulator.items():
for stage in slice_data.values():
for channel_name, channel_data in stage.items():
stage[channel_name] = np.array(channel_data, dtype=np.complex64)
for slice_id, slice_data in self.bfiq_accumulator.items():
for param_name, param_data in slice_data.items():
slice_data[param_name] = np.array(param_data, dtype=np.complex64)
for accumulator in [
self.mainacfs_accumulator,
self.intfacfs_accumulator,
self.xcfs_accumulator,
]:
for slice_id, acfs in accumulator.items():
accumulator[slice_id] = np.array(acfs, np.complex64)
[docs]
def update(self, sqn: ProcessedSequenceMessage):
"""
Parses the message and updates the accumulators and metadata fields with the new data.
:param sqn: Processed sequence from rx_signal_processing module.
:type sqn: ProcessedSequenceMessage
"""
self.sequence_num = sqn.sequence_num
self.timestamps.append(sqn.sequence_start_time)
for data_set in sqn.output_datasets:
self.slice_ids.add(data_set.slice_id)
# for accumulator in self._get_accumulators():
# if data_set.slice_id not in accumulator.keys():
# accumulator[data_set.slice_id] = {}
if sqn.rawrf_shm != "":
self.rawrf_available = True
self.rawrf_num_samps = sqn.rawrf_num_samps
self.rawrf_locations.append(sqn.rawrf_shm)
# Logical AND to catch any time the GPS may have been unlocked during the integration period
self.gps_locked = self.gps_locked and sqn.gps_locked
# Find the max time diff between GPS and system time to report for this integration period
if abs(self.gps_to_system_time_diff) < abs(sqn.gps_to_system_time_diff):
self.gps_to_system_time_diff = sqn.gps_to_system_time_diff
# Bitwise OR to catch any AGC faults during the integration period
self.agc_status_word = self.agc_status_word | sqn.agc_status_bank_h
# Bitwise OR to catch any low power conditions during the integration period
self.lp_status_word = self.lp_status_word | sqn.lp_status_bank_h
self._parse_acfs(sqn)
self._parse_bfiq(sqn)
self._parse_antennas_iq(sqn)