Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB_ADDON: Bugfix #942

Merged
merged 6 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 41 additions & 91 deletions db_addon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import time
import re
import queue
import threading
import logging
import pickle
import operator
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -62,7 +60,7 @@ class DatabaseAddOn(SmartPlugin):
Main class of the Plugin. Does all plugin specific stuff and provides the update functions for the items
"""

PLUGIN_VERSION = '1.2.8'
PLUGIN_VERSION = '1.2.9'

def __init__(self, sh):
"""
Expand All @@ -80,17 +78,15 @@ def __init__(self, sh):
self.plugins = Plugins.get_instance()

# define cache dicts
self.pickle_data_validity_time = 600 # seconds after which the data saved in pickle are not valid anymore
self.current_values = {} # Dict to hold min and max value of current day / week / month / year for items
self.previous_values = {} # Dict to hold value of end of last day / week / month / year for items
self.item_cache = {} # Dict to hold item_id, oldest_log_ts and oldest_entry for items
self.value_list_raw_data = {}
self.pickle_data_validity_time = 600 # seconds after which the data saved in pickle are not valid anymore
self.current_values = {} # Dict to hold min and max value of current day / week / month / year for items
self.previous_values = {} # Dict to hold value of end of last day / week / month / year for items
self.item_cache = {} # Dict to hold item_id, oldest_log_ts and oldest_entry for items
self.value_list_raw_data = {} # List to hold raw data

# define variables for database, database connection, working queue and status
self.item_queue = queue.Queue() # Queue containing all to be executed items
self.update_item_delay_deque = deque() # Deque for delay working of updated item values
# ToDo: Check if still needed
self.queue_consumer_thread = None # Queue consumer thread
self._db_plugin = None # object if database plugin
self._db = None # object of database
self.connection_data = None # connection data list of database
Expand All @@ -100,7 +96,8 @@ def __init__(self, sh):
self.last_connect_time = 0 # mechanism for limiting db connection requests
self.alive = None # Is plugin alive?
self.active_queue_item: str = '-' # String holding item path of currently executed item
self.onchange_delay_time = 30
self.onchange_delay_time = 30 # delay time in seconds between change of database item start of reevaluation of db_addon item
self.database_item_list = [] # list of needed database items

# define default mysql settings
self.default_connect_timeout = 60
Expand Down Expand Up @@ -168,25 +165,15 @@ def run(self):
# update database_items in item config, where path was given
self._update_database_items()

# create list if all relevant database items
self._create_list_of_relevant_database_items()

# set plugin to alive
self.alive = True

# work item queue
self.work_item_queue()

# ToDo: Check if still needed
"""
try:
self._queue_consumer_thread_startup()
except Exception as e:
self.logger.warning(f"During working item queue Exception '{e}' occurred.")
self.logger.debug(e, exc_info=True)
# self.logger.error("Thread for working item queue died. De-init plugin.")
# self.deinit()
self.logger.error("Suspend Plugin and clear Item-Queue.")
self.suspend(True)
"""

def stop(self):
"""
Stop method for the plugin
Expand All @@ -200,9 +187,6 @@ def stop(self):
self._db.close()
self.save_cache_data()

# ToDo: Check if still needed
# self._queue_consumer_thread_shutdown()

def parse_item(self, item: Item):
"""
Default plugin parse_item method. Is called when the plugin is initialized.
Expand Down Expand Up @@ -525,23 +509,6 @@ def get_database_item() -> Item:

return None, None

def has_db_addon_item() -> bool:
"""Returns item from shNG config which is item with db_addon attribut valid for database item"""

for child in item.return_children():
if check_db_addon_fct(child):
return True

for child_child in child.return_children():
if check_db_addon_fct(child_child):
return True

for child_child_child in child_child.return_children():
if check_db_addon_fct(child_child_child):
return True

return False

