#!/usr/bin/python
"""
sequences
~~~~~~~~~
This is the module containing the Sequence class. The Sequence class contains the InterfaceClassBase
members, as well as a list of pulse dictionaries, the total_combined_pulses in the sequence,
power_divider, last_pulse_len, ssdelay, seqtime, which together give sstime (scope synce time,
or time for receiving, and numberofreceivesamples to sample during the receiving window
(calculated using the receive sampling rate).
:copyright: 2018 SuperDARN Canada
:author: Marci Detwiller
"""
# built-in
import collections
import copy
from functools import reduce
import inspect
import math
from pathlib import Path
# third-party
import numpy as np
import structlog
# local
from experiment_prototype.interface_classes.interface_class_base import (
InterfaceClassBase,
)
from experiment_prototype.experiment_exception import ExperimentException
from utils.signals import get_samples, get_phase_shift
# Obtain the module name that imported this log_config
caller = Path(inspect.stack()[-1].filename)
module_name = caller.name.split(".")[0]
log = structlog.getLogger(module_name)
[docs]class Sequence(InterfaceClassBase):
"""
Set up the sequence class.
**The members of the sequence are:**
ssdelay
delay past the end of the sequence to receive for (us) - function of num_ranges and
pulse_len. ss stands for scope sync.
seqtime
the amount of time for the whole sequence to transmit, until the logic signal
switches low on the last pulse in the sequence (us).
sstime
ssdelay + seqtime (total time for receiving) (us).
numberofreceivesamples
the number of receive samples to take, given the rx rate, during
the sstime.
first_rx_sample_start
The location of the first sample for the RX data from the start of the TX data (in number
of samples, unitless). This will be calculated as the center sample of the first
occurring pulse (uncombined).
blanks
A list of sample indices that should not be used for acfs because they were samples
taken when transmitting.
basic_slice_pulses
A dictionary that holds pre-computed tx samples for each slice. Each dictionary value is a
multi-dimensional array that holds a beamformed set of samples for each antenna for all
beam directions.
combined_pulses_metadata
This list holds dictionary metadata for all pulses in the sequence. This metadata holds all
the info needed to combine pulses if pulses are mixed.
- start_time_us - start time of the pulse in us, relative to the first pulse in sqn.
- total_pulse_len - total length of the pulse that includes len of all combined pulses.
- pulse_sample_start - The tx sample number at which the pulse starts.
- tr_window_num_samps - The number of tx samples of the tr window.
- component_info - a list of all the pre-combined pulse components (incl their length and
start time) that are in the combined pulseAlso in us.
- pulse_transmit_data - dictionary hold the transmit metadata that will be sent to driver.
output_encodings
This dict will hold a list of all the encodings used during an aveperiod for each slice.
These will be used for data write later.
:param seqn_keys: slice_ids that need to be included in this sequence.
:type seqn_keys: list
:param sequence_slice_dict: the slice dictionary that explains the parameters of each slice
that is included in this sequence. Keys are the slice_ids
included and values are dictionaries including all necessary
slice parameters as keys.
:type sequence_slice_dict: dict
:param sequence_interface: the interfacing dictionary that describes how to interface the
slices that are included in this sequence. Keys are tuples of
format (slice_id_1, slice_id_2) and values are of
interface_types set up in experiment_prototype.
:type sequence_interface: dict
:param transmit_metadata: metadata from the config file that is useful here.
:type transmit_metadata: dict
"""
def __init__(
self, seqn_keys, sequence_slice_dict, sequence_interface, transmit_metadata
):
InterfaceClassBase.__init__(
self, seqn_keys, sequence_slice_dict, sequence_interface, transmit_metadata
)
self.decimation_scheme = self.slice_dict[self.slice_ids[0]].decimation_scheme
for slice_id in self.slice_ids:
if self.slice_dict[slice_id].decimation_scheme != self.decimation_scheme:
errmsg = (
f"Slices {self.slice_ids[0]} and {slice_id} are CONCURRENT "
f"interfaced and do not have the same decimation scheme"
)
raise ExperimentException(errmsg)
dm_rate = 1
for stage in self.decimation_scheme.stages:
dm_rate *= stage.dm_rate
txrate = self.transmit_metadata["txrate"]
main_antenna_count = self.transmit_metadata["main_antenna_count"]
main_antenna_spacing = self.transmit_metadata["main_antenna_spacing"]
intf_antenna_count = self.transmit_metadata["intf_antenna_count"]
intf_antenna_spacing = self.transmit_metadata["intf_antenna_spacing"]
self.tx_main_antennas = self.transmit_metadata["tx_main_antennas"]
self.rx_main_antennas = self.transmit_metadata["rx_main_antennas"]
self.rx_intf_antennas = self.transmit_metadata["rx_intf_antennas"]
pulse_ramp_time = self.transmit_metadata["pulse_ramp_time"]
max_usrp_dac_amplitude = self.transmit_metadata["max_usrp_dac_amplitude"]
tr_window_time = self.transmit_metadata["tr_window_time"]
intf_offset = self.transmit_metadata["intf_offset"]
self.basic_slice_pulses = {}
self.rx_beam_phases = {}
self.tx_main_phase_shifts = {}
self.rx_main_antenna_indices = {}
self.rx_intf_antenna_indices = {}
self.tx_antenna_indices = {}
self.txctrfreq = self.slice_dict[self.slice_ids[0]].txctrfreq
self.rxctrfreq = self.slice_dict[self.slice_ids[0]].rxctrfreq
single_pulse_timing = []
# For each slice calculate beamformed samples and place into the basic_slice_pulses
# dictionary. Also populate the pulse timing metadata and place into single_pulse_timing
for slice_id in self.slice_ids:
exp_slice = self.slice_dict[slice_id]
freq_khz = float(exp_slice.freq)
wave_freq = freq_khz - self.txctrfreq
wave_freq_hz = wave_freq * 1000
# Now we set up the phases for receive side
if exp_slice.rx_antenna_pattern is not None:
# Returns an array of size [beam_angle] of complex numbers of magnitude <= 1
rx_main_phase_shift = exp_slice.rx_antenna_pattern(
exp_slice.beam_angle,
freq_khz,
main_antenna_count,
main_antenna_spacing,
)
rx_intf_phase_shift = exp_slice.rx_antenna_pattern(
exp_slice.beam_angle,
freq_khz,
intf_antenna_count,
intf_antenna_spacing,
intf_offset[0],
)
else:
rx_main_phase_shift = get_phase_shift(
exp_slice.beam_angle,
freq_khz,
main_antenna_count,
main_antenna_spacing,
)
rx_intf_phase_shift = get_phase_shift(
exp_slice.beam_angle,
freq_khz,
intf_antenna_count,
intf_antenna_spacing,
intf_offset[0],
)
# The antenna indices for receiving by this slice
slice_rx_main_antennas = exp_slice.rx_main_antennas
slice_rx_intf_antennas = exp_slice.rx_intf_antennas
# The index of the antennas for this slice, within the list of all antennas from the config file
main_indices = [
self.rx_main_antennas.index(ant) for ant in slice_rx_main_antennas
]
intf_indices = [
self.rx_intf_antennas.index(ant) for ant in slice_rx_intf_antennas
]
self.rx_main_antenna_indices[slice_id] = main_indices
self.rx_intf_antenna_indices[slice_id] = intf_indices
# Zero out the complex phase for any antenna that isn't used in this slice
main_phases = np.zeros(
(rx_main_phase_shift.shape[0], len(self.rx_main_antennas)),
dtype=rx_main_phase_shift.dtype,
)
intf_phases = np.zeros(
(rx_intf_phase_shift.shape[0], len(self.rx_intf_antennas)),
dtype=rx_intf_phase_shift.dtype,
)
main_phases[:, main_indices] = rx_main_phase_shift[
:, slice_rx_main_antennas
]
intf_phases[:, intf_indices] = rx_intf_phase_shift[
:, slice_rx_intf_antennas
]
self.rx_beam_phases[slice_id] = {"main": main_phases, "intf": intf_phases}
# Set up the tx pulses if transmitting
if not exp_slice.rxonly:
basic_samples = get_samples(
txrate,
wave_freq_hz,
float(exp_slice.pulse_len) / 1e6,
pulse_ramp_time,
1.0,
)
if exp_slice.tx_antenna_pattern is not None:
# Returns an array of size [tx_antennas] of complex numbers of magnitude <= 1
tx_main_phase_shift = exp_slice.tx_antenna_pattern(
freq_khz, exp_slice.tx_antennas, main_antenna_spacing
)
else:
tx_main_phase_shift = get_phase_shift(
exp_slice.beam_angle,
freq_khz,
main_antenna_count,
main_antenna_spacing,
)
# The antennas used for transmitting this slice
slice_tx_antennas = exp_slice.tx_antennas
# The index of the antennas for this slice, within the list of all antennas from the config file
tx_indices = [
self.tx_main_antennas.index(ant) for ant in slice_tx_antennas
]
self.tx_antenna_indices[slice_id] = tx_indices
# Zero out the complex phase of any antenna that isn't used in this slice
tx_phases = np.zeros(
(tx_main_phase_shift.shape[0], len(self.tx_main_antennas)),
dtype=tx_main_phase_shift.dtype,
)
tx_phases[:, tx_indices] = tx_main_phase_shift[:, slice_tx_antennas]
# tx_phases: [num_beams, num_antennas]
# basic_samples: [num_samples]
# phased_samps_for_beams: [num_beams, num_antennas, num_samples]
log.verbose(
"slice information",
slice_id=slice_id,
tx_main_phases=tx_phases,
tx_main_magnitudes=np.abs(tx_phases),
tx_main_angles=np.rad2deg(np.angle(tx_phases)),
)
phased_samps_for_beams = np.einsum(
"ij,k->ijk", tx_phases, basic_samples
)
self.basic_slice_pulses[slice_id] = phased_samps_for_beams
else:
self.basic_slice_pulses[slice_id] = []
tx_phases = np.zeros(
(rx_main_phase_shift.shape[0], len(self.tx_main_antennas)),
dtype=np.complex64,
)
self.tx_main_phase_shifts[slice_id] = tx_phases
for pulse_time in exp_slice.pulse_sequence:
pulse_timing_us = (
pulse_time * exp_slice.tau_spacing + exp_slice.seqoffset
)
pulse_sample_start = round((pulse_timing_us * 1e-6) * txrate)
pulse_num_samps = round((exp_slice.pulse_len * 1e-6) * txrate)
single_pulse_timing.append(
{
"start_time_us": pulse_timing_us,
"pulse_len_us": exp_slice.pulse_len,
"pulse_sample_start": pulse_sample_start,
"pulse_num_samps": pulse_num_samps,
"slice_id": slice_id,
}
)
single_pulse_timing = sorted(
single_pulse_timing, key=lambda d: d["start_time_us"]
)
# Combine any pulses closer than the minimum separation time into a single pulse data
# dictionary and append to the list of all combined pulses, combined_pulses_metadata.
tr_window_num_samps = round(tr_window_time * txrate)
def initialize_combined_pulse_dict(pulse_timing_info):
return {
"start_time_us": pulse_timing_info["start_time_us"],
"total_pulse_len": pulse_timing_info["pulse_len_us"],
"pulse_sample_start": pulse_timing_info["pulse_sample_start"],
"total_num_samps": pulse_timing_info["pulse_num_samps"],
"tr_window_num_samps": tr_window_num_samps,
"component_info": [pulse_timing_info],
}
pulse_data = initialize_combined_pulse_dict(single_pulse_timing[0])
combined_pulses_metadata = []
# Determine where pulses occur in the sequence. This will be important if there are overlaps
for pulse_time in single_pulse_timing[1:]:
pulse_timing_us = pulse_time["start_time_us"]
pulse_len_us = pulse_time["pulse_len_us"]
pulse_sample_start = pulse_time["pulse_sample_start"]
pulse_num_samps = pulse_time["pulse_num_samps"]
last_timing_us = pulse_data["start_time_us"]
last_pulse_len_us = pulse_data["total_pulse_len"]
last_sample_start = pulse_data["pulse_sample_start"]
last_pulse_num_samps = pulse_data["total_num_samps"]
# If there are overlaps (two pulses within minimum separation time) then make them into one single pulse
min_sep = self.transmit_metadata["min_pulse_separation"]
if pulse_timing_us < last_timing_us + last_pulse_len_us + min_sep:
# If the current pulse is completely enveloped by the previous pulse,
# these values won't change or else we are truncating the previous pulse.
new_pulse_len = max(
pulse_timing_us - last_timing_us + pulse_len_us, last_pulse_len_us
)
new_pulse_samps = max(
pulse_sample_start - last_sample_start + pulse_num_samps,
last_pulse_num_samps,
)
pulse_data["total_pulse_len"] = new_pulse_len
pulse_data["total_num_samps"] = new_pulse_samps
pulse_data["component_info"].append(pulse_time)
else: # pulses do not overlap
combined_pulses_metadata.append(pulse_data)
pulse_data = initialize_combined_pulse_dict(pulse_time)
combined_pulses_metadata.append(pulse_data)
# Store the overlapping antennas between all pairs of slices in this sequence. This will be
# used to determine the power divider for each slice in the sequence, if any two slices have
# overlapping pulses and use the same antennas.
slice_shared_antennas = dict()
for i in range(len(self.slice_ids)):
slice_1_id = self.slice_ids[i]
slice_1_antennas = set(self.slice_dict[slice_1_id].tx_antennas)
for j in range(i + 1, len(self.slice_ids)):
slice_2_id = self.slice_ids[j]
slice_2_antennas = set(self.slice_dict[slice_2_id].tx_antennas)
slice_shared_antennas[(slice_1_id, slice_2_id)] = (
slice_1_antennas.intersection(slice_2_antennas)
)
# Dictionary to keep track of which slices share antennas and transmit at the same time
slice_overlaps = {slice_id: set() for slice_id in self.slice_ids}
# Now we can figure out the power divider for each slice
for combined_pulse in combined_pulses_metadata:
num_pulses = len(combined_pulse["component_info"])
if num_pulses == 1:
# Only one pulse here, no need to check for overlap
continue
# Look at each possible pair of pulses in this combined pulse
for i in range(num_pulses):
pulse_1 = combined_pulse["component_info"][i]
for j in range(i + 1, num_pulses):
pulse_2 = combined_pulse["component_info"][j]
if pulse_1["slice_id"] == pulse_2["slice_id"]:
# This is possible if pulses overlap like 1 -> 2 -> 1, so that 1 doesn't
# overlap with itself but is still combined with itself.
continue
min_slice_id = min(pulse_1["slice_id"], pulse_2["slice_id"])
max_slice_id = max(pulse_1["slice_id"], pulse_2["slice_id"])
if len(slice_shared_antennas[(min_slice_id, max_slice_id)]) != 0:
# These two pulses share antennas, and are also combined in a pulse.
# Now we check if they actually transmit at the same time, or are just
# combined because they almost overlap.
if (
pulse_2["start_time_us"]
< pulse_1["start_time_us"] + pulse_1["pulse_len_us"]
):
slice_overlaps[pulse_1["slice_id"]].add(pulse_2["slice_id"])
slice_overlaps[pulse_2["slice_id"]].add(pulse_1["slice_id"])
# Get the naive power divider - total number slices which overlap with slice under consideration.
power_divider = {
slice_id: len(ids) + 1 for slice_id, ids in slice_overlaps.items()
}
# Now we iterate through each slice, and check if the slices it overlaps with overlap with each
# other. If they don't, we can subtract 1 from the power divider for the slice.
for ref_slice, overlaps in slice_overlaps.items():
overlap_list = list(overlaps)
for i in range(len(overlap_list)):
for j in range(i + 1, len(overlap_list)):
slice_1 = overlap_list[i]
slice_2 = overlap_list[j]
if slice_2 not in slice_overlaps[slice_1]:
# No overlap, so we decrement.
power_divider[ref_slice] -= 1
# Normalize all combined pulses to the max USRP DAC amplitude
all_antennas = []
for slice_id in self.slice_ids:
if not self.slice_dict[slice_id].rxonly:
self.basic_slice_pulses[slice_id] *= (
max_usrp_dac_amplitude / power_divider[slice_id]
)
slice_tx_antennas = self.slice_dict[slice_id].tx_antennas
all_antennas.extend(slice_tx_antennas)
sequence_antennas = list(set(all_antennas))
# predetermine some of the transmit metadata.
num_pulses = len(combined_pulses_metadata)
for i in range(num_pulses):
combined_pulses_metadata[i]["pulse_transmit_data"] = {}
pulse_transmit_data = combined_pulses_metadata[i]["pulse_transmit_data"]
pulse_transmit_data["startofburst"] = i == 0
pulse_transmit_data["endofburst"] = i == (num_pulses - 1)
pulse_transmit_data["pulse_antennas"] = sequence_antennas
# the samples array is populated as needed during operations
pulse_transmit_data["samples_array"] = None
pulse_transmit_data["timing"] = combined_pulses_metadata[i]["start_time_us"]
# isarepeat is set as needed during operations
pulse_transmit_data["isarepeat"] = False
# print out pulse information for logging.
for i, cpm in enumerate(combined_pulses_metadata):
# message = f"Pulse {i}: start time(us) {cpm['start_time_us']} start sample {cpm['pulse_sample_start']}"
# message += f" pulse length(us) {cpm['total_pulse_len']} pulse num samples {cpm['total_num_samps']}"
log.verbose("pulse information", **cpm)
self.combined_pulses_metadata = combined_pulses_metadata
# FIND the max scope sync time
# The gc214 receiver card in the old system required 19 us for sample delay and another 10
# us as empirically discovered. in that case delay = (num_ranges + 19 + 10) * pulse_len. Now
# we will remove those values. In the old design scope sync was used directly to determine
# how long to sample. Now we will calculate the number of samples to receive
# (numberofreceivesamples) using scope sync and send that to the driver to sample at a
# specific rxrate (given by the config).
# number ranges to the first range for all slice ids
range_as_samples = lambda x, y: int(math.ceil(x / y))
num_ranges_to_first_range = {
slice_id: range_as_samples(
self.slice_dict[slice_id].first_range,
self.slice_dict[slice_id].range_sep,
)
for slice_id in self.slice_ids
}
# time for number of ranges given, in us, taking into account first_range and num_ranges.
# pulse_len is the amount of time for any range.
self.ssdelay = max(
[
(
self.slice_dict[slice_id].num_ranges
+ num_ranges_to_first_range[slice_id]
)
* self.slice_dict[slice_id].pulse_len
for slice_id in self.slice_ids
]
)
# The delay is long enough for any slice's pulse length and num_ranges to be accounted for.
# Find the sequence time. Add some TR setup time before the first pulse. The
# timing to the last pulse is added, as well as its pulse length and the TR delay
# at the end of last pulse.
# tr_window_time is originally in seconds, convert to us.
self.seqtime = (
2 * tr_window_time * 1.0e6
+ self.combined_pulses_metadata[-1]["start_time_us"]
+ self.combined_pulses_metadata[-1]["total_pulse_len"]
)
# FIND the total scope sync time and number of samples to receive.
self.sstime = self.seqtime + self.ssdelay
# number of receive samples will round down
# This is the number of receive samples to receive for the entire duration of the sequence
# and afterwards. This starts before first pulse is sent and goes until the end of the scope
# sync delay which is there for the amount of time necessary to get the echoes from the
# specified number of ranges.
self.numberofreceivesamples = int(
self.transmit_metadata["rx_sample_rate"] * self.sstime * 1e-6
)
self.output_encodings = collections.defaultdict(list)
# create debug dict for tx samples.
debug_dict = {
"txrate": txrate,
"txctrfreq": self.txctrfreq,
"pulse_timing": [],
"pulse_sample_start": [],
"sequence_samples": {},
"decimated_samples": {},
"dmrate": dm_rate,
}
for i, cpm in enumerate(combined_pulses_metadata):
debug_dict["pulse_timing"].append(cpm["start_time_us"])
debug_dict["pulse_sample_start"].append(cpm["pulse_sample_start"])
for i in range(main_antenna_count):
debug_dict["sequence_samples"][i] = []
debug_dict["decimated_samples"][i] = []
self.debug_dict = debug_dict
first_slice_pulse_len = self.combined_pulses_metadata[0]["component_info"][0][
"pulse_num_samps"
]
full_pulse_samps = first_slice_pulse_len + 2 * tr_window_num_samps
offset_to_start = int(full_pulse_samps / 2)
self.first_rx_sample_start = offset_to_start
self.blanks = self.find_blanks()
self.align_sequences = reduce(
lambda a, b: a or b, [s.align_sequences for s in self.slice_dict.values()]
)
if self.align_sequences:
log.info("aligning sequences to 0.1 s boundaries.")
[docs] def make_sequence(self, beam_iter, sequence_num):
"""
Create the samples needed for each pulse in the sequence. This function is optimized to be
able to generate new samples every sequence if needed. Modifies the samples_array and
isarepeat fields of all pulse dictionaries needed for this sequence for radar_control to use
in operation.
:param beam_iter: The beam iterator
:type beam_iter: int
:param sequence_num: The sequence number in the ave period
:type sequence_num: int
:returns: Transmit data for each pulse where each pulse is a dict, including timing and
samples
:rtype: list
:returns: The transmit sequence and related data to use for debug.
:rtype: dict
"""
txrate = self.transmit_metadata["txrate"]
buffer_len = int(txrate * self.sstime * 1e-6)
# This is going to act as buffer for mixing pulses. It is the length of the receive samples
# since we know this will be large enough to hold samples at any pulse position. There will
# be a buffer for each antenna.
sequence = np.zeros(
[len(self.tx_main_antennas), buffer_len], dtype=np.complex64
)
for slice_id in self.slice_ids:
exp_slice = self.slice_dict[slice_id]
if exp_slice.rxonly:
continue
beam_num = exp_slice.tx_beam_order[beam_iter]
# basic_samples: [num_antennas, num_samps]
basic_samples = self.basic_slice_pulses[slice_id][beam_num]
num_pulses = len(exp_slice.pulse_sequence)
encode_fn = exp_slice.pulse_phase_offset
if encode_fn:
# Must return 1D array of length [pulses].
phase_encoding = encode_fn(beam_num, sequence_num, num_pulses)
# dimensions: [pulses]
# Append list of phase encodings for this sequence, one per pulse.
# output_encodings contains a list of lists for each slice id
self.output_encodings[slice_id].append(phase_encoding)
# phase_encoding: [pulses]
# basic_samples: [antennas, samples]
# samples: [pulses, antennas, samples]
phase_encoding = np.radians(phase_encoding)
phase_encoding = np.exp(1j * phase_encoding)
samples = np.einsum("i,jk->ijk", phase_encoding, basic_samples)
else: # no encodings, all pulses in the slice are all the same
samples = np.repeat(basic_samples[np.newaxis, :, :], num_pulses, axis=0)
# sum the samples into their position in the sequence buffer. Find where the relative
# timing of each pulse matches the sample number in the buffer. Directly sum the samples
# for each pulse into the buffer position. If any pulses overlap, this is how they will
# be mixed.
for i, pulse in enumerate(self.combined_pulses_metadata):
for component_info in pulse["component_info"]:
if component_info["slice_id"] == slice_id:
pulse_sample_start = component_info["pulse_sample_start"]
pulse_samples_len = component_info["pulse_num_samps"]
start = pulse["tr_window_num_samps"] + pulse_sample_start
end = start + pulse_samples_len
# samples: [pulses, tx_antenna_count, samples]
# sequence: [tx_antenna_count, buffer_len]
sequence[:, start:end] += samples[i, :, :]
# copy the encoded and combined samples into the metadata for the sequence.
pulse_data = []
for i, pulse in enumerate(self.combined_pulses_metadata):
pulse_sample_start = pulse["pulse_sample_start"]
num_samples = pulse["total_num_samps"]
start = pulse_sample_start
end = start + num_samples + 2 * pulse["tr_window_num_samps"]
samples = sequence[:, start:end]
new_pulse_info = copy.deepcopy(pulse["pulse_transmit_data"])
new_pulse_info["samples_array"] = samples
if i != 0:
last_pulse = pulse_data[i - 1]["samples_array"]
if samples.shape == last_pulse.shape:
if np.isclose(samples, last_pulse).all():
new_pulse_info["isarepeat"] = True
pulse_data.append(new_pulse_info)
if __debug__:
debug_dict = copy.deepcopy(self.debug_dict)
debug_dict["sequence_samples"] = sequence
debug_dict["decimated_samples"] = sequence[:, :: debug_dict["dmrate"]]
else:
debug_dict = None
return pulse_data, debug_dict
[docs] def find_blanks(self):
"""
Finds the blanked samples after all pulse positions are calculated.
"""
blanks = []
dm_rate = self.debug_dict["dmrate"]
for pulse in self.combined_pulses_metadata:
pulse_start = pulse["pulse_sample_start"]
num_samples = pulse["total_num_samps"] + 2 * pulse["tr_window_num_samps"]
rx_sample_start = int(pulse_start / dm_rate)
rx_num_samps = math.ceil(num_samples / dm_rate)
pulse_blanks = np.arange(rx_sample_start, rx_sample_start + rx_num_samps)
pulse_blanks += int(self.first_rx_sample_start / dm_rate)
blanks.extend(pulse_blanks)
return blanks
[docs] def get_rx_phases(self, beam_iter):
"""
Gets the receive phases for a given beam
:param beam_iter: The beam iter in a scan.
:type beam_iter: int
:returns: The receive phases for each possible beam for every main and intf antenna
:rtype: dict for both main and intf
"""
temp_dict = copy.deepcopy(self.rx_beam_phases)
for k, v in temp_dict.items():
beam_num = self.slice_dict[k].rx_beam_order[beam_iter]
if not isinstance(beam_num, list):
beam_num = [beam_num]
v["main"] = v["main"][beam_num, :]
v["intf"] = v["intf"][beam_num, :]
return temp_dict