#!/usr/bin/python3
"""
local_scd_server.py
~~~~~~~~~~~~~~~~~~~
Monitors for new SWG files and adds the SWG info to the scd if there is an update.
:copyright: 2022 SuperDARN Canada
"""
import subprocess as sp
import sys
import os
import datetime
import time
import argparse
BOREALISPATH = os.environ["BOREALISPATH"]
sys.path.append(f"{BOREALISPATH}/scheduler")
import scd_utils
SWG_GIT_REPO_DIR = "schedules"
SWG_GIT_REPO = "https://github.com/SuperDARN/schedules.git"
EXPERIMENTS = {
"sas": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
"pgr": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
"rkn": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
"inv": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
"cly": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
"lab": {
"common_time": "twofsound",
"discretionary_time": "twofsound",
"htr_common_time": "twofsound",
"themis_time": "themisscan",
"special_time_normal": "twofsound",
"rbsp_time": "rbspscan",
"no_switching_time": "normalscan",
"interleaved_time": "interleavedscan",
"normalsound_time": "normalsound",
},
}
[docs]class SWG(object):
"""Holds the data needed for processing a SWG file."""
def __init__(self, scd_dir):
super().__init__()
self.scd_dir = scd_dir
# Determine if the git repo for schedules exists, and clone it if it doesn't
try:
cmd = f"git -C {self.scd_dir}/{SWG_GIT_REPO_DIR} rev-parse"
sp.check_output(cmd, shell=True)
except sp.CalledProcessError:
cmd = f"cd {self.scd_dir}; git clone {SWG_GIT_REPO}"
sp.call(cmd, shell=True)
[docs] def new_swg_file_available(self):
"""
Checks if a new swg file is uploaded via git.
:returns: True, if new git update is available.
:rtype: bool
"""
# This command will return the number of new commits available in main. This signals that
# there are new SWG files available.
cmd = f"cd {self.scd_dir}/{SWG_GIT_REPO_DIR}; git fetch; git log ..origin/main --oneline | wc -l"
shell_output = sp.check_output(cmd, shell=True)
return bool(int(shell_output))
[docs] def pull_new_swg_file(self):
"""Uses git to grab the new scd updates."""
cmd = f"cd {self.scd_dir}/{SWG_GIT_REPO_DIR}; git pull origin main"
print("Pulling schedule repository")
shell_output = sp.check_output(cmd, shell=True)
print(f"Result: {shell_output}")
[docs] def parse_swg_to_scd(self, modes, radar, first_run):
"""
Reads the new SWG file and parses into a set of parameters than can be used for borealis
scheduling.
:param modes: Holds the modes that correspond to the SWG requests.
:type modes: dict
:param radar: Radar acronym.
:type radar: str
:param first_run: Is this the first run? If so - start with current month, otherwise next month.
:type first_run: bool
:returns: List of all the parsed parameters.
:rtype: list
"""
print("Parsing schedule files")
if first_run:
month_to_use = datetime.datetime.utcnow()
else:
month_to_use = scd_utils.get_next_month_from_date()
year = month_to_use.strftime("%Y")
month = month_to_use.strftime("%m")
yearmonth = year + month
swg_file = f"{self.scd_dir}/{SWG_GIT_REPO_DIR}/{year}/{yearmonth}.swg"
print(f"Reading schedule file {swg_file}")
with open(swg_file, "r") as f:
swg_lines = f.readlines()
skip_line = False
parsed_params = []
for idx, line in enumerate(swg_lines):
# Init mode_to_use and mode_type to None every loop, so we can error check
mode_to_use = None
mode_type = None
# Skip line is used for special time radar lines
if skip_line:
skip_line = False
continue
# We've hit the SCD notes and no longer need to parse
if "# Notes:" in line:
break
# Skip only white space lines
if not line.strip():
continue
# Lines starting with '#' or whitespace and '#' are comments
if line.strip()[0] == "#":
continue
# First line is month and year
if idx == 0:
continue
items = line.split()
start_day = items[0][0:2]
start_hr = items[0][3:]
if "Common Time" in line:
mode_type = "common"
# 2018 11 23 no longer scheduling twofsound as common time during 'no switching'
if "no switching" in line:
mode_to_use = modes["no_switching_time"]
elif "normalsound" in line:
mode_to_use = modes["normalsound_time"]
elif "interleavescan" in line:
mode_to_use = modes["interleaved_time"]
else:
mode_to_use = modes["htr_common_time"]
if "Special Time" in line:
mode_type = "special"
if "ALL" in line or radar.upper() in line:
if "THEMIS" in line:
mode_to_use = modes["themis_time"]
elif "ST-APOG" in line or "RBSP" in line:
mode_to_use = modes["rbsp_time"]
elif "ARASE" in line or "interleavescan" in line:
mode_to_use = modes["interleaved_time"]
elif "normalsound" in line:
mode_to_use = modes["normalsound_time"]
else:
print("Unknown Special Time: using default common time")
mode_to_use = modes["htr_common_time"]
else:
mode_to_use = modes["special_time_normal"]
if "Discretionary Time" in line:
mode_type = "discretionary"
mode_to_use = modes["discretionary_time"]
if not mode_to_use or not mode_type:
raise ValueError(f"SWG line couldn't be parsed: {line}")
param = {
f"yyyymmdd": f"{year}{month}{start_day}",
f"hhmm": f"{start_hr}:00",
"experiment": mode_to_use,
"scheduling_mode": mode_type,
}
parsed_params.append(param)
print(f"Found schedule line: {param}")
return parsed_params
[docs]def main():
""" """
parser = argparse.ArgumentParser(
description="Automatically schedules new events from the SWG"
)
parser.add_argument("--scd-dir", required=True, help="The scd working directory")
parser.add_argument(
"--force",
action="store_true",
help="Force an update to the schedules for the next month",
)
parser.add_argument(
"--first-run",
action="store_true",
help="This will generate the first set of schedule files if running on a fresh directory. "
"If the next month schedule is available, you will need to roll back the SWG schedule "
"folder back to the last commit before running in continuous operation.",
)
args = parser.parse_args()
scd_dir = args.scd_dir
scd_logs = scd_dir + "/logs"
if not os.path.exists(scd_dir):
os.makedirs(scd_dir)
if not os.path.exists(scd_logs):
os.makedirs(scd_logs)
sites = list(EXPERIMENTS.keys())
site_scds = [scd_utils.SCDUtils(f"{scd_dir}/{s}.scd", s) for s in sites]
swg = SWG(scd_dir)
force_next_month = args.force
current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"{current_time} - Starting local_scd_server.py...")
if args.first_run:
# Create the .scd files for each site if running for first time
for s in sites:
filename = f"{scd_dir}/{s}.scd"
with open(filename, "a"):
pass
while True:
if swg.new_swg_file_available() or args.first_run or force_next_month:
current_time = datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
print(f"{current_time} - New swg file available")
swg.pull_new_swg_file()
site_experiments = [
swg.parse_swg_to_scd(EXPERIMENTS[s], s, args.first_run) for s in sites
]
errors = False
today = datetime.datetime.utcnow()
scd_error_log = today.strftime("/scd_errors.%Y%m%d")
for se, site_scd in zip(site_experiments, site_scds):
for ex in se:
try:
print(
f"add_line date: {ex['yyyymmdd']}, "
f"with experiment: {ex['experiment']}, "
f"mode: {ex['scheduling_mode']}"
)
site_scd.add_line(
ex["yyyymmdd"],
ex["hhmm"],
ex["experiment"],
ex["scheduling_mode"],
)
except ValueError as err:
error_msg = (
f"{today.strftime('%c')} {site_scd.scd_filename}: Unable to add line:\n"
f"\t {ex['yyyymmdd']} {ex['hhmm']} {ex['experiment']} {ex['scheduling_mode']}\n"
f"\t Exception thrown:\n"
f"\t\t {str(err)}\n"
)
with open(scd_logs + scd_error_log, "a") as f:
f.write(error_msg)
errors = True
break
except FileNotFoundError:
error_msg = (
f"SCD filename: {site_scd.scd_filename} is missing!!!\n"
)
with open(scd_logs + scd_error_log, "a") as f:
f.write(error_msg)
errors = True
break
if not errors:
success_msg = "All swg lines successfully scheduled.\n"
for site, site_scd in zip(sites, site_scds):
yyyymmdd = today.strftime("%Y%m%d")
hhmm = today.strftime("%H:%M")
new_lines = site_scd.get_relevant_lines(yyyymmdd, hhmm)
text_lines = [site_scd.fmt_line(x) for x in new_lines]
success_msg += f"\t{site}\n"
for line in text_lines:
success_msg += f"\t\t{line}\n"
with open(scd_logs + scd_error_log, "a") as f:
f.write(success_msg)
if args.first_run:
break
force_next_month = False
else:
time.sleep(300)
if __name__ == "__main__":
main()