#!/usr/bin/env python3
"""
experiment_handler process
~~~~~~~~~~~~~~~~~~~~~~~~~~
This program runs a given experiment. It will use the experiment's build_scans method to
create the iterable ScanClassBase objects that will be used by the radar_control block,
then it will pass the experiment to the radar_control block to run.
It will be passed some data to use in its update method at the end of every integration time.
This has yet to be implemented but will allow experiment_prototype to modify itself
based on received data as feedback.
:copyright: 2018 SuperDARN Canada
:author: Marci Detwiller
"""
import zmq
import os
import sys
import argparse
import inspect
import importlib
import threading
import pickle
import zlib
BOREALISPATH = os.environ['BOREALISPATH']
sys.path.append(BOREALISPATH)
from utils.experiment_options.experimentoptions import ExperimentOptions
from utils.zmq_borealis_helpers import socket_operations
from experiment_prototype.experiment_exception import ExperimentException
from experiment_prototype.experiment_prototype import ExperimentPrototype
[docs]def printing(msg):
EXPERIMENT_HANDLER = "\033[34m" + "EXPERIMENT HANDLER: " + "\033[0m"
sys.stdout.write(EXPERIMENT_HANDLER + msg + "\n")
[docs]def usage_msg():
"""
Return the usage message for this process.
This is used if a -h flag or invalid arguments are provided.
:returns: the usage message
"""
usage_message = """ experiment_handler.py [-h] experiment_module scheduling_mode_type
Pass the module containing the experiment to the experiment handler as a required
argument. It will search for the module in the BOREALISPATH/experiment_prototype
package. It will retrieve the class from within the module (your experiment).
It will use the experiment's build_scans method to create the iterable ScanClassBase
objects that will be used by the radar_control block, then it will pass the
experiment to the radar_control block to run.
It will be passed some data to use in its .update() method at the end of every
integration time. This has yet to be implemented but will allow experiments to
modify themselves based on received data as feedback. This is not a necessary method
for all experiments and if there is no update method experiment updates will not
occur."""
return usage_message
[docs]def experiment_parser():
"""
Creates the parser to retrieve the experiment module.
:returns: parser, the argument parser for the experiment_handler.
"""
parser = argparse.ArgumentParser(usage=usage_msg())
parser.add_argument("experiment_module", help="The name of the module in the experiment_prototype "
"package that contains your Experiment class, "
"e.g. normalscan")
parser.add_argument("scheduling_mode_type", help="The type of scheduling time for this experiment "
"run, e.g. common, special, or discretionary.")
return parser
[docs]def retrieve_experiment(experiment_module_name):
"""
Retrieve the experiment class from the provided module given as an argument.
:param experiment_module_name: The name of the experiment module to run
from the Borealis project's experiments directory.
:raise ExperimentException: if the experiment module provided as an argument does not contain
a single class that inherits from ExperimentPrototype class.
:returns: Experiment, the experiment class, inherited from ExperimentPrototype.
"""
if __debug__:
printing("Running the experiment: " + experiment_module_name)
experiment_mod = importlib.import_module("experiments." + experiment_module_name)
# find the class or classes *defined* in this module.
# returns list of class name and object
experiment_classes = [(m[0], m[1]) for m in inspect.getmembers(
experiment_mod, inspect.isclass) if
m[1].__module__ == experiment_mod.__name__]
# remove any classes that do not have ExperimentPrototype as parent.
for (class_name, class_obj) in experiment_classes:
if ExperimentPrototype not in inspect.getmro(class_obj):
# an experiment must inherit from ExperimentPrototype
# other utility classes might be in the file but we will ignore them.
experiment_classes.remove((class_name, class_obj))
# experiment_classes should now only have classes *defined* in the module,
# that have ExperimentPrototype as parent.
if len(experiment_classes) == 0:
errmsg = "No experiment classes are present that are built from"\
" parent class ExperimentPrototype - exiting"
raise ExperimentException(errmsg)
if len(experiment_classes) > 1:
errmsg = "You have more than one experiment class in your " \
"experiment file - exiting"
raise ExperimentException(errmsg)
# this is the experiment class that we need to run.
Experiment = experiment_classes[0][1]
printing('Retrieving experiment: {} from module {}'.format(
experiment_classes[0][0], experiment_mod))
try:
return Experiment
except NameError:
errmsg = "Something went wrong: Cannot find the experiment inside " \
"your module. Please make sure there is a class that " \
"inherits from ExperimentPrototype in your module."
raise ExperimentException(errmsg)
[docs]def send_experiment(exp_handler_to_radar_control, iden, serialized_exp):
"""
Send the experiment to radar_control module.
:param exp_handler_to_radar_control: socket to send the experiment on
:param iden: ZMQ identity
:param serialized_exp: Either a pickled experiment or a None.
"""
try:
socket_operations.send_exp(exp_handler_to_radar_control, iden, serialized_exp)
except zmq.ZMQError: # the queue was full - radarcontrol not receiving.
pass # TODO handle this. Shutdown and restart all modules.
[docs]def experiment_handler(semaphore):
"""
Run the experiment. This is the main process when this program is called.
This process runs the experiment from the module that was passed in as an argument. It
currently does not exit unless killed. It may be updated in the future to exit if provided
with an error flag.
This process begins with setup of sockets and retrieving the experiment class from the module.
It then waits for a message of type RadarStatus to come in from the radar_control block. If
the status is 'EXPNEEDED', meaning an experiment is needed, experiment_handler will build the
scan iterable objects (of class ScanClassBase) and will pass them to radar_control. Other
statuses will be implemented in the future.
In the future, the update method will be implemented where the experiment can be modified by
the incoming data.
"""
options = ExperimentOptions()
ids = [options.exphan_to_radctrl_identity, options.exphan_to_dsp_identity]
sockets_list = socket_operations.create_sockets(ids, options.router_address)
exp_handler_to_radar_control = sockets_list[0]
exp_handler_to_dsp = sockets_list[1]
parser = experiment_parser()
args = parser.parse_args()
experiment_name = args.experiment_module
scheduling_mode_type = args.scheduling_mode_type
Experiment = retrieve_experiment(experiment_name)
experiment_update = False
for method_name, obj in inspect.getmembers(Experiment, inspect.isfunction):
if method_name == 'update':
experiment_update = True
if __debug__:
if experiment_update:
printing("Experiment has an updated method.")
exp = Experiment()
exp._set_scheduling_mode(scheduling_mode_type)
change_flag = True
def update_experiment():
# Recv complete processed data from DSP or datawrite? TODO
#socket_operations.send_request(exp_handler_to_dsp,
# options.dsp_to_exphan_identity,
# "Need completed data")
#data = socket_operations.recv_data(exp_handler_to_dsp,
# options.dsp_to_exphan_identity, printing)
some_data = None # TODO get the data from data socket and pass to update
semaphore.acquire()
change_flag = exp.update(some_data)
if change_flag:
if __debug__:
printing("Building an updated experiment.")
exp.build_scans()
printing("Experiment {exp} with CPID {cp} successfully updated"
.format(exp=exp.__class__.__name__, cp=exp.cpid))
semaphore.release()
update_thread = threading.Thread(target=update_experiment)
while True:
if not change_flag:
serialized_exp = pickle.dumps(None,
protocol=pickle.HIGHEST_PROTOCOL)
else:
exp.build_scans()
printing("Sucessful experiment {exp} built with CPID {cp}".format(
exp=exp.__class__.__name__, cp=exp.cpid))
serialized_exp = pickle.dumps(exp, protocol=pickle.HIGHEST_PROTOCOL)
# use the newest, fastest protocol (currently version 4 in python 3.4+)
change_flag = False
# WAIT until radar_control is ready to receive a changed experiment
message = socket_operations.recv_request(exp_handler_to_radar_control,
options.radctrl_to_exphan_identity,
printing)
if __debug__:
request_msg = "Radar control made request -> {}.".format(message)
printing(request_msg)
semaphore.acquire()
if message == 'EXPNEEDED':
printing("Sending new experiment from beginning")
# starting anew
send_experiment(exp_handler_to_radar_control,
options.radctrl_to_exphan_identity, serialized_exp)
elif message == 'NOERROR':
# no errors
send_experiment(exp_handler_to_radar_control,
options.radctrl_to_exphan_identity, serialized_exp)
# TODO: handle errors with revert back to original experiment. requires another
# message
semaphore.release()
if experiment_update:
# check if a thread is already running !!!
if not update_thread.isAlive():
if __debug__:
printing("Updating experiment")
update_thread = threading.Thread(target=update_experiment)
update_thread.daemon = True
update_thread.start()
if __name__ == "__main__":
semaphore = threading.Semaphore()
experiment_handler(semaphore)