Source code for src.utils.data_aggregator

"""
Aggregator class for collecting data from each sequence within an averaging period.

The :mod:`src.data_write` module uses an :class:`Aggregator` object to ingest processed data from
:mod:`src.rx_signal_processing`. Once :mod:`src.radar_control` sends metadata for an averaging period, data from
the :class:`Aggregator` is collected into numpy arrays for writing to file.
"""

import collections
import copy
from dataclasses import dataclass, fields, field
from multiprocessing import shared_memory
from typing import Union

import numpy as np

from utils.message_formats import DebugDataStage, ProcessedSequenceMessage


[docs] @dataclass class Aggregator: """ Aggregator for ingesting :class:`ProcessedSequenceMessage` data during an averaging period. **Usage**:: aggregator = Aggregator( num_main_antennas=16, rx_main_antennas=[i for i in range(16)], rx_intf_antennas=[i for i in range(4)], ) packet = ProcessedSequenceMessage() # this is generated by rx_signal_processing aggregator.update(packet) # can do this for as many ProcessedSequenceMessage's as you get aggregator.finalize() # Now internally aggregator contains numpy.array objects Consider an averaging period with two slices, IDs ``0`` and ``1``, that are ``SEQUENCE`` interfaced. These two slices run one after the other, so the number of pulse sequences for each at the end of the averaging period may differ by at most one. These slices may have different beam orders, numbers of range gates, pulse sequences, etc. At the end of the averaging period, the accumulator fields could look something like this:: antennas_iq_accumulator = { 0: { # all antennas sampled for this slice 'stage_0': { 0: np.ndarray, # shape [19, 45084] ([num_sequences, num_samples]) 1: np.ndarray, # shape [19, 45084] ..., 19: np.ndarray, # shape [19, 45084] }, 'antennas_iq': { 0: np.ndarray, # shape [19, 299] 1: np.ndarray, # shape [19, 299] ..., 19: np.ndarray, # shape [19, 299] }, } 1: { # only antennas 2 and 3 were sampled for this slice 'stage_0': { 2: np.ndarray, # shape [18, 54000] 3: np.ndarray, # shape [18, 54000] }, 'antennas_iq': { 2: np.ndarray, # shape [18, 324] 3: np.ndarray, # shape [18, 324] }, } } bfiq_accumulator = { 0: { 'main': np.ndarray, # shape [19, 1, 299] ([num_sequences, num_beams, num_samples]) 'intf': np.ndarray, # shape [19, 1, 299] }, 1: { 'main': np.ndarray, # shape [18, 16, 324] }, } main_acfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] ([num_sequences, num_beams, num_ranges, num_lags]) 1: np.ndarray, # shape [18, 16, 100, 26] } intf_acfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] } xcfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] } """ agc_status_word: int = 0b0 #: AGC status bit-mapped to transmitter box gps_locked: bool = True """ GPS lock status. Initialized to True for updating with logical AND in :func:`update()` """ gps_to_system_time_diff: float = 0.0 #: Clock difference between computer and USRPs lp_status_word: int = 0b0 #: Low-power status bit-mapped to transmitter box sequence_num: int = field(init=False) slice_ids: set[int] = field( default_factory=set ) #: All slice IDs in the averaging period timestamps: list[float] = field( default_factory=list ) #: Timestamps of first pulse in each pulse sequence num_main_antennas: int = 0 #: Number of physical antennas in main array rx_main_antennas: list[int] = field( default_factory=list ) #: Indices of main array antennas connected to USRP RX channels rx_intf_antennas: list[int] = field( default_factory=list ) #: Indices of intf array antennas connected to USRP RX channels antenna_iq_accumulator: dict[int, dict[str, dict[int, Union[list, np.ndarray]]]] = ( field(default_factory=dict) ) """ Complex voltage samples for all slices, filtering stages, and RX channels. By level of nesting, the keys for this field are: 1. Slice ID 2. Filter stage name. The final stage is always named ``"antennas_iq"``, intermediate stages are named ``"stage_x"`` where ``x`` is the stage of filtering that has been conducted. ``"stage_0"`` is the output after filter 0, ``"stage_1"`` is the output after filters 0 and 1, and so forth. 3. Antenna index. This is the physical antenna index, not receive channel index. Indices start at 0 for the main array. Interferometer antennas are numbered after the main array, typically starting at 16. """ antennas_iq_available: bool = False bfiq_accumulator: dict[int, dict[str, Union[list, np.ndarray]]] = field( default_factory=dict ) """ Beamformed samples for all slices, beam directions, and antenna arrays. By level of nesting, the keys for this field are: 1. Slice ID 2. Antenna array name. This is either ``"main"`` or ``"intf"``. """ bfiq_available: bool = False intfacfs_accumulator: dict[int, Union[list, np.ndarray]] = field( default_factory=dict ) """ Interferometer array autocorrelations for all slices. Keyed by slice ID. """ intf_acf_slices: set[int] = field( default_factory=set ) #: Slice IDs that have interferometer array ACF data mainacfs_accumulator: dict[int, Union[list, np.ndarray]] = field( default_factory=dict ) """ Main array autocorrelations for all slices. Keyed by slice ID. """ main_acf_slices: set[int] = field( default_factory=set ) #: Slice IDs that have main array ACF data rawrf_available: bool = False rawrf_locations: list[str] = field( default_factory=list ) #: Rawrf data names in shared memory rawrf_num_samps: int = 0 #: number of IQ samples per sequence for rawrf data xcfs_accumulator: dict[int, Union[list, np.ndarray]] = field(default_factory=dict) """ Antenna array cross-correlations for all slices. Keyed by slice ID. """ xcf_slices: set[int] = field(default_factory=set) #: Slice IDs that have XCF data def _get_accumulators(self): """Returns a list of all accumulator dictionaries in this object.""" accumulators = [] for f in fields(self): name = f.name if "accumulator" in name: accumulators.append(getattr(self, name)) return accumulators @staticmethod def _extract_from_shm(location: str, dims: tuple[int, int, int]): """ Copies a numpy array out of shared memory. """ shm = shared_memory.SharedMemory(name=location) data = np.ndarray(dims, dtype=np.complex64, buffer=shm.buf) owned_array = data.copy() shm.close() shm.unlink() return owned_array @staticmethod def _accumulate_acfs( accumulator: dict, slice_id: int, data_shape: tuple[int, int, int], shm_name: str, ): """ Opens a numpy array from shared memory into ``accumulator``. :param accumulator: accumulator to hold data :type accumulator: dict :param slice_id: slice identifier :type slice_id: int :param data_shape: shape of the numpy array :type data_shape: tuple[int, int, int] :param shm_name: shared memory object name :type shm_name: str """ # Put the data in the accumulator if slice_id not in accumulator: accumulator[slice_id] = [] accumulator[slice_id].append(Aggregator._extract_from_shm(shm_name, data_shape)) def _parse_acfs(self, processed_data: ProcessedSequenceMessage): """ Populates ``self.mainacfs_accumulator``, ``self.intfacfs_accumulator``, and ``self.xcfs_accumulator`` with data from ``processed_data``. :param processed_data: Processed sequence from rx_signal_processing module. :type processed_data: ProcessedSequenceMessage """ for data_set in processed_data.output_datasets: slice_id = data_set.slice_id data_shape = (data_set.num_beams, data_set.num_ranges, data_set.num_lags) if data_set.main_acf_shm is not None: self.main_acf_slices.add(slice_id) self._accumulate_acfs( self.mainacfs_accumulator, slice_id, data_shape, data_set.main_acf_shm, ) if data_set.xcf_shm is not None: self.xcf_slices.add(slice_id) self._accumulate_acfs( self.xcfs_accumulator, slice_id, data_shape, data_set.xcf_shm ) if data_set.intf_acf_shm is not None: self.intf_acf_slices.add(slice_id) self._accumulate_acfs( self.intfacfs_accumulator, slice_id, data_shape, data_set.intf_acf_shm, ) @staticmethod def _accumulate_bfiq( accumulator: dict, name: str, dims: tuple[int, int, int], sqn: ProcessedSequenceMessage, ): """ Extracts data of shape ``dims`` from shared memory and adds it to ``accumulator``. """ data = Aggregator._extract_from_shm(getattr(sqn, f"bfiq_{name}_shm"), dims) for i, data_set in enumerate(sqn.output_datasets): slice_id = data_set.slice_id num_beams = data_set.num_beams if slice_id not in accumulator: accumulator[slice_id] = dict() if f"{name}" not in accumulator[slice_id]: accumulator[slice_id][f"{name}"] = [] accumulator[slice_id][f"{name}"].append(data[i, :num_beams, :]) def _parse_bfiq(self, processed_data: ProcessedSequenceMessage): """ Populates ``self.bfiq_accumulator`` with beamformed IQ data from ``processed_data``. :param processed_data: Processed sequence from rx_signal_processing module. :type processed_data: ProcessedSequenceMessage """ num_slices = len(processed_data.output_datasets) max_num_beams = processed_data.max_num_beams num_samps = processed_data.num_samps shape = (num_slices, max_num_beams, num_samps) self._accumulate_bfiq(self.bfiq_accumulator, "main", shape, processed_data) if processed_data.bfiq_intf_shm: self._accumulate_bfiq(self.bfiq_accumulator, "intf", shape, processed_data) self.bfiq_available = True @staticmethod def _accumulate_iq( stage_data: DebugDataStage, num_slices: int, num_main_antennas: int, num_intf_antennas: int, ): """ Retrieves all intermediate filtered IQ data from shared memory into """ stages = [] for filter_stage in stage_data: stage_samps = filter_stage.num_samps stage_data = Aggregator._extract_from_shm( filter_stage.main_shm, (num_slices, num_main_antennas, stage_samps) ) if filter_stage.intf_shm: intf_data = Aggregator._extract_from_shm( filter_stage.intf_shm, (num_slices, num_intf_antennas, stage_samps) ) stage_data = np.hstack((stage_data, intf_data)) stage_dict = { "name": filter_stage.stage_name, "data": stage_data, } stages.append(stage_dict) return stages def _parse_antennas_iq(self, processed_data: ProcessedSequenceMessage): """ Populates ``self.antennas_iq_accumulator`` with IQ data from ``processed_data``. :param processed_data: Processed sequence from rx_signal_processing module. :type processed_data: ProcessedSequenceMessage """ # Get data dimensions for reading in the shared memory num_slices = len(processed_data.output_datasets) num_main_antennas = len(self.rx_main_antennas) num_intf_antennas = len(self.rx_intf_antennas) stages = self._accumulate_iq( processed_data.debug_data, num_slices, num_main_antennas, num_intf_antennas ) self.antennas_iq_available = True # All possible antenna numbers, given the config file antenna_indices = copy.deepcopy(self.rx_main_antennas) # The interferometer antenna numbers start after the last main antenna number antenna_indices.extend( [ant + self.num_main_antennas for ant in self.rx_intf_antennas] ) # Iterate over every data set, one data set per slice for i, data_set in enumerate(processed_data.output_datasets): slice_id = data_set.slice_id if slice_id not in self.antenna_iq_accumulator: self.antenna_iq_accumulator[slice_id] = dict() # non beamformed IQ samples are available for debug_stage in stages: stage_name = debug_stage["name"] antennas_data = debug_stage["data"][i] if stage_name not in self.antenna_iq_accumulator[slice_id]: self.antenna_iq_accumulator[slice_id][stage_name] = ( collections.OrderedDict() ) antenna_iq_stage = self.antenna_iq_accumulator[slice_id][stage_name] # Loops over antenna data within stage for ant_num in range(antennas_data.shape[0]): # Convert index in the data array to antenna number from the config file ant_name = antenna_indices[ant_num] if ant_name not in antenna_iq_stage: antenna_iq_stage[ant_name] = [] antenna_iq_stage[ant_name].append(antennas_data[ant_num, :])
[docs] def finalize(self): """ Consolidates data for each data type to one numpy array. When :func:`update()` is called, new data arrays are appended to a list for speed considerations. This function converts these lists into numpy arrays. """ for slice_id, slice_data in self.antenna_iq_accumulator.items(): for stage in slice_data.values(): for channel_name, channel_data in stage.items(): stage[channel_name] = np.array(channel_data, dtype=np.complex64) for slice_id, slice_data in self.bfiq_accumulator.items(): for param_name, param_data in slice_data.items(): slice_data[param_name] = np.array(param_data, dtype=np.complex64) for accumulator in [ self.mainacfs_accumulator, self.intfacfs_accumulator, self.xcfs_accumulator, ]: for slice_id, acfs in accumulator.items(): accumulator[slice_id] = np.array(acfs, np.complex64)
[docs] def update(self, sqn: ProcessedSequenceMessage): """ Parses the message and updates the accumulators and metadata fields with the new data. :param sqn: Processed sequence from rx_signal_processing module. :type sqn: ProcessedSequenceMessage """ self.sequence_num = sqn.sequence_num self.timestamps.append(sqn.sequence_start_time) for data_set in sqn.output_datasets: self.slice_ids.add(data_set.slice_id) # for accumulator in self._get_accumulators(): # if data_set.slice_id not in accumulator.keys(): # accumulator[data_set.slice_id] = {} if sqn.rawrf_shm != "": self.rawrf_available = True self.rawrf_num_samps = sqn.rawrf_num_samps self.rawrf_locations.append(sqn.rawrf_shm) # Logical AND to catch any time the GPS may have been unlocked during the integration period self.gps_locked = self.gps_locked and sqn.gps_locked # Find the max time diff between GPS and system time to report for this integration period if abs(self.gps_to_system_time_diff) < abs(sqn.gps_to_system_time_diff): self.gps_to_system_time_diff = sqn.gps_to_system_time_diff # Bitwise OR to catch any AGC faults during the integration period self.agc_status_word = self.agc_status_word | sqn.agc_status_bank_h # Bitwise OR to catch any low power conditions during the integration period self.lp_status_word = self.lp_status_word | sqn.lp_status_bank_h self._parse_acfs(sqn) self._parse_bfiq(sqn) self._parse_antennas_iq(sqn)