Source code for src.radar_control
#!/usr/bin/env python3
"""
radar_control process
~~~~~~~~~~~~~~~~~~~~~
Radar_control is the process that runs the radar (sends pulses to the driver with
timing information and sends processing information to the signal processing process).
Experiment_handler provides the experiment for radar_control to run. It iterates
through the interface_class_base objects to control the radar.
:copyright: 2018 SuperDARN Canada
:author: Marci Detwiller
"""
import os
import sys
import time
from datetime import datetime, timedelta
import zmq
import pickle
import threading
import numpy as np
from functools import reduce
from experiment_prototype.experiment_prototype import ExperimentPrototype
from utils.options import Options
import utils.message_formats as messages
from utils import socket_operations
sys.path.append(os.environ["BOREALISPATH"])
if __debug__:
from build.debug.src.utils.protobuf.driverpacket_pb2 import DriverPacket
else:
from build.release.src.utils.protobuf.driverpacket_pb2 import DriverPacket
TIME_PROFILE = False
[docs]def setup_driver(
radctrl_to_driver, driver_to_radctrl_iden, txctrfreq, rxctrfreq, txrate, rxrate
):
"""
First packet sent to driver for setup.
:param radctrl_to_driver: the sender socket for sending the driverpacket
:param driver_to_radctrl_iden: the receiver socket identity on the driver side
:param txctrfreq: the transmit center frequency to tune to, kHz.
:param rxctrfreq: the receive center frequency to tune to. With rx_sample_rate from config.ini file, this
determines the received signal band, kHz.
:param txrate: the tx sampling rate (Hz).
:param rxrate: the rx sampling rate (Hz).
"""
driverpacket = DriverPacket()
driverpacket.txcenterfreq = txctrfreq * 1000 # convert to Hz
driverpacket.rxcenterfreq = rxctrfreq * 1000 # convert to Hz
driverpacket.txrate = txrate
driverpacket.rxrate = rxrate
socket_operations.send_pulse(
radctrl_to_driver, driver_to_radctrl_iden, driverpacket.SerializeToString()
)
socket_operations.recv_data(radctrl_to_driver, driver_to_radctrl_iden, log)
[docs]def data_to_driver(
radctrl_to_driver,
driver_to_radctrl_iden,
samples_array,
txctrfreq,
rxctrfreq,
txrate,
rxrate,
numberofreceivesamples,
seqtime,
SOB,
EOB,
timing,
seqnum,
align_sequences,
repeat=False,
):
"""
Place data in the driver packet and send it via zeromq to the driver.
:param radctrl_to_driver: the sender socket for sending the driverpacket
:param driver_to_radctrl_iden: the reciever socket identity on the driver side
:param samples_array: this is a list of length main_antenna_count from the config file. It contains one
numpy array of complex values per antenna. If the antenna will not be transmitted on, it contains a
numpy array of zeros of the same length as the rest. All arrays will have the same length according to
the pulse length.
:param txctrfreq: the transmit center frequency to tune to.
:param rxctrfreq: the receive center frequency to tune to. With rx_sample_rate from config.ini file, this
determines the received signal band.
:param txrate: the tx sampling rate (Hz).
:param rxrate: the rx sampling rate (Hz).
:param numberofreceivesamples: number of samples to receive at the rx_sample_rate from config.ini file. This
determines length of Scope Sync GPIO being high for this sequence.
:param seqtime: relative timing offset
:param SOB: start of burst boolean, true for first pulse in sequence.
:param EOB: end of burst boolean, true for last pulse in sequence.
:param timing: in us, the time past timezero to send this pulse. Timezero is the start of the sequence.
:param seqnum: the sequence number. This is a unique identifier for the sequence that is always increasing
with increasing sequences while radar_control is running. It is only reset when program restarts.
:param align_sequences: a boolean indicating whether to align the start of the sequence to a clean tenth
of a second.
:param repeat: a boolean indicating whether the pulse is the exact same as the last pulse
in the sequence, in which case we will save the time and not send the samples list and other
params that will be the same.
"""
driverpacket = DriverPacket()
driverpacket.timetosendsamples = timing
driverpacket.SOB = SOB
driverpacket.EOB = EOB
driverpacket.sequence_num = seqnum
driverpacket.numberofreceivesamples = numberofreceivesamples
driverpacket.seqtime = seqtime
driverpacket.align_sequences = align_sequences
if repeat:
# antennas empty
# samples empty
# ctrfreq empty
# rxrate and txrate empty
log.debug("repeat timing", timing=timing, sob=SOB, eob=EOB)
else:
# Setup data to send to driver for transmit.
for ant_idx in range(samples_array.shape[0]):
sample_add = driverpacket.channel_samples.add()
# Add one Samples message for each channel possible in config.
# Any unused channels will be sent zeros.
# Protobuf expects types: int, long, or float, will reject numpy types and
# throw a TypeError so we must convert the numpy arrays to lists
sample_add.real.extend(samples_array[ant_idx, :].real.tolist())
sample_add.imag.extend(samples_array[ant_idx, :].imag.tolist())
driverpacket.txcenterfreq = txctrfreq * 1000 # convert to Hz
driverpacket.rxcenterfreq = rxctrfreq * 1000 # convert to Hz
driverpacket.txrate = txrate
driverpacket.rxrate = rxrate
driverpacket.numberofreceivesamples = numberofreceivesamples
log.debug("non-repeat timing", timing=timing, sob=SOB, eob=EOB)
socket_operations.send_pulse(
radctrl_to_driver, driver_to_radctrl_iden, driverpacket.SerializeToString()
)
[docs]def send_dsp_metadata(
radctrl_to_dsp,
dsp_radctrl_iden,
radctrl_to_brian,
brian_radctrl_iden,
rxrate,
output_sample_rate,
seqnum,
slice_ids,
slice_dict,
beam_dict,
sequence_time,
first_rx_sample_start,
rxctrfreq,
pulse_phase_offsets,
decimation_scheme=None,
):
"""
Place data in the receiver packet and send it via zeromq to the signal processing unit and brian.
Happens every sequence.
:param radctrl_to_dsp: The sender socket for sending data to dsp
:param dsp_radctrl_iden: The receiver socket identity on the dsp side
:param radctrl_to_brian: The sender socket for sending data to brian
:param brian_radctrl_iden: The receiver socket identity on the brian side
:param rxrate: The receive sampling rate (Hz).
:param output_sample_rate: The output sample rate desired for the output data (Hz).
:param seqnum: the sequence number. This is a unique identifier for the sequence that is always increasing
with increasing sequences while radar_control is running. It is only reset when program restarts.
:param slice_ids: The identifiers of the slices that are combined in this sequence. These IDs tell us where to
look in the beam dictionary and slice dictionary for frequency information and beam direction information
about this sequence to give to the signal processing unit.
:param slice_dict: The slice dictionary, which contains information about all slices and will be referenced for
information about the slices in this sequence. Namely, we get the frequency we want to receive at, the
number of ranges and the first range information.
:param beam_dict: The dictionary containing beam directions for each slice.
:param sequence_time: entire duration of sequence, including receive time after all
transmissions.
:param first_rx_sample_start: The sample where the first rx sample will start relative to the
tx data.
:param rxctrfreq: the center frequency of receiving.
:param pulse_phase_offsets: Phase offsets (degrees) applied to each pulse in the sequence
:param decimation_scheme: object of type DecimationScheme that has all decimation and
filtering data.
"""
# TODO: does the for loop below need to happen every time. Could be only updated
# as necessary to make it more efficient.
message = messages.SequenceMetadataMessage()
message.sequence_time = sequence_time
message.sequence_num = seqnum
message.offset_to_first_rx_sample = first_rx_sample_start
message.rx_rate = rxrate
message.output_sample_rate = output_sample_rate
message.rx_ctr_freq = rxctrfreq * 1.0e3
if decimation_scheme is not None:
for stage in decimation_scheme.stages:
dm_stage_add = messages.DecimationStageMessage(
stage.stage_num, stage.input_rate, stage.dm_rate, stage.filter_taps
)
message.add_decimation_stage(dm_stage_add)
for slice_id in slice_ids:
chan_add = messages.RxChannel(slice_id)
chan_add.tau_spacing = slice_dict[slice_id].tau_spacing
# Send the translational frequencies to dsp in order to bandpass filter correctly.
if slice_dict[slice_id].clrfrqflag:
pass # TODO - get freq from clear frequency search.
else:
chan_add.rx_freq = slice_dict[slice_id].freq * 1.0e3
chan_add.num_ranges = slice_dict[slice_id].num_ranges
chan_add.first_range = slice_dict[slice_id].first_range
chan_add.range_sep = slice_dict[slice_id].range_sep
chan_add.rx_intf_antennas = slice_dict[slice_id].rx_intf_antennas
main_bms = beam_dict[slice_id]["main"]
intf_bms = beam_dict[slice_id]["intf"]
# Combine main and intf such that for a given beam all main phases come first.
beams = np.hstack((main_bms, intf_bms))
chan_add.beam_phases = np.array(beams)
for lag in slice_dict[slice_id].lag_table:
lag_add = messages.Lag(lag[0], lag[1], int(lag[1] - lag[0]))
# Get the phase offset for this pulse combination
if len(pulse_phase_offsets[slice_id]) != 0:
pulse_phase_offset = pulse_phase_offsets[slice_id][-1]
lag0_idx = slice_dict[slice_id].pulse_sequence.index(lag[0])
lag1_idx = slice_dict[slice_id].pulse_sequence.index(lag[1])
phase_in_rad = np.radians(
pulse_phase_offset[lag0_idx] - pulse_phase_offset[lag1_idx]
)
phase_offset = np.exp(1j * np.array(phase_in_rad, np.float32))
# Catch case where no pulse phase offsets are specified
else:
phase_offset = 1.0 + 0.0j
lag_add.phase_offset_real = np.real(phase_offset)
lag_add.phase_offset_imag = np.imag(phase_offset)
chan_add.add_lag(lag_add)
message.add_rx_channel(chan_add)
# Brian requests sequence metadata for timeouts
if TIME_PROFILE:
time_waiting = time.perf_counter()
request = socket_operations.recv_request(radctrl_to_brian, brian_radctrl_iden, log)
log.debug("brian requested", request=request)
if TIME_PROFILE:
time_done = time.perf_counter() - time_waiting
log.verbose(
"waiting time for metadata request",
metadata_time=time_done * 1e3,
metadata_time_units="ms",
)
bytes_packet = pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL)
socket_operations.send_obj(radctrl_to_brian, brian_radctrl_iden, bytes_packet)
socket_operations.send_obj(
radctrl_to_dsp,
dsp_radctrl_iden,
pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL),
)
[docs]def search_for_experiment(radar_control_to_exp_handler, exphan_to_radctrl_iden, status):
"""
Check for new experiments from the experiment handler
:param radar_control_to_exp_handler: TODO
:param exphan_to_radctrl_iden: The TODO
:param status: status string (EXP_NEEDED or NO_ERROR).
:returns new_experiment_received: boolean (True for new experiment received)
:returns experiment: experiment instance (or None if there is no new experiment)
"""
try:
socket_operations.send_request(
radar_control_to_exp_handler, exphan_to_radctrl_iden, status
)
except zmq.ZMQBaseError as e:
log.error("zmq failed request", error=e)
log.exception("zmq failed request", exception=e)
sys.exit(1)
experiment = None
new_experiment_received = False
try:
serialized_exp = socket_operations.recv_exp(
radar_control_to_exp_handler, exphan_to_radctrl_iden, log
)
except zmq.ZMQBaseError as e:
log.error("zmq failed receive", error=e)
log.exception("zmq failed receive", exception=e)
sys.exit(1)
new_exp = pickle.loads(serialized_exp) # Protocol detected automatically
if isinstance(new_exp, ExperimentPrototype):
experiment = new_exp
new_experiment_received = True
log.info("new experiment found")
elif new_exp is not None:
log.debug("received non experiment_prototype type")
else:
log.debug("experiment continuing without update")
# TODO decide what to do here. I think we need this case if someone doesn't build their experiment properly
return new_experiment_received, experiment
[docs]def send_datawrite_metadata(
radctrl_to_datawrite,
datawrite_radctrl_iden,
seqnum,
num_sequences,
scan_flag,
inttime,
sequences,
beam_iter,
experiment_id,
experiment_name,
scheduling_mode,
output_sample_rate,
experiment_comment,
filter_scaling_factors,
rx_center_freq,
debug_samples=None,
):
"""
Send the metadata about this averaging period to datawrite so that it can be recorded.
:param radctrl_to_datawrite: The socket to send the packet on.
:param datawrite_radctrl_iden: Identity of datawrite on the socket.
:param seqnum: The last sequence number (identifier) that is valid for this averaging
period. Used to verify and synchronize driver, dsp, datawrite.
:param num_sequences: The number of sequences that were sent in this averaging period. (number of
sequences to average together).
:param scan_flag: True if this averaging period is the first in a scan.
:param inttime: The time that expired during this averaging period.
:param sequences: The sequences of class Sequence for this averaging period (AveragingPeriod).
:param beam_iter: The beam iterator of this averaging period.
:param experiment_id: the ID of the experiment that is running
:param experiment_name: the experiment name to be placed in the data files.
:param scheduling_mode: the type of scheduling mode running at this time, to write to file.
:param output_sample_rate: The output sample rate of the output data, defined by the
experiment, in Hz.
:param experiment_comment: The comment string for the experiment, user-defined.
:param filter_scaling_factors: The decimation scheme scaling factors used for the experiment,
to get the scaling for the data for accurate power measurements between experiments.
:param rx_center_freq: The receive center frequency (kHz)
:param debug_samples: the debug samples for this averaging period, to be written to the
file if debug is set. This is a list of dictionaries for each Sequence in the
AveragingPeriod. The dictionary is set up in the sample_building module function
create_debug_sequence_samples. The keys are 'txrate', 'txctrfreq', 'pulse_timing',
'pulse_sample_start', 'sequence_samples', 'decimated_sequence', and 'dmrate'.
The 'sequence_samples' and 'decimated_samples' values are themselves dictionaries, where the
keys are the antenna numbers (there is a sample set for each transmit antenna).
"""
message = messages.AveperiodMetadataMessage()
message.experiment_id = experiment_id
message.experiment_name = experiment_name
message.experiment_comment = experiment_comment
message.rx_ctr_freq = rx_center_freq
message.num_sequences = num_sequences
message.last_sqn_num = seqnum
message.scan_flag = scan_flag
message.aveperiod_time = inttime.total_seconds()
message.output_sample_rate = output_sample_rate
message.data_normalization_factor = reduce(
lambda x, y: x * y, filter_scaling_factors
) # multiply all
message.scheduling_mode = scheduling_mode
for sequence_index, sequence in enumerate(sequences):
sequence_add = messages.Sequence()
sequence_add.blanks = sequence.blanks
if debug_samples:
tx_data = messages.TxData()
tx_data.tx_rate = debug_samples[sequence_index]["txrate"]
tx_data.tx_ctr_freq = debug_samples[sequence_index]["txctrfreq"]
tx_data.pulse_timing_us = debug_samples[sequence_index]["pulse_timing"]
tx_data.pulse_sample_start = debug_samples[sequence_index][
"pulse_sample_start"
]
tx_data.tx_samples = debug_samples[sequence_index]["sequence_samples"]
tx_data.dm_rate = debug_samples[sequence_index]["dmrate"]
tx_data.decimated_tx_samples = debug_samples[sequence_index][
"decimated_samples"
]
sequence_add.tx_data = tx_data
for slice_id in sequence.slice_ids:
sqn_slice = sequence.slice_dict[slice_id]
rxchannel = messages.RxChannelMetadata()
rxchannel.slice_id = slice_id
rxchannel.slice_comment = sqn_slice.comment
rxchannel.interfacing = "{}".format(sqn_slice.slice_interfacing)
rxchannel.rx_only = sqn_slice.rxonly
rxchannel.pulse_len = sqn_slice.pulse_len
rxchannel.tau_spacing = sqn_slice.tau_spacing
rxchannel.rx_freq = sqn_slice.freq
rxchannel.ptab = sqn_slice.pulse_sequence
# We always build one sequence in advance, so we trim the last one from when radar
# control stops processing the averaging period.
for encoding in sequence.output_encodings[slice_id][:num_sequences]:
rxchannel.add_sqn_encodings(encoding.flatten().tolist())
sequence.output_encodings[slice_id] = []
rxchannel.rx_main_antennas = sqn_slice.rx_main_antennas
rxchannel.rx_intf_antennas = sqn_slice.rx_intf_antennas
rxchannel.tx_antenna_phases = sequence.tx_main_phase_shifts[slice_id][
beam_iter
]
beams = sqn_slice.rx_beam_order[beam_iter]
if isinstance(beams, int):
beams = [beams]
rx_main_phases = []
rx_intf_phases = []
for beam in beams:
beam_add = messages.Beam(sqn_slice.beam_angle[beam], beam)
rxchannel.add_beam(beam_add)
rx_main_phases.append(
sequence.rx_beam_phases[slice_id]["main"][
beam, sequence.rx_main_antenna_indices[slice_id]
]
)
rx_intf_phases.append(
sequence.rx_beam_phases[slice_id]["intf"][
beam, sequence.rx_intf_antenna_indices[slice_id]
]
)
rxchannel.rx_main_phases = np.array(rx_main_phases, dtype=np.complex64)
rxchannel.rx_intf_phases = np.array(rx_intf_phases, dtype=np.complex64)
rxchannel.first_range = float(sqn_slice.first_range)
rxchannel.num_ranges = sqn_slice.num_ranges
rxchannel.range_sep = sqn_slice.range_sep
if sqn_slice.acf:
rxchannel.acf = sqn_slice.acf
rxchannel.xcf = sqn_slice.xcf
rxchannel.acfint = sqn_slice.acfint
for lag in sqn_slice.lag_table:
lag_add = messages.LagTable(lag, int(lag[1] - lag[0]))
rxchannel.add_ltab(lag_add)
rxchannel.averaging_method = sqn_slice.averaging_method
sequence_add.add_rx_channel(rxchannel)
message.sequences.append(sequence_add)
log.debug("sending metadata to data_write")
socket_operations.send_bytes(
radctrl_to_datawrite,
datawrite_radctrl_iden,
pickle.dumps(message, protocol=pickle.HIGHEST_PROTOCOL),
)
[docs]def round_up_time(dt=None, round_to=60):
"""
Round a datetime object to any time lapse in seconds
:param dt: datetime.datetime object, default now.
:param round_to: Closest number of seconds to round to, default 1 minute.
:author: Thierry Husson 2012 - Use it as you want but don't blame me.
:modified: K.Kotyk 2019
Will round to the nearest minute mark. Adds one minute if rounded down.
"""
if dt is None:
dt = datetime.utcnow()
midnight = dt.replace(hour=0, minute=0, second=0)
seconds = (dt.replace(tzinfo=None) - midnight).seconds
rounding = (seconds + round_to / 2) // round_to * round_to
result = dt + timedelta(0, rounding - seconds, -dt.microsecond)
if result < dt:
result += timedelta(minutes=1)
return result
[docs]def main():
"""
Run the radar with the experiment supplied by experiment_handler.
Receives an instance of an experiment. Iterates through the Scans,
AveragingPeriods, Sequences, and pulses of the experiment.
For every pulse, samples and other control information are sent to the n200_driver.
For every pulse sequence, processing information is sent to the signal processing
block.
After every averaging period, the experiment block is given the
opportunity to change the experiment (not currently implemented). If a new
experiment is sent, radar will halt the old one and begin with the new experiment.
"""
# Get config options
options = Options()
# The socket identities for radar_control, retrieved from options
ids = [
options.radctrl_to_exphan_identity,
options.radctrl_to_dsp_identity,
options.radctrl_to_driver_identity,
options.radctrl_to_brian_identity,
options.radctrl_to_dw_identity,
]
# Setup sockets
# Socket to send pulse samples over
# TODO test: need to make sure that we know that all sockets are set up after this try...except block.
# TODO test: starting the programs in different orders.
try:
sockets_list = socket_operations.create_sockets(ids, options.router_address)
except zmq.ZMQBaseError as e:
log.error("zmq failed setting up sockets", error=e)
log.exception("zmq failed setting up sockets", exception=e)
sys.exit(1)
radar_control_to_exp_handler = sockets_list[0]
radar_control_to_dsp = sockets_list[1]
radar_control_to_driver = sockets_list[2]
radar_control_to_brian = sockets_list[3]
radar_control_to_dw = sockets_list[4]
# seqnum is used as a identifier in all packets while radar is running so set it up here.
# seqnum will get increased by num_sequences (number of averages or sequences in the averaging period)
# at the end of every averaging period.
seqnum_start = 0
# Wait for experiment handler at the start until we have an experiment to run.
new_experiment_waiting = False
while not new_experiment_waiting:
new_experiment_waiting, experiment = search_for_experiment(
radar_control_to_exp_handler,
options.exphan_to_radctrl_identity,
"EXPNEEDED",
)
new_experiment_waiting = False
new_experiment_loaded = True
# Flag for starting the radar on the minute boundary
wait_for_first_scanbound = experiment.slice_dict.get("wait_for_first_scanbound")
# Send driver initial setup data - rates and center frequency from experiment.
# Wait for acknowledgment that USRP object is set up.
setup_driver(
radar_control_to_driver,
options.driver_to_radctrl_identity,
experiment.slice_dict[0].txctrfreq,
experiment.slice_dict[0].rxctrfreq,
experiment.txrate,
experiment.rxrate,
)
first_aveperiod = True
next_scan_start = None
while True:
# This loops through all scans in an experiment, or restarts this loop if a new experiment occurs.
# TODO : further documentation throughout in comments (high level) and in separate documentation.
# Iterate through Scans, AveragingPeriods, Sequences, Pulses.
# Start anew on first scan if we have a new experiment.
if new_experiment_waiting:
try:
experiment = new_experiment
except NameError as e:
# new_experiment does not exist, should never happen as flag only gets set when
# there is a new experiment.
log.error("experiment could not be found", error=e)
log.exception("experiment could not be found", exception=e)
sys.exit(1)
new_experiment_waiting = False
new_experiment = None
new_experiment_loaded = True
for scan_num, scan in enumerate(experiment.scan_objects):
log.debug("scan number", scan_num=scan_num)
# Scan iter is the iterator through the scanbound or through the number of averaging periods in the scan
if (
first_aveperiod
and scan.scanbound is not None
and not wait_for_first_scanbound
):
# On first integration, determine current averaging period and set scan_iter to it
now = datetime.utcnow()
current_minute = now.replace(second=0, microsecond=0)
scan_iter = next(
(
i
for i, v in enumerate(scan.scanbound)
if current_minute + timedelta(seconds=v) > now
),
0,
)
else:
# Otherwise start at first averaging period
scan_iter = 0
# If a new experiment was received during the last scan, it finished the integration period
# it was on and returned here with new_experiment_waiting set to True. Break to load new experiment.
# Start anew on first scan if we have a new experiment
if new_experiment_waiting:
break
if scan.scanbound:
if scan.align_scan_to_beamorder:
for aveperiod in scan.aveperiods:
# Always align first beam at start of scan
aveperiod.beam_iter = 0
# Find the start of the next scan with a scanbound so we can determine time remaining for end of scan
next_scanbound = None
next_scan_num = scan_num
while next_scanbound is None:
next_scan_num += 1
if next_scan_num == len(experiment.scan_objects):
next_scan_num = 0
next_scanbound = experiment.scan_objects[next_scan_num].scanbound
if first_aveperiod:
# On the very first averaging period of Borealis starting, calculate the start minute
# align scanbound reference time to find when to start
now = datetime.utcnow()
dt = now.replace(second=0, microsecond=0)
if dt + timedelta(seconds=scan.scanbound[scan_iter]) >= now:
start_minute = dt
else:
start_minute = round_up_time(now)
else:
# At the start of a scan object that has scanbound, recalculate the start minute to the
# previously calculated next_scan_start
start_minute = next_scan_start.replace(second=0, microsecond=0)
# Find the modulus of the number of aveperiod times to run in the scan and the number
# of AvePeriod classes. The classes will be alternated so we can determine which class
# will be running at the end of the scan.
index_of_last_aveperiod_in_scan = (
scan.num_aveperiods_in_scan + scan.aveperiod_iter
) % len(scan.aveperiods)
last_aveperiod_intt = scan.aveperiods[
index_of_last_aveperiod_in_scan
].intt
# A scanbound necessitates intt
end_of_scan = (
start_minute
+ timedelta(seconds=scan.scanbound[-1])
+ timedelta(seconds=last_aveperiod_intt * 1e-3)
)
end_minute = end_of_scan.replace(second=0, microsecond=0)
if end_minute + timedelta(seconds=next_scanbound[0]) >= end_of_scan:
next_scan_start = end_minute + timedelta(seconds=next_scanbound[0])
else:
next_scan_start = round_up_time(end_of_scan) + timedelta(
seconds=next_scanbound[0]
)
while (
scan_iter < scan.num_aveperiods_in_scan and not new_experiment_waiting
):
# If there are multiple aveperiods in a scan they are alternated (AVEPERIOD interfaced)
aveperiod = scan.aveperiods[scan.aveperiod_iter]
if TIME_PROFILE:
time_start_of_aveperiod = datetime.utcnow()
# Get new experiment here, before starting a new averaging period.
# If new_experiment_waiting is set here, implement new_experiment after this
# averaging period. There may be a new experiment waiting, or a new experiment.
if not new_experiment_waiting and not new_experiment_loaded:
new_experiment_waiting, new_experiment = search_for_experiment(
radar_control_to_exp_handler,
options.exphan_to_radctrl_identity,
"NOERROR",
)
elif new_experiment_loaded:
new_experiment_loaded = False
log.debug("new averaging period")
# All phases are set up for this averaging period for the beams required.
# Time to start averaging in the below loop.
if not scan.scanbound:
averaging_period_start_time = datetime.utcnow() # ms
log.verbose(
"averaging period start time",
averaging_period_start_time=averaging_period_start_time,
averaging_period_start_time_units="",
)
if aveperiod.intt is not None:
intt_break = True
if scan.scanbound:
# Calculate scan start time. First beam in the sequence will likely
# be ready to go if the first scan aligns directly to the minute. The
# rest will need to wait until their boundary time is up.
beam_scanbound = start_minute + timedelta(
seconds=scan.scanbound[scan_iter]
)
time_diff = beam_scanbound - datetime.utcnow()
if time_diff.total_seconds() > 0:
if first_aveperiod:
log.verbose(
"seconds to next avg period",
time_until_avg_period=time_diff.total_seconds(),
time_until_avg_period_units="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
else:
log.debug(
"seconds to next avg period",
time_until_avg_period=time_diff.total_seconds(),
time_until_avg_period_units="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
# TODO: reduce sleep if we want to use GPS timestamped transmissions
time.sleep(time_diff.total_seconds())
else:
# TODO: This will be wrong if the start time is in the past.
# TODO: maybe use datetime.utcnow() like below instead of beam_scanbound
# when the avg period should have started?
log.debug(
"expected avg period start time",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
averaging_period_start_time = datetime.utcnow()
log.verbose(
"avg period start time",
avg_period_start_time=averaging_period_start_time,
avg_period_start_time_units="s",
scan_iter=scan_iter,
beam_scanbound=beam_scanbound,
)
# Here we find how much system time has elapsed to find the true amount
# of time we can integrate for this scan boundary. We can then see if
# we have enough time left to run the averaging period.
time_elapsed = averaging_period_start_time - start_minute
if scan_iter < len(scan.scanbound) - 1:
scanbound_time = scan.scanbound[scan_iter + 1]
# TODO: scanbound_time could be in the past if system has taken
# too long, perhaps calculate which 'beam' (scan_iter) instead by
# rewriting this code for an experiment-wide scanbound attribute instead
# of individual scanbounds inside the scan objects
# TODO: if scan_iter skips ahead, aveperiod.beam_iter may also need to
# if scan.align_to_beamorder is True
bound_time_remaining = (
scanbound_time - time_elapsed.total_seconds()
)
else:
bound_time_remaining = (
next_scan_start - averaging_period_start_time
)
bound_time_remaining = bound_time_remaining.total_seconds()
log.verbose(
"bound time remaining",
bound_time_remaining=bound_time_remaining,
bound_time_remaining_units="s",
scan_num=scan_num,
scan_iter=scan_iter, # scan_iter is averaging period number for some reason
beam_scanbound=beam_scanbound,
)
if bound_time_remaining < aveperiod.intt * 1e-3:
# Reduce the averaging period to only the time remaining until the next scan boundary
# TODO: Check for bound_time_remaining > 0
# to be sure there is actually time to run this intt
# (if bound_time_remaining < 0, we need a solution to reset)
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=bound_time_remaining * 1e3)
)
else:
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=aveperiod.intt)
)
else: # No scanbound for this scan
averaging_period_done_time = (
averaging_period_start_time
+ timedelta(milliseconds=aveperiod.intt)
)
else: # intt does not exist, therefore using intn
intt_break = False
ending_number_of_sequences = aveperiod.intn # this will exist
msg = {
x: y[aveperiod.beam_iter]
for x, y in aveperiod.slice_to_beamorder.items()
}
log.verbose("avg period slice and beam number", slice_and_beam=msg)
if TIME_PROFILE:
aveperiod_prep_time = datetime.utcnow() - time_start_of_aveperiod
log.verbose(
"time to prep aveperiod",
aveperiod_prep_time=aveperiod_prep_time,
aveperiod_prep_time_units="",
)
# Time to start averaging in the below loop
num_sequences = 0
time_remains = True
pulse_transmit_data_tracker = {}
debug_samples = []
while time_remains:
for sequence_index, sequence in enumerate(aveperiod.sequences):
# Alternating sequences if there are multiple in the averaging_period
start_time = datetime.utcnow()
if intt_break:
if start_time >= averaging_period_done_time:
time_remains = False
averaging_period_time = (
start_time - averaging_period_start_time
)
break
else: # Break at a certain number of sequences
if num_sequences == ending_number_of_sequences:
time_remains = False
averaging_period_time = (
start_time - averaging_period_start_time
)
break
# On first sequence, we make the first set of samples
if sequence_index not in pulse_transmit_data_tracker:
pulse_transmit_data_tracker[sequence_index] = {}
sqn, dbg = sequence.make_sequence(
aveperiod.beam_iter, num_sequences
)
if dbg:
debug_samples.append(dbg)
pulse_transmit_data_tracker[sequence_index][
num_sequences
] = sqn
decimation_scheme = sequence.decimation_scheme
def send_pulses():
for pulse_transmit_data in pulse_transmit_data_tracker[
sequence_index
][num_sequences]:
data_to_driver(
radar_control_to_driver,
options.driver_to_radctrl_identity,
pulse_transmit_data["samples_array"],
sequence.txctrfreq,
sequence.rxctrfreq,
experiment.txrate,
experiment.rxrate,
sequence.numberofreceivesamples,
sequence.seqtime,
pulse_transmit_data["startofburst"],
pulse_transmit_data["endofburst"],
pulse_transmit_data["timing"],
seqnum_start + num_sequences,
sequence.align_sequences,
repeat=pulse_transmit_data["isarepeat"],
)
if TIME_PROFILE:
pulses_to_driver_time = datetime.utcnow() - start_time
log.verbose(
"pulses to driver time",
pulses_to_driver_time=pulses_to_driver_time,
pulses_to_driver_time_units="s",
)
def send_dsp_meta():
rx_beam_phases = sequence.get_rx_phases(aveperiod.beam_iter)
send_dsp_metadata(
radar_control_to_dsp,
options.dsp_to_radctrl_identity,
radar_control_to_brian,
options.brian_to_radctrl_identity,
experiment.rxrate,
experiment.output_rx_rate,
seqnum_start + num_sequences,
sequence.slice_ids,
experiment.slice_dict,
rx_beam_phases,
sequence.seqtime,
sequence.first_rx_sample_start,
sequence.rxctrfreq,
sequence.output_encodings,
sequence.decimation_scheme,
)
if TIME_PROFILE:
sequence_metadata_time = datetime.utcnow() - start_time
log.verbose(
"metadata to dsp time",
sequence_metadata_time=sequence_metadata_time,
sequence_metadata_time_units="s",
)
def make_next_samples():
sqn, dbg = sequence.make_sequence(
aveperiod.beam_iter,
num_sequences + len(aveperiod.sequences),
)
if dbg:
debug_samples.append(dbg)
pulse_transmit_data_tracker[sequence_index][
num_sequences + len(aveperiod.sequences)
] = sqn
if TIME_PROFILE:
new_sequence_time = datetime.utcnow() - start_time
log.verbose(
"make new sequence time",
new_sequence_time=new_sequence_time,
new_sequence_time_units="s",
)
# These three things can happen simultaneously. We can spawn them as threads.
threads = [
threading.Thread(target=send_pulses),
threading.Thread(target=send_dsp_meta),
threading.Thread(target=make_next_samples),
]
for thread in threads:
thread.daemon = True
thread.start()
for thread in threads:
thread.join()
num_sequences += 1
if first_aveperiod:
first_aveperiod = False
# Sequence is done
if __debug__:
time.sleep(1)
if TIME_PROFILE:
avg_period_end_time = datetime.utcnow()
log.verbose(
"avg period end time",
avg_period_end_time=avg_period_end_time,
avg_period_end_time_units="s",
)
log.info(
"aveperiod done",
num_sequences=num_sequences,
slice_ids=aveperiod.slice_ids,
)
if scan.aveperiod_iter == 0 and aveperiod.beam_iter == 0:
# This is the first averaging period in the scan object.
# if scanbound is aligned to beamorder, the scan_iter will also = 0 at this point.
scan_flag = True
else:
scan_flag = False
last_sequence_num = seqnum_start + num_sequences - 1
def send_dw():
send_datawrite_metadata(
radar_control_to_dw,
options.dw_to_radctrl_identity,
last_sequence_num,
num_sequences,
scan_flag,
averaging_period_time,
aveperiod.sequences,
aveperiod.beam_iter,
experiment.cpid,
experiment.experiment_name,
experiment.scheduling_mode,
experiment.output_rx_rate,
experiment.comment_string,
decimation_scheme.filter_scaling_factors,
experiment.slice_dict[0].rxctrfreq,
debug_samples=debug_samples,
)
thread = threading.Thread(target=send_dw)
thread.daemon = True
thread.start()
# end of the averaging period loop - move onto the next averaging period.
# Increment the sequence number by the number of sequences that were in this
# averaging period.
seqnum_start += num_sequences
if TIME_PROFILE:
time_to_finish_aveperiod = datetime.utcnow() - avg_period_end_time
log.verbose(
"time to finish avg period",
avg_period_elapsed_time=time_to_finish_aveperiod,
avg_period_elapsed_time_units="s",
)
aveperiod.beam_iter += 1
if aveperiod.beam_iter == aveperiod.num_beams_in_scan:
aveperiod.beam_iter = 0
scan_iter += 1
scan.aveperiod_iter += 1
if scan.aveperiod_iter == len(scan.aveperiods):
scan.aveperiod_iter = 0
if __name__ == "__main__":
from utils import log_config
log = log_config.log()
log.info(f"RADAR_CONTROL BOOTED")
try:
main()
log.info(f"RADAR_CONTROL EXITED")
except Exception as main_exception:
log.critical("RADAR_CONTROL CRASHED", error=main_exception)
log.exception("RADAR_CONTROL CRASHED", exception=main_exception)