Skip to content

Commit

Permalink
Merge pull request #698 from FreeTAKTeam/revert-679-563-20-beta-feder…
Browse files Browse the repository at this point in the history
…ation-service-breaks-after-a-while

Revert "563 20 beta federation service breaks after a while"
  • Loading branch information
naman108 authored Apr 23, 2024
2 parents d58a495 + 7e7d9e4 commit 8660340
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 332 deletions.
2 changes: 1 addition & 1 deletion FreeTAKServer/controllers/services/FTS.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from FreeTAKServer.core.services.federation.federation import (
FederationServerService,
)
from FreeTAKServer.services.federation_client_service.federation_client_service_main import (
from FreeTAKServer.core.services.federation.FederationClientService import (
FederationClientServiceController,
)
from FreeTAKServer.core.services.SSLCoTServiceController import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
import threading
from typing import Dict, List, Tuple

from digitalpy.core.service_management.digitalpy_service import DigitalPyService

from FreeTAKServer.core.serializers.protobuf_serializer import ProtobufSerializer
from FreeTAKServer.core.serializers.xml_serializer import XmlSerializer

from FreeTAKServer.core.parsers.XMLCoTController import XMLCoTController

from FreeTAKServer.core.configuration.CreateLoggerController import CreateLoggerController
from FreeTAKServer.core.configuration.LoggingConstants import LoggingConstants
from FreeTAKServer.core.configuration.MainConfig import MainConfig
Expand All @@ -38,9 +31,6 @@
from FreeTAKServer.model.protobufModel.fig_pb2 import FederatedEvent
from FreeTAKServer.model.SpecificCoT.SpecificCoTAbstract import SpecificCoTAbstract

from .controllers.client_connection_controller import ClientConnectionController
from .controllers.protobuf_controller import ProtobufController

# Make a connection to the MainConfig object for all routines below
config = MainConfig.instance()

Expand All @@ -49,7 +39,7 @@

loggingConstants = LoggingConstants()

class FederationClientServiceController(DigitalPyService):
class FederationClientServiceController(FederationServiceBase):
"""A service which controllers the connection too and transfer of data with
federated servers.
"""
Expand All @@ -64,8 +54,26 @@ def __init__(self):
self.pipe = None
self.federates: Dict[str, Federate] = {}
self.sel = selectors.DefaultSelector()
self.client_connection_controller = ClientConnectionController(self.sel, self.federates, self.db, self.logger)
self.protobuf_controller = ProtobufController()
self.user_dict = {}

def get_service_users(self) -> List[FederatedEvent]:
return self.user_dict.values()

def add_service_user(self, user: FederatedEvent) -> None:
""" add a service user to this services user persistence mechanism
Returns: None
"""
self.user_dict[user.contact.uid] = user

def remove_service_user(self, user: FederatedEvent):
""" remove a service user from this services user persistence mechanism
Returns: None
"""
del self.user_dict[user.contact.uid]

def define_responsibility_chain(self):
pass
Expand Down Expand Up @@ -191,7 +199,7 @@ def main(self):
inbound_data_thread.join()

def serialize_data(self, data_object: FederatedEvent):
specific_obj = self.protobuf_controller.process_protobuff_to_object(data_object)
specific_obj = self._process_protobuff_to_object(data_object)
return specific_obj

def outbound_data_handler(self):
Expand Down Expand Up @@ -257,56 +265,37 @@ def inbound_data_handler(self):
except Exception as e:
self.logger.error(str(e))

def _send_connected_clients(self, connection):
try:
clients = self.db.query_user()
except Exception as e:
self.logger.warning("error thrown in getting clients from DataBase to send to federates " + str(e))
return None
for client in clients:
try:
proto_obj = FederatedEvent()
proto_obj.contact.uid = str(client.uid) # pylint: disable=no-member; member does exist
proto_obj.contact.callsign = str(client.CoT.detail.contact.callsign) # pylint: disable=no-member; member does exist
proto_obj.contact.operation = 1 # pylint: disable=no-member; member does exist
proto_str = proto_obj.SerializeToString()
header = self.protobuf_controller.generate_header(len(proto_str))
connection.send(header + proto_str)
except Exception as e:
self.logger.warning("error thrown sending federate data to newly connected federate " + str(e))
continue

def connect_to_server(self, server_vars: Tuple[str, str]) -> None:
try:
federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address)
ssock.settimeout(10)
ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port)))
ssock.setblocking(False)
federate = Federate()
federate.uid = server_vars[0]
federate.addr = federate_db_obj.address
federate.conn = ssock
federate.name = federate_db_obj.name
events = selectors.EVENT_READ
self.sel.register(ssock, events, federate)
self.federates[server_vars[0]] = federate
self._send_connected_clients(ssock)
self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address,
port = federate_db_obj.port, initiator = "Self")
self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"')
return None
except Exception as ex_general:
try:
self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"')
except Exception as ex:
self.logger.warning("exception thrown removing outgoing federation from DB %s", str(ex))
self.logger.warning("exception thrown creating new federation %s", str(ex_general))
try:
self.db.update_Federation({"status": "Disabled", "lastError": str(ex_general)}, query=f'id == "{server_vars[0]}"')
except Exception as ex:
self.logger.warning("exception thrown updating federate in db %s", str(ex))
federate_db_obj = self.db.query_Federation(f'id == "{server_vars[0]}"')[0]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
ssock = self.context.wrap_socket(sock, server_hostname=federate_db_obj.address)
ssock.settimeout(10)
ssock.connect((str(federate_db_obj.address), int(federate_db_obj.port)))
ssock.setblocking(False)
federate = Federate()
federate.uid = server_vars[0]
federate.addr = federate_db_obj.address
federate.conn = ssock
federate.name = federate_db_obj.name
events = selectors.EVENT_READ
self.sel.register(ssock, events, federate)
self.federates[server_vars[0]] = federate
self._send_connected_clients(ssock)
self.db.create_ActiveFederation(id = federate_db_obj.id, address = federate_db_obj.address,
port = federate_db_obj.port, initiator = "Self")
self.db.update_Federation({"lastError": None}, query=f'id == "{federate_db_obj.id}"')
return None
except Exception as e:
try:
self.db.remove_ActiveFederation(f'id == "{server_vars[0]}"')
except Exception as e:
self.logger.warning("exception thrown removing outgoing federation from DB "+str(e))
self.logger.warning("exception thrown creating new federation "+str(e))
try:
self.db.update_Federation({"status": "Disabled", "lastError": str(e)}, query=f'id == "{server_vars[0]}"')
except Exception as e:
self.logger.warning("exception thrown updating federate in db "+str(e))

def receive_data_from_federate(self, timeout):
"""called whenever data is available from any federate and immediately proceeds to
Expand All @@ -323,7 +312,7 @@ def receive_data_from_federate(self, timeout):
continue
if header:
try:
buffer = self.protobuf_controller.get_header_length(header)
buffer = self._get_header_length(header)
raw_protobuf_message = conn.recv(buffer)
print(raw_protobuf_message)
protobuf_object = FederatedEvent()
Expand All @@ -334,11 +323,11 @@ def receive_data_from_federate(self, timeout):
conn.recv(10000)
continue
else:
self.client_connection_controller.disconnect_client(key.data.uid)
self.disconnect_client(key.data.uid)
return dataarray
else:
return None

def start(self, pipe):
self.db = DatabaseController()
self.pipe = pipe
Expand Down
Empty file.
Empty file.

This file was deleted.

Empty file.
Loading

0 comments on commit 8660340

Please sign in to comment.