def check_db_addon_fct(check_item) -> bool:
"""
Check if item has db_addon_fct and is onchange
Expand Down Expand Up @@ -625,7 +592,6 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte

# read item_attribute_dict aus item_attributes_master
item_attribute_dict = ITEM_ATTRIBUTES['db_addon_fct'].get(db_addon_fct)
self.logger.debug(f"{db_addon_fct}: {item_attribute_dict=}")

# get query parameters from db_addon_fct or db_addon_params
if item_attribute_dict['params']:
Expand Down Expand Up @@ -682,11 +648,9 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
if self.debug_log.parse:
self.logger.debug(f"Item={item.property.path} added with db_addon_fct={db_addon_fct} and database_item={database_item}")

# add type (onchange or ondemand) to item dict
item_config_data_dict.update({'on': item_attribute_dict['on']})

# add cycle for item groups
cycle = item_attribute_dict['calc']
cycle = item_attribute_dict['cycle']
on = 'demand'
if cycle == 'group':
cycle = item_config_data_dict['query_params'].get('group')
if not cycle:
Expand All @@ -695,13 +659,19 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
elif cycle == 'timeframe':
cycle = item_config_data_dict['query_params'].get('timeframe')
cycle = f"{timeframe_to_updatecyle(cycle)}"
elif cycle == 'None':
cycle = None
item_config_data_dict.update({'cycle': cycle})
elif not cycle:
on = 'change'
item_config_data_dict.update({'cycle': cycle, 'on': on})

# do logging
if self.debug_log.parse:
self.logger.debug(f"Item '{item.property.path}' added to be run {item_config_data_dict['cycle']}.")
if cycle:
self.logger.debug(f"Item '{item.property.path}' added to be run {item_config_data_dict['cycle']}.")
else:
self.logger.debug(f"Item '{item.property.path}' added but will not be run cyclic.")

if on == 'change':
self.logger.debug(f"Item '{item.property.path}' added and will be run on-change of {database_item}.")

# create item config for item to be run on startup
if db_addon_startup or item_attribute_dict['cat'] == 'gen':
Expand All @@ -710,6 +680,8 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
item_config_data_dict.update({'startup': False})

# add item to plugin item dict
if self.debug_log.parse:
self.logger.debug(f"Item '{item.property.path}' completely parsed: {item_config_data_dict=}.")
self.add_item(item, config_data_dict=item_config_data_dict)

# handle all items with db_addon_info
Expand All @@ -725,11 +697,8 @@ def format_db_addon_ignore_value_list(optimize: bool = self.optimize_value_filte
self.add_item(item, config_data_dict={'db_addon': 'admin', 'db_addon_fct': f"admin_{self.get_iattr_value(item.conf, 'db_addon_admin').lower()}", 'database_item': None})
return self.update_item

# Reference to 'update_item' für alle Items mit Attribut 'database', um die on_change Items zu berechnen
elif self.has_iattr(item.conf, self.item_attribute_search_str) and has_db_addon_item():
if self.debug_log.parse:
self.logger.debug(f"reference to update_item for item={item.property.path} will be set due to onchange")
self.add_item(item, config_data_dict={'db_addon': 'database'})
# Reference to 'update_item' for all database items to trigger calculation of on-change items
elif self.has_iattr(item.conf, self.item_attribute_search_str):
return self.update_item

def update_item(self, item, caller=None, source=None, dest=None):
Expand All @@ -747,8 +716,6 @@ def update_item(self, item, caller=None, source=None, dest=None):
if self.alive and caller != self.get_shortname():
# handle database items
if item in self._database_items():
# if not self.startup_finished:
# self.logger.info(f"Handling of 'onchange' is paused for startup. No updated will be processed.")
self.logger.debug(f" Updated Item {item.property.path} with value {item()} will be put to queue in approx. {self.onchange_delay_time}s resp. after startup.")
self.update_item_delay_deque.append([item, item(), int(time.time() + self.onchange_delay_time)])

Expand Down Expand Up @@ -1274,7 +1241,7 @@ def _update_database_items(self) -> None:
item_config.update({'startup': True})

def _suspend_item_calculation(self, item: Union[str, Item], suspended: bool = False) -> Union[bool, None]:
"""suspend calculation od decicated item"""
"""suspend calculation od dedicated item"""
if isinstance(item, str):
item = self.items.return_item(item)

Expand All @@ -1285,6 +1252,16 @@ def _suspend_item_calculation(self, item: Union[str, Item], suspended: bool = Fa
item_config['suspended'] = suspended
return suspended

def _create_list_of_relevant_database_items(self):
"""creates list of all relevant database items for further reference"""
_database_items = set()
for item in self.get_item_list('database_item'):
item_config = self.get_item_config(item)
database_item = item_config.get('database_item')
if database_item is not None:
_database_items.add(database_item)
self.database_item_list = list(_database_items)

@property
def log_level(self) -> int:
return self.logger.getEffectiveLevel()
Expand Down Expand Up @@ -1359,7 +1336,7 @@ def _info_items(self) -> list:
return self.get_item_list('db_addon', 'info')

def _database_items(self) -> list:
return self.get_item_list('db_addon', 'database')
return self.database_item_list

def _database_item_path_items(self) -> list:
return self.get_item_list('database_item_path', True)
Expand Down Expand Up @@ -2429,33 +2406,6 @@ def _clear_queue(self) -> None:
self.logger.info(f"Working queue will be cleared. Calculation run will end.")
self.item_queue.queue.clear()

# ToDo: Check if still needed
def _queue_consumer_thread_startup(self):
"""Start a thread to work item queue"""

self.logger = logging.getLogger(__name__)
_name = 'plugins.' + self.get_fullname() + '.work_item_queue'

try:
self.queue_consumer_thread = threading.Thread(target=self.work_item_queue, name=_name, daemon=False)
self.queue_consumer_thread.start()
self.logger.debug("Thread for 'queue_consumer_thread' has been started")
except threading.ThreadError:
self.logger.error("Unable to launch thread for 'queue_consumer_thread'.")
self.queue_consumer_thread = None

# ToDo: Check if still needed
def _queue_consumer_thread_shutdown(self):
"""Shut down the thread to work item queue"""

if self.queue_consumer_thread:
self.queue_consumer_thread.join()
if self.queue_consumer_thread.is_alive():
self.logger.error("Unable to shut down 'queue_consumer_thread' thread")
else:
self.logger.info("Thread 'queue_consumer_thread' has been shut down.")
self.queue_consumer_thread = None

def _get_start_end_as_timestamp(self, timeframe: str, start: Union[int, str, None], end: Union[int, str, None]) -> tuple:
"""
Provides start and end as timestamp in microseconds from timeframe with start and end
Expand Down Expand Up @@ -2605,7 +2555,7 @@ def _query_log_timestamp(self, func: str, item_id: int, ts_start: int, ts_end: i
'last': 'LIMIT 1 ',
}

_where = "item_id = :item_id AND time < :ts_end " if func == "next" else "item_id = :item_id AND time BETWEEN :ts_start AND :ts_end "
_where = "item_id = :item_id AND time <= :ts_start " if func == "next" else "item_id = :item_id AND time BETWEEN :ts_start AND :ts_end "

_db_table = 'log '

Expand Down Expand Up @@ -2657,7 +2607,7 @@ def _query_log_timestamp(self, func: str, item_id: int, ts_start: int, ts_end: i
# set params
params = {'item_id': item_id, 'ts_start': ts_start, 'ts_end': ts_end}
if func == "next":
params.pop('ts_start', None)
params.pop('ts_end', None)

# assemble query
query = f"SELECT {_select[func]}FROM {_db_table}WHERE {_where}{_group_by.get(group, '')}{_order.get(func, '')}{_limit.get(func, '')}{_table_alias.get(func, '')}{_group_by.get(group2, '')}".strip()
Expand Down
Loading
Loading