"""
radar_control process
~~~~~~~~~~~~~~~~~~~~~
Radar_control is the process that runs the radar (sends pulses to the driver with
timing information and sends processing information to the signal processing process).
Experiment_handler provides the experiment for radar_control to run. It iterates
through the interface_class_base objects to control the radar.
"""
import argparse
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from functools import reduce
import mmap
import os
import sys
import threading
import time
import zmq
import numpy as np
import posix_ipc as ipc
from utils.experiment_prototype import experiment_handler, retrieve_experiment
from utils.interface_classes.averaging_periods import CFSAveragingPeriod
from utils.options import Options
import utils.message_formats as messages
from utils import socket_operations as so
sys.path.append(os.environ["BOREALISPATH"])
TIME_PROFILE = False
[docs]
@dataclass
class CFSParameters:
"""
Parameters used to track clear frequency search data, each use
of this class should be linked to a unique aveperiod.
:param cfs_freq: list of frequencies sampled by CFS
:param cfs_mags: power measurements corresponding to cfs_freq, indexed by beam iterator
then by slice order in an aveperiod
:param cfs_range: lower and upper frequency bound of CFS
:param cfs_masks: mask of frequencies in cfs range that cannot be used for tx,
indexed by beam_iter, then slice_id
:param last_cfs_set_time: epoch time since a CFS scan was last run. Indexed by
beam iterator
:param beam_frequency: last frequency assigned to a slice on a beam. Indexed first
by the beam iterator, then by the slice_id
:param set_new_freq: boolean flag indicating if a new freq should be set for the
slices on a beam. Indexed by beam iterator
"""
cfs_freq: list
cfs_mags: dict
cfs_range: dict
cfs_masks: dict
last_cfs_set_time: dict
beam_frequency: dict
set_new_freq: dict
[docs]
@dataclass
class RadctrlParameters:
"""
Class holding parameters that are passed between processes during the radar
operation.
"""
experiment: ...
aveperiod: ...
sequence: ...
options: ...
decimation_scheme: ... = field(init=False)
seqnum_start: int
startup_flag: bool
num_beams: int = 0
num_sequences: int = 0
last_sequence_num: int = 0
sequence_index: int = 0
start_time: datetime = datetime.now(timezone.utc)
averaging_period_start_time: datetime = datetime.now(timezone.utc)
averaging_period_time: timedelta = timedelta(seconds=0)
debug_samples: list = field(default_factory=list)
pulse_transmit_data_tracker: dict = field(default_factory=dict)
pulse_buffer_offset: int = 0
slice_dict: dict = field(default_factory=dict, init=False)
cfs_scan_flag: bool = False
scan_flag: bool = False
dsp_cfs_identity: str = ""
router_address: str = ""
radctrl_cfs_identity: str = ""
dw_cfs_identity: str = ""
brian_to_radctrl_identity: str = ""
def __post_init__(self):
self.slice_dict = self.experiment.slice_dict
# Set slice_dict after an experiment has been assigned
if self.sequence:
self.decimation_scheme = self.sequence.decimation_scheme
else:
self.decimation_scheme = None
# define decimation scheme only is a sequence was defined
self.dsp_cfs_identity = self.options.dsp_cfs_identity
self.router_address = self.options.router_address
self.radctrl_cfs_identity = self.options.radctrl_cfs_identity
self.dw_cfs_identity = self.options.dw_cfs_identity
self.brian_to_radctrl_identity = self.options.brian_to_radctrl_identity
[docs]
def driver_comms_thread(radctrl_driver_iden, driver_socket_iden, router_addr):
"""
Thread for handling communication between radar_control and usrp_driver.
"""
driver_comms_socket = zmq.Context().instance().socket(zmq.PAIR)
driver_comms_socket.connect("inproc://radctrl_driver")
radctrl_driver_socket = so.create_sockets(router_addr, radctrl_driver_iden)
first_time = True
while True:
message = driver_comms_socket.recv_pyobj()
# Wait until the main thread sends a message intended for the driver
so.send_bytes(radctrl_driver_socket, driver_socket_iden, message)
# Send the message to the driver
if first_time:
so.recv_string(radctrl_driver_socket, driver_socket_iden, log)
# Driver sends back an ACK on the first communication
driver_comms_socket.send_string("Driver ready to go")
# Let the main thread know that the driver is ready to go
first_time = False
[docs]
def create_driver_message(radctrl_params, pulse_transmit_data, pulse_buffer):
"""
Format data for sending to usrp_driver via zeromq
:param radctrl_params: The current averaging period parameters dataclass.The parameters extracted are:
* ``sequence.txctrfreq`` - the transmit center frequency to tune to
* ``sequence.rxctrfreq`` - the receive center frequency to tune to. With rx_sample_rate from config.ini
file, this determines the received signal band
* ``experiment.txrate`` - the tx sampling rate (Hz)
* ``experiment.rxrate`` - the rx sampling rate (Hz)
* ``sequence.numberofreceivesamples`` - number of samples to receive at the rx_sample_rate from config.ini
file. This determines length of Scope Sync GPIO being high for this sequence
* ``sequence.seqtime`` - relative timing offset
* ``seqnum`` - the sequence number. This is a unique identifier for the sequence that is always increasing
with increasing sequences while radar_control is running. It is only reset when program restarts. It is
determined as ``seqnum_start`` + ``num_sequences``
* ``sequence.align_sequences`` - a boolean indicating whether to align the start of the sequence to a
clean tenth of a second.
:param pulse_transmit_data: dictionary of current transmit pulse data. The message extracts:
* ``samples_array`` - this is a list of length main_antenna_count from the config file. It contains one
numpy array of complex values per antenna. If the antenna will not be transmitted on, it contains a
numpy array of zeros of the same length as the rest. All arrays will have the same length according to
the pulse length
* ``startofburst`` - start of burst boolean, true for first pulse in sequence
* ``endofburst`` - end of burst boolean, true for last pulse in sequence
* ``timing`` - in us, the time past timezero to send this pulse. Timezero is the start of the sequence
* ``isarepeat`` - a boolean indicating whether the pulse is the exact same as the last pulse
in the sequence, in which case we will save the time and not send the samples list and other
params that will be the same
:return: The compiled ``DriverPacket`` message to send to the usrp driver
:rtype: dataclass
"""
message = messages.DriverPacket()
message.txcenterfreq = (
radctrl_params.slice_dict[0].txctrfreq * 1000
) # convert to Hz
message.rxcenterfreq = (
radctrl_params.slice_dict[0].rxctrfreq * 1000
) # convert to Hz
message.txrate = radctrl_params.experiment.txrate
message.rxrate = radctrl_params.experiment.rxrate
# If this is the first time the driver is being set-up, only send tx and rx rates and center frequencies
if radctrl_params.startup_flag:
return message.format_for_ipc()
message.sample_timing = pulse_transmit_data["timing"]
message.burst_start = int(pulse_transmit_data["startofburst"])
message.burst_end = int(pulse_transmit_data["endofburst"])
message.sequence_num = radctrl_params.seqnum_start + radctrl_params.num_sequences
message.seqtime = radctrl_params.sequence.seqtime
message.align_sequences = int(radctrl_params.sequence.align_sequences)
samples_array = pulse_transmit_data["samples_array"]
num_samps = samples_array.shape[1]
offset = radctrl_params.pulse_buffer_offset
if offset + num_samps >= pulse_buffer.shape[1]:
offset = 0
pulse_buffer[:, offset : offset + num_samps] = samples_array
message.buffer_offset = offset
message.num_tx_samps = num_samps
radctrl_params.pulse_buffer_offset = offset + num_samps
message.num_rx_samps = radctrl_params.sequence.numberofreceivesamples
return message.format_for_ipc()
[docs]
def dsp_comms_thread(radctrl_dsp_iden, dsp_socket_iden, router_addr):
"""
Thread for handling communication between radar_control and rx_signal_processing.
"""
inproc_socket = zmq.Context().instance().socket(zmq.PAIR)
inproc_socket.connect("inproc://radctrl_dsp")
radctrl_dsp_socket = so.create_sockets(router_addr, radctrl_dsp_iden)
while True:
message = inproc_socket.recv_pyobj()
# Wait for metadata from the main thread
so.send_pyobj(radctrl_dsp_socket, dsp_socket_iden, message)
# Send the message to rx_signal_processing
so.recv_string(radctrl_dsp_socket, dsp_socket_iden, log)
# Wait for rx_signal_processing to acknowledge that it received the metadata
inproc_socket.send_string("DSP acknowledged metadata")
# Let the main thread know that DSP has received the metadata
[docs]
def create_dsp_message(radctrl_params):
"""
Place data in the receiver packet and send it via zeromq to the signal processing unit and brian.
Happens every sequence.
:param radctrl_params: The radar control parameter dataclass updated during the averaging period. The message grabs:
* ``experiment.rxrate`` - The receiver sampling rate (Hz)
* ``experiment.output_rx_rate`` - The output sample rate desired for the output data (Hz)
* ``seqnum`` - the sequence number. This is a unique identifier for the sequence that is always increasing
with increasing sequences while radar_control is running. It is only reset when program restarts.
It is calculated from ``seqnum_start`` + ``num_sequences``
* ``sequence.slice_ids`` - The identifiers of the slices that are combined in this sequence.
These IDs tell us where to look in the beam dictionary and slice dictionary for frequency information
and beam direction information about this sequence to give to the signal processing unit
* ``sequence.slice_dict`` - The slice dictionary, which contains information about all slices and will be
referenced for information about the slices in this sequence. Namely, we get the frequency we want to
receive at, the number of ranges and the first range information
* ``beam_dict`` - The dictionary containing beam directions for each slice, generated using
``sequence.get_rx_phases(aveperiod.beam_iter)``
* ``sequence.seqtime`` - entire duration of sequence, including receive time after all transmissions
* ``sequence.first_rx_sample_start`` - The sample where the first rx sample will start relative to the
tx data
* ``sequence.rxctrfreq`` - the center frequency of receiving
* ``sequence.output_encodings`` - Phase offsets (degrees) applied to each pulse in the sequence
* ``decimation_scheme`` - object of type DecimationScheme that has all decimation and filtering data
* ``cfs_scan_flag`` - flag indicating of sequence is a clear frequency search rx only sequence
:return: The compiled ``SequenceMetadataMessage`` message to send to rx signal processing
:rtype: dataclass
"""
message = messages.SequenceMetadataMessage()
message.sequence_time = radctrl_params.sequence.seqtime
message.sequence_num = radctrl_params.seqnum_start + radctrl_params.num_sequences
message.offset_to_first_rx_sample = radctrl_params.sequence.first_rx_sample_start
message.rx_rate = radctrl_params.experiment.rxrate
message.output_sample_rate = radctrl_params.sequence.output_rx_rate
message.rx_ctr_freq = radctrl_params.sequence.rxctrfreq * 1.0e3
message.cfs_scan_flag = radctrl_params.cfs_scan_flag
message.acf = radctrl_params.sequence.acf
message.xcf = radctrl_params.sequence.xcf
message.acfint = radctrl_params.sequence.acfint
if radctrl_params.cfs_scan_flag:
message.cfs_fft_n = radctrl_params.aveperiod.cfs_fft_n
if radctrl_params.decimation_scheme is not None:
message.decimation_scheme = radctrl_params.decimation_scheme
slice_dict = radctrl_params.slice_dict
if radctrl_params.cfs_scan_flag:
beam_dict = dict()
for slice_id in radctrl_params.aveperiod.cfs_slice_ids:
# `slice_id` will correspond to a special CFS Sequence object in a CFSAveragingPeriod.
# Likewise with `radctrl_params.sequence`
if slice_dict[slice_id].rxonly:
# No TX - use RX phases instead for CFS beamforming
phase_dict = radctrl_params.sequence.get_rx_phases(
radctrl_params.aveperiod.beam_iter
)
main_phases = phase_dict[slice_id]["main"][0]
else:
# Use TX phases for CFS beamforming
beam_num = slice_dict[slice_id].tx_beam_order[
radctrl_params.aveperiod.beam_iter
]
tx_phases = radctrl_params.sequence.build_tx_phases(
slice_id,
slice_dict[slice_id],
np.average(slice_dict[slice_id].cfs_range),
)
main_phases = tx_phases[0, beam_num]
intf_phases = np.zeros(
len(slice_dict[slice_id].rx_intf_antennas), dtype=np.complex64
)
beam_dict[slice_id] = {
"main": np.array([main_phases]),
"intf": np.array([intf_phases]),
}
else:
beam_dict = radctrl_params.sequence.get_rx_phases(
radctrl_params.aveperiod.beam_iter
)
slice_dict = radctrl_params.sequence.slice_dict
for slice_id in radctrl_params.sequence.slice_ids:
rx_chan = messages.RxChannel(slice_id)
rx_chan.tau_spacing = slice_dict[slice_id].tau_spacing
# Send the translational frequencies to dsp in order to bandpass filter correctly.
freq_khz = slice_dict[slice_id].freq
if isinstance(freq_khz, list):
freq_khz = freq_khz[
slice_dict[slice_id].freq_order[radctrl_params.aveperiod.beam_iter]
]
rx_chan.rx_freq = freq_khz * 1.0e3
rx_chan.num_ranges = slice_dict[slice_id].num_ranges
rx_chan.first_range = slice_dict[slice_id].first_range
rx_chan.range_sep = slice_dict[slice_id].range_sep
rx_chan.rx_intf_antennas = slice_dict[slice_id].rx_intf_antennas
rx_chan.pulses = slice_dict[slice_id].pulse_sequence
rx_chan.acf = slice_dict[slice_id].acf
rx_chan.xcf = slice_dict[slice_id].xcf
rx_chan.acfint = slice_dict[slice_id].acfint
main_bms = beam_dict[slice_id]["main"]
intf_bms = beam_dict[slice_id]["intf"]
# Combine main and intf such that for a given beam all main phases come first.
beams = np.hstack((main_bms, intf_bms))
rx_chan.beam_phases = np.array(beams)
for lag in slice_dict[slice_id].lag_table:
lag_add = messages.Lag(lag[0], lag[1], int(lag[1] - lag[0]))
# Get the phase offset for this pulse combination
pulse_phase_offsets = radctrl_params.sequence.output_encodings
if len(pulse_phase_offsets[slice_id]) != 0:
pulse_phase_offset = pulse_phase_offsets[slice_id][-1]
lag0_idx = slice_dict[slice_id].pulse_sequence.index(lag[0])
lag1_idx = slice_dict[slice_id].pulse_sequence.index(lag[1])
phase_in_rad = np.radians(
pulse_phase_offset[lag0_idx] - pulse_phase_offset[lag1_idx]
)
phase_offset = np.exp(1j * np.array(phase_in_rad, np.float32))
# Catch case where no pulse phase offsets are specified
else:
phase_offset = 1.0 + 0.0j
lag_add.phase_offset_real = np.real(phase_offset)
lag_add.phase_offset_imag = np.imag(phase_offset)
rx_chan.lags.append(lag_add)
message.rx_channels.append(rx_chan)
return message
[docs]
def make_next_samples(radctrl_params):
sqn = radctrl_params.sequence.make_sequence(
radctrl_params.aveperiod.beam_iter,
radctrl_params.num_sequences + len(radctrl_params.aveperiod.sequences),
)
radctrl_params.pulse_transmit_data_tracker[radctrl_params.sequence_index][
radctrl_params.num_sequences + len(radctrl_params.aveperiod.sequences)
] = sqn
if TIME_PROFILE:
new_sequence_time = datetime.now(timezone.utc) - radctrl_params.start_time
log.verbose(
"make new sequence time",
time=new_sequence_time,
time_unit="s",
)
[docs]
def dw_comms_thread(radctrl_dw_iden, dw_socket_iden, router_addr):
"""
Thread for handling communication between radar_control and data_write.
"""
inproc_socket = zmq.Context().instance().socket(zmq.PAIR)
inproc_socket.connect("inproc://radctrl_dw")
radctrl_dw_socket = so.create_sockets(router_addr, radctrl_dw_iden)
while True:
message = inproc_socket.recv_pyobj()
# Wait for metadata from the main thread
so.send_pyobj(radctrl_dw_socket, dw_socket_iden, message)
# Send the message to data write
[docs]
def create_dw_message(radctrl_params):
"""
Send the metadata about this averaging period to datawrite so that it can be recorded.
:param radctrl_params: The radar control parameter dataclass updated during the aveperiod. The message grabs:
* ``seqnum`` - The last sequence number (identifier) that is valid for this averaging period. Used
to verify and synchronize driver, dsp, datawrite. Calculated with ``seqnum_start`` + ``num_sequences``
* ``num_sequences``- The number of sequences that were sent in this averaging period. (number of
sequences to average together)
* ``scan_flag`` - True if this averaging period is the first in a scan
* ``averaging_period_time`` - The time that expired during this averaging period
* ``aveperiod.sequences`` - The sequences of class Sequence for this averaging period (AveragingPeriod)
* ``aveperiod.beam_iter`` - The beam iterator of this averaging period
* ``experiment.cpid`` - the ID of the experiment that is running
* ``experiment.experiment_name`` - the experiment name to be placed in the data files
* ``experiment.scheduling_mode`` - the type of scheduling mode running at this time, to write to file
* ``experiment.output_rx_rate`` - The output sample rate of the output data, defined by the
experiment, in Hz
* ``experiment.comment_string`` - The comment string for the experiment, user-defined
* ``decimation_scheme.filter_scaling_factors`` - The decimation scheme scaling factors used for the
experiment, to get the scaling for the data for accurate power measurements between experiments
* ``experiment.slice_dict[0].rxctrfreq`` - The receive center frequency (kHz)
* ``debug_samples`` - the debug samples for this averaging period, to be written to the
file if debug is set. This is a list of dictionaries for each Sequence in the
AveragingPeriod. The dictionary is set up in the sample_building module function
create_debug_sequence_samples. The keys are ``txrate``, ``txctrfreq``, ``pulse_timing`,
``pulse_sample_start``, ``sequence_samples``, ``decimated_sequence``, and ``dmrate``.
The ``sequence_samples`` and ``decimated_samples`` values are themselves dictionaries, where the
keys are the antenna numbers (there is a sample set for each transmit antenna)
:return: The compiled ``AveperiodMetadataMessage`` message to send to data write
:rtype: dataclass
"""
message = messages.AveperiodMetadataMessage()
message.experiment_id = radctrl_params.experiment.cpid
message.experiment_name = radctrl_params.experiment.experiment_name
message.experiment_comment = radctrl_params.experiment.comment_string
message.rx_ctr_freq = radctrl_params.experiment.slice_dict[0].rxctrfreq
if isinstance(radctrl_params.aveperiod, CFSAveragingPeriod):
message.num_sequences = (
radctrl_params.num_sequences - 1
) # first sequence was CFS
message.cfs_freqs = radctrl_params.aveperiod.cfs_freq
message.cfs_noise = [
x[radctrl_params.aveperiod.beam_iter]
for x in radctrl_params.aveperiod.cfs_mags.values()
]
message.cfs_range = radctrl_params.aveperiod.cfs_range
message.cfs_masks = [
x[radctrl_params.aveperiod.beam_iter]
for x in radctrl_params.aveperiod.cfs_masks.values()
]
message.cfs_slice_ids = radctrl_params.aveperiod.cfs_slice_ids
else:
message.num_sequences = radctrl_params.num_sequences
message.last_sqn_num = radctrl_params.last_sequence_num
message.scan_flag = radctrl_params.scan_flag
message.aveperiod_time = radctrl_params.averaging_period_time.total_seconds()
message.input_sample_rate = radctrl_params.experiment.rxrate
message.data_normalization_factor = reduce(
lambda x, y: x * y, radctrl_params.decimation_scheme.filter_scaling_factors
) # multiply all
message.scheduling_mode = radctrl_params.experiment.scheduling_mode
for sequence_index, sequence in enumerate(radctrl_params.aveperiod.sequences):
sequence_add = messages.Sequence()
sequence_add.blanks = sequence.blanks
sequence_add.output_sample_rate = sequence.output_rx_rate
for slice_id in sequence.slice_ids:
sqn_slice = sequence.slice_dict[slice_id]
freq_idx = sqn_slice.freq_order[radctrl_params.aveperiod.beam_iter]
rxchannel = messages.RxChannelMetadata()
rxchannel.slice_id = slice_id
rxchannel.slice_comment = sqn_slice.comment
rxchannel.interfacing = "{}".format(sqn_slice.slice_interfacing)
rxchannel.rx_only = sqn_slice.rxonly
rxchannel.pulse_len = sqn_slice.pulse_len
rxchannel.tau_spacing = sqn_slice.tau_spacing
freq_khz = sqn_slice.freq
if isinstance(freq_khz, list):
freq_khz = freq_khz[freq_idx]
rxchannel.rx_freq = freq_khz
rxchannel.ptab = sqn_slice.pulse_sequence
# We always build one sequence in advance, so we trim the last one from when radar
# control stops processing the averaging period.
for encoding in sequence.output_encodings[slice_id][
: radctrl_params.num_sequences
]:
rxchannel.sequence_encodings.append(encoding.flatten().tolist())
sequence.output_encodings[slice_id] = []
rxchannel.rx_main_antennas = sqn_slice.rx_main_antennas
rxchannel.rx_intf_antennas = sqn_slice.rx_intf_antennas
rxchannel.tx_antennas = sqn_slice.tx_antennas
rxchannel.tx_excitations = sequence.tx_main_phase_shifts[slice_id][
freq_idx, sqn_slice.tx_beam_order[radctrl_params.aveperiod.beam_iter]
]
beams = sqn_slice.rx_beam_order[radctrl_params.aveperiod.beam_iter]
if isinstance(beams, int):
beams = [beams]
rx_main_excitations = []
rx_intf_excitations = []
for beam in beams:
beam_add = messages.Beam(sqn_slice.beam_angle[beam], beam)
rxchannel.beams.append(beam_add)
rx_main_excitations.append(
sequence.rx_beam_phases[slice_id]["main"][
freq_idx, beam, sequence.rx_main_antenna_indices[slice_id]
]
)
rx_intf_excitations.append(
sequence.rx_beam_phases[slice_id]["intf"][
freq_idx, beam, sequence.rx_intf_antenna_indices[slice_id]
]
)
rxchannel.rx_main_excitations = np.array(
rx_main_excitations, dtype=np.complex64
)
rxchannel.rx_intf_excitations = np.array(
rx_intf_excitations, dtype=np.complex64
)
rxchannel.first_range = float(sqn_slice.first_range)
rxchannel.num_ranges = sqn_slice.num_ranges
rxchannel.range_sep = sqn_slice.range_sep
if sqn_slice.acf:
rxchannel.acf = sqn_slice.acf
rxchannel.xcf = sqn_slice.xcf
rxchannel.acfint = sqn_slice.acfint
for lag in sqn_slice.lag_table:
lag_add = messages.LagTable(lag, int(lag[1] - lag[0]))
rxchannel.ltabs.append(lag_add)
rxchannel.averaging_method = sqn_slice.averaging_method
sequence_add.rx_channels.append(rxchannel)
message.sequences.append(sequence_add)
return message
[docs]
def round_up_time(dt=None, round_to=60):
"""
Round a datetime object to any time-lapse in seconds
:param dt: datetime object, default now.
:param round_to: Closest number of seconds to round to, default 1 minute.
:author: Thierry Husson 2012 - Use it as you want but don't blame me.
:modified: K.Kotyk 2019
Will round to the nearest minute mark. Adds one minute if rounded down.
"""
if dt is None:
dt = datetime.now(timezone.utc)
midnight = dt.replace(hour=0, minute=0, second=0)
seconds = (dt - midnight).seconds
rounding = (seconds + round_to / 2) // round_to * round_to
result = dt + timedelta(0, rounding - seconds, -dt.microsecond)
if result < dt:
result += timedelta(minutes=1)
return result
[docs]
def run_cfs_scan(radctrl_params, sockets, pulse_buffer):
radctrl_process_socket = sockets[0]
radctrl_brian_socket = sockets[1]
to_driver_inproc_sock = sockets[2]
dsp_socket_iden = radctrl_params.dsp_cfs_identity
router_addr = radctrl_params.router_address
cfs_socket = so.create_sockets(router_addr, radctrl_params.radctrl_cfs_identity)
aveperiod = radctrl_params.aveperiod
sequence = aveperiod.cfs_sequence
sqn = sequence.make_sequence(aveperiod.beam_iter, 0)
# todo: modify the sqn_num to avoid dimension mismatch?
cfs_sqn_num = radctrl_params.seqnum_start + radctrl_params.num_sequences
so.send_pyobj(cfs_socket, radctrl_params.dw_cfs_identity, cfs_sqn_num)
brian_socket_iden = radctrl_params.brian_to_radctrl_identity
so.recv_string(radctrl_brian_socket, brian_socket_iden, log)
# The above call is blocking; will wait until brian says to start
for pulse_transmit_data in sqn:
driver_message = create_driver_message(
radctrl_params, pulse_transmit_data, pulse_buffer
)
to_driver_inproc_sock.send_pyobj(driver_message)
radctrl_params.cfs_scan_flag = True
dsp_message = create_dsp_message(radctrl_params)
radctrl_process_socket.send_pyobj(dsp_message)
# Send metadata to dsp_comms_thread, so it can package into a message for rx_signal_processing
so.send_pyobj(radctrl_brian_socket, brian_socket_iden, dsp_message)
# Send the metadata along to brian
freq_data = so.recv_pyobj(
cfs_socket,
dsp_socket_iden,
log,
expected_type=messages.ProcessedSequenceMessage,
)
radctrl_process_socket.recv_string()
# Consume the DSP ACK that dsp_comms_thread will send
return freq_data
[docs]
def cfs_block(ave_params, cfs_sockets, pulse_buffer):
aveperiod = ave_params.aveperiod
beam = aveperiod.beam_iter
if not (
aveperiod.last_cfs_set_time[beam]
< datetime.now(timezone.utc) - timedelta(seconds=aveperiod.cfs_stable_time)
or aveperiod.cfs_always_run
):
return
aveperiod.last_cfs_set_time[beam] = datetime.now(timezone.utc)
# Only let CFS run after the user set stable time has
# passed to prevent CFS from switching freqs too quickly
ave_params.sequence = aveperiod.cfs_sequence
ave_params.decimation_scheme = aveperiod.cfs_sequence.decimation_scheme
processed_cfs_packet = run_cfs_scan(ave_params, cfs_sockets, pulse_buffer)
ave_params.num_sequences += 1
ave_params.cfs_scan_flag = False
aveperiod.cfs_freq = processed_cfs_packet.cfs_freq
for ind, dset in enumerate(processed_cfs_packet.output_datasets):
aveperiod.cfs_mags[aveperiod.cfs_slice_ids[ind]][beam] = dset.cfs_data
if (
not any([x[beam] for x in aveperiod.set_new_freq.values()])
and aveperiod.cfs_pwr_threshold is not None
):
# If using a user set power threshold to trigger CFS freq setting, check if any
# power related conditions are triggered and set the corresponding flag
aveperiod.check_update_freq(
processed_cfs_packet,
aveperiod.cfs_slice_ids,
aveperiod.cfs_pwr_threshold,
beam,
)
if not (
any([x[beam] for x in aveperiod.set_new_freq.values()])
or aveperiod.cfs_pwr_threshold is None
):
# Return if the frequency does not need to be changed.
return
# If using a power threshold and one of the power conditions were
# triggered, or if not using a power threshold, set the CFS params
slice_masks, last_set_cfs = aveperiod.select_cfs_freqs(processed_cfs_packet)
for slice_id in aveperiod.cfs_slice_ids:
aveperiod.set_new_freq[slice_id][beam] = False
aveperiod.cfs_masks[slice_id][beam] = slice_masks[slice_id]
aveperiod.beam_frequency[slice_id][beam] = last_set_cfs[slice_id]
[docs]
def main(exp_name, scheduling_mode, embargo, **kwargs):
"""
Run the radar with the experiment supplied by experiment_handler.
Receives an instance of an experiment. Iterates through the Scans,
AveragingPeriods, Sequences, and pulses of the experiment.
For every pulse, samples and other control information are sent to the n200_driver.
For every pulse sequence, processing information is sent to the signal processing
block.
After every averaging period, the experiment block is given the
opportunity to change the experiment (not currently implemented). If a new
experiment is sent, radar will halt the old one and begin with the new experiment.
"""
# Get config options
options = Options()
# Setup sockets
# Socket to send pulse samples over
# TODO test: need to make sure that we know that all sockets are set up after this try...except block.
# TODO test: starting the programs in different orders.
try:
radctrl_brian_socket = so.create_sockets(
options.router_address, options.radctrl_to_brian_identity
)
except zmq.ZMQBaseError as e:
log.error("zmq failed setting up sockets", error=e)
log.exception("zmq failed setting up sockets", exception=e)
sys.exit(1)
# Sockets for thread communication
radctrl_inproc_socket = zmq.Context().instance().socket(zmq.PAIR)
radctrl_inproc_socket.bind("inproc://radctrl_dsp")
driver_comms_socket = zmq.Context().instance().socket(zmq.PAIR)
driver_comms_socket.bind("inproc://radctrl_driver")
dw_comms_socket = zmq.Context().instance().socket(zmq.PAIR)
dw_comms_socket.bind("inproc://radctrl_dw")
# seqnum is used as an identifier in all packets while radar is running so set it up here.
# seqnum will get increased by num_sequences (number of averages or sequences in the averaging period)
# at the end of every averaging period.
seqnum_start = 0
# Wait for experiment handler at the start until we have an experiment to run.
exp_class = retrieve_experiment(exp_name)
experiment = experiment_handler(exp_class, scheduling_mode, embargo, **kwargs)
# Flag for starting the radar on the minute boundary
wait_for_first_scanbound = experiment.slice_dict.get("wait_for_first_scanbound")
# Send driver initial setup data - rates and center frequency from experiment.
# Wait for acknowledgment that USRP object is set up.
start_up_params = RadctrlParameters(
experiment, None, None, options, seqnum_start, True
)
driver_start_message = create_driver_message(start_up_params, None, None)
dsp_comms = threading.Thread(
target=dsp_comms_thread,
args=(
options.radctrl_to_dsp_identity,
options.dsp_to_radctrl_identity,
options.router_address,
),
)
driver_comms = threading.Thread(
target=driver_comms_thread,
args=(
options.radctrl_to_driver_identity,
options.driver_to_radctrl_identity,
options.router_address,
),
)
dw_comms = threading.Thread(
target=dw_comms_thread,
args=(
options.radctrl_to_dw_identity,
options.dw_to_radctrl_identity,
options.router_address,
),
)
dsp_comms.start()
driver_comms.start()
dw_comms.start()
driver_comms_socket.send_pyobj(driver_start_message)
driver_comms_socket.recv_string()
# Wait until driver acknowledges that it is good to start
shm = ipc.SharedMemory(options.pulse_buffer_name)
mapped_mem = mmap.mmap(shm.fd, shm.size)
pulse_buffer = np.ndarray(
(len(options.tx_main_antennas), options.pulse_buffer_size),
buffer=mapped_mem,
dtype=np.complex64,
)
first_aveperiod = True
next_scan_start = None
while True:
# This loops through all scans in an experiment, or restarts this loop if a new experiment occurs.
# TODO : further documentation throughout in comments (high level) and in separate documentation.
# Iterate through Scans, AveragingPeriods, Sequences, Pulses.
# Start anew on first scan if we have a new experiment.
for scan_num, scan in enumerate(experiment.scan_objects):
log.debug("scan number", scan_num=scan_num)
# Scan iter is the iterator through the scanbound or through the number of averaging periods in the scan
if (
first_aveperiod
and scan.scanbound is not None
and not wait_for_first_scanbound
):
# On first integration, determine current averaging period and set scan_iter to it
now = datetime.now(timezone.utc)
current_minute = now.replace(second=0, microsecond=0)
scan_iter = next(
(
i
for i, v in enumerate(scan.scanbound)
if current_minute + timedelta(seconds=v) > now
),
0,
)
else:
# Otherwise start at first averaging period
scan_iter = 0
if scan.scanbound:
if scan.align_scan_to_beamorder:
for aveperiod in scan.aveperiods:
# Always align first beam at start of scan
aveperiod.beam_iter = 0
# Find the start of the next scan with a scanbound so we can determine time remaining for end of scan
next_scanbound = None
next_scan_num = scan_num
while next_scanbound is None:
next_scan_num += 1
if next_scan_num == len(experiment.scan_objects):
next_scan_num = 0
next_scanbound = experiment.scan_objects[next_scan_num].scanbound
if first_aveperiod:
# On the very first averaging period of Borealis starting, calculate the start minute
# align scanbound reference time to find when to start
now = datetime.now(timezone.utc)
dt = now.replace(second=0, microsecond=0)
if dt + timedelta(seconds=scan.scanbound[scan_iter]) >= now:
start_minute = dt
else:
start_minute = round_up_time(now)
else:
# At the start of a scan object that has scanbound, recalculate the start minute to the
# previously calculated next_scan_start
start_minute = next_scan_start.replace(second=0, microsecond=0)
# Find the modulus of the number of aveperiod times to run in the scan and the number
# of AvePeriod classes. The classes will be alternated so we can determine which class
# will be running at the end of the scan.
index_of_last_aveperiod_in_scan = (
scan.num_aveperiods_in_scan + scan.aveperiod_iter
) % len(scan.aveperiods)
last_aveperiod_intt = scan.aveperiods[
index_of_last_aveperiod_in_scan
].intt
# A scanbound necessitates intt
end_of_scan = (
start_minute
+ timedelta(seconds=scan.scanbound[-1])
+ timedelta(seconds=last_aveperiod_intt * 1e-3)
)
end_minute = end_of_scan.replace(second=0, microsecond=0)
if end_minute + timedelta(seconds=next_scanbound[0]) >= end_of_scan:
next_scan_start = end_minute + timedelta(seconds=next_scanbound[0])
else:
next_scan_start = round_up_time(end_of_scan) + timedelta(
seconds=next_scanbound[0]
)
while scan_iter < scan.num_aveperiods_in_scan:
# If there are multiple aveperiods in a scan they are alternated (AVEPERIOD interfaced)
aveperiod = scan.aveperiods[scan.aveperiod_iter]
if TIME_PROFILE:
time_start_of_aveperiod = datetime.now(timezone.utc)
log.debug("new averaging period")
# All phases are set up for this averaging period for the beams required.
# Time to start averaging in the below loop.
if not scan.scanbound:
averaging_period_start_time = datetime.now(timezone.utc) # ms
log.verbose(
"averaging period start time",
time=averaging_period_start_time,
time_unit="s",
)
if aveperiod.intt is not None:
intt_break = True
if scan.scanbound:
# Calculate scan start time. First beam in the sequence will likely
# be ready to go if the first scan aligns directly to the minute. The
# rest will need to wait until their boundary time is up.
beam_scanbound = start_minute + timedelta(
seconds=scan.scanbound[scan_iter]
)
time_diff = beam_scanbound - datetime.now(timezone.utc)
if time_diff.total_seconds() > 0:
if first_aveperiod:
log.verbose(
"seconds to next avg period",
time=time_diff.total_seconds(),
time_unit="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
else:
log.debug(
"seconds to next avg period",
time=time_diff.total_seconds(),
time_unit="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
# TODO: reduce sleep if we want to use GPS timestamped transmissions
time.sleep(time_diff.total_seconds())
else:
# TODO: This will be wrong if the start time is in the past.
# TODO: maybe use datetime.now(timezone.utc) like below
# instead of beam_scanbound when the avg period should have
# started?
log.debug(
"expected avg period start time",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
averaging_period_start_time = datetime.now(timezone.utc)
log.verbose(
"avg period start time",
time=averaging_period_start_time,
time_unit="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
# Here we find how much system time has elapsed to find the true amount
# of time we can integrate for this scan boundary. We can then see if
# we have enough time left to run the averaging period.
time_elapsed = averaging_period_start_time - start_minute
if scan_iter < len(scan.scanbound) - 1:
scanbound_time = scan.scanbound[scan_iter + 1]
# TODO: scanbound_time could be in the past if system has taken
# too long, perhaps calculate which 'beam' (scan_iter) instead by
# rewriting this code for an experiment-wide scanbound attribute instead
# of individual scanbounds inside the scan objects
# TODO: if scan_iter skips ahead, aveperiod.beam_iter may also need to
# if scan.align_to_beamorder is True
bound_time_remaining = (
scanbound_time - time_elapsed.total_seconds()
)
else:
bound_time_remaining = (
next_scan_start - averaging_period_start_time
)
bound_time_remaining = bound_time_remaining.total_seconds()
log.verbose(
"bound time remaining",
time=bound_time_remaining,
time_unit="s",
scan_num=scan_num,
scan_iter=scan_iter, # scan_iter is averaging period number for some reason
beam_scanbound=beam_scanbound,
)
if bound_time_remaining < aveperiod.intt * 1e-3:
# Reduce the averaging period to only the time remaining until the next scan boundary
# TODO: Check for bound_time_remaining > 0
# to be sure there is actually time to run this intt
# (if bound_time_remaining < 0, we need a solution to reset)
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=bound_time_remaining * 1e3)
)
else:
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=aveperiod.intt)
)
else: # No scanbound for this scan
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=aveperiod.intt)
)
else: # intt does not exist, therefore using intn
intt_break = False
ending_number_of_sequences = aveperiod.intn # this will exist
msg = {
x: y[aveperiod.beam_iter]
for x, y in aveperiod.slice_to_beamorder.items()
}
log.verbose("avg period slice and beam number", slice_and_beam=msg)
if TIME_PROFILE:
aveperiod_prep_time = (
datetime.now(timezone.utc) - time_start_of_aveperiod
)
log.verbose(
"time to prep aveperiod",
time=aveperiod_prep_time,
time_unit="",
)
# Time to start averaging in the below loop
ave_params = RadctrlParameters(
experiment,
aveperiod,
aveperiod.sequences[0],
options,
seqnum_start,
False,
num_beams=aveperiod.num_beams_in_scan,
)
time_remains = True
while time_remains:
if ave_params.num_sequences == 0 and isinstance(
aveperiod, CFSAveragingPeriod
):
cfs_time = time.time()
cfs_block(
ave_params,
[
radctrl_inproc_socket,
radctrl_brian_socket,
driver_comms_socket,
],
pulse_buffer,
)
aveperiod.update_cfs_freqs() # always update, to use correct freq for beam
log.verbose("CFS block run time", time=time.time() - cfs_time)
for sequence_index, sequence in enumerate(aveperiod.sequences):
# Alternating sequences if there are multiple in the averaging_period
ave_params.start_time = datetime.now(timezone.utc)
ave_params.sequence = sequence
ave_params.sequence_index = sequence_index
if intt_break:
if ave_params.start_time >= averaging_period_done_time:
time_remains = False
ave_params.averaging_period_time = (
ave_params.start_time - averaging_period_start_time
)
break
else: # Break at a certain number of sequences
if ave_params.num_sequences == ending_number_of_sequences:
time_remains = False
ave_params.averaging_period_time = (
ave_params.start_time - averaging_period_start_time
)
break
# On first sequence, we make the first set of samples
if sequence_index not in ave_params.pulse_transmit_data_tracker:
ave_params.pulse_transmit_data_tracker[sequence_index] = {}
sqn = sequence.make_sequence(
aveperiod.beam_iter, ave_params.num_sequences
)
ave_params.pulse_transmit_data_tracker[sequence_index][
ave_params.num_sequences
] = sqn
ave_params.decimation_scheme = sequence.decimation_scheme
# This can happen simultaneously
threads = [
threading.Thread(target=make_next_samples(ave_params))
]
for thread in threads:
thread.daemon = True
thread.start()
for (
pulse_transmit_data
) in ave_params.pulse_transmit_data_tracker[sequence_index][
ave_params.num_sequences
]:
message_create = time.perf_counter()
driver_message = create_driver_message(
ave_params, pulse_transmit_data, pulse_buffer
)
log.debug(
"driver_message creation time",
duration=(time.perf_counter() - message_create) * 1000,
units="ms",
)
driver_comms_socket.send_pyobj(driver_message)
so.recv_string(
radctrl_brian_socket, options.brian_to_radctrl_identity, log
)
# The above call is blocking; will wait until brian says to start
dsp_message = create_dsp_message(ave_params)
radctrl_inproc_socket.send_pyobj(dsp_message)
# Send metadata to dsp_comms_thread, so it can pass it to rx_signal_processing
so.send_pyobj(
radctrl_brian_socket,
options.brian_to_radctrl_identity,
dsp_message,
)
# Send the metadata along to brian
for thread in threads:
thread.join()
radctrl_inproc_socket.recv_string()
# wait for dsp to signal that it received the metadata
ave_params.num_sequences += 1
if first_aveperiod:
first_aveperiod = False
# Sequence is done
if __debug__:
time.sleep(1)
if TIME_PROFILE:
avg_period_end_time = datetime.now(timezone.utc)
log.verbose(
"avg period end time",
time=avg_period_end_time,
time_unit="s",
)
log.info(
"aveperiod done",
num_sequences=ave_params.num_sequences,
slice_ids=aveperiod.slice_ids,
)
if scan.aveperiod_iter == 0 and aveperiod.beam_iter == 0:
# This is the first averaging period in the scan object.
# if scanbound is aligned to beamorder, the scan_iter will also = 0 at this point.
ave_params.scan_flag = True
else:
ave_params.scan_flag = False
ave_params.last_sequence_num = (
ave_params.seqnum_start + ave_params.num_sequences - 1
)
dw_message = create_dw_message(ave_params)
dw_comms_socket.send_pyobj(dw_message)
# Send metadata to dw_comms_thread, so it can package into a message for data write
# end of the averaging period loop - move onto the next averaging period.
# Increment the sequence number by the number of sequences that were in this
# averaging period.
seqnum_start += ave_params.num_sequences
if TIME_PROFILE:
time_to_finish_aveperiod = (
datetime.now(timezone.utc) - avg_period_end_time
)
log.verbose(
"time to finish avg period",
time=time_to_finish_aveperiod,
time_unit="s",
)
aveperiod.beam_iter += 1
if aveperiod.beam_iter == aveperiod.num_beams_in_scan:
aveperiod.beam_iter = 0
scan_iter += 1
scan.aveperiod_iter += 1
if scan.aveperiod_iter == len(scan.aveperiods):
scan.aveperiod_iter = 0
def radctrl_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
"experiment_module",
help="The name of the module in the experiment_prototype package that contains "
"your Experiment class, e.g. normalscan",
)
parser.add_argument(
"scheduling_mode_type",
help="The type of scheduling time for this experiment run, e.g. common, "
"special, or discretionary.",
)
parser.add_argument(
"--embargo",
action="store_true",
help="Embargo the file (makes the CPID negative)",
)
parser.add_argument(
"--kwargs",
nargs="+",
default="",
help="Keyword arguments for the experiment. Each must be formatted as kw=val",
)
return parser
if __name__ == "__main__":
from utils import log_config
log = log_config.log()
log.info("RADAR_CONTROL BOOTED")
try:
args = radctrl_parser().parse_args()
# parse kwargs and pass to experiment
parsed_kwargs = {}
for element in args.kwargs:
kwarg = element.split("=")
parsed_kwargs[kwarg[0]] = kwarg[1]
main(
args.experiment_module,
args.scheduling_mode_type,
args.embargo,
**parsed_kwargs,
)
log.info("RADAR_CONTROL EXITED")
except Exception as main_exception:
log.critical("RADAR_CONTROL CRASHED", error=main_exception)
log.exception("RADAR_CONTROL CRASHED", exception=main_exception)