import os
import types
import socket
import logging
import importlib
import time
import threading
import multiprocessing as mp
from queue import Queue, Empty
from socketserver import (
ThreadingTCPServer, ThreadingUDPServer, BaseRequestHandler
)
logging.basicConfig(
filename=os.path.join(os.getenv('ACSDATA', ''), 'sim-server.log'),
format='%(asctime)s %(process)d %(message)s',
level=logging.DEBUG)
[docs]class BaseHandler(BaseRequestHandler):
"""This is the base handler class from which `ListenHandler` and
`SendHandler` classes are inherited. It only defines the custom header
and tail for accepting some commands not related to the system protocol,
and the `_execute_custom_command` method to parse the received custom
command.
:Example:
A $system_stop%%%%% command will stop the server, a $error%%%%% command
will configure the system in order to respond with errors, etc."""
custom_header, custom_tail = ('$', '%%%%%')
[docs] def _execute_custom_command(self, msg_body):
"""This method accepts a custom command (without the custom header and
tail) formatted as `command_name:par1,par2,...,parN`. It then parses
the command and its parameters and tries to call the system's
equivalent method, also handling unexpected exceptions.
:param msg_body: the custom command message without the custom header
and tail (`$` and `%%%%%` respectively)
:type msg_body: string"""
if ':' in msg_body:
name, params_str = msg_body.split(':')
else:
name, params_str = msg_body, ''
if params_str:
params = params_str.split(',')
else:
params = ()
try:
response = getattr(self.system, name)(*params)
if isinstance(response, str):
self.socket.sendto(
response.encode('latin-1'),
self.client_address
)
if response == '$server_shutdown%%%%%':
time.sleep(0.01)
self.server.shutdown()
self.server.server_close()
except AttributeError:
logging.debug('command %s not supported', name)
except Exception as ex:
logging.debug('unexpected exception %s', ex)
[docs]class ListenHandler(BaseHandler):
def setup(self):
self.custom_msg = ''
self.socket = self.request
self.connection_oriented = True
if not isinstance(self.socket, tuple): # TCP client
logging.info('Got connection from %s', self.client_address)
greet_msg = self.system.system_greet()
if greet_msg:
self.socket.sendto(
greet_msg.encode('latin-1'),
self.client_address
)
else: # UDP client
self.connection_oriented = False
[docs] def handle(self):
"""Method that gets called right after the `setup` method ends its
execution. It handles incoming messages, whether they are received via
a TCP or a UDP socket. It passes down the `System` class the received
messages one byte at a time in order for the `System.parse()` method to
work properly. It then returns the `System` response when received
from the `System` class. It also constantly listens for custom commands
that do not belong to a specific `System` class, but are useful
additions to the framework with the purpose of reproducing a specific
scenario (i.e. some error condition)."""
if not self.connection_oriented: # UDP client
msg, self.socket = self.socket
msg += b'\n'
self._handle(msg)
else: # TCP client
while True:
try:
msg = self.socket.recv(1024)
if not msg:
break
self._handle(msg)
except IOError:
break
def _handle(self, msg):
"""Handles a single message.
:param msg: the received message. Usually, in case of a TCP socket,
this is a single byte to be passed down to the `System.parse()`
method. In case of a connection-less communication (UDP socket),
this parameter contains a chunk of bytes, each one of them is then
processed on its own.
"""
response = None
for byte in msg:
byte = chr(byte)
try:
response = self.system.parse(byte)
except ValueError as ex:
logging.debug(ex)
except Exception:
logging.debug('unexpected exception')
if isinstance(response, bool):
pass
elif response and isinstance(response, str):
try:
response = response.encode('latin-1')
self.socket.sendto(response, self.client_address)
except IOError: # skip coverage
# Something went wrong while sending the response,
# probably the client was stopped without closing
# the connection
break
else:
logging.debug('unexpected response: %s', response)
if byte == self.custom_header:
self.custom_msg = byte
elif self.custom_msg.startswith(self.custom_header):
self.custom_msg += byte
if self.custom_msg.endswith(self.custom_tail):
msg_body = self.custom_msg[1:-len(self.custom_tail)]
self.custom_msg = ''
self._execute_custom_command(msg_body)
[docs]class SendHandler(BaseHandler):
[docs] def handle(self):
"""Method that gets called right after the `setup` method ends its
execution. It handles messages that the server has to periodically send
to its connected client(s). It also constantly listens for custom
commands that do not belong to a specific `System` class, but are
useful additions to the framework with the purpose of reproducing a
specific scenario (i.e. some error condition)."""
sampling_time = self.system.sampling_time
message_queue = Queue(1)
self.socket = self.request
msg = None
if isinstance(self.socket, tuple):
msg, self.socket = self.socket
self.socket.setblocking(False)
self.system.subscribe(message_queue)
while True:
try:
if msg:
custom_msg = msg
msg = None
else:
custom_msg = self.socket.recv(1024)
# Check if the client is sending a custom command
if not custom_msg:
break
custom_msg = custom_msg.decode('latin-1')
if (
custom_msg.startswith(self.custom_header)
and custom_msg.endswith(self.custom_tail)
):
self._execute_custom_command(
custom_msg[1:-len(self.custom_tail)]
)
except IOError:
# No data received, just pass
pass
try:
response = message_queue.get(timeout=sampling_time)
self.socket.sendto(response, self.client_address)
if self.socket.type == socket.SOCK_DGRAM:
break
except Empty:
pass
except IOError:
# Something went wrong while sending the message, probably
# the client was stopped without closing the connection
break
self.system.unsubscribe(message_queue)
[docs]class Server:
"""This class can instance a server for the given address(es).
The server can either be a TCP or a UDP server, depending on which
`server_type` argument is provided. Also, the server could be a listening
server, if param `l_address` is provided, or a sending server, if param
`s_address` is provided. If both addresses are provided, the server acts as
both as a listening server and a sending server. Be aware that if the
server both listens and sends to its clients, `l_address` and `s_address`
must not share the same endpoint (IP address and/or port should be
different).
:param system: the desired simulator system module
:param server_type: the type of threading server to be used
:param kwargs: the arguments to pass to the system instance constructor
method
:param l_address: the address of the server that exposes the
`System.parse()` method
:param s_address: the address of the server that exposes the
`System.subscribe()` and `System.unsubscribe()` methods
:type system: System class that inherits from ListeningServer or/and
SendingServer
:type server_type: ThreadingTCPServer or ThreadingUDPServer
:type kwargs: dict
:type l_address: (ip, port)
:type s_address: (ip, port)
"""
def __init__(
self,
system_cls,
server_type,
kwargs,
l_address=None,
s_address=None
):
if server_type not in (ThreadingTCPServer, ThreadingUDPServer):
raise ValueError(
'Provide either the `ThreadingTCPServer` class '
+ 'or the `ThreadingUDPServer` class!'
)
if not l_address and not s_address:
raise ValueError('You must specify at least one server.')
for address in (l_address, s_address):
if not address:
continue
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(address)
finally:
s.close()
self.system_cls = system_cls
self.system_kwargs = kwargs
self.system = None
self.server_type = server_type
self.server_type.allow_reuse_address = True
self.l_address = l_address
self.s_address = s_address
self.servers = []
self.threads = []
self.main_thread = None
def _setup(self):
if self.l_address:
self.servers.append(
self.server_type(self.l_address, ListenHandler)
)
if self.s_address:
self.servers.append(
self.server_type(self.s_address, SendHandler)
)
self.system = self.system_cls(**self.system_kwargs)
for server in self.servers:
server.RequestHandlerClass.system = self.system
[docs] def serve_forever(self, serving=None):
"""This method starts the System and then cycle for incoming requests.
"""
self._setup()
for server in self.servers:
t = threading.Thread(target=server.serve_forever)
t.daemon = True
t.start()
self.threads.append(t)
if serving is not None:
serving.set()
try:
for t in self.threads:
t.join()
except KeyboardInterrupt:
pass
[docs] def start(self):
"""Starts a daemon thread which calls the `serve_forever` method. The
server is therefore started as a daemon."""
serving = mp.Event()
self.main_thread = threading.Thread(
target=self.serve_forever,
args=(serving,),
daemon=True
)
self.main_thread.start()
serving.wait()
[docs] def stop(self):
"""Stops the server."""
for server in self.servers:
server.shutdown()
server.server_close()
[docs]class Simulator:
"""This class represents the whole simulator, composed of one
or more servers.
:param system_module: the module that implements the System class.
:type system_module: module that implements the System class, string
"""
def __init__(self, system_module, **kwargs):
if not isinstance(system_module, types.ModuleType):
system_module = importlib.import_module(
f'simulators.{system_module}'
)
self.system = system_module.System
self.kwargs = kwargs
self.servers = system_module.servers
self.system_type = kwargs.get('system_type') # From command line
module_name = system_module.__name__.split('.')[-1]
self.simulator_name = self.system_type or module_name
self.processes = []
[docs] def start(
self,
context="fork",
daemon=False,
has_started=None
):
"""Starts a simulator by instancing the servers listed in the given
module.
:param daemon: if true, the server processes are created as daemons,
meaning that when this simulator object is destroyed, they get
destroyed as well. Default value is false, meaning that the server
processes will continue to run even if the simulator object gets
destroyed. To stop these processes, method `stop` must be called.
:type daemon: bool
"""
started_servers = []
process_class = mp.get_context(context).Process
executor = threading.Thread
servers = []
try:
for l_addr, s_addr, s_type, kwargs in self.servers:
kwargs.update(self.kwargs)
s = Server(
self.system, s_type, kwargs, l_addr, s_addr
)
servers.append(s)
for s in servers:
started = mp.Event()
p = executor(
target=s.serve_forever,
args=(started,),
daemon=daemon
)
self.processes.append(p)
started_servers.append(started)
executor = process_class
for process in self.processes:
process.start()
for started_server in started_servers:
started_server.wait()
print(f"Simulator '{self.simulator_name}' up and running.")
if has_started is not None:
has_started.set()
if not daemon:
for p in self.processes:
p.join()
print(f"Simulator '{self.simulator_name}' stopped.")
except KeyboardInterrupt:
print('') # Skip the line displaying the SIGINT character
self.stop()
except OSError:
print(f"Simulator '{self.simulator_name}' already running.")
[docs] def stop(self):
"""Stops a simulator by sending the custom `$system_stop%%%%%` command
to all servers of the given simulator."""
def _send_stop(entry):
l_addr, s_addr, server_type, _ = entry
for address in (l_addr, s_addr):
if not address:
continue
if server_type == ThreadingTCPServer:
socket_type = socket.SOCK_STREAM
else:
socket_type = socket.SOCK_DGRAM
sockobj = socket.socket(socket.AF_INET, socket_type)
try:
sockobj.settimeout(0.1)
sockobj.connect(address)
try:
while sockobj.recv(1024):
pass
except socket.timeout:
pass
sockobj.sendto(b'$system_stop%%%%%', address)
response = sockobj.recv(1024)
if response != b'$server_shutdown%%%%%': # skip coverage
logging.warning(
'%s %s %s',
'The server did not answer with the',
'$server_shutdown%%%%% string!',
'The simulator might still be running!'
)
except TimeoutError: # skip coverage
# We don't want to log this
pass
except Exception as ex: # skip coverage
logging.debug(ex)
finally:
sockobj.close()
threads = []
for entry in self.servers:
t = threading.Thread(target=_send_stop, args=(entry,))
t.start()
threads.append(t)
for t in threads:
t.join()
for p in self.processes:
p.join()
if self.processes:
print(f"Simulator '{self.simulator_name}' stopped.")
else:
print(f"Sent stop to '{self.simulator_name}' simulator.")