Data Aggregator¶
Aggregator class for collecting data from each sequence within an averaging period.
The src.data_write module uses an Aggregator object to ingest processed data from
src.rx_signal_processing. Once src.radar_control sends metadata for an averaging period, data from
the Aggregator is collected into numpy arrays for writing to file.
- class src.utils.data_aggregator.Aggregator[source]¶
Bases:
objectAggregator for ingesting
ProcessedSequenceMessagedata during an averaging period.Usage:
aggregator = Aggregator( num_main_antennas=16, rx_main_antennas=[i for i in range(16)], rx_intf_antennas=[i for i in range(4)], ) packet = ProcessedSequenceMessage() # this is generated by rx_signal_processing aggregator.update(packet) # can do this for as many ProcessedSequenceMessage's as you get aggregator.finalize() # Now internally aggregator contains numpy.array objects
Consider an averaging period with two slices, IDs
0and1, that areSEQUENCEinterfaced. These two slices run one after the other, so the number of pulse sequences for each at the end of the averaging period may differ by at most one. These slices may have different beam orders, numbers of range gates, pulse sequences, etc. At the end of the averaging period, the accumulator fields could look something like this:antennas_iq_accumulator = { 0: { # all antennas sampled for this slice 'stage_0': { 0: np.ndarray, # shape [19, 45084] ([num_sequences, num_samples]) 1: np.ndarray, # shape [19, 45084] ..., 19: np.ndarray, # shape [19, 45084] }, 'antennas_iq': { 0: np.ndarray, # shape [19, 299] 1: np.ndarray, # shape [19, 299] ..., 19: np.ndarray, # shape [19, 299] }, } 1: { # only antennas 2 and 3 were sampled for this slice 'stage_0': { 2: np.ndarray, # shape [18, 54000] 3: np.ndarray, # shape [18, 54000] }, 'antennas_iq': { 2: np.ndarray, # shape [18, 324] 3: np.ndarray, # shape [18, 324] }, } } bfiq_accumulator = { 0: { 'main': np.ndarray, # shape [19, 1, 299] ([num_sequences, num_beams, num_samples]) 'intf': np.ndarray, # shape [19, 1, 299] }, 1: { 'main': np.ndarray, # shape [18, 16, 324] }, } main_acfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] ([num_sequences, num_beams, num_ranges, num_lags]) 1: np.ndarray, # shape [18, 16, 100, 26] } intf_acfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] } xcfs_accumulator = { 0: np.ndarray, # shape [19, 1, 75, 22] }
- __init__(
- agc_status_word=0,
- gps_locked=True,
- gps_to_system_time_diff=0.0,
- lp_status_word=0,
- slice_ids=<factory>,
- timestamps=<factory>,
- num_main_antennas=0,
- rx_main_antennas=<factory>,
- rx_intf_antennas=<factory>,
- antenna_iq_accumulator=<factory>,
- antennas_iq_available=False,
- bfiq_accumulator=<factory>,
- bfiq_available=False,
- intfacfs_accumulator=<factory>,
- intf_acf_slices=<factory>,
- mainacfs_accumulator=<factory>,
- main_acf_slices=<factory>,
- rawrf_available=False,
- rawrf_locations=<factory>,
- rawrf_num_samps=0,
- xcfs_accumulator=<factory>,
- xcf_slices=<factory>,
- Parameters:
- Return type:
None
- finalize()[source]¶
Consolidates data for each data type to one numpy array.
When
update()is called, new data arrays are appended to a list for speed considerations. This function converts these lists into numpy arrays.
- update(sqn)[source]¶
Parses the message and updates the accumulators and metadata fields with the new data.
- Parameters:
sqn (ProcessedSequenceMessage) – Processed sequence from rx_signal_processing module.
- antenna_iq_accumulator: dict[int, dict[str, dict[int, list | ndarray]]]¶
Complex voltage samples for all slices, filtering stages, and RX channels.
By level of nesting, the keys for this field are:
Slice ID
Filter stage name. The final stage is always named
"antennas_iq", intermediate stages are named"stage_x"wherexis the stage of filtering that has been conducted."stage_0"is the output after filter 0,"stage_1"is the output after filters 0 and 1, and so forth.Antenna index. This is the physical antenna index, not receive channel index. Indices start at 0 for the main array. Interferometer antennas are numbered after the main array, typically starting at 16.
- bfiq_accumulator: dict[int, dict[str, list | ndarray]]¶
Beamformed samples for all slices, beam directions, and antenna arrays.
By level of nesting, the keys for this field are:
Slice ID
Antenna array name. This is either
"main"or"intf".
- gps_locked: bool = True¶
GPS lock status.
Initialized to True for updating with logical AND in
update()
- intfacfs_accumulator: dict[int, list | ndarray]¶
Interferometer array autocorrelations for all slices. Keyed by slice ID.