diff --git a/FreeTAKServer/controllers/services/FTS.py b/FreeTAKServer/controllers/services/FTS.py index b0775a87..0c7fadbf 100644 --- a/FreeTAKServer/controllers/services/FTS.py +++ b/FreeTAKServer/controllers/services/FTS.py @@ -39,7 +39,7 @@ from FreeTAKServer.core.services.federation.federation import ( FederationServerService, ) -from FreeTAKServer.services.federation_client_service.federation_client_service_main import ( +from FreeTAKServer.core.services.federation.FederationClientService import ( FederationClientServiceController, ) from FreeTAKServer.core.services.SSLCoTServiceController import ( diff --git a/FreeTAKServer/services/federation_client_service/federation_client_service_main.py b/FreeTAKServer/core/services/federation/FederationClientService.py similarity index 76% rename from FreeTAKServer/services/federation_client_service/federation_client_service_main.py rename to FreeTAKServer/core/services/federation/FederationClientService.py index cec4da4c..c39e06e0 100644 --- a/FreeTAKServer/services/federation_client_service/federation_client_service_main.py +++ b/FreeTAKServer/core/services/federation/FederationClientService.py @@ -13,13 +13,6 @@ import threading from typing import Dict, List, Tuple -from digitalpy.core.service_management.digitalpy_service import DigitalPyService - -from FreeTAKServer.core.serializers.protobuf_serializer import ProtobufSerializer -from FreeTAKServer.core.serializers.xml_serializer import XmlSerializer - -from FreeTAKServer.core.parsers.XMLCoTController import XMLCoTController - from FreeTAKServer.core.configuration.CreateLoggerController import CreateLoggerController from FreeTAKServer.core.configuration.LoggingConstants import LoggingConstants from FreeTAKServer.core.configuration.MainConfig import MainConfig @@ -38,9 +31,6 @@ from FreeTAKServer.model.protobufModel.fig_pb2 import FederatedEvent from FreeTAKServer.model.SpecificCoT.SpecificCoTAbstract import SpecificCoTAbstract -from .controllers.client_connection_controller import ClientConnectionController -from .controllers.protobuf_controller import ProtobufController - # Make a connection to the MainConfig object for all routines below config = MainConfig.instance() @@ -49,7 +39,7 @@ loggingConstants = LoggingConstants() -class FederationClientServiceController(DigitalPyService): +class FederationClientServiceController(FederationServiceBase): """A service which controllers the connection too and transfer of data with federated servers. """ @@ -64,8 +54,26 @@ def __init__(self): self.pipe = None self.federates: Dict[str, Federate] = {} self.sel = selectors.DefaultSelector() - self.client_connection_controller = ClientConnectionController(self.sel, self.federates, self.db, self.logger) - self.protobuf_controller = ProtobufController() + self.user_dict = {} + + def get_service_users(self) -> List[FederatedEvent]: + return self.user_dict.values() + + def add_service_user(self, user: FederatedEvent) -> None: + """ add a service user to this services user persistence mechanism + + Returns: None + + """ + self.user_dict[user.contact.uid] = user + + def remove_service_user(self, user: FederatedEvent): + """ remove a service user from this services user persistence mechanism + + Returns: None + + """ + del self.user_dict[user.contact.uid] def define_responsibility_chain(self): pass @@ -191,7 +199,7 @@ def main(self): inbound_data_thread.join() def serialize_data(self, data_object: FederatedEvent): - specific_obj = self.protobuf_controller.process_protobuff_to_object(data_object) + specific_obj = self._process_protobuff_to_object(data_object) return specific_obj def outbound_data_handler(self): @@ -257,56 +265,37 @@ def inbound_data_handler(self): except Exception as e: self.logger.error(str(e)) - def _send_connected_clients(self, connection): - try: - clients = self.db.query_user() - except Exception as e: - self.logger.warning("error thrown in getting clients from DataBase to send to federates " + str(e)) - return None - for client in clients: - try: - proto_obj = FederatedEvent() - proto_obj.contact.uid = str(client.uid) # pylint: disable=no-member; member does exist - proto_obj.contact.callsign = str(client.CoT.detail.contact.callsign) # pylint: disable=no-member; member does exist - proto_obj.contact.operation = 1 # pylint: disable=no-member; member does exist - proto_str = proto_obj.SerializeToString() - header = self.protobuf_controller.generate_header(len(proto_str)) - connection.send(header + proto_str) - except Exception as e: - self.logger.warning("error thrown sending federate data to newly connected federate " + str(e)) - continue - def connect_to_server(self, server_vars: Tuple[str, str]) -> None: - try: - federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0] - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address) - ssock.settimeout(10) - ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port))) - ssock.setblocking(False) - federate = Federate() - federate.uid = server_vars[0] - federate.addr = federate_db_obj.address - federate.conn = ssock - federate.name = federate_db_obj.name - events = selectors.EVENT_READ - self.sel.register(ssock, events, federate) - self.federates[server_vars[0]] = federate - self._send_connected_clients(ssock) - self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address, - port = federate_db_obj.port, initiator = "Self") - self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"') - return None - except Exception as ex_general: - try: - self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"') - except Exception as ex: - self.logger.warning("exception thrown removing outgoing federation from DB %s", str(ex)) - self.logger.warning("exception thrown creating new federation %s", str(ex_general)) try: - self.db.update_Federation({"status": "Disabled", "lastError": str(ex_general)}, query=f'id == "{server_vars[0]}"') - except Exception as ex: - self.logger.warning("exception thrown updating federate in db %s", str(ex)) + federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0] + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address) + ssock.settimeout(10) + ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port))) + ssock.setblocking(False) + federate = Federate() + federate.uid = server_vars[0] + federate.addr = federate_db_obj.address + federate.conn = ssock + federate.name = federate_db_obj.name + events = selectors.EVENT_READ + self.sel.register(ssock, events, federate) + self.federates[server_vars[0]] = federate + self._send_connected_clients(ssock) + self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address, + port = federate_db_obj.port, initiator = "Self") + self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"') + return None + except Exception as e: + try: + self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"') + except Exception as e: + self.logger.warning("exception thrown removing outgoing federation from DB "+str(e)) + self.logger.warning("exception thrown creating new federation "+str(e)) + try: + self.db.update_Federation({"status": "Disabled", "lastError": str(e)}, query=f'id == "{server_vars[0]}"') + except Exception as e: + self.logger.warning("exception thrown updating federate in db "+str(e)) def receive_data_from_federate(self, timeout): """called whenever data is available from any federate and immediately proceeds to @@ -323,7 +312,7 @@ def receive_data_from_federate(self, timeout): continue if header: try: - buffer = self.protobuf_controller.get_header_length(header) + buffer = self._get_header_length(header) raw_protobuf_message = conn.recv(buffer) print(raw_protobuf_message) protobuf_object = FederatedEvent() @@ -334,11 +323,11 @@ def receive_data_from_federate(self, timeout): conn.recv(10000) continue else: - self.client_connection_controller.disconnect_client(key.data.uid) + self.disconnect_client(key.data.uid) return dataarray else: return None - + def start(self, pipe): self.db = DatabaseController() self.pipe = pipe diff --git a/FreeTAKServer/services/federation_client_service/__init__.py b/FreeTAKServer/services/federation_client_service/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/FreeTAKServer/services/federation_client_service/controllers/__init__.py b/FreeTAKServer/services/federation_client_service/controllers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/FreeTAKServer/services/federation_client_service/controllers/client_connection_controller.py b/FreeTAKServer/services/federation_client_service/controllers/client_connection_controller.py deleted file mode 100644 index 884385db..00000000 --- a/FreeTAKServer/services/federation_client_service/controllers/client_connection_controller.py +++ /dev/null @@ -1,30 +0,0 @@ -class ClientConnectionController(): - def __init__(self, selector, federates, db, logger) -> None: - self.selector = selector - self.federates = federates - self.db = db - self.logger = logger - - def disconnect_client(self, id: str) -> None: - try: - self.logger.info("disconnecting client") - try: - federate = self.federates[id] - except Exception as e: - self.logger.warning("federate array has no item with uid " + str(id) + " federates array is len " + str( - len(self.federates))) - return None - try: - federate.conn.close() - self.selector.unregister(federate.conn) - del (self.federates[federate.uid]) - except Exception as e: - self.logger.warning("exception thrown disconnecting client " + str(e)) - - try: - self.db.remove_ActiveFederation(f'id == "{federate.uid}"') - except Exception as e: - self.logger.warning("exception thrown removing outgoing federation from DB " + str(e)) - return None - except Exception as e: - self.logger.warning("exception thrown accessing client for disconnecting client " + str(e)) diff --git a/FreeTAKServer/services/federation_client_service/controllers/handlers/__init__.py b/FreeTAKServer/services/federation_client_service/controllers/handlers/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/FreeTAKServer/services/federation_client_service/controllers/handlers/handlers.py b/FreeTAKServer/services/federation_client_service/controllers/handlers/handlers.py deleted file mode 100644 index b5be7df4..00000000 --- a/FreeTAKServer/services/federation_client_service/controllers/handlers/handlers.py +++ /dev/null @@ -1,182 +0,0 @@ -from abc import ABC - -from FreeTAKServer.model.ClientInformation import ClientInformation -from FreeTAKServer.model.federate import Federate -from FreeTAKServer.model.clients import ClientAbstract -from FreeTAKServer.model.FTSModel.fts_protocol_object import FTSProtocolObject -from FreeTAKServer.model.SpecificCoT.SpecificCoTAbstract import SpecificCoTAbstract -from FreeTAKServer.model.SpecificCoT.SendOther import SendOther - -from FreeTAKServer.core.services.service_abstracts import ServerServiceInterface, ServiceInterface -from FreeTAKServer.core.services.federation.federation_service_base import FederationServiceBase -from FreeTAKServer.core.configuration.types import Types -from FreeTAKServer.core.SpecificCoTControllers.SendCoTAbstractController import SendCoTAbstractController - - -class HandlerInterface(ABC): - - def Handle(self, obj, command): - raise NotImplementedError - - def setNextHandler(self, handler): - raise NotImplementedError - - -class HandlerBase(HandlerInterface): - """ implements basic functionality required for all handlers - - this class implements the setNextHandler method from the HandlerInterface - as well as creating a callNextHandler method which will either call the next - handler in the chain of responsibility or raise an exception if there are no - remaining handlers. - """ - - def __init__(self): - self.nextHandler = None - - def Handle(self, obj: FederationServiceBase, command): - raise NotImplementedError - - def setNextHandler(self, handler: HandlerInterface): - """ set next handler to be called in chain of responsibility - - :params handler: the handler which should be next in chain of responsibility - """ - - self.nextHandler = handler - - def callNextHandler(self, obj: FederationServiceBase, command): - if self.nextHandler is None: - raise Exception("no further handlers, object not supported by responsibility chain") - else: - self.nextHandler.Handle(obj, command) - -# service level command handlers below - -class StopHandler(HandlerBase): - """ Handler for command stop service - - """ - - def Handle(self, obj: FederationServiceBase, command): - - if isinstance(obj, FederationServiceBase) and command == "STOP": - obj.stop() - - else: - self.callNextHandler(obj, command) - -# connection level command handlers below - -class ConnectHandler(HandlerBase): - """ Handler for command to connect to new server - - """ - - def Handle(self, obj: FederationServiceBase, command): - if isinstance(command, tuple) and command[1] == "CREATE": - obj.connect_to_server(command) - - else: - self.callNextHandler(obj, command) - - -class DisconnectHandler(HandlerBase): - """ Handler for command to disconnect client - - """ - - def Handle(self, obj: FederationServiceBase, command): - if isinstance(obj, FederationServiceBase) and isinstance(command, tuple) and command[1] == "DELETE": - obj.disconnect_client(command[0]) - - else: - self.callNextHandler(obj, command) - - -# Data level command handlers below - -class DestinationValidationHandler(HandlerBase): - """ - this handler is responsible for validating that the desired destination of a CoT - is present before being sent - """ - - def Handle(self, obj: FederationServiceBase, command: SpecificCoTAbstract) -> None: - user_list = obj.get_service_users() - if isinstance(command.modelObject, SendOther): - if command.modelObject.martiPresent: - - user_callsigns = [user.contact.callsign for user in user_list] - cot_dest_callsigns = [dest.callsign for dest in command.modelObject.detail.marti.dest] - - if bool(set(user_callsigns) & set(cot_dest_callsigns)): - self.callNextHandler(obj, command) - else: - raise Exception("this service has none of the desired clients for this CoT") - - elif hasattr(command.modelObject.detail, 'marti') and hasattr(command.modelObject.detail.marti.dest[0], 'callsign'): - user_callsigns = [user.contact.callsign for user in user_list] - cot_dest_callsigns = [dest.callsign for dest in command.modelObject.detail.marti.dest] - - if bool(set(user_callsigns) & set(cot_dest_callsigns)) or cot_dest_callsigns==[None]: - self.callNextHandler(obj, command) - else: - raise Exception("this service has none of the desired clients for this CoT") - else: - self.callNextHandler(obj, command) - - -class DataValidationHandler(HandlerBase): - """this handler is responsible for validating that the contents of a data object - are of proper type and contain basic required attributes - """ - - def Handle(self, obj, command): - # validate data - if isinstance(command, SpecificCoTAbstract) or isinstance(command, ClientInformation): - self.callNextHandler(obj, command) - else: - raise TypeError('invalid command content for data level handler') - -class SendDataHandler(HandlerBase): - """ Handler for command send data to client, should always be the final handler in the data_handler_chain of any class - - """ - - def Handle(self, obj: FederationServiceBase, command: any): - - if isinstance(obj, FederationServiceBase) and isinstance(command, SpecificCoTAbstract): - obj.send_data_to_clients(command) - - else: - self.callNextHandler(obj, command) - - -class SendConnectionDataHandler(HandlerBase): - """ handler for command send client connection data to federate - - """ - - def Handle(self, obj: FederationServiceBase, command: any): - - if isinstance(obj, FederationServiceBase) and isinstance(command, ClientInformation): - obj.send_connection_data(command) - return None - else: - self.callNextHandler(obj, command) - - -class SendDisconnectionDataHandler(HandlerBase): - """ handler for command send client disconnection data to federate - - """ - - def Handle(self, obj: ServiceInterface, command): - from FreeTAKServer.model.SpecificCoT.SendDisconnect import SendDisconnect - - if isinstance(obj, FederationServiceBase) and isinstance(command, SendDisconnect): - obj.send_disconnection_data(command) - - else: - self.callNextHandler(obj, command) diff --git a/FreeTAKServer/services/federation_client_service/controllers/protobuf_controller.py b/FreeTAKServer/services/federation_client_service/controllers/protobuf_controller.py deleted file mode 100644 index 7e3f1855..00000000 --- a/FreeTAKServer/services/federation_client_service/controllers/protobuf_controller.py +++ /dev/null @@ -1,29 +0,0 @@ -class ProtobufController: - - def process_protobuff_to_object(self, protobuf_object: FederatedEvent): - """ this method will convert the protobuf object to a FTS model object and xml string - it will also add the remarks to indicate that the client or cot is comming from a federate - - Args: - protobuf_object: - - Returns: - - """ - model_object, fts_object = XMLCoTController().determine_model_object_type(protobuf_object.event.type) # pylint: disable=no-member; member does exist - fts_object = fts_object() - model_object = ProtobufSerializer().from_format_to_fts_object(protobuf_object, model_object()) - xml_object = XmlSerializer().from_fts_object_to_format(model_object) - fts_object.setModelObject(model_object) - fts_object.setXmlString(etree.tostring(xml_object)) - """xmlstring = event - if xmlstring.find('detail') and xmlstring.find('detail'). - xmlstring.find('detail').remove(xmlstring.find('detail').find('remarks')) - xmlstring.find('detail').extend([child for child in xmlstring.find('detail')])""" - return fts_object - - def get_header_length(self, header): - return int.from_bytes(header, 'big') - - def generate_header(self, contentlength): - return contentlength.to_bytes(4, byteorder="big") diff --git a/FreeTAKServer/services/federation_client_service/controllers/user_controller.py b/FreeTAKServer/services/federation_client_service/controllers/user_controller.py deleted file mode 100644 index 3f60cf82..00000000 --- a/FreeTAKServer/services/federation_client_service/controllers/user_controller.py +++ /dev/null @@ -1,25 +0,0 @@ -from typing import List - - -class UserController: - def __init__(self) -> None: - self.user_dict = {} - - def get_service_users(self) -> List[FederatedEvent]: - return self.user_dict.values() - - def add_service_user(self, user: FederatedEvent) -> None: - """ add a service user to this services user persistence mechanism - - Returns: None - - """ - self.user_dict[user.contact.uid] = user - - def remove_service_user(self, user: FederatedEvent): - """ remove a service user from this services user persistence mechanism - - Returns: None - - """ - del self.user_dict[user.contact.uid] \ No newline at end of file