"""
signals
~~~~~~~
Digital signal processing functions.
Functionality handled includes pulse generation, determination of antenna phases for beamforming, filtering and
downsampling of received signals, beamforming of filtered signals, and extraction of lag profiles from beamformed
samples.
"""
from functools import reduce
import math
from multiprocessing import shared_memory
from typing import Optional, Union
from scipy.constants import speed_of_light
import numpy as np
import numpy.fft as fft
try:
import cupy as xp
except ImportError:
cupy_available = False
import numpy as xp
else:
cupy_available = True
from .experiment_slice import ExperimentSlice
[docs]
class DSP:
"""
Performs the digital signal processing functions of Borealis.
"""
[docs]
def __init__(
self,
rx_rate: float,
filter_taps: list[np.ndarray],
mixing_freqs: list[float],
dm_rates: list[int],
use_shared_mem: bool = True,
):
"""
Create the filters and initialize parameters for signal processing operations.
:param rx_rate: The sampling rate for the system [Hz]
:type rx_rate: float
:param filter_taps: The filter taps to use at each stage
:type filter_taps: list[np.ndarray]
:param mixing_freqs: The freqs used to mix to baseband [Hz]
:type mixing_freqs: list[float]
:param dm_rates: The decimation rates at each stage
:type dm_rates: list[int]
:param use_shared_mem: Flag to use shared memory for CPU arrays
:type use_shared_mem: bool
"""
self.filter_outputs: list[xp.ndarray] = []
"""
IQ data for each RX channel, with intermediate results of staged filters.
This field is populated by :func:`DSP.apply_filters()` and will be a list of either cupy or numpy ndarray's,
depending on the type of ``input_samples`` parameter to :func:`DSP.apply_filters()`. Entries are ordered by
the filter stage, and each has shape ``[num_slices, num_channels, num_samples]``, where ``num_samples`` gets
smaller with each successive filter stage.
"""
self.beamformed_samples: Optional[np.ndarray] = None
"""Beamformed IQ data.
This field is populated by :func:`DSP.beamform()` after :func:`DSP.apply_filters()` has been called.
The shape of the array is ``[num_slices, num_beams, num_samples]``.
"""
self.antennas_iq_samples = None
self.shared_mem = {}
self.use_shared_mem = use_shared_mem
self.rx_rate = rx_rate
self.mixing_freqs = mixing_freqs
self.dm_rates = dm_rates
self.filters = self.create_filters(filter_taps, mixing_freqs, rx_rate)
[docs]
def apply_filters(self, input_samples):
"""
Filter and downsample ``input_samples``.
Results are stored in ``self.filter_outputs``, on the same device (CPU/GPU) as ``input_samples``.
:param input_samples: The samples to operate on, shape [num_channels, num_samples]
:type input_samples: ndarray
"""
self.filter_outputs.append(
self.apply_bandpass_decimate(
input_samples,
self.filters[0],
self.mixing_freqs,
self.dm_rates[0],
self.rx_rate,
)
)
for i in range(1, len(self.filters)):
self.filter_outputs.append(
self.apply_lowpass_decimate(
self.filter_outputs[i - 1], self.filters[i], self.dm_rates[i]
)
)
[docs]
def move_filter_results(self):
"""
Move the final results of filtering (antennas_iq data) to the CPU, optionally in SharedMemory if
specified for this instance.
"""
# Create an array on the CPU for antennas_iq data
antennas_iq_samples = self.filter_outputs[-1]
if self.use_shared_mem:
ant_shm = shared_memory.SharedMemory(
create=True, size=antennas_iq_samples.nbytes
)
buffer = ant_shm.buf
self.shared_mem["antennas_iq"] = ant_shm
else:
buffer = None
self.antennas_iq_samples = np.ndarray(
antennas_iq_samples.shape, dtype=np.complex64, buffer=buffer
)
# Move the antennas_iq samples to the CPU for beamforming
if cupy_available:
self.antennas_iq_samples[...] = xp.asnumpy(antennas_iq_samples)[...]
else:
self.antennas_iq_samples[...] = antennas_iq_samples[...]
[docs]
def cfs_freq_analysis(self, metadata, num_fft_points):
"""
Performs decimation and frequency analysis on clear frequency search data. Data will not be
in shared memory.
:param metadata: Clear frequency search sequence metadata
:type metadata: dict
:param num_fft_points: Number of points used in the FFT. Determines the frequency resolution
of where df = (rx_rate / total decimation rate) / num_fft_points
:type num_fft_points: int
"""
fs = self.rx_rate / np.prod(self.dm_rates) # Sampling frequency in Hz
pulses = metadata["pulses"]
tau = round(metadata["tau_spacing"] * 1e-6 * fs) # puts into units of samples
pulses_in_samples = [int(round(p * tau)) for p in pulses]
pulse_dur = round(0.0006 * fs) # TODO: determine way to derive 0.0006
start_sample = int(round(pulses_in_samples[0] + pulse_dur / 2))
if len(pulses_in_samples) > 1:
end_sample = int(round(pulses_in_samples[1] - pulse_dur / 2))
else:
end_sample = self.antennas_iq_samples.shape[-1]
num_intervals = int((end_sample - start_sample) / num_fft_points)
end_sample = start_sample + num_intervals * num_fft_points
data = self.beamformed_samples[:, :, start_sample:end_sample]
data_chunks = np.reshape(
data, data.shape[:-1] + (num_intervals, num_fft_points)
)
fft_data = fft.fftshift(fft.fft(data_chunks, axis=-1), axes=-1)
cfs_data = 20 * np.log(np.sum(np.abs(np.average(fft_data, axis=2)), axis=1))
cfs_freqs = fft.fftshift(fft.fftfreq(num_fft_points, d=1 / fs))
return cfs_data, cfs_freqs
[docs]
@staticmethod
def create_filters(
filter_taps: list[np.ndarray], mixing_freqs: list[float], rx_rate: float
):
"""
Makes bandpass filters and moves taps to the GPU, if available.
The first stage filters are mixed to bandpass and the low pass filters are reshaped. ``mixing_freqs`` should
be given as offsets from the USRP center frequency. For example, with 12 MHz center frequency and a 10.5 MHz
operating frequency, the mixing frequency should be -1.5 MHz.
:param filter_taps: The filter taps from the experiment decimation scheme
:type filter_taps: list[np.ndarray]
:param mixing_freqs: The frequencies used to mix the first stage filter to make it a bandpass filter
:type mixing_freqs: list[float]
:param rx_rate: The rf rx rate.
:type rx_rate: float
:returns: List of stages of filter taps. First stage is bandpass with shape ``[num_freqs, num_taps]``,
subsequent stages are lowpass with shape ``[1, num_taps]``. Filter taps are moved to the GPU if
cupy is available.
:rtype: list[xp.ndarray]
"""
filters = []
num_freqs = len(mixing_freqs)
num_taps = filter_taps[0].shape[0]
bandpass = np.zeros((num_freqs, num_taps), dtype=np.complex64)
oscillator = np.arange(num_taps, dtype=np.complex64)
for idx, f in enumerate(mixing_freqs):
# Filtering is actually done via a correlation, not a convolution (the filter is not reversed).
# Thus, the mixing frequency should be the negative of the frequency that is actually being extracted.
bandpass[idx, :] = filter_taps[0] * np.exp(
oscillator * 2j * np.pi * (-f) / rx_rate
)
filters.append(xp.array(bandpass, dtype=xp.complex64))
for t in filter_taps[1:]:
filters.append(xp.array(t[np.newaxis, :], dtype=xp.complex64))
return filters
@staticmethod
def _windowed_view(ndarray, window_len: int, step: int):
"""
Creates a strided and windowed view of ``ndarray``. This allows us to skip samples that will
otherwise be dropped without missing samples needed for the convolutions windows. The strides
will also not extend out of bounds meaning we do not need to pad extra samples and then drop bad
samples after the fact.
:param ndarray: The input ndarray (cupy or numpy)
:type ndarray: ndarray
:param window_len: The window length(filter length)
:type window_len: int
:param step: The step(dm rate)
:type step: int
:returns: The array with a new view.
:rtype: ndarray (cupy or numpy)
"""
nrows = ((ndarray.shape[-1] - window_len) // step) + 1
last_dim_stride = ndarray.strides[-1]
new_shape = ndarray.shape[:-1] + (nrows, window_len)
new_strides = list(ndarray.strides + (last_dim_stride,))
new_strides[-2] *= step
return xp.lib.stride_tricks.as_strided(
ndarray, shape=new_shape, strides=new_strides
)
[docs]
@staticmethod
def apply_bandpass_decimate(
input_samples, bp_filters, mixing_freqs, dm_rate, rx_rate
):
"""
Apply a Frerking bandpass filter to the input samples.
Several different frequencies can be centered on simultaneously. Downsampling is done in simultaneously to
reduce the computational load.
:param input_samples: The input IQ samples for each channel.
:type input_samples: xp.ndarray [num_channels, num_samples]
:param bp_filters: The bandpass filter(s).
:type bp_filters: xp.ndarray [num_slices, num_taps]
:param mixing_freqs: The frequencies used to mix the first stage filter for bandpass
:type mixing_freqs: list[float]
:param dm_rate: The decimation rate of this stage
:type dm_rate: int
:param rx_rate: The sampling rate
:type rx_rate: float
:returns: Samples after bandpass filter and downsampling operations. Shape [num_slices, num_channels, samples]
:rtype: ndarray
"""
# We need to force the input into the GPU to be float16, float32, or complex64 so that the einsum result is
# complex64 and NOT complex128. The GPU is significantly slower (10x++) working with complex128 numbers.
# We do not require the additional precision.
bp_filters = xp.array(bp_filters, dtype=xp.complex64)
input_samples = DSP._windowed_view(input_samples, bp_filters.shape[-1], dm_rate)
# [num_slices, num_taps]
# [num_channels, num_output_samples, num_taps]
filtered = xp.einsum(
"ij,klj->ikl", bp_filters, input_samples, optimize="greedy"
)
# Apply the phase correction for the Frerking method.
ph = xp.arange(filtered.shape[-1], dtype=np.float32)[xp.newaxis, :]
freqs = xp.array(mixing_freqs)[:, xp.newaxis]
# [1, num_output_samples]
# [num_slices, 1]
# ph: [num_slices, num_output_samples]
ph = xp.fmod(ph * 2.0 * xp.pi * (-freqs) / rx_rate * dm_rate, 2.0 * xp.pi)
ph = xp.exp(1j * ph.astype(xp.float32))
# [num_slices, num_channels, num_output_samples]
# [num_slices, 1, num_output_samples]
# corrected: [num_slices, num_channels, num_output_samples]
corrected = filtered * ph[:, xp.newaxis, :]
return corrected
[docs]
@staticmethod
def apply_lowpass_decimate(input_samples, lp_filter, dm_rate):
"""
Apply a lowpass filter to the baseband input samples. Downsampling is done in parallel via a
strided window view of the input samples.
:param input_samples: Baseband input samples
:type input_samples: ndarray [num_slices, num_channels, num_samples]
:param lp_filter: Lowpass filter taps
:type lp_filter: xp.ndarray [1, num_taps]
:param dm_rate: The decimation rate of this stage
:type dm_rate: int
:returns: Samples after lowpass filter and downsampling operations. Shape [num_slices, num_channels, samples]
:rtype: ndarray
"""
# We need to force the input into the GPU to be float16, float32, or complex64 so that the einsum result is
# complex64 and NOT complex128. The GPU is significantly slower (10x++) working with complex128 numbers.
# We do not require the additional precision.
lp_filter = xp.array(lp_filter, dtype=xp.complex64)
input_samples = DSP._windowed_view(input_samples, lp_filter.shape[-1], dm_rate)
# [1, num_taps]
# [num_slices, num_channels, num_output_samples, num_taps]
# filtered: [num_slices, num_channels, num_output_samples]
filtered = xp.einsum("ij,klmj->klm", lp_filter, input_samples)
return filtered
[docs]
@staticmethod
def correlations_from_samples(
beamformed_samples_1: np.ndarray,
beamformed_samples_2: np.ndarray,
output_sample_rate: float,
slice_metadata: list[dict],
) -> list[np.ndarray]:
"""
Correlate two sets of beamformed samples together.
:param beamformed_samples_1: The first beamformed samples
:type beamformed_samples_1: np.ndarray [num_slices, num_beams, num_samples]
:param beamformed_samples_2: The second beamformed samples
:type beamformed_samples_2: np.ndarray [num_slices, num_beams, num_samples]
:param output_sample_rate: Sampling rate of data
:type output_sample_rate: float
:param slice_metadata: Details used to extract indices for each slice.
:type slice_metadata: list[dict]
:returns: Correlations for slices. List of length num_slices, with each entry having shape
[num_beams, num_range_gates, num_lags].
:rtype: list[np.ndarray]
"""
values = []
for slc, metadata in enumerate(slice_metadata):
if metadata.get("skip", False) or metadata["lags"].size == 0:
values.append(np.array([]))
continue
range_off = (
np.arange(metadata["num_range_gates"], dtype=np.int32)
+ metadata["first_range_off"]
)
tau_in_samples = np.int32(
round(metadata["tau_spacing"] * 1e-6 * output_sample_rate)
)
lag_pulses_as_samples = (
np.array(metadata["lags"], np.int32) * tau_in_samples
)
# [num_range_gates, 1, 1]
# [1, num_lags, 2]
samples_for_all_range_lags = (
range_off[..., np.newaxis, np.newaxis]
+ lag_pulses_as_samples[np.newaxis, :, :]
)
# [num_range_gates, num_lags, 2]
row = samples_for_all_range_lags[..., 1].astype(np.int32)
# [num_range_gates, num_lags, 2]
col = samples_for_all_range_lags[..., 0].astype(np.int32)
values_for_slice = np.empty(
(beamformed_samples_1.shape[1], row.shape[0], row.shape[1]),
dtype=np.complex64,
) # [num_beams, num_ranges, num_lags]
for lag in range(row.shape[1]):
values_for_slice[:, :, lag] = np.einsum(
"ij,ij->ji",
beamformed_samples_1[slc, :, row[:, lag]],
beamformed_samples_2[slc, :, col[:, lag]].conj(),
)
values.append(values_for_slice * metadata["lag_phase_offsets"])
return values
[docs]
def get_phase_shift(
beam_angle: list[float],
freq_khz: Union[float, list[float]],
antenna_locations: np.ndarray,
):
"""
Find the complex excitation for all antennas to make beams in all given directions.
:param beam_angle: list of azimuthal direction of the beam off boresight, in degrees,
positive beamdir being to the right of the boresight (looking along
boresight from ground).
:type beam_angle: list[float]
:param freq_khz: transmit frequency in kHz. Can be a single frequency or a list
:type freq_khz: Union[float, list[float]]
:param antenna_locations: x-coordinates of each antenna in the array, in meters. Shape [num_antennas]
:type antenna_locations: np.ndarray
:returns: phase_shift a 3D array of shape [freqs, beams, antennas] giving the complex excitation for each
antenna required to form each beam for each frequency.
:rtype: phase_shift ndarray
"""
if not isinstance(freq_khz, list):
freq_khz = [freq_khz]
# freq_khz: [num_freqs]
freq_khz = np.array(freq_khz, dtype=np.float32)
freq_hz = freq_khz * 1000.0 # convert to Hz.
k = 2 * np.pi * freq_hz / speed_of_light # 2pi / wavelength
beam_rads = np.deg2rad(np.array(beam_angle, dtype=np.float32))
# beam_rads: [num_angles]
# phase shift = 0 at array midpoint (by convention), so this is the displacement in x of each beam from the array
# midpoint after the wave traverses one wavelength. Essentially, the component along x of the beam, normalized by
# the wavelength.
beam_displacements = np.outer(k, -1 * np.sin(beam_rads))
# beam_displacements: [num_freqs, num_angles]
phase_shift = np.einsum("ij,k->ijk", beam_displacements, antenna_locations)
phase_shift = np.exp(1j * phase_shift)
return phase_shift
[docs]
def get_samples(
sampling_rate: float,
mixing_freq: float,
pulse_len: float,
ramp_time: float,
max_amplitude: float,
) -> np.ndarray:
"""
Get basic (not phase-shifted) samples for a given pulse.
Find the normalized sample array given the rate (Hz), frequency (Hz), pulse length (s).
:param sampling_rate: USRP tx sample rate [Hz].
:type sampling_rate: float
:param mixing_freq: frequency offset from the centre frequency on the USRP [Hz]. To be mixed with the
centre frequency before transmitting, e.g. centre = 12 MHz, wave_freq = + 1.2 MHz, output = 13.2 MHz.
:type mixing_freq: float
:param pulse_len: length of the pulse [s]
:type pulse_len: float
:param ramp_time: ramp up and ramp down time for the pulse [s]. Typical 1.0e-5 s from ``config.ini``
:type ramp_time: float
:param max_amplitude: USRP's max DAC amplitude. N200 = 0.707 max
:type max_amplitude: float
:returns samples: a numpy array of complex samples, representing all samples needed for a pulse of length
``pulse_len`` sampled at ``sampling_rate``.
:rtype: np.ndarray
"""
mixing_freq = float(mixing_freq)
sampling_rate = float(sampling_rate)
wave_freq = 2 * math.pi * mixing_freq / sampling_rate
# for linear we used the below:
ramp_samps = round(
sampling_rate * ramp_time
) # number of samples for ramp-up and ramp-down of pulse.
pulse_len_samps = round(sampling_rate * pulse_len)
wave_form = np.exp(1j * wave_freq * np.arange(pulse_len_samps))
amplitude_ramp_up = np.arange(ramp_samps) / ramp_samps
amplitude_ramp_down = np.flipud(amplitude_ramp_up)
ramp_up_piece = wave_form[:ramp_samps]
ramp_down_piece = wave_form[pulse_len_samps - ramp_samps :]
np.multiply(ramp_up_piece, amplitude_ramp_up, out=ramp_up_piece)
np.multiply(ramp_down_piece, amplitude_ramp_down, out=ramp_down_piece)
samples = wave_form * max_amplitude
return samples
[docs]
def basic_pulse_phase_offset(exp_slice: ExperimentSlice, beam_iter: iter):
"""
Calculate the phase difference of each pulse with respect to the first
pulse based on the transmit frequency and the pulse separation.
:param exp_slice: The experiment slice information
:type exp_slice: ExperimentSlice
:param beam_iter: beam index
:type beam_iter: int
:returns: Pulse phase offsets [radians]
:rtype: np.ndarray
"""
freq = exp_slice.freq
if isinstance(freq, list):
freq = freq[exp_slice.freq_order[beam_iter]]
freq_hz = freq * 1e3
tau_s = exp_slice.tau_spacing / 1e6
omega = 2 * np.pi * freq_hz
pulse_sequence = exp_slice.pulse_sequence
num_pulses = len(pulse_sequence)
pulse_phases = np.zeros(num_pulses)
for p in range(num_pulses):
pulse_time = pulse_sequence[p] * tau_s
pulse_phases[p] = np.angle(np.exp(1j * omega * pulse_time))
return pulse_phases