Source code for simulators.server

import os
import types
import socket
import logging
import importlib
import time
import threading
import multiprocessing as mp
from queue import Queue, Empty
from socketserver import (
    ThreadingTCPServer, ThreadingUDPServer, BaseRequestHandler
)


logging.basicConfig(
    filename=os.path.join(os.getenv('ACSDATA', ''), 'sim-server.log'),
    format='%(asctime)s %(process)d %(message)s',
    level=logging.DEBUG)


[docs]class BaseHandler(BaseRequestHandler): """This is the base handler class from which `ListenHandler` and `SendHandler` classes are inherited. It only defines the custom header and tail for accepting some commands not related to the system protocol, and the `_execute_custom_command` method to parse the received custom command. :Example: A $system_stop%%%%% command will stop the server, a $error%%%%% command will configure the system in order to respond with errors, etc.""" custom_header, custom_tail = ('$', '%%%%%')
[docs] def _execute_custom_command(self, msg_body): """This method accepts a custom command (without the custom header and tail) formatted as `command_name:par1,par2,...,parN`. It then parses the command and its parameters and tries to call the system's equivalent method, also handling unexpected exceptions. :param msg_body: the custom command message without the custom header and tail (`$` and `%%%%%` respectively) :type msg_body: string""" if ':' in msg_body: name, params_str = msg_body.split(':') else: name, params_str = msg_body, '' if params_str: params = params_str.split(',') else: params = () try: response = getattr(self.system, name)(*params) if isinstance(response, str): self.socket.sendto( response.encode('latin-1'), self.client_address ) if response == '$server_shutdown%%%%%': time.sleep(0.01) self.server.shutdown() self.server.server_close() except AttributeError: logging.debug('command %s not supported', name) except Exception as ex: logging.debug('unexpected exception %s', ex)
[docs]class ListenHandler(BaseHandler): def setup(self): self.custom_msg = '' self.socket = self.request self.connection_oriented = True if not isinstance(self.socket, tuple): # TCP client logging.info('Got connection from %s', self.client_address) greet_msg = self.system.system_greet() if greet_msg: self.socket.sendto( greet_msg.encode('latin-1'), self.client_address ) else: # UDP client self.connection_oriented = False
[docs] def handle(self): """Method that gets called right after the `setup` method ends its execution. It handles incoming messages, whether they are received via a TCP or a UDP socket. It passes down the `System` class the received messages one byte at a time in order for the `System.parse()` method to work properly. It then returns the `System` response when received from the `System` class. It also constantly listens for custom commands that do not belong to a specific `System` class, but are useful additions to the framework with the purpose of reproducing a specific scenario (i.e. some error condition).""" if not self.connection_oriented: # UDP client msg, self.socket = self.socket msg += b'\n' self._handle(msg) else: # TCP client while True: try: msg = self.socket.recv(1024) if not msg: break self._handle(msg) except IOError: break
def _handle(self, msg): """Handles a single message. :param msg: the received message. Usually, in case of a TCP socket, this is a single byte to be passed down to the `System.parse()` method. In case of a connection-less communication (UDP socket), this parameter contains a chunk of bytes, each one of them is then processed on its own. """ response = None for byte in msg: byte = chr(byte) try: response = self.system.parse(byte) except ValueError as ex: logging.debug(ex) except Exception: logging.debug('unexpected exception') if isinstance(response, bool): pass elif response and isinstance(response, str): try: response = response.encode('latin-1') self.socket.sendto(response, self.client_address) except IOError: # skip coverage # Something went wrong while sending the response, # probably the client was stopped without closing # the connection break else: logging.debug('unexpected response: %s', response) if byte == self.custom_header: self.custom_msg = byte elif self.custom_msg.startswith(self.custom_header): self.custom_msg += byte if self.custom_msg.endswith(self.custom_tail): msg_body = self.custom_msg[1:-len(self.custom_tail)] self.custom_msg = '' self._execute_custom_command(msg_body)
[docs]class SendHandler(BaseHandler):
[docs] def handle(self): """Method that gets called right after the `setup` method ends its execution. It handles messages that the server has to periodically send to its connected client(s). It also constantly listens for custom commands that do not belong to a specific `System` class, but are useful additions to the framework with the purpose of reproducing a specific scenario (i.e. some error condition).""" sampling_time = self.system.sampling_time message_queue = Queue(1) self.socket = self.request msg = None if isinstance(self.socket, tuple): msg, self.socket = self.socket self.socket.setblocking(False) self.system.subscribe(message_queue) while True: try: if msg: custom_msg = msg msg = None else: custom_msg = self.socket.recv(1024) # Check if the client is sending a custom command if not custom_msg: break custom_msg = custom_msg.decode('latin-1') if ( custom_msg.startswith(self.custom_header) and custom_msg.endswith(self.custom_tail) ): self._execute_custom_command( custom_msg[1:-len(self.custom_tail)] ) except IOError: # No data received, just pass pass try: response = message_queue.get(timeout=sampling_time) self.socket.sendto(response, self.client_address) if self.socket.type == socket.SOCK_DGRAM: break except Empty: pass except IOError: # Something went wrong while sending the message, probably # the client was stopped without closing the connection break self.system.unsubscribe(message_queue)
[docs]class Server: """This class can instance a server for the given address(es). The server can either be a TCP or a UDP server, depending on which `server_type` argument is provided. Also, the server could be a listening server, if param `l_address` is provided, or a sending server, if param `s_address` is provided. If both addresses are provided, the server acts as both as a listening server and a sending server. Be aware that if the server both listens and sends to its clients, `l_address` and `s_address` must not share the same endpoint (IP address and/or port should be different). :param system: the desired simulator system module :param server_type: the type of threading server to be used :param kwargs: the arguments to pass to the system instance constructor method :param l_address: the address of the server that exposes the `System.parse()` method :param s_address: the address of the server that exposes the `System.subscribe()` and `System.unsubscribe()` methods :type system: System class that inherits from ListeningServer or/and SendingServer :type server_type: ThreadingTCPServer or ThreadingUDPServer :type kwargs: dict :type l_address: (ip, port) :type s_address: (ip, port) """ def __init__( self, system_cls, server_type, kwargs, l_address=None, s_address=None ): if server_type not in (ThreadingTCPServer, ThreadingUDPServer): raise ValueError( 'Provide either the `ThreadingTCPServer` class ' + 'or the `ThreadingUDPServer` class!' ) if not l_address and not s_address: raise ValueError('You must specify at least one server.') for address in (l_address, s_address): if not address: continue with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(address) finally: s.close() self.system_cls = system_cls self.system_kwargs = kwargs self.system = None self.server_type = server_type self.server_type.allow_reuse_address = True self.l_address = l_address self.s_address = s_address self.servers = [] self.threads = [] self.main_thread = None def _setup(self): if self.l_address: self.servers.append( self.server_type(self.l_address, ListenHandler) ) if self.s_address: self.servers.append( self.server_type(self.s_address, SendHandler) ) self.system = self.system_cls(**self.system_kwargs) for server in self.servers: server.RequestHandlerClass.system = self.system
[docs] def serve_forever(self, serving=None): """This method starts the System and then cycle for incoming requests. """ self._setup() for server in self.servers: t = threading.Thread(target=server.serve_forever) t.daemon = True t.start() self.threads.append(t) if serving is not None: serving.set() try: for t in self.threads: t.join() except KeyboardInterrupt: pass
[docs] def start(self): """Starts a daemon thread which calls the `serve_forever` method. The server is therefore started as a daemon.""" serving = mp.Event() self.main_thread = threading.Thread( target=self.serve_forever, args=(serving,), daemon=True ) self.main_thread.start() serving.wait()
[docs] def stop(self): """Stops the server.""" for server in self.servers: server.shutdown() server.server_close()
[docs]class Simulator: """This class represents the whole simulator, composed of one or more servers. :param system_module: the module that implements the System class. :type system_module: module that implements the System class, string """ def __init__(self, system_module, **kwargs): if not isinstance(system_module, types.ModuleType): system_module = importlib.import_module( f'simulators.{system_module}' ) self.system = system_module.System self.kwargs = kwargs self.servers = system_module.servers self.system_type = kwargs.get('system_type') # From command line module_name = system_module.__name__.split('.')[-1] self.simulator_name = self.system_type or module_name self.processes = []
[docs] def start( self, context="fork", daemon=False, has_started=None ): """Starts a simulator by instancing the servers listed in the given module. :param daemon: if true, the server processes are created as daemons, meaning that when this simulator object is destroyed, they get destroyed as well. Default value is false, meaning that the server processes will continue to run even if the simulator object gets destroyed. To stop these processes, method `stop` must be called. :type daemon: bool """ started_servers = [] process_class = mp.get_context(context).Process executor = threading.Thread servers = [] try: for l_addr, s_addr, s_type, kwargs in self.servers: kwargs.update(self.kwargs) s = Server( self.system, s_type, kwargs, l_addr, s_addr ) servers.append(s) for s in servers: started = mp.Event() p = executor( target=s.serve_forever, args=(started,), daemon=daemon ) self.processes.append(p) started_servers.append(started) executor = process_class for process in self.processes: process.start() for started_server in started_servers: started_server.wait() print(f"Simulator '{self.simulator_name}' up and running.") if has_started is not None: has_started.set() if not daemon: for p in self.processes: p.join() print(f"Simulator '{self.simulator_name}' stopped.") except KeyboardInterrupt: print('') # Skip the line displaying the SIGINT character self.stop() except OSError: print(f"Simulator '{self.simulator_name}' already running.")
[docs] def stop(self): """Stops a simulator by sending the custom `$system_stop%%%%%` command to all servers of the given simulator.""" def _send_stop(entry): l_addr, s_addr, server_type, _ = entry for address in (l_addr, s_addr): if not address: continue if server_type == ThreadingTCPServer: socket_type = socket.SOCK_STREAM else: socket_type = socket.SOCK_DGRAM sockobj = socket.socket(socket.AF_INET, socket_type) try: sockobj.settimeout(0.1) sockobj.connect(address) try: while sockobj.recv(1024): pass except socket.timeout: pass sockobj.sendto(b'$system_stop%%%%%', address) response = sockobj.recv(1024) if response != b'$server_shutdown%%%%%': # skip coverage logging.warning( '%s %s %s', 'The server did not answer with the', '$server_shutdown%%%%% string!', 'The simulator might still be running!' ) except TimeoutError: # skip coverage # We don't want to log this pass except Exception as ex: # skip coverage logging.debug(ex) finally: sockobj.close() threads = [] for entry in self.servers: t = threading.Thread(target=_send_stop, args=(entry,)) t.start() threads.append(t) for t in threads: t.join() for p in self.processes: p.join() if self.processes: print(f"Simulator '{self.simulator_name}' stopped.") else: print(f"Sent stop to '{self.simulator_name}' simulator.")