Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "563 20 beta federation service breaks after a while" #698

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion FreeTAKServer/controllers/services/FTS.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from FreeTAKServer.core.services.federation.federation import (
FederationServerService,
)
from FreeTAKServer.services.federation_client_service.federation_client_service_main import (
from FreeTAKServer.core.services.federation.FederationClientService import (
FederationClientServiceController,
)
from FreeTAKServer.core.services.SSLCoTServiceController import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
import threading
from typing import Dict, List, Tuple

from digitalpy.core.service_management.digitalpy_service import DigitalPyService

from FreeTAKServer.core.serializers.protobuf_serializer import ProtobufSerializer
from FreeTAKServer.core.serializers.xml_serializer import XmlSerializer

from FreeTAKServer.core.parsers.XMLCoTController import XMLCoTController

from FreeTAKServer.core.configuration.CreateLoggerController import CreateLoggerController
from FreeTAKServer.core.configuration.LoggingConstants import LoggingConstants
from FreeTAKServer.core.configuration.MainConfig import MainConfig
Expand All @@ -38,9 +31,6 @@
from FreeTAKServer.model.protobufModel.fig_pb2 import FederatedEvent
from FreeTAKServer.model.SpecificCoT.SpecificCoTAbstract import SpecificCoTAbstract

from .controllers.client_connection_controller import ClientConnectionController
from .controllers.protobuf_controller import ProtobufController

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

Expand All @@ -49,7 +39,7 @@

loggingConstants = LoggingConstants()

class FederationClientServiceController(DigitalPyService):
class FederationClientServiceController(FederationServiceBase):
"""A service which controllers the connection too and transfer of data with
federated servers.
"""
Expand All @@ -64,8 +54,26 @@ def __init__(self):
self.pipe = None
self.federates: Dict[str, Federate] = {}
self.sel = selectors.DefaultSelector()
self.client_connection_controller = ClientConnectionController(self.sel, self.federates, self.db, self.logger)
self.protobuf_controller = ProtobufController()
self.user_dict = {}

def get_service_users(self) -> List[FederatedEvent]:
return self.user_dict.values()

def add_service_user(self, user: FederatedEvent) -> None:
""" add a service user to this services user persistence mechanism
Returns: None
"""
self.user_dict[user.contact.uid] = user

def remove_service_user(self, user: FederatedEvent):
""" remove a service user from this services user persistence mechanism
Returns: None
"""
del self.user_dict[user.contact.uid]

def define_responsibility_chain(self):
pass
Expand Down Expand Up @@ -191,7 +199,7 @@ def main(self):
inbound_data_thread.join()

def serialize_data(self, data_object: FederatedEvent):
specific_obj = self.protobuf_controller.process_protobuff_to_object(data_object)
specific_obj = self._process_protobuff_to_object(data_object)
return specific_obj

def outbound_data_handler(self):
Expand Down Expand Up @@ -257,56 +265,37 @@ def inbound_data_handler(self):
except Exception as e:
self.logger.error(str(e))

def _send_connected_clients(self, connection):
try:
clients = self.db.query_user()
except Exception as e:
self.logger.warning("error thrown in getting clients from DataBase to send to federates " + str(e))
return None
for client in clients:
try:
proto_obj = FederatedEvent()
proto_obj.contact.uid = str(client.uid) # pylint: disable=no-member; member does exist
proto_obj.contact.callsign = str(client.CoT.detail.contact.callsign) # pylint: disable=no-member; member does exist
proto_obj.contact.operation = 1 # pylint: disable=no-member; member does exist
proto_str = proto_obj.SerializeToString()
header = self.protobuf_controller.generate_header(len(proto_str))
connection.send(header + proto_str)
except Exception as e:
self.logger.warning("error thrown sending federate data to newly connected federate " + str(e))
continue

def connect_to_server(self, server_vars: Tuple[str, str]) -> None:
try:
federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address)
ssock.settimeout(10)
ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port)))
ssock.setblocking(False)
federate = Federate()
federate.uid = server_vars[0]
federate.addr = federate_db_obj.address
federate.conn = ssock
federate.name = federate_db_obj.name
events = selectors.EVENT_READ
self.sel.register(ssock, events, federate)
self.federates[server_vars[0]] = federate
self._send_connected_clients(ssock)
self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address,
port = federate_db_obj.port, initiator = "Self")
self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"')
return None
except Exception as ex_general:
try:
self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"')
except Exception as ex:
self.logger.warning("exception thrown removing outgoing federation from DB %s", str(ex))
self.logger.warning("exception thrown creating new federation %s", str(ex_general))
try:
self.db.update_Federation({"status": "Disabled", "lastError": str(ex_general)}, query=f'id == "{server_vars[0]}"')
except Exception as ex:
self.logger.warning("exception thrown updating federate in db %s", str(ex))
federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address)
ssock.settimeout(10)
ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port)))
ssock.setblocking(False)
federate = Federate()
federate.uid = server_vars[0]
federate.addr = federate_db_obj.address
federate.conn = ssock
federate.name = federate_db_obj.name
events = selectors.EVENT_READ
self.sel.register(ssock, events, federate)
self.federates[server_vars[0]] = federate
self._send_connected_clients(ssock)
self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address,
port = federate_db_obj.port, initiator = "Self")
self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"')
return None
except Exception as e:
try:
self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"')
except Exception as e:
self.logger.warning("exception thrown removing outgoing federation from DB "+str(e))
self.logger.warning("exception thrown creating new federation "+str(e))
try:
self.db.update_Federation({"status": "Disabled", "lastError": str(e)}, query=f'id == "{server_vars[0]}"')
except Exception as e:
self.logger.warning("exception thrown updating federate in db "+str(e))

def receive_data_from_federate(self, timeout):
"""called whenever data is available from any federate and immediately proceeds to
Expand All @@ -323,7 +312,7 @@ def receive_data_from_federate(self, timeout):
continue
if header:
try:
buffer = self.protobuf_controller.get_header_length(header)
buffer = self._get_header_length(header)
raw_protobuf_message = conn.recv(buffer)
print(raw_protobuf_message)
protobuf_object = FederatedEvent()
Expand All @@ -334,11 +323,11 @@ def receive_data_from_federate(self, timeout):
conn.recv(10000)
continue
else:
self.client_connection_controller.disconnect_client(key.data.uid)
self.disconnect_client(key.data.uid)
return dataarray
else:
return None

def start(self, pipe):
self.db = DatabaseController()
self.pipe = pipe
Expand Down
Empty file.
Empty file.

This file was deleted.

Loading
Loading