Source code for src.utils.options

#!/usr/bin/python

"""
    options
    ~~~~~~~

    Parse all configuration options from the config.ini file for the current site.
    Additionally, parse the hdw.dat and restrict.dat files of the current site for other
    configuration information.

    See https://borealis.readthedocs.io/en/latest/source/config_options.html for
    detailed descriptions of each configuration option.

    :copyright: 2023 SuperDARN Canada
    :author: Theodore Kolkman
"""

import os
import json
from dataclasses import dataclass, field


[docs]@dataclass class Options: """ Parses all configuration options from the config.ini file for the current site. Additionally, parses the hdw.dat and restrict.dat files of the current site for other configuration information. """ # config.ini options data_directory: str = field(init=False) hdw_path: str = field(init=False) rx_intf_antennas: list[int] = field(init=False) intf_antenna_spacing: float = field(init=False) intf_antenna_count: int = field(init=False) log_aggregator_addr: str = field(init=False) log_aggregator_bool: bool = field(init=False) log_aggregator_port: int = field(init=False) log_console_bool: bool = field(init=False) log_directory: str = field(init=False) console_log_level: str = field(init=False) logfile_log_level: str = field(init=False) aggregator_log_level: str = field(init=False) log_logfile_bool: bool = field(init=False) rx_main_antennas: list[int] = field(init=False) tx_main_antennas: list[int] = field(init=False) main_antenna_spacing: float = field(init=False) main_antenna_count: int = field(init=False) max_filtering_stages: int = field(init=False) max_filter_taps_per_stage: int = field(init=False) max_freq: float = field(init=False) max_output_sample_rate: float = field(init=False) max_rx_sample_rate: float = field(init=False) max_tx_sample_rate: float = field(init=False) max_usrp_dac_amplitude: float = field(init=False) min_freq: float = field(init=False) min_pulse_length: float = field(init=False) min_pulse_separation: float = field(init=False) min_tau_spacing_length: float = field(init=False) n200_addrs: list[str] = field(init=False) n200_count: int = field(init=False) pulse_ramp_time: float = field(init=False) realtime_address: str = field(init=False) ringbuffer_name: str = field(init=False) router_address: str = field(init=False) site_id: str = field(init=False) tr_window_time: float = field(init=False) usrp_master_clock_rate: float = field(init=False) # hdw.dat options altitude: float = field(init=False) analog_atten_stages: int = field(init=False) analog_rx_attenuator: float = field(init=False) analog_rx_rise: float = field(init=False) beam_sep: float = field(init=False) boresight: float = field(init=False) boresight_shift: float = field(init=False) geo_lat: float = field(init=False) geo_long: float = field(init=False) intf_offset: list[float] = field(init=False) max_beams: int = field(init=False) max_range_gates: int = field(init=False) phase_sign: int = field(init=False) status: int = field(init=False) tdiff_a: float = field(init=False) tdiff_b: float = field(init=False) velocity_sign: int = field(init=False) # restrict.dat options default_freq: int = field(init=False) restricted_ranges: list[tuple[int]] = field(init=False) # ZMQ Identities brian_to_driver_identity: str = "BRIAN_DRIVER_IDEN" brian_to_dspbegin_identity: str = "BRIAN_DSPBEGIN_IDEN" brian_to_dspend_identity: str = "BRIAN_DSPEND_IDEN" brian_to_radctrl_identity: str = "BRIAN_RADCTRL_IDEN" driver_to_brian_identity: str = "DRIVER_BRIAN_IDEN" driver_to_dsp_identity: str = "DRIVER_DSP_IDEN" driver_to_mainaffinity_identity: str = "DRIVER_MAINAFFINITY_IDEN" driver_to_radctrl_identity: str = "DRIVER_RADCTRL_IDEN" driver_to_rxaffinity_identity: str = "DRIVER_RXAFFINITY_IDEN" driver_to_txaffinity_identity: str = "DRIVER_TXAFFINITY_IDEN" dspbegin_to_brian_identity: str = "DSPBEGIN_BRIAN_IDEN" dspend_to_brian_identity: str = "DSPEND_BRIAN_IDEN" dsp_to_driver_identity: str = "DSP_DRIVER_IDEN" dsp_to_dw_identity: str = "DSP_DW_IDEN" dsp_to_exphan_identity: str = "DSP_EXPHAN_IDEN" dsp_to_radctrl_identity: str = "DSP_RADCTRL_IDEN" dw_to_dsp_identity: str = "DW_DSP_IDEN" dw_to_radctrl_identity: str = "DW_RADCTRL_IDEN" dw_to_rt_identity: str = "DW_RT_IDEN" exphan_to_dsp_identity: str = "EXPHAN_DSP_IDEN" exphan_to_radctrl_identity: str = "EXPHAN_RADCTRL_IDEN" mainaffinity_to_driver_identity: str = "MAINAFFINITY_DRIVER_IDEN" radctrl_to_brian_identity: str = "RADCTRL_BRIAN_IDEN" radctrl_to_driver_identity: str = "RADCTRL_DRIVER_IDEN" radctrl_to_dsp_identity: str = "RADCTRL_DSP_IDEN" radctrl_to_dw_identity: str = "RADCTRL_DW_IDEN" radctrl_to_exphan_identity: str = "RADCTRL_EXPHAN_IDEN" rt_to_dw_identity: str = "RT_DW_IDEN" rxaffinity_to_driver_identity: str = "RXAFFINITY_DRIVER_IDEN" txaffinity_to_driver_identity: str = "TXAFFINITY_DRIVER_IDEN" def __post_init__(self): if not os.environ["BOREALISPATH"]: raise ValueError("BOREALISPATH env variable not set") if not os.environ["RADAR_ID"]: raise ValueError("RADAR_ID env variable not set") self.parse_config() # Parse info from config file self.parse_hdw() # Parse info from hdw file self.parse_restrict() # Parse info from restrict.dat file self.verify_options() # Check that all parsed values are valid
[docs] def parse_config(self): # Read in config.ini file for current site path = ( f'{os.environ["BOREALISPATH"]}/config/' f'{os.environ["RADAR_ID"]}/' f'{os.environ["RADAR_ID"]}_config.ini' ) try: with open(path, "r") as data: raw_config = json.load(data) except OSError: errmsg = f"Cannot open config file at {path}" raise ValueError(errmsg) # Initialize all options from config file self.site_id = raw_config["site_id"] self.main_antenna_count = int(raw_config["main_antenna_count"]) self.intf_antenna_count = int(raw_config["intf_antenna_count"]) # Parse N200 array and calculate which main and intf antennas operating self.n200_count = 0 self.n200_addrs = [] # Used for checking IPs of N200s self.rx_main_antennas = [] self.rx_intf_antennas = [] self.tx_main_antennas = [] def parse_channel(channel_str: str, rx: bool): """Parse the antenna number and which array it belongs to""" try: antenna_num = int(channel_str[1:]) except ValueError: problem = "channel[1:] must be an integer" raise ValueError(problem) if channel_str[0] == "m": if rx: self.rx_main_antennas.append(antenna_num) else: self.tx_main_antennas.append(antenna_num) elif channel_str[0] == "i": if rx: self.rx_intf_antennas.append(antenna_num) else: raise ValueError( "Cannot connect tx channel to interferometer array" ) else: problem = "channel must start with either 'm' or 'i' for main or interferometer array" raise ValueError(problem) for n200 in raw_config["n200s"]: rx_channel_0 = n200["rx_channel_0"] rx_channel_1 = n200["rx_channel_1"] tx_channel_0 = n200["tx_channel_0"] n200_in_use = False if rx_channel_0 != "": try: parse_channel(rx_channel_0, True) n200_in_use = True except ValueError as err: msg = f"; N200 {n200['addr']} rx_channel_0" raise ValueError(str(err) + msg) if rx_channel_1 != "": try: parse_channel(rx_channel_1, True) n200_in_use = True except ValueError as err: msg = f"; N200 {n200['addr']} rx_channel_1" raise ValueError(str(err) + msg) if tx_channel_0 != "": try: parse_channel(tx_channel_0, False) n200_in_use = True except ValueError as err: msg = f"; N200 {n200['addr']} tx_channel_0" raise ValueError(str(err) + msg) if n200_in_use: self.n200_addrs.append(n200["addr"]) self.n200_count += 1 self.rx_main_antennas.sort() self.rx_intf_antennas.sort() self.tx_main_antennas.sort() self.main_antenna_spacing = float(raw_config["main_antenna_spacing"]) # m self.intf_antenna_spacing = float(raw_config["intf_antenna_spacing"]) # m self.min_freq = float(raw_config["min_freq"]) # Hz self.max_freq = float(raw_config["max_freq"]) # Hz self.min_pulse_length = float(raw_config["min_pulse_length"]) # us self.min_tau_spacing_length = float(raw_config["min_tau_spacing_length"]) # us # Minimum pulse separation is the minimum before the experiment treats it as a single pulse # (transmitting zeroes or no receiving between the pulses) 125 us is approx two TX/RX times self.min_pulse_separation = float(raw_config["min_pulse_separation"]) # us self.max_tx_sample_rate = float(raw_config["max_tx_sample_rate"]) # sps self.max_rx_sample_rate = float(raw_config["max_rx_sample_rate"]) # sps self.max_usrp_dac_amplitude = float(raw_config["max_usrp_dac_amplitude"]) # V self.pulse_ramp_time = float(raw_config["pulse_ramp_time"]) # s self.tr_window_time = float(raw_config["tr_window_time"]) # s self.usrp_master_clock_rate = float(raw_config["usrp_master_clock_rate"]) # sps self.max_output_sample_rate = float(raw_config["max_output_sample_rate"]) # sps self.max_filtering_stages = int(raw_config["max_filtering_stages"]) self.max_filter_taps_per_stage = int(raw_config["max_filter_taps_per_stage"]) self.router_address = raw_config["router_address"] self.realtime_address = raw_config["realtime_address"] self.ringbuffer_name = raw_config["ringbuffer_name"] self.data_directory = raw_config["data_directory"] self.log_directory = raw_config["log_handlers"]["logfile"]["directory"] self.hdw_path = raw_config["hdw_path"] self.console_log_level = raw_config["log_handlers"]["console"]["level"] self.logfile_log_level = raw_config["log_handlers"]["logfile"]["level"] self.aggregator_log_level = raw_config["log_handlers"]["aggregator"]["level"] self.log_console_bool = raw_config["log_handlers"]["console"]["enable"] self.log_logfile_bool = raw_config["log_handlers"]["logfile"]["enable"] self.log_aggregator_bool = raw_config["log_handlers"]["aggregator"]["enable"] self.log_aggregator_addr = raw_config["log_handlers"]["aggregator"]["addr"] self.log_aggregator_port = int(raw_config["log_handlers"]["aggregator"]["port"])
[docs] def parse_hdw(self): # Load information from the hardware file hdw_dat_file = f'{self.hdw_path}/hdw.dat.{os.environ["RADAR_ID"]}' try: with open(hdw_dat_file) as hdwdata: lines = hdwdata.readlines() except OSError: errmsg = f"Cannot open hdw.dat file at {hdw_dat_file}" raise ValueError(errmsg) lines[:] = [line for line in lines if line[0] != "#"] # remove comments lines[:] = [line for line in lines if len(line.split()) != 0] # remove blanks # Take the final line try: hdw = lines[-1] except IndexError: errmsg = f"Cannot find any valid lines in the hardware file: {hdw_dat_file}" raise ValueError(errmsg) # we now have the correct line of data. params = hdw.split() if len(params) != 22: errmsg = f"Found {len(params)} parameters in hardware file, expected 22" raise ValueError(errmsg) self.status = int(params[1]) # 1 operational, -1 offline self.geo_lat = float(params[4]) # decimal degrees, S = negative self.geo_long = float(params[5]) # decimal degrees, W = negative self.altitude = float(params[6]) # metres self.boresight = float( params[7] ) # degrees from geographic north, CCW = negative. self.boresight_shift = float( params[8] ) # degrees from physical boresight. nominal 0.0 degrees self.beam_sep = float(params[9]) # degrees, nominal 3.24 degrees self.velocity_sign = int(params[10]) # +1 or -1 self.phase_sign = int( params[11] ) # +1 indicates correct interferometry phase, -1 indicates 180 self.tdiff_a = float(params[12]) # us for channel A. self.tdiff_b = float(params[13]) # us for channel B. # interferometer offset from midpoint of main, metres [x, y, z] where x is along line of # antennas, y is along array normal and z is altitude difference, in m. self.intf_offset = [float(params[14]), float(params[15]), float(params[16])] self.analog_rx_rise = float(params[17]) # us self.analog_rx_attenuator = float(params[18]) # dB self.analog_atten_stages = int(params[19]) # number of stages self.max_range_gates = int(params[20]) self.max_beams = int( params[21] ) # so a beam number always points in a certain direction
[docs] def parse_restrict(self): # Read in restrict.dat path = ( f'{os.environ["BOREALISPATH"]}/config/' f'{os.environ["RADAR_ID"]}/' f'restrict.dat.{os.environ["RADAR_ID"]}' ) try: with open(path) as data: restricted = data.readlines() except IOError: print(f"IOError on restrict.dat file at {path}") raise restricted[:] = [ line for line in restricted if line[0] != "#" ] # remove comments restricted[:] = [ line for line in restricted if len(line.split()) != 0 ] # remove blanks for line in restricted: splitup = line.split("=") if len(splitup) == 2: if splitup[0].strip() == "default": self.default_freq = int(splitup[1]) # kHz restricted.remove(line) break else: # no break raise ValueError("No default frequency found in restrict.dat") self.restricted_ranges = [] for line in restricted: splitup = line.split() if len(splitup) != 2: errmsg = "Error reading restricted frequency: more than two frequencies listed" raise ValueError(errmsg) try: splitup = [int(float(freq)) for freq in splitup] # convert to ints except ValueError: errmsg = "Error parsing restrict.dat: frequencies must be valid numbers" raise ValueError(errmsg) restricted_range = tuple(splitup) self.restricted_ranges.append(restricted_range)
[docs] def verify_options(self): if self.site_id != os.environ["RADAR_ID"]: errmsg = f'site_id {self.site_id} is different from RADAR_ID {os.environ["RADAR_ID"]}' raise ValueError(errmsg) if len(self.n200_addrs) != len(set(self.n200_addrs)): raise ValueError("Two or more n200s have identical IP addresses") if len(self.rx_main_antennas) > 0: if ( min(self.rx_main_antennas) < 0 or max(self.rx_main_antennas) >= self.main_antenna_count ): raise ValueError( "rx_main_antennas and main_antenna_count are not consistent" ) if len(self.rx_main_antennas) != len(set(self.rx_main_antennas)): raise ValueError("rx_main_antennas has duplicate values") if len(self.tx_main_antennas) > 0: if ( min(self.tx_main_antennas) < 0 or max(self.tx_main_antennas) >= self.main_antenna_count ): raise ValueError( "tx_main_antennas and main_antenna_count are not consistent" ) if len(self.tx_main_antennas) != len(set(self.tx_main_antennas)): raise ValueError("tx_main_antennas has duplicate values") if len(self.rx_intf_antennas) > 0: if ( min(self.rx_intf_antennas) < 0 or max(self.rx_intf_antennas) >= self.intf_antenna_count ): raise ValueError( "rx_intf_antennas and intf_antenna_count are not consistent" ) if len(self.rx_intf_antennas) != len(set(self.rx_intf_antennas)): raise ValueError("rx_intf_antennas has duplicate values") # TODO: Test that realtime_address and router_address are valid addresses if not os.path.exists(self.data_directory): raise ValueError(f"data_directory {self.data_directory} does not exist") if not os.path.exists(self.log_directory): raise ValueError(f"log_directory {self.log_directory} does not exist") if not os.path.exists(self.hdw_path): raise ValueError(f"hdw_path directory {self.hdw_path} does not exist")
def __str__(self): return_str = f""" site_id = {self.site_id} \ \n rx_main_antennas = {self.rx_main_antennas} \ \n tx_main_antennas = {self.tx_main_antennas} \ \n main_antenna_count = {self.main_antenna_count} \ \n rx_intf_antennas = {self.rx_intf_antennas} \ \n intf_antenna_count = {self.intf_antenna_count} \ \n main_antenna_spacing = {self.main_antenna_spacing} metres \ \n intf_antenna_spacing = {self.intf_antenna_spacing} metres \ \n min_freq = {self.min_freq} Hz\ \n max_freq = {self.max_freq} Hz\ \n min_pulse_length = {self.min_pulse_length} us \ \n min_tau_spacing_length = {self.min_tau_spacing_length} us \ \n min_pulse_separation = {self.min_pulse_separation} us \ \n max_tx_sample_rate = {self.max_tx_sample_rate} Hz (samples/sec)\ \n max_rx_sample_rate = {self.max_rx_sample_rate} Hz (samples/sec)\ \n max_usrp_dac_amplitude = {self.max_usrp_dac_amplitude} V\ \n pulse_ramp_time = {self.pulse_ramp_time} s\ \n tr_window_time = {self.tr_window_time} s\ \n usrp_master_clock_rate = {self.usrp_master_clock_rate} Hz (samples/sec)\ \n max_output_sample_rate = {self.max_output_sample_rate} Hz (samples/sec)\ \n max_filtering_stages = {self.max_filtering_stages} \ \n max_filter_taps_per_stage = {self.max_filter_taps_per_stage} \ \n hdw_path = {self.hdw_path} \ \n geo_lat = {self.geo_lat} degrees \ \n geo_long = {self.geo_long} degrees\ \n altitude = {self.altitude} metres \ \n boresight = {self.boresight} degrees from geographic north, CCW = negative. \ \n boresight_shift = {self.boresight_shift} degrees. \ \n beam_sep = {self.beam_sep} degrees\ \n velocity_sign = {self.velocity_sign} \ \n tdiff_a = {self.tdiff_a} us \ \n tdiff_b = {self.tdiff_b} us \ \n phase_sign = {self.phase_sign} \ \n intf_offset = {self.intf_offset} m \ \n analog_rx_rise = {self.analog_rx_rise} us \ \n analog_rx_attenuator = {self.analog_rx_attenuator} dB \ \n analog_atten_stages = {self.analog_atten_stages} \ \n max_range_gates = {self.max_range_gates} \ \n max_beams = {self.max_beams} \ \n default_freq = {self.default_freq} kHz \ \n restricted_ranges = {self.restricted_ranges} kHz \n""" return return_str