diff --git a/broadcast_service/_core.py b/broadcast_service/_core.py index a240b4a..fcda19b 100644 --- a/broadcast_service/_core.py +++ b/broadcast_service/_core.py @@ -15,6 +15,7 @@ import logging from typing import Optional, List, Callable from concurrent.futures import ThreadPoolExecutor +from broadcast_service.singleton import Singleton __all__ = ['broadcast_service', 'BroadcastService', 'enable_log'] @@ -23,7 +24,7 @@ def enable_log(): logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') -class BroadcastService: +class BroadcastService(metaclass=Singleton): """ This class implements broadcast mode, you can import the instance by single class. By BroadcastService, you can send topic message,it will automatically execute the diff --git a/broadcast_service/singleton.py b/broadcast_service/singleton.py new file mode 100644 index 0000000..55b2aee --- /dev/null +++ b/broadcast_service/singleton.py @@ -0,0 +1,24 @@ +"""The singleton metaclass for ensuring only one instance of a class.""" +import abc + + +class Singleton(abc.ABCMeta, type): + """ + Singleton metaclass for ensuring only one instance of a class. + """ + + _instances = {} + + def __call__(cls, *args, **kwargs): + """Call method for the singleton metaclass.""" + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class AbstractSingleton(abc.ABC, metaclass=Singleton): + """ + Abstract singleton class for ensuring only one instance of a class. + """ + + pass diff --git a/setup.py b/setup.py index 2878693..ceadf1a 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setuptools.setup( name="broadcast_service", - version="1.3.0", + version="1.3.1", author="Zeeland", author_email="zeeland@foxmail.com", description="A lightweight third-party broadcast/pubsub library",