Source code for src.utils.signals

"""
signals
~~~~~~~

Digital signal processing functions.

Functionality handled includes pulse generation, determination of antenna phases for beamforming, filtering and
downsampling of received signals, beamforming of filtered signals, and extraction of lag profiles from beamformed
samples.
"""

from functools import reduce
import math
from multiprocessing import shared_memory
from typing import Optional, Union

from scipy.constants import speed_of_light
import numpy as np
import numpy.fft as fft

try:
    import cupy as xp
except ImportError:
    cupy_available = False
    import numpy as xp
else:
    cupy_available = True

from .experiment_slice import ExperimentSlice


[docs] class DSP: """ Performs the digital signal processing functions of Borealis. """
[docs] def __init__( self, rx_rate: float, filter_taps: list[np.ndarray], mixing_freqs: list[float], dm_rates: list[int], use_shared_mem: bool = True, ): """ Create the filters and initialize parameters for signal processing operations. :param rx_rate: The sampling rate for the system [Hz] :type rx_rate: float :param filter_taps: The filter taps to use at each stage :type filter_taps: list[np.ndarray] :param mixing_freqs: The freqs used to mix to baseband [Hz] :type mixing_freqs: list[float] :param dm_rates: The decimation rates at each stage :type dm_rates: list[int] :param use_shared_mem: Flag to use shared memory for CPU arrays :type use_shared_mem: bool """ self.filter_outputs: list[xp.ndarray] = [] """ IQ data for each RX channel, with intermediate results of staged filters. This field is populated by :func:`DSP.apply_filters()` and will be a list of either cupy or numpy ndarray's, depending on the type of ``input_samples`` parameter to :func:`DSP.apply_filters()`. Entries are ordered by the filter stage, and each has shape ``[num_slices, num_channels, num_samples]``, where ``num_samples`` gets smaller with each successive filter stage. """ self.beamformed_samples: Optional[np.ndarray] = None """Beamformed IQ data. This field is populated by :func:`DSP.beamform()` after :func:`DSP.apply_filters()` has been called. The shape of the array is ``[num_slices, num_beams, num_samples]``. """ self.antennas_iq_samples = None self.shared_mem = {} self.use_shared_mem = use_shared_mem self.rx_rate = rx_rate self.mixing_freqs = mixing_freqs self.dm_rates = dm_rates self.filters = self.create_filters(filter_taps, mixing_freqs, rx_rate)
[docs] def apply_filters(self, input_samples): """ Filter and downsample ``input_samples``. Results are stored in ``self.filter_outputs``, on the same device (CPU/GPU) as ``input_samples``. :param input_samples: The samples to operate on, shape [num_channels, num_samples] :type input_samples: ndarray """ self.filter_outputs.append( self.apply_bandpass_decimate( input_samples, self.filters[0], self.mixing_freqs, self.dm_rates[0], self.rx_rate, ) ) for i in range(1, len(self.filters)): self.filter_outputs.append( self.apply_lowpass_decimate( self.filter_outputs[i - 1], self.filters[i], self.dm_rates[i] ) )
[docs] def move_filter_results(self): """ Move the final results of filtering (antennas_iq data) to the CPU, optionally in SharedMemory if specified for this instance. """ # Create an array on the CPU for antennas_iq data antennas_iq_samples = self.filter_outputs[-1] if self.use_shared_mem: ant_shm = shared_memory.SharedMemory( create=True, size=antennas_iq_samples.nbytes ) buffer = ant_shm.buf self.shared_mem["antennas_iq"] = ant_shm else: buffer = None self.antennas_iq_samples = np.ndarray( antennas_iq_samples.shape, dtype=np.complex64, buffer=buffer ) # Move the antennas_iq samples to the CPU for beamforming if cupy_available: self.antennas_iq_samples[...] = xp.asnumpy(antennas_iq_samples)[...] else: self.antennas_iq_samples[...] = antennas_iq_samples[...]
[docs] def beamform(self, beam_phases: np.ndarray): """ Applies the beamforming operation to the antennas_iq samples. Different beam directions are formed in parallel, and the results are stored in SharedMemory if so specified for this instance. :param beam_phases: The complex phases used to beamform the filtered samples. Expects shape [num_slices, num_beams, num_channels]. :type beam_phases: np.ndarray """ # beam_phases: [num_slices, num_beams, num_channels] beam_phases = beam_phases # Create shared memory for result of beamforming # final_shape: [num_slices, num_beams, num_samples] final_shape = ( self.antennas_iq_samples.shape[0], beam_phases.shape[1], self.antennas_iq_samples.shape[2], ) final_size = np.dtype(np.complex64).itemsize * reduce( lambda a, b: a * b, final_shape ) if self.use_shared_mem: bf_shm = shared_memory.SharedMemory(create=True, size=final_size) buffer = bf_shm.buf self.shared_mem["bfiq"] = bf_shm else: buffer = None self.beamformed_samples = np.ndarray( final_shape, dtype=np.complex64, buffer=buffer ) # Apply beamforming self.beamformed_samples[...] = self.beamform_samples( self.antennas_iq_samples, beam_phases )
[docs] def cfs_freq_analysis(self, metadata, num_fft_points): """ Performs decimation and frequency analysis on clear frequency search data. Data will not be in shared memory. :param metadata: Clear frequency search sequence metadata :type metadata: dict :param num_fft_points: Number of points used in the FFT. Determines the frequency resolution of where df = (rx_rate / total decimation rate) / num_fft_points :type num_fft_points: int """ fs = self.rx_rate / np.prod(self.dm_rates) # Sampling frequency in Hz pulses = metadata["pulses"] tau = round(metadata["tau_spacing"] * 1e-6 * fs) # puts into units of samples pulses_in_samples = [int(round(p * tau)) for p in pulses] pulse_dur = round(0.0006 * fs) # TODO: determine way to derive 0.0006 start_sample = int(round(pulses_in_samples[0] + pulse_dur / 2)) if len(pulses_in_samples) > 1: end_sample = int(round(pulses_in_samples[1] - pulse_dur / 2)) else: end_sample = self.antennas_iq_samples.shape[-1] num_intervals = int((end_sample - start_sample) / num_fft_points) end_sample = start_sample + num_intervals * num_fft_points data = self.beamformed_samples[:, :, start_sample:end_sample] data_chunks = np.reshape( data, data.shape[:-1] + (num_intervals, num_fft_points) ) fft_data = fft.fftshift(fft.fft(data_chunks, axis=-1), axes=-1) cfs_data = 20 * np.log(np.sum(np.abs(np.average(fft_data, axis=2)), axis=1)) cfs_freqs = fft.fftshift(fft.fftfreq(num_fft_points, d=1 / fs)) return cfs_data, cfs_freqs
[docs] @staticmethod def create_filters( filter_taps: list[np.ndarray], mixing_freqs: list[float], rx_rate: float ): """ Makes bandpass filters and moves taps to the GPU, if available. The first stage filters are mixed to bandpass and the low pass filters are reshaped. ``mixing_freqs`` should be given as offsets from the USRP center frequency. For example, with 12 MHz center frequency and a 10.5 MHz operating frequency, the mixing frequency should be -1.5 MHz. :param filter_taps: The filter taps from the experiment decimation scheme :type filter_taps: list[np.ndarray] :param mixing_freqs: The frequencies used to mix the first stage filter to make it a bandpass filter :type mixing_freqs: list[float] :param rx_rate: The rf rx rate. :type rx_rate: float :returns: List of stages of filter taps. First stage is bandpass with shape ``[num_freqs, num_taps]``, subsequent stages are lowpass with shape ``[1, num_taps]``. Filter taps are moved to the GPU if cupy is available. :rtype: list[xp.ndarray] """ filters = [] num_freqs = len(mixing_freqs) num_taps = filter_taps[0].shape[0] bandpass = np.zeros((num_freqs, num_taps), dtype=np.complex64) oscillator = np.arange(num_taps, dtype=np.complex64) for idx, f in enumerate(mixing_freqs): # Filtering is actually done via a correlation, not a convolution (the filter is not reversed). # Thus, the mixing frequency should be the negative of the frequency that is actually being extracted. bandpass[idx, :] = filter_taps[0] * np.exp( oscillator * 2j * np.pi * (-f) / rx_rate ) filters.append(xp.array(bandpass, dtype=xp.complex64)) for t in filter_taps[1:]: filters.append(xp.array(t[np.newaxis, :], dtype=xp.complex64)) return filters
@staticmethod def _windowed_view(ndarray, window_len: int, step: int): """ Creates a strided and windowed view of ``ndarray``. This allows us to skip samples that will otherwise be dropped without missing samples needed for the convolutions windows. The strides will also not extend out of bounds meaning we do not need to pad extra samples and then drop bad samples after the fact. :param ndarray: The input ndarray (cupy or numpy) :type ndarray: ndarray :param window_len: The window length(filter length) :type window_len: int :param step: The step(dm rate) :type step: int :returns: The array with a new view. :rtype: ndarray (cupy or numpy) """ nrows = ((ndarray.shape[-1] - window_len) // step) + 1 last_dim_stride = ndarray.strides[-1] new_shape = ndarray.shape[:-1] + (nrows, window_len) new_strides = list(ndarray.strides + (last_dim_stride,)) new_strides[-2] *= step return xp.lib.stride_tricks.as_strided( ndarray, shape=new_shape, strides=new_strides )
[docs] @staticmethod def apply_bandpass_decimate( input_samples, bp_filters, mixing_freqs, dm_rate, rx_rate ): """ Apply a Frerking bandpass filter to the input samples. Several different frequencies can be centered on simultaneously. Downsampling is done in simultaneously to reduce the computational load. :param input_samples: The input IQ samples for each channel. :type input_samples: xp.ndarray [num_channels, num_samples] :param bp_filters: The bandpass filter(s). :type bp_filters: xp.ndarray [num_slices, num_taps] :param mixing_freqs: The frequencies used to mix the first stage filter for bandpass :type mixing_freqs: list[float] :param dm_rate: The decimation rate of this stage :type dm_rate: int :param rx_rate: The sampling rate :type rx_rate: float :returns: Samples after bandpass filter and downsampling operations. Shape [num_slices, num_channels, samples] :rtype: ndarray """ # We need to force the input into the GPU to be float16, float32, or complex64 so that the einsum result is # complex64 and NOT complex128. The GPU is significantly slower (10x++) working with complex128 numbers. # We do not require the additional precision. bp_filters = xp.array(bp_filters, dtype=xp.complex64) input_samples = DSP._windowed_view(input_samples, bp_filters.shape[-1], dm_rate) # [num_slices, num_taps] # [num_channels, num_output_samples, num_taps] filtered = xp.einsum( "ij,klj->ikl", bp_filters, input_samples, optimize="greedy" ) # Apply the phase correction for the Frerking method. ph = xp.arange(filtered.shape[-1], dtype=np.float32)[xp.newaxis, :] freqs = xp.array(mixing_freqs)[:, xp.newaxis] # [1, num_output_samples] # [num_slices, 1] # ph: [num_slices, num_output_samples] ph = xp.fmod(ph * 2.0 * xp.pi * (-freqs) / rx_rate * dm_rate, 2.0 * xp.pi) ph = xp.exp(1j * ph.astype(xp.float32)) # [num_slices, num_channels, num_output_samples] # [num_slices, 1, num_output_samples] # corrected: [num_slices, num_channels, num_output_samples] corrected = filtered * ph[:, xp.newaxis, :] return corrected
[docs] @staticmethod def apply_lowpass_decimate(input_samples, lp_filter, dm_rate): """ Apply a lowpass filter to the baseband input samples. Downsampling is done in parallel via a strided window view of the input samples. :param input_samples: Baseband input samples :type input_samples: ndarray [num_slices, num_channels, num_samples] :param lp_filter: Lowpass filter taps :type lp_filter: xp.ndarray [1, num_taps] :param dm_rate: The decimation rate of this stage :type dm_rate: int :returns: Samples after lowpass filter and downsampling operations. Shape [num_slices, num_channels, samples] :rtype: ndarray """ # We need to force the input into the GPU to be float16, float32, or complex64 so that the einsum result is # complex64 and NOT complex128. The GPU is significantly slower (10x++) working with complex128 numbers. # We do not require the additional precision. lp_filter = xp.array(lp_filter, dtype=xp.complex64) input_samples = DSP._windowed_view(input_samples, lp_filter.shape[-1], dm_rate) # [1, num_taps] # [num_slices, num_channels, num_output_samples, num_taps] # filtered: [num_slices, num_channels, num_output_samples] filtered = xp.einsum("ij,klmj->klm", lp_filter, input_samples) return filtered
[docs] @staticmethod def beamform_samples(filtered_samples, beam_phases): """ Beamform the filtered samples for multiple beams simultaneously. :param filtered_samples: The filtered input samples. :type filtered_samples: ndarray [num_slices, num_channels, num_samples] :param beam_phases: The beam phases used to phase each channel's samples before combining. :type beam_phases: ndarray [num_slices, num_beams, num_channels] :returns: Beamformed samples of shape [num_slices, num_beams, num_samples] :rtype: np.ndarray """ # [num_slices, num_beams, num_channels] beam_phases = np.array(beam_phases) # result: [num_slices, num_beams, num_samples] return np.einsum("ijk,ilj->ilk", filtered_samples, beam_phases)
[docs] @staticmethod def correlations_from_samples( beamformed_samples_1: np.ndarray, beamformed_samples_2: np.ndarray, output_sample_rate: float, slice_metadata: list[dict], ) -> list[np.ndarray]: """ Correlate two sets of beamformed samples together. :param beamformed_samples_1: The first beamformed samples :type beamformed_samples_1: np.ndarray [num_slices, num_beams, num_samples] :param beamformed_samples_2: The second beamformed samples :type beamformed_samples_2: np.ndarray [num_slices, num_beams, num_samples] :param output_sample_rate: Sampling rate of data :type output_sample_rate: float :param slice_metadata: Details used to extract indices for each slice. :type slice_metadata: list[dict] :returns: Correlations for slices. List of length num_slices, with each entry having shape [num_beams, num_range_gates, num_lags]. :rtype: list[np.ndarray] """ values = [] for slc, metadata in enumerate(slice_metadata): if metadata.get("skip", False) or metadata["lags"].size == 0: values.append(np.array([])) continue range_off = ( np.arange(metadata["num_range_gates"], dtype=np.int32) + metadata["first_range_off"] ) tau_in_samples = np.int32( round(metadata["tau_spacing"] * 1e-6 * output_sample_rate) ) lag_pulses_as_samples = ( np.array(metadata["lags"], np.int32) * tau_in_samples ) # [num_range_gates, 1, 1] # [1, num_lags, 2] samples_for_all_range_lags = ( range_off[..., np.newaxis, np.newaxis] + lag_pulses_as_samples[np.newaxis, :, :] ) # [num_range_gates, num_lags, 2] row = samples_for_all_range_lags[..., 1].astype(np.int32) # [num_range_gates, num_lags, 2] col = samples_for_all_range_lags[..., 0].astype(np.int32) values_for_slice = np.empty( (beamformed_samples_1.shape[1], row.shape[0], row.shape[1]), dtype=np.complex64, ) # [num_beams, num_ranges, num_lags] for lag in range(row.shape[1]): values_for_slice[:, :, lag] = np.einsum( "ij,ij->ji", beamformed_samples_1[slc, :, row[:, lag]], beamformed_samples_2[slc, :, col[:, lag]].conj(), ) values.append(values_for_slice * metadata["lag_phase_offsets"]) return values
[docs] def get_phase_shift( beam_angle: list[float], freq_khz: Union[float, list[float]], antenna_locations: np.ndarray, ): """ Find the complex excitation for all antennas to make beams in all given directions. :param beam_angle: list of azimuthal direction of the beam off boresight, in degrees, positive beamdir being to the right of the boresight (looking along boresight from ground). :type beam_angle: list[float] :param freq_khz: transmit frequency in kHz. Can be a single frequency or a list :type freq_khz: Union[float, list[float]] :param antenna_locations: x-coordinates of each antenna in the array, in meters. Shape [num_antennas] :type antenna_locations: np.ndarray :returns: phase_shift a 3D array of shape [freqs, beams, antennas] giving the complex excitation for each antenna required to form each beam for each frequency. :rtype: phase_shift ndarray """ if not isinstance(freq_khz, list): freq_khz = [freq_khz] # freq_khz: [num_freqs] freq_khz = np.array(freq_khz, dtype=np.float32) freq_hz = freq_khz * 1000.0 # convert to Hz. k = 2 * np.pi * freq_hz / speed_of_light # 2pi / wavelength beam_rads = np.deg2rad(np.array(beam_angle, dtype=np.float32)) # beam_rads: [num_angles] # phase shift = 0 at array midpoint (by convention), so this is the displacement in x of each beam from the array # midpoint after the wave traverses one wavelength. Essentially, the component along x of the beam, normalized by # the wavelength. beam_displacements = np.outer(k, -1 * np.sin(beam_rads)) # beam_displacements: [num_freqs, num_angles] phase_shift = np.einsum("ij,k->ijk", beam_displacements, antenna_locations) phase_shift = np.exp(1j * phase_shift) return phase_shift
[docs] def get_samples( sampling_rate: float, mixing_freq: float, pulse_len: float, ramp_time: float, max_amplitude: float, ) -> np.ndarray: """ Get basic (not phase-shifted) samples for a given pulse. Find the normalized sample array given the rate (Hz), frequency (Hz), pulse length (s). :param sampling_rate: USRP tx sample rate [Hz]. :type sampling_rate: float :param mixing_freq: frequency offset from the centre frequency on the USRP [Hz]. To be mixed with the centre frequency before transmitting, e.g. centre = 12 MHz, wave_freq = + 1.2 MHz, output = 13.2 MHz. :type mixing_freq: float :param pulse_len: length of the pulse [s] :type pulse_len: float :param ramp_time: ramp up and ramp down time for the pulse [s]. Typical 1.0e-5 s from ``config.ini`` :type ramp_time: float :param max_amplitude: USRP's max DAC amplitude. N200 = 0.707 max :type max_amplitude: float :returns samples: a numpy array of complex samples, representing all samples needed for a pulse of length ``pulse_len`` sampled at ``sampling_rate``. :rtype: np.ndarray """ mixing_freq = float(mixing_freq) sampling_rate = float(sampling_rate) wave_freq = 2 * math.pi * mixing_freq / sampling_rate # for linear we used the below: ramp_samps = round( sampling_rate * ramp_time ) # number of samples for ramp-up and ramp-down of pulse. pulse_len_samps = round(sampling_rate * pulse_len) wave_form = np.exp(1j * wave_freq * np.arange(pulse_len_samps)) amplitude_ramp_up = np.arange(ramp_samps) / ramp_samps amplitude_ramp_down = np.flipud(amplitude_ramp_up) ramp_up_piece = wave_form[:ramp_samps] ramp_down_piece = wave_form[pulse_len_samps - ramp_samps :] np.multiply(ramp_up_piece, amplitude_ramp_up, out=ramp_up_piece) np.multiply(ramp_down_piece, amplitude_ramp_down, out=ramp_down_piece) samples = wave_form * max_amplitude return samples
[docs] def basic_pulse_phase_offset(exp_slice: ExperimentSlice, beam_iter: iter): """ Calculate the phase difference of each pulse with respect to the first pulse based on the transmit frequency and the pulse separation. :param exp_slice: The experiment slice information :type exp_slice: ExperimentSlice :param beam_iter: beam index :type beam_iter: int :returns: Pulse phase offsets [radians] :rtype: np.ndarray """ freq = exp_slice.freq if isinstance(freq, list): freq = freq[exp_slice.freq_order[beam_iter]] freq_hz = freq * 1e3 tau_s = exp_slice.tau_spacing / 1e6 omega = 2 * np.pi * freq_hz pulse_sequence = exp_slice.pulse_sequence num_pulses = len(pulse_sequence) pulse_phases = np.zeros(num_pulses) for p in range(num_pulses): pulse_time = pulse_sequence[p] * tau_s pulse_phases[p] = np.angle(np.exp(1j * omega * pulse_time)) return pulse_phases