Source code for src.radar_control

"""
radar_control process
~~~~~~~~~~~~~~~~~~~~~
Radar_control is the process that runs the radar (sends pulses to the driver with
timing information and sends processing information to the signal processing process).
Experiment_handler provides the experiment for radar_control to run. It iterates
through the interface_class_base objects to control the radar.
"""

import argparse
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from functools import reduce
import mmap
import os
import sys
import threading
import time

import zmq
import numpy as np
import posix_ipc as ipc

from utils.experiment_prototype import experiment_handler, retrieve_experiment
from utils.interface_classes.averaging_periods import CFSAveragingPeriod
from utils.options import Options
import utils.message_formats as messages
from utils import socket_operations as so

sys.path.append(os.environ["BOREALISPATH"])

TIME_PROFILE = False


[docs] @dataclass class CFSParameters: """ Parameters used to track clear frequency search data, each use of this class should be linked to a unique aveperiod. :param cfs_freq: list of frequencies sampled by CFS :param cfs_mags: power measurements corresponding to cfs_freq, indexed by beam iterator then by slice order in an aveperiod :param cfs_range: lower and upper frequency bound of CFS :param cfs_masks: mask of frequencies in cfs range that cannot be used for tx, indexed by beam_iter, then slice_id :param last_cfs_set_time: epoch time since a CFS scan was last run. Indexed by beam iterator :param beam_frequency: last frequency assigned to a slice on a beam. Indexed first by the beam iterator, then by the slice_id :param set_new_freq: boolean flag indicating if a new freq should be set for the slices on a beam. Indexed by beam iterator """ cfs_freq: list cfs_mags: dict cfs_range: dict cfs_masks: dict last_cfs_set_time: dict beam_frequency: dict set_new_freq: dict
[docs] @dataclass class RadctrlParameters: """ Class holding parameters that are passed between processes during the radar operation. """ experiment: ... aveperiod: ... sequence: ... options: ... decimation_scheme: ... = field(init=False) seqnum_start: int startup_flag: bool num_beams: int = 0 num_sequences: int = 0 last_sequence_num: int = 0 sequence_index: int = 0 start_time: datetime = datetime.now(timezone.utc) averaging_period_start_time: datetime = datetime.now(timezone.utc) averaging_period_time: timedelta = timedelta(seconds=0) debug_samples: list = field(default_factory=list) pulse_transmit_data_tracker: dict = field(default_factory=dict) pulse_buffer_offset: int = 0 slice_dict: dict = field(default_factory=dict, init=False) cfs_scan_flag: bool = False scan_flag: bool = False dsp_cfs_identity: str = "" router_address: str = "" radctrl_cfs_identity: str = "" dw_cfs_identity: str = "" brian_to_radctrl_identity: str = "" def __post_init__(self): self.slice_dict = self.experiment.slice_dict # Set slice_dict after an experiment has been assigned if self.sequence: self.decimation_scheme = self.sequence.decimation_scheme else: self.decimation_scheme = None # define decimation scheme only is a sequence was defined self.dsp_cfs_identity = self.options.dsp_cfs_identity self.router_address = self.options.router_address self.radctrl_cfs_identity = self.options.radctrl_cfs_identity self.dw_cfs_identity = self.options.dw_cfs_identity self.brian_to_radctrl_identity = self.options.brian_to_radctrl_identity
[docs] def driver_comms_thread(radctrl_driver_iden, driver_socket_iden, router_addr): """ Thread for handling communication between radar_control and usrp_driver. """ driver_comms_socket = zmq.Context().instance().socket(zmq.PAIR) driver_comms_socket.connect("inproc://radctrl_driver") radctrl_driver_socket = so.create_sockets(router_addr, radctrl_driver_iden) first_time = True while True: message = driver_comms_socket.recv_pyobj() # Wait until the main thread sends a message intended for the driver so.send_bytes(radctrl_driver_socket, driver_socket_iden, message) # Send the message to the driver if first_time: so.recv_string(radctrl_driver_socket, driver_socket_iden, log) # Driver sends back an ACK on the first communication driver_comms_socket.send_string("Driver ready to go") # Let the main thread know that the driver is ready to go first_time = False
[docs] def create_driver_message(radctrl_params, pulse_transmit_data, pulse_buffer): """ Format data for sending to usrp_driver via zeromq :param radctrl_params: The current averaging period parameters dataclass.The parameters extracted are: * ``sequence.txctrfreq`` - the transmit center frequency to tune to * ``sequence.rxctrfreq`` - the receive center frequency to tune to. With rx_sample_rate from config.ini file, this determines the received signal band * ``experiment.txrate`` - the tx sampling rate (Hz) * ``experiment.rxrate`` - the rx sampling rate (Hz) * ``sequence.numberofreceivesamples`` - number of samples to receive at the rx_sample_rate from config.ini file. This determines length of Scope Sync GPIO being high for this sequence * ``sequence.seqtime`` - relative timing offset * ``seqnum`` - the sequence number. This is a unique identifier for the sequence that is always increasing with increasing sequences while radar_control is running. It is only reset when program restarts. It is determined as ``seqnum_start`` + ``num_sequences`` * ``sequence.align_sequences`` - a boolean indicating whether to align the start of the sequence to a clean tenth of a second. :param pulse_transmit_data: dictionary of current transmit pulse data. The message extracts: * ``samples_array`` - this is a list of length main_antenna_count from the config file. It contains one numpy array of complex values per antenna. If the antenna will not be transmitted on, it contains a numpy array of zeros of the same length as the rest. All arrays will have the same length according to the pulse length * ``startofburst`` - start of burst boolean, true for first pulse in sequence * ``endofburst`` - end of burst boolean, true for last pulse in sequence * ``timing`` - in us, the time past timezero to send this pulse. Timezero is the start of the sequence * ``isarepeat`` - a boolean indicating whether the pulse is the exact same as the last pulse in the sequence, in which case we will save the time and not send the samples list and other params that will be the same :return: The compiled ``DriverPacket`` message to send to the usrp driver :rtype: dataclass """ message = messages.DriverPacket() message.txcenterfreq = ( radctrl_params.slice_dict[0].txctrfreq * 1000 ) # convert to Hz message.rxcenterfreq = ( radctrl_params.slice_dict[0].rxctrfreq * 1000 ) # convert to Hz message.txrate = radctrl_params.experiment.txrate message.rxrate = radctrl_params.experiment.rxrate # If this is the first time the driver is being set-up, only send tx and rx rates and center frequencies if radctrl_params.startup_flag: return message.format_for_ipc() message.sample_timing = pulse_transmit_data["timing"] message.burst_start = int(pulse_transmit_data["startofburst"]) message.burst_end = int(pulse_transmit_data["endofburst"]) message.sequence_num = radctrl_params.seqnum_start + radctrl_params.num_sequences message.seqtime = radctrl_params.sequence.seqtime message.align_sequences = int(radctrl_params.sequence.align_sequences) samples_array = pulse_transmit_data["samples_array"] num_samps = samples_array.shape[1] offset = radctrl_params.pulse_buffer_offset if offset + num_samps >= pulse_buffer.shape[1]: offset = 0 pulse_buffer[:, offset : offset + num_samps] = samples_array message.buffer_offset = offset message.num_tx_samps = num_samps radctrl_params.pulse_buffer_offset = offset + num_samps message.num_rx_samps = radctrl_params.sequence.numberofreceivesamples return message.format_for_ipc()
[docs] def dsp_comms_thread(radctrl_dsp_iden, dsp_socket_iden, router_addr): """ Thread for handling communication between radar_control and rx_signal_processing. """ inproc_socket = zmq.Context().instance().socket(zmq.PAIR) inproc_socket.connect("inproc://radctrl_dsp") radctrl_dsp_socket = so.create_sockets(router_addr, radctrl_dsp_iden) while True: message = inproc_socket.recv_pyobj() # Wait for metadata from the main thread so.send_pyobj(radctrl_dsp_socket, dsp_socket_iden, message) # Send the message to rx_signal_processing so.recv_string(radctrl_dsp_socket, dsp_socket_iden, log) # Wait for rx_signal_processing to acknowledge that it received the metadata inproc_socket.send_string("DSP acknowledged metadata")
# Let the main thread know that DSP has received the metadata
[docs] def create_dsp_message(radctrl_params): """ Place data in the receiver packet and send it via zeromq to the signal processing unit and brian. Happens every sequence. :param radctrl_params: The radar control parameter dataclass updated during the averaging period. The message grabs: * ``experiment.rxrate`` - The receiver sampling rate (Hz) * ``experiment.output_rx_rate`` - The output sample rate desired for the output data (Hz) * ``seqnum`` - the sequence number. This is a unique identifier for the sequence that is always increasing with increasing sequences while radar_control is running. It is only reset when program restarts. It is calculated from ``seqnum_start`` + ``num_sequences`` * ``sequence.slice_ids`` - The identifiers of the slices that are combined in this sequence. These IDs tell us where to look in the beam dictionary and slice dictionary for frequency information and beam direction information about this sequence to give to the signal processing unit * ``sequence.slice_dict`` - The slice dictionary, which contains information about all slices and will be referenced for information about the slices in this sequence. Namely, we get the frequency we want to receive at, the number of ranges and the first range information * ``beam_dict`` - The dictionary containing beam directions for each slice, generated using ``sequence.get_rx_phases(aveperiod.beam_iter)`` * ``sequence.seqtime`` - entire duration of sequence, including receive time after all transmissions * ``sequence.first_rx_sample_start`` - The sample where the first rx sample will start relative to the tx data * ``sequence.rxctrfreq`` - the center frequency of receiving * ``sequence.output_encodings`` - Phase offsets (degrees) applied to each pulse in the sequence * ``decimation_scheme`` - object of type DecimationScheme that has all decimation and filtering data * ``cfs_scan_flag`` - flag indicating of sequence is a clear frequency search rx only sequence :return: The compiled ``SequenceMetadataMessage`` message to send to rx signal processing :rtype: dataclass """ message = messages.SequenceMetadataMessage() message.sequence_time = radctrl_params.sequence.seqtime message.sequence_num = radctrl_params.seqnum_start + radctrl_params.num_sequences message.offset_to_first_rx_sample = radctrl_params.sequence.first_rx_sample_start message.rx_rate = radctrl_params.experiment.rxrate message.output_sample_rate = radctrl_params.sequence.output_rx_rate message.rx_ctr_freq = radctrl_params.sequence.rxctrfreq * 1.0e3 message.cfs_scan_flag = radctrl_params.cfs_scan_flag message.acf = radctrl_params.sequence.acf message.xcf = radctrl_params.sequence.xcf message.acfint = radctrl_params.sequence.acfint if radctrl_params.cfs_scan_flag: message.cfs_fft_n = radctrl_params.aveperiod.cfs_fft_n if radctrl_params.decimation_scheme is not None: message.decimation_scheme = radctrl_params.decimation_scheme slice_dict = radctrl_params.slice_dict if radctrl_params.cfs_scan_flag: beam_dict = dict() for slice_id in radctrl_params.aveperiod.cfs_slice_ids: # `slice_id` will correspond to a special CFS Sequence object in a CFSAveragingPeriod. # Likewise with `radctrl_params.sequence` if slice_dict[slice_id].rxonly: # No TX - use RX phases instead for CFS beamforming phase_dict = radctrl_params.sequence.get_rx_phases( radctrl_params.aveperiod.beam_iter ) main_phases = phase_dict[slice_id]["main"][0] else: # Use TX phases for CFS beamforming beam_num = slice_dict[slice_id].tx_beam_order[ radctrl_params.aveperiod.beam_iter ] tx_phases = radctrl_params.sequence.build_tx_phases( slice_id, slice_dict[slice_id], np.average(slice_dict[slice_id].cfs_range), ) main_phases = tx_phases[0, beam_num] intf_phases = np.zeros( len(slice_dict[slice_id].rx_intf_antennas), dtype=np.complex64 ) beam_dict[slice_id] = { "main": np.array([main_phases]), "intf": np.array([intf_phases]), } else: beam_dict = radctrl_params.sequence.get_rx_phases( radctrl_params.aveperiod.beam_iter ) slice_dict = radctrl_params.sequence.slice_dict for slice_id in radctrl_params.sequence.slice_ids: rx_chan = messages.RxChannel(slice_id) rx_chan.tau_spacing = slice_dict[slice_id].tau_spacing # Send the translational frequencies to dsp in order to bandpass filter correctly. freq_khz = slice_dict[slice_id].freq if isinstance(freq_khz, list): freq_khz = freq_khz[ slice_dict[slice_id].freq_order[radctrl_params.aveperiod.beam_iter] ] rx_chan.rx_freq = freq_khz * 1.0e3 rx_chan.num_ranges = slice_dict[slice_id].num_ranges rx_chan.first_range = slice_dict[slice_id].first_range rx_chan.range_sep = slice_dict[slice_id].range_sep rx_chan.rx_intf_antennas = slice_dict[slice_id].rx_intf_antennas rx_chan.pulses = slice_dict[slice_id].pulse_sequence rx_chan.acf = slice_dict[slice_id].acf rx_chan.xcf = slice_dict[slice_id].xcf rx_chan.acfint = slice_dict[slice_id].acfint main_bms = beam_dict[slice_id]["main"] intf_bms = beam_dict[slice_id]["intf"] # Combine main and intf such that for a given beam all main phases come first. beams = np.hstack((main_bms, intf_bms)) rx_chan.beam_phases = np.array(beams) for lag in slice_dict[slice_id].lag_table: lag_add = messages.Lag(lag[0], lag[1], int(lag[1] - lag[0])) # Get the phase offset for this pulse combination pulse_phase_offsets = radctrl_params.sequence.output_encodings if len(pulse_phase_offsets[slice_id]) != 0: pulse_phase_offset = pulse_phase_offsets[slice_id][-1] lag0_idx = slice_dict[slice_id].pulse_sequence.index(lag[0]) lag1_idx = slice_dict[slice_id].pulse_sequence.index(lag[1]) phase_in_rad = np.radians( pulse_phase_offset[lag0_idx] - pulse_phase_offset[lag1_idx] ) phase_offset = np.exp(1j * np.array(phase_in_rad, np.float32)) # Catch case where no pulse phase offsets are specified else: phase_offset = 1.0 + 0.0j lag_add.phase_offset_real = np.real(phase_offset) lag_add.phase_offset_imag = np.imag(phase_offset) rx_chan.lags.append(lag_add) message.rx_channels.append(rx_chan) return message
[docs] def make_next_samples(radctrl_params): sqn = radctrl_params.sequence.make_sequence( radctrl_params.aveperiod.beam_iter, radctrl_params.num_sequences + len(radctrl_params.aveperiod.sequences), ) radctrl_params.pulse_transmit_data_tracker[radctrl_params.sequence_index][ radctrl_params.num_sequences + len(radctrl_params.aveperiod.sequences) ] = sqn if TIME_PROFILE: new_sequence_time = datetime.now(timezone.utc) - radctrl_params.start_time log.verbose( "make new sequence time", time=new_sequence_time, time_unit="s", )
[docs] def dw_comms_thread(radctrl_dw_iden, dw_socket_iden, router_addr): """ Thread for handling communication between radar_control and data_write. """ inproc_socket = zmq.Context().instance().socket(zmq.PAIR) inproc_socket.connect("inproc://radctrl_dw") radctrl_dw_socket = so.create_sockets(router_addr, radctrl_dw_iden) while True: message = inproc_socket.recv_pyobj() # Wait for metadata from the main thread so.send_pyobj(radctrl_dw_socket, dw_socket_iden, message)
# Send the message to data write
[docs] def create_dw_message(radctrl_params): """ Send the metadata about this averaging period to datawrite so that it can be recorded. :param radctrl_params: The radar control parameter dataclass updated during the aveperiod. The message grabs: * ``seqnum`` - The last sequence number (identifier) that is valid for this averaging period. Used to verify and synchronize driver, dsp, datawrite. Calculated with ``seqnum_start`` + ``num_sequences`` * ``num_sequences``- The number of sequences that were sent in this averaging period. (number of sequences to average together) * ``scan_flag`` - True if this averaging period is the first in a scan * ``averaging_period_time`` - The time that expired during this averaging period * ``aveperiod.sequences`` - The sequences of class Sequence for this averaging period (AveragingPeriod) * ``aveperiod.beam_iter`` - The beam iterator of this averaging period * ``experiment.cpid`` - the ID of the experiment that is running * ``experiment.experiment_name`` - the experiment name to be placed in the data files * ``experiment.scheduling_mode`` - the type of scheduling mode running at this time, to write to file * ``experiment.output_rx_rate`` - The output sample rate of the output data, defined by the experiment, in Hz * ``experiment.comment_string`` - The comment string for the experiment, user-defined * ``decimation_scheme.filter_scaling_factors`` - The decimation scheme scaling factors used for the experiment, to get the scaling for the data for accurate power measurements between experiments * ``experiment.slice_dict[0].rxctrfreq`` - The receive center frequency (kHz) * ``debug_samples`` - the debug samples for this averaging period, to be written to the file if debug is set. This is a list of dictionaries for each Sequence in the AveragingPeriod. The dictionary is set up in the sample_building module function create_debug_sequence_samples. The keys are ``txrate``, ``txctrfreq``, ``pulse_timing`, ``pulse_sample_start``, ``sequence_samples``, ``decimated_sequence``, and ``dmrate``. The ``sequence_samples`` and ``decimated_samples`` values are themselves dictionaries, where the keys are the antenna numbers (there is a sample set for each transmit antenna) :return: The compiled ``AveperiodMetadataMessage`` message to send to data write :rtype: dataclass """ message = messages.AveperiodMetadataMessage() message.experiment_id = radctrl_params.experiment.cpid message.experiment_name = radctrl_params.experiment.experiment_name message.experiment_comment = radctrl_params.experiment.comment_string message.rx_ctr_freq = radctrl_params.experiment.slice_dict[0].rxctrfreq if isinstance(radctrl_params.aveperiod, CFSAveragingPeriod): message.num_sequences = ( radctrl_params.num_sequences - 1 ) # first sequence was CFS message.cfs_freqs = radctrl_params.aveperiod.cfs_freq message.cfs_noise = [ x[radctrl_params.aveperiod.beam_iter] for x in radctrl_params.aveperiod.cfs_mags.values() ] message.cfs_range = radctrl_params.aveperiod.cfs_range message.cfs_masks = [ x[radctrl_params.aveperiod.beam_iter] for x in radctrl_params.aveperiod.cfs_masks.values() ] message.cfs_slice_ids = radctrl_params.aveperiod.cfs_slice_ids else: message.num_sequences = radctrl_params.num_sequences message.last_sqn_num = radctrl_params.last_sequence_num message.scan_flag = radctrl_params.scan_flag message.aveperiod_time = radctrl_params.averaging_period_time.total_seconds() message.input_sample_rate = radctrl_params.experiment.rxrate message.data_normalization_factor = reduce( lambda x, y: x * y, radctrl_params.decimation_scheme.filter_scaling_factors ) # multiply all message.scheduling_mode = radctrl_params.experiment.scheduling_mode for sequence_index, sequence in enumerate(radctrl_params.aveperiod.sequences): sequence_add = messages.Sequence() sequence_add.blanks = sequence.blanks sequence_add.output_sample_rate = sequence.output_rx_rate for slice_id in sequence.slice_ids: sqn_slice = sequence.slice_dict[slice_id] freq_idx = sqn_slice.freq_order[radctrl_params.aveperiod.beam_iter] rxchannel = messages.RxChannelMetadata() rxchannel.slice_id = slice_id rxchannel.slice_comment = sqn_slice.comment rxchannel.interfacing = "{}".format(sqn_slice.slice_interfacing) rxchannel.rx_only = sqn_slice.rxonly rxchannel.pulse_len = sqn_slice.pulse_len rxchannel.tau_spacing = sqn_slice.tau_spacing freq_khz = sqn_slice.freq if isinstance(freq_khz, list): freq_khz = freq_khz[freq_idx] rxchannel.rx_freq = freq_khz rxchannel.ptab = sqn_slice.pulse_sequence # We always build one sequence in advance, so we trim the last one from when radar # control stops processing the averaging period. for encoding in sequence.output_encodings[slice_id][ : radctrl_params.num_sequences ]: rxchannel.sequence_encodings.append(encoding.flatten().tolist()) sequence.output_encodings[slice_id] = [] rxchannel.rx_main_antennas = sqn_slice.rx_main_antennas rxchannel.rx_intf_antennas = sqn_slice.rx_intf_antennas rxchannel.tx_antennas = sqn_slice.tx_antennas rxchannel.tx_excitations = sequence.tx_main_phase_shifts[slice_id][ freq_idx, sqn_slice.tx_beam_order[radctrl_params.aveperiod.beam_iter] ] beams = sqn_slice.rx_beam_order[radctrl_params.aveperiod.beam_iter] if isinstance(beams, int): beams = [beams] rx_main_excitations = [] rx_intf_excitations = [] for beam in beams: beam_add = messages.Beam(sqn_slice.beam_angle[beam], beam) rxchannel.beams.append(beam_add) rx_main_excitations.append( sequence.rx_beam_phases[slice_id]["main"][ freq_idx, beam, sequence.rx_main_antenna_indices[slice_id] ] ) rx_intf_excitations.append( sequence.rx_beam_phases[slice_id]["intf"][ freq_idx, beam, sequence.rx_intf_antenna_indices[slice_id] ] ) rxchannel.rx_main_excitations = np.array( rx_main_excitations, dtype=np.complex64 ) rxchannel.rx_intf_excitations = np.array( rx_intf_excitations, dtype=np.complex64 ) rxchannel.first_range = float(sqn_slice.first_range) rxchannel.num_ranges = sqn_slice.num_ranges rxchannel.range_sep = sqn_slice.range_sep if sqn_slice.acf: rxchannel.acf = sqn_slice.acf rxchannel.xcf = sqn_slice.xcf rxchannel.acfint = sqn_slice.acfint for lag in sqn_slice.lag_table: lag_add = messages.LagTable(lag, int(lag[1] - lag[0])) rxchannel.ltabs.append(lag_add) rxchannel.averaging_method = sqn_slice.averaging_method sequence_add.rx_channels.append(rxchannel) message.sequences.append(sequence_add) return message
[docs] def round_up_time(dt=None, round_to=60): """ Round a datetime object to any time-lapse in seconds :param dt: datetime object, default now. :param round_to: Closest number of seconds to round to, default 1 minute. :author: Thierry Husson 2012 - Use it as you want but don't blame me. :modified: K.Kotyk 2019 Will round to the nearest minute mark. Adds one minute if rounded down. """ if dt is None: dt = datetime.now(timezone.utc) midnight = dt.replace(hour=0, minute=0, second=0) seconds = (dt - midnight).seconds rounding = (seconds + round_to / 2) // round_to * round_to result = dt + timedelta(0, rounding - seconds, -dt.microsecond) if result < dt: result += timedelta(minutes=1) return result
[docs] def run_cfs_scan(radctrl_params, sockets, pulse_buffer): radctrl_process_socket = sockets[0] radctrl_brian_socket = sockets[1] to_driver_inproc_sock = sockets[2] dsp_socket_iden = radctrl_params.dsp_cfs_identity router_addr = radctrl_params.router_address cfs_socket = so.create_sockets(router_addr, radctrl_params.radctrl_cfs_identity) aveperiod = radctrl_params.aveperiod sequence = aveperiod.cfs_sequence sqn = sequence.make_sequence(aveperiod.beam_iter, 0) # todo: modify the sqn_num to avoid dimension mismatch? cfs_sqn_num = radctrl_params.seqnum_start + radctrl_params.num_sequences so.send_pyobj(cfs_socket, radctrl_params.dw_cfs_identity, cfs_sqn_num) brian_socket_iden = radctrl_params.brian_to_radctrl_identity so.recv_string(radctrl_brian_socket, brian_socket_iden, log) # The above call is blocking; will wait until brian says to start for pulse_transmit_data in sqn: driver_message = create_driver_message( radctrl_params, pulse_transmit_data, pulse_buffer ) to_driver_inproc_sock.send_pyobj(driver_message) radctrl_params.cfs_scan_flag = True dsp_message = create_dsp_message(radctrl_params) radctrl_process_socket.send_pyobj(dsp_message) # Send metadata to dsp_comms_thread, so it can package into a message for rx_signal_processing so.send_pyobj(radctrl_brian_socket, brian_socket_iden, dsp_message) # Send the metadata along to brian freq_data = so.recv_pyobj( cfs_socket, dsp_socket_iden, log, expected_type=messages.ProcessedSequenceMessage, ) radctrl_process_socket.recv_string() # Consume the DSP ACK that dsp_comms_thread will send return freq_data
[docs] def cfs_block(ave_params, cfs_sockets, pulse_buffer): aveperiod = ave_params.aveperiod beam = aveperiod.beam_iter if not ( aveperiod.last_cfs_set_time[beam] < datetime.now(timezone.utc) - timedelta(seconds=aveperiod.cfs_stable_time) or aveperiod.cfs_always_run ): return aveperiod.last_cfs_set_time[beam] = datetime.now(timezone.utc) # Only let CFS run after the user set stable time has # passed to prevent CFS from switching freqs too quickly ave_params.sequence = aveperiod.cfs_sequence ave_params.decimation_scheme = aveperiod.cfs_sequence.decimation_scheme processed_cfs_packet = run_cfs_scan(ave_params, cfs_sockets, pulse_buffer) ave_params.num_sequences += 1 ave_params.cfs_scan_flag = False aveperiod.cfs_freq = processed_cfs_packet.cfs_freq for ind, dset in enumerate(processed_cfs_packet.output_datasets): aveperiod.cfs_mags[aveperiod.cfs_slice_ids[ind]][beam] = dset.cfs_data if ( not any([x[beam] for x in aveperiod.set_new_freq.values()]) and aveperiod.cfs_pwr_threshold is not None ): # If using a user set power threshold to trigger CFS freq setting, check if any # power related conditions are triggered and set the corresponding flag aveperiod.check_update_freq( processed_cfs_packet, aveperiod.cfs_slice_ids, aveperiod.cfs_pwr_threshold, beam, ) if not ( any([x[beam] for x in aveperiod.set_new_freq.values()]) or aveperiod.cfs_pwr_threshold is None ): # Return if the frequency does not need to be changed. return # If using a power threshold and one of the power conditions were # triggered, or if not using a power threshold, set the CFS params slice_masks, last_set_cfs = aveperiod.select_cfs_freqs(processed_cfs_packet) for slice_id in aveperiod.cfs_slice_ids: aveperiod.set_new_freq[slice_id][beam] = False aveperiod.cfs_masks[slice_id][beam] = slice_masks[slice_id] aveperiod.beam_frequency[slice_id][beam] = last_set_cfs[slice_id]
[docs] def main(exp_name, scheduling_mode, embargo, **kwargs): """ Run the radar with the experiment supplied by experiment_handler. Receives an instance of an experiment. Iterates through the Scans, AveragingPeriods, Sequences, and pulses of the experiment. For every pulse, samples and other control information are sent to the n200_driver. For every pulse sequence, processing information is sent to the signal processing block. After every averaging period, the experiment block is given the opportunity to change the experiment (not currently implemented). If a new experiment is sent, radar will halt the old one and begin with the new experiment. """ # Get config options options = Options() # Setup sockets # Socket to send pulse samples over # TODO test: need to make sure that we know that all sockets are set up after this try...except block. # TODO test: starting the programs in different orders. try: radctrl_brian_socket = so.create_sockets( options.router_address, options.radctrl_to_brian_identity ) except zmq.ZMQBaseError as e: log.error("zmq failed setting up sockets", error=e) log.exception("zmq failed setting up sockets", exception=e) sys.exit(1) # Sockets for thread communication radctrl_inproc_socket = zmq.Context().instance().socket(zmq.PAIR) radctrl_inproc_socket.bind("inproc://radctrl_dsp") driver_comms_socket = zmq.Context().instance().socket(zmq.PAIR) driver_comms_socket.bind("inproc://radctrl_driver") dw_comms_socket = zmq.Context().instance().socket(zmq.PAIR) dw_comms_socket.bind("inproc://radctrl_dw") # seqnum is used as an identifier in all packets while radar is running so set it up here. # seqnum will get increased by num_sequences (number of averages or sequences in the averaging period) # at the end of every averaging period. seqnum_start = 0 # Wait for experiment handler at the start until we have an experiment to run. exp_class = retrieve_experiment(exp_name) experiment = experiment_handler(exp_class, scheduling_mode, embargo, **kwargs) # Flag for starting the radar on the minute boundary wait_for_first_scanbound = experiment.slice_dict.get("wait_for_first_scanbound") # Send driver initial setup data - rates and center frequency from experiment. # Wait for acknowledgment that USRP object is set up. start_up_params = RadctrlParameters( experiment, None, None, options, seqnum_start, True ) driver_start_message = create_driver_message(start_up_params, None, None) dsp_comms = threading.Thread( target=dsp_comms_thread, args=( options.radctrl_to_dsp_identity, options.dsp_to_radctrl_identity, options.router_address, ), ) driver_comms = threading.Thread( target=driver_comms_thread, args=( options.radctrl_to_driver_identity, options.driver_to_radctrl_identity, options.router_address, ), ) dw_comms = threading.Thread( target=dw_comms_thread, args=( options.radctrl_to_dw_identity, options.dw_to_radctrl_identity, options.router_address, ), ) dsp_comms.start() driver_comms.start() dw_comms.start() driver_comms_socket.send_pyobj(driver_start_message) driver_comms_socket.recv_string() # Wait until driver acknowledges that it is good to start shm = ipc.SharedMemory(options.pulse_buffer_name) mapped_mem = mmap.mmap(shm.fd, shm.size) pulse_buffer = np.ndarray( (len(options.tx_main_antennas), options.pulse_buffer_size), buffer=mapped_mem, dtype=np.complex64, ) first_aveperiod = True next_scan_start = None while True: # This loops through all scans in an experiment, or restarts this loop if a new experiment occurs. # TODO : further documentation throughout in comments (high level) and in separate documentation. # Iterate through Scans, AveragingPeriods, Sequences, Pulses. # Start anew on first scan if we have a new experiment. for scan_num, scan in enumerate(experiment.scan_objects): log.debug("scan number", scan_num=scan_num) # Scan iter is the iterator through the scanbound or through the number of averaging periods in the scan if ( first_aveperiod and scan.scanbound is not None and not wait_for_first_scanbound ): # On first integration, determine current averaging period and set scan_iter to it now = datetime.now(timezone.utc) current_minute = now.replace(second=0, microsecond=0) scan_iter = next( ( i for i, v in enumerate(scan.scanbound) if current_minute + timedelta(seconds=v) > now ), 0, ) else: # Otherwise start at first averaging period scan_iter = 0 if scan.scanbound: if scan.align_scan_to_beamorder: for aveperiod in scan.aveperiods: # Always align first beam at start of scan aveperiod.beam_iter = 0 # Find the start of the next scan with a scanbound so we can determine time remaining for end of scan next_scanbound = None next_scan_num = scan_num while next_scanbound is None: next_scan_num += 1 if next_scan_num == len(experiment.scan_objects): next_scan_num = 0 next_scanbound = experiment.scan_objects[next_scan_num].scanbound if first_aveperiod: # On the very first averaging period of Borealis starting, calculate the start minute # align scanbound reference time to find when to start now = datetime.now(timezone.utc) dt = now.replace(second=0, microsecond=0) if dt + timedelta(seconds=scan.scanbound[scan_iter]) >= now: start_minute = dt else: start_minute = round_up_time(now) else: # At the start of a scan object that has scanbound, recalculate the start minute to the # previously calculated next_scan_start start_minute = next_scan_start.replace(second=0, microsecond=0) # Find the modulus of the number of aveperiod times to run in the scan and the number # of AvePeriod classes. The classes will be alternated so we can determine which class # will be running at the end of the scan. index_of_last_aveperiod_in_scan = ( scan.num_aveperiods_in_scan + scan.aveperiod_iter ) % len(scan.aveperiods) last_aveperiod_intt = scan.aveperiods[ index_of_last_aveperiod_in_scan ].intt # A scanbound necessitates intt end_of_scan = ( start_minute + timedelta(seconds=scan.scanbound[-1]) + timedelta(seconds=last_aveperiod_intt * 1e-3) ) end_minute = end_of_scan.replace(second=0, microsecond=0) if end_minute + timedelta(seconds=next_scanbound[0]) >= end_of_scan: next_scan_start = end_minute + timedelta(seconds=next_scanbound[0]) else: next_scan_start = round_up_time(end_of_scan) + timedelta( seconds=next_scanbound[0] ) while scan_iter < scan.num_aveperiods_in_scan: # If there are multiple aveperiods in a scan they are alternated (AVEPERIOD interfaced) aveperiod = scan.aveperiods[scan.aveperiod_iter] if TIME_PROFILE: time_start_of_aveperiod = datetime.now(timezone.utc) log.debug("new averaging period") # All phases are set up for this averaging period for the beams required. # Time to start averaging in the below loop. if not scan.scanbound: averaging_period_start_time = datetime.now(timezone.utc) # ms log.verbose( "averaging period start time", time=averaging_period_start_time, time_unit="s", ) if aveperiod.intt is not None: intt_break = True if scan.scanbound: # Calculate scan start time. First beam in the sequence will likely # be ready to go if the first scan aligns directly to the minute. The # rest will need to wait until their boundary time is up. beam_scanbound = start_minute + timedelta( seconds=scan.scanbound[scan_iter] ) time_diff = beam_scanbound - datetime.now(timezone.utc) if time_diff.total_seconds() > 0: if first_aveperiod: log.verbose( "seconds to next avg period", time=time_diff.total_seconds(), time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) else: log.debug( "seconds to next avg period", time=time_diff.total_seconds(), time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) # TODO: reduce sleep if we want to use GPS timestamped transmissions time.sleep(time_diff.total_seconds()) else: # TODO: This will be wrong if the start time is in the past. # TODO: maybe use datetime.now(timezone.utc) like below # instead of beam_scanbound when the avg period should have # started? log.debug( "expected avg period start time", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) averaging_period_start_time = datetime.now(timezone.utc) log.verbose( "avg period start time", time=averaging_period_start_time, time_unit="s", scan_iter=scan_iter, beam_scanbound=beam_scanbound, ) # Here we find how much system time has elapsed to find the true amount # of time we can integrate for this scan boundary. We can then see if # we have enough time left to run the averaging period. time_elapsed = averaging_period_start_time - start_minute if scan_iter < len(scan.scanbound) - 1: scanbound_time = scan.scanbound[scan_iter + 1] # TODO: scanbound_time could be in the past if system has taken # too long, perhaps calculate which 'beam' (scan_iter) instead by # rewriting this code for an experiment-wide scanbound attribute instead # of individual scanbounds inside the scan objects # TODO: if scan_iter skips ahead, aveperiod.beam_iter may also need to # if scan.align_to_beamorder is True bound_time_remaining = ( scanbound_time - time_elapsed.total_seconds() ) else: bound_time_remaining = ( next_scan_start - averaging_period_start_time ) bound_time_remaining = bound_time_remaining.total_seconds() log.verbose( "bound time remaining", time=bound_time_remaining, time_unit="s", scan_num=scan_num, scan_iter=scan_iter, # scan_iter is averaging period number for some reason beam_scanbound=beam_scanbound, ) if bound_time_remaining < aveperiod.intt * 1e-3: # Reduce the averaging period to only the time remaining until the next scan boundary # TODO: Check for bound_time_remaining > 0 # to be sure there is actually time to run this intt # (if bound_time_remaining < 0, we need a solution to reset) averaging_period_done_time = ( averaging_period_start_time + timedelta(milliseconds=bound_time_remaining * 1e3) ) else: averaging_period_done_time = ( averaging_period_start_time + timedelta(milliseconds=aveperiod.intt) ) else: # No scanbound for this scan averaging_period_done_time = ( averaging_period_start_time + timedelta(milliseconds=aveperiod.intt) ) else: # intt does not exist, therefore using intn intt_break = False ending_number_of_sequences = aveperiod.intn # this will exist msg = { x: y[aveperiod.beam_iter] for x, y in aveperiod.slice_to_beamorder.items() } log.verbose("avg period slice and beam number", slice_and_beam=msg) if TIME_PROFILE: aveperiod_prep_time = ( datetime.now(timezone.utc) - time_start_of_aveperiod ) log.verbose( "time to prep aveperiod", time=aveperiod_prep_time, time_unit="", ) # Time to start averaging in the below loop ave_params = RadctrlParameters( experiment, aveperiod, aveperiod.sequences[0], options, seqnum_start, False, num_beams=aveperiod.num_beams_in_scan, ) time_remains = True while time_remains: if ave_params.num_sequences == 0 and isinstance( aveperiod, CFSAveragingPeriod ): cfs_time = time.time() cfs_block( ave_params, [ radctrl_inproc_socket, radctrl_brian_socket, driver_comms_socket, ], pulse_buffer, ) aveperiod.update_cfs_freqs() # always update, to use correct freq for beam log.verbose("CFS block run time", time=time.time() - cfs_time) for sequence_index, sequence in enumerate(aveperiod.sequences): # Alternating sequences if there are multiple in the averaging_period ave_params.start_time = datetime.now(timezone.utc) ave_params.sequence = sequence ave_params.sequence_index = sequence_index if intt_break: if ave_params.start_time >= averaging_period_done_time: time_remains = False ave_params.averaging_period_time = ( ave_params.start_time - averaging_period_start_time ) break else: # Break at a certain number of sequences if ave_params.num_sequences == ending_number_of_sequences: time_remains = False ave_params.averaging_period_time = ( ave_params.start_time - averaging_period_start_time ) break # On first sequence, we make the first set of samples if sequence_index not in ave_params.pulse_transmit_data_tracker: ave_params.pulse_transmit_data_tracker[sequence_index] = {} sqn = sequence.make_sequence( aveperiod.beam_iter, ave_params.num_sequences ) ave_params.pulse_transmit_data_tracker[sequence_index][ ave_params.num_sequences ] = sqn ave_params.decimation_scheme = sequence.decimation_scheme # This can happen simultaneously threads = [ threading.Thread(target=make_next_samples(ave_params)) ] for thread in threads: thread.daemon = True thread.start() for ( pulse_transmit_data ) in ave_params.pulse_transmit_data_tracker[sequence_index][ ave_params.num_sequences ]: message_create = time.perf_counter() driver_message = create_driver_message( ave_params, pulse_transmit_data, pulse_buffer ) log.debug( "driver_message creation time", duration=(time.perf_counter() - message_create) * 1000, units="ms", ) driver_comms_socket.send_pyobj(driver_message) so.recv_string( radctrl_brian_socket, options.brian_to_radctrl_identity, log ) # The above call is blocking; will wait until brian says to start dsp_message = create_dsp_message(ave_params) radctrl_inproc_socket.send_pyobj(dsp_message) # Send metadata to dsp_comms_thread, so it can pass it to rx_signal_processing so.send_pyobj( radctrl_brian_socket, options.brian_to_radctrl_identity, dsp_message, ) # Send the metadata along to brian for thread in threads: thread.join() radctrl_inproc_socket.recv_string() # wait for dsp to signal that it received the metadata ave_params.num_sequences += 1 if first_aveperiod: first_aveperiod = False # Sequence is done if __debug__: time.sleep(1) if TIME_PROFILE: avg_period_end_time = datetime.now(timezone.utc) log.verbose( "avg period end time", time=avg_period_end_time, time_unit="s", ) log.info( "aveperiod done", num_sequences=ave_params.num_sequences, slice_ids=aveperiod.slice_ids, ) if scan.aveperiod_iter == 0 and aveperiod.beam_iter == 0: # This is the first averaging period in the scan object. # if scanbound is aligned to beamorder, the scan_iter will also = 0 at this point. ave_params.scan_flag = True else: ave_params.scan_flag = False ave_params.last_sequence_num = ( ave_params.seqnum_start + ave_params.num_sequences - 1 ) dw_message = create_dw_message(ave_params) dw_comms_socket.send_pyobj(dw_message) # Send metadata to dw_comms_thread, so it can package into a message for data write # end of the averaging period loop - move onto the next averaging period. # Increment the sequence number by the number of sequences that were in this # averaging period. seqnum_start += ave_params.num_sequences if TIME_PROFILE: time_to_finish_aveperiod = ( datetime.now(timezone.utc) - avg_period_end_time ) log.verbose( "time to finish avg period", time=time_to_finish_aveperiod, time_unit="s", ) aveperiod.beam_iter += 1 if aveperiod.beam_iter == aveperiod.num_beams_in_scan: aveperiod.beam_iter = 0 scan_iter += 1 scan.aveperiod_iter += 1 if scan.aveperiod_iter == len(scan.aveperiods): scan.aveperiod_iter = 0
def radctrl_parser(): parser = argparse.ArgumentParser() parser.add_argument( "experiment_module", help="The name of the module in the experiment_prototype package that contains " "your Experiment class, e.g. normalscan", ) parser.add_argument( "scheduling_mode_type", help="The type of scheduling time for this experiment run, e.g. common, " "special, or discretionary.", ) parser.add_argument( "--embargo", action="store_true", help="Embargo the file (makes the CPID negative)", ) parser.add_argument( "--kwargs", nargs="+", default="", help="Keyword arguments for the experiment. Each must be formatted as kw=val", ) return parser if __name__ == "__main__": from utils import log_config log = log_config.log() log.info("RADAR_CONTROL BOOTED") try: args = radctrl_parser().parse_args() # parse kwargs and pass to experiment parsed_kwargs = {} for element in args.kwargs: kwarg = element.split("=") parsed_kwargs[kwarg[0]] = kwarg[1] main( args.experiment_module, args.scheduling_mode_type, args.embargo, **parsed_kwargs, ) log.info("RADAR_CONTROL EXITED") except Exception as main_exception: log.critical("RADAR_CONTROL CRASHED", error=main_exception) log.exception("RADAR_CONTROL CRASHED", exception=main_exception)