Skip to content

Commit

Permalink
Fix v2.2.1: Fix the database connection logic and prometheus thread m…
Browse files Browse the repository at this point in the history
…etrics (#94)

## v2.2.1 - 2024-08-24
### What's Changed
**Full Changelog**: v2.2.0...v2.2.1 by @obervinov in #94
#### 🐛 Bug Fixes
* #93
* #92
  • Loading branch information
obervinov authored Aug 24, 2024
1 parent 4d4335d commit b340a79
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 101 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/).


## v2.2.1 - 2024-08-24
### What's Changed
**Full Changelog**: https://github.com/obervinov/pyinstabot-downloader/compare/v2.2.0...v2.2.1 by @obervinov in https://github.com/obervinov/pyinstabot-downloader/pull/94
#### 🐛 Bug Fixes
* [Bug: Correct the exception Cursor already close](https://github.com/obervinov/pyinstabot-downloader/issues/93)
* [Bug: Display the state of all additional bot threads in prometheus metrics](https://github.com/obervinov/pyinstabot-downloader/issues/92)


## v2.2.0 - 2024-08-20
### What's Changed
**Full Changelog**: https://github.com/obervinov/pyinstabot-downloader/compare/v2.1.8...v2.2.0 by @obervinov in https://github.com/obervinov/pyinstabot-downloader/pull/90
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ _except for the part of the configuration that configures the connection to `Vau
"host": "postgresql.example.com",
"password": "qwerty123",
"port": "5432",
"user": "user1"
"user": "user1",
"connections": "10"
}
```
</br>
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pyinstabot-downloader"
version = "2.2.0"
version = "2.2.1"
description = "This project is a Telegram bot that allows you to upload posts from your Instagram profile to clouds like Dropbox, Mega or any WebDav compatible cloud storage."
authors = ["Bervinov Oleg <[email protected]>"]
maintainers = ["Bervinov Oleg <[email protected]>"]
Expand Down
8 changes: 4 additions & 4 deletions src/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -603,14 +603,14 @@ def main():
None
"""
# Thread for processing queue
thread_queue_handler = threading.Thread(target=queue_handler_thread, args=(), name="Thread-queue-handler")
thread_queue_handler = threading.Thread(target=queue_handler_thread, args=(), name="QueueHandlerThread")
thread_queue_handler.start()
# Thread for update status message
thread_status_message = threading.Thread(target=status_message_updater_thread, args=(), name="Thread-message-updater")
thread_status_message = threading.Thread(target=status_message_updater_thread, args=(), name="MessageUpdaterThread")
thread_status_message.start()
# Thread for export metrics
threads = [thread_queue_handler, thread_status_message]
thread_metrics = threading.Thread(target=metrics.run, args=(threads,), name="Thread-metrics")
threads = threading.enumerate()
thread_metrics = threading.Thread(target=metrics.run, args=(threads,), name="MetricsThread")
thread_metrics.start()
# Run bot
while True:
Expand Down
198 changes: 103 additions & 95 deletions src/modules/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from typing import Union
import psycopg2
from psycopg2 import pool
from logger import log
from .tools import get_hash

Expand All @@ -19,22 +20,12 @@ def wrapper(self, *args, **kwargs):
return method(self, *args, **kwargs)
except psycopg2.Error as exception:
log.warning('[Database]: Connection to the database was lost: %s. Attempting to reconnect...', str(exception))
time.sleep(5)
try:
if self.database_connection:
self.database_connection.close()
db_configuration = self.vault.read_secret(path='configuration/database')
self.database_connection = psycopg2.connect(
host=db_configuration['host'],
port=db_configuration['port'],
user=db_configuration['user'],
password=db_configuration['password'],
database=db_configuration['database']
)
self.cursor = self.database_connection.cursor()
self.database_connections = self.create_connection_pool()
log.info('[Database]: Reconnection successful.')
return method(self, *args, **kwargs)
except psycopg2.Error as inner_exception:
time.sleep(10)
log.error('[Database]: Failed to reconnect to the database: %s', str(inner_exception))
raise inner_exception
return wrapper
Expand All @@ -43,6 +34,41 @@ def wrapper(self, *args, **kwargs):
class DatabaseClient:
"""
A class that represents a client for interacting with a PostgreSQL database.
Attributes:
database_connections (psycopg2.extensions.connection): A connection to the PostgreSQL database.
vault (object): An object representing a HashiCorp Vault client for retrieving secrets.
errors (psycopg2.errors): A collection of error classes for exceptions raised by the psycopg2 module.
Methods:
_create_connection_pool(): Create a connection pool for the PostgreSQL database.
_get_connection(): Get a connection from the connection pool.
_close_connection(connection): Close the connection and return it to the connection pool.
_prepare_db(): Prepare the database by creating and initializing the necessary tables.
_migrations(): Execute database migrations to update the database schema or data.
_is_migration_executed(migration_name): Check if a migration has already been executed.
_mark_migration_as_executed(migration_name, version): Inserts a migration into the migrations table to mark it as executed.
_create_table(table_name, columns): Create a new table in the database with the given name and columns if it does not already exist.
_insert(table_name, columns, values): Inserts a new row into the specified table with the given columns and values.
_select(table_name, columns, **kwargs): Selects rows from the specified table with the given columns based on the specified condition.
_update(table_name, values, condition): Update the specified table with the given values of values based on the specified condition.
_delete(table_name, condition): Delete rows from a table based on a condition.
_reset_stale_records(): Reset stale records in the database. To ensure that the bot is restored after a restart.
add_message_to_queue(data): Add a message to the queue table in the database.
get_message_from_queue(scheduled_time): Get a one message from the queue table that is scheduled to be sent at the specified time.
update_message_state_in_queue(post_id, state, **kwargs): Update the state of a message in the queue table and move it to the processed table
if the state is 'processed'.
update_schedule_time_in_queue(post_id, user_id, scheduled_time): Update the scheduled time of a message in the queue table.
get_user_queue(user_id): Get messages from the queue table for the specified user.
get_user_processed(user_id): Get last ten messages from the processed table for the specified user.
check_message_uniqueness(post_id, user_id): Check if a message with the given post ID and chat ID already exists in the queue.
keep_message(message_id, chat_id, message_content, **kwargs): Add a message to the messages table in the database.
add_user(user_id, chat_id): Add a user to the users table in the database.
get_users(): Get a list of all users in the database.
get_considered_message(message_type, chat_id): Get a message with specified type and
Rises:
psycopg2.Error: An error occurred while interacting with the PostgreSQL database.
"""
def __init__(
self,
Expand All @@ -54,63 +80,64 @@ def __init__(
Args:
vault (object): An object representing a HashiCorp Vault client for retrieving secrets with the database configuration.
Attributes:
database_connection (psycopg2.extensions.connection): A connection to the PostgreSQL database.
cursor (psycopg2.extensions.cursor): A cursor for executing SQL queries on the database.
vault (object): An object representing a HashiCorp Vault client for retrieving secrets.
Parameters:
host (str): The hostname of the database server.
port (int): The port number of the database server.
user (str): The username to use when connecting to the database.
password (str): The password to use when connecting to the database.
database (str): The name of the database to connect to.
log (object): An object representing a logger for logging messages.
Returns:
None
Examples:
To create a new instance of the Database class:
>>> from modules.database import Database
>>> from modules.vault import Vault
>>> vault = Vault()
>>> db = Database(vault=vault)
"""
db_configuration = vault.read_secret(path='configuration/database')
self.vault = vault
self.errors = psycopg2.errors
self.database_connections = self.create_connection_pool()

self._prepare_db()
self._migrations()
self._reset_stale_records()

def create_connection_pool(self) -> pool.SimpleConnectionPool:
"""
Create a connection pool for the PostgreSQL database.
self.database_connection = psycopg2.connect(
Returns:
pool.SimpleConnectionPool: A connection pool for the PostgreSQL database.
"""
db_configuration = self.vault.read_secret(path='configuration/database')
log.info(
'[Database]: Creating a connection pool for the %s:%s/%s',
db_configuration['host'], db_configuration['port'], db_configuration['database']
)
return pool.SimpleConnectionPool(
minconn=1,
maxconn=db_configuration['connections'],
host=db_configuration['host'],
port=db_configuration['port'],
user=db_configuration['user'],
password=db_configuration['password'],
database=db_configuration['database']
)
log.info(
'[Database]: initialized connection to %s:%s/%s',
db_configuration['host'], db_configuration['port'], db_configuration['database']
)

self.errors = psycopg2.errors
self.cursor = self.database_connection.cursor()
self.vault = vault
def _get_connection(self) -> psycopg2.extensions.connection:
"""
Get a connection from the connection pool.
self._prepare_db()
self._migrations()
self._reset_stale_records()
Returns:
psycopg2.extensions.connection: A connection to the PostgreSQL database.
"""
return self.database_connections.getconn()

def _prepare_db(self) -> None:
def _close_connection(self, connection: psycopg2.extensions.connection) -> None:
"""
Prepare the database by creating and initializing the necessary tables.
Close the cursor and return it to the connection pool.
Args:
None
Parameters:
None
connection (psycopg2.extensions.connection): A connection to the PostgreSQL database.
"""
self.database_connections.putconn(connection)

Returns:
None
def _prepare_db(self) -> None:
"""
Prepare the database by creating and initializing the necessary tables.
"""
# Read configuration file for database initialization
configuration_path = os.path.abspath(os.path.join(os.path.dirname(os.path.abspath(__file__)), '../configs/databases.json'))
Expand Down Expand Up @@ -140,15 +167,6 @@ def _prepare_db(self) -> None:
def _migrations(self) -> None:
"""
Execute database migrations to update the database schema or data.
Args:
None
Parameters:
None
Returns:
None
"""
log.info('[Database]: Migrations: Preparing to execute database migrations...')
# Migrations directory
Expand Down Expand Up @@ -181,8 +199,7 @@ def _is_migration_executed(
Returns:
bool: True if the migration has been executed, False otherwise.
"""
self.cursor.execute(f"SELECT id FROM migrations WHERE name = '{migration_name}'")
return self.cursor.fetchone() is not None
return self._select(table_name='migrations', columns=('id',), condition=f"name = '{migration_name}'")

def _mark_migration_as_executed(
self,
Expand All @@ -194,12 +211,8 @@ def _mark_migration_as_executed(
Args:
migration_name (str): The name of the migration to mark as executed.
Returns:
None
"""
self.cursor.execute(f"INSERT INTO migrations (name, version) VALUES ('{migration_name}', '{version}')")
self.database_connection.commit()
self._insert(table_name='migrations', columns=('name', 'version'), values=(migration_name, version))

def _create_table(
self,
Expand All @@ -213,15 +226,15 @@ def _create_table(
table_name (str): The name of the table to create.
columns (str): A string containing the column definitions for the table.
Returns:
None
Examples:
To create a new table called 'users' with columns 'id' and 'name', you can call the method like this:
>>> _create_table('users', 'id INTEGER PRIMARY KEY, name TEXT')
"""
self.cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")
self.database_connection.commit()
conn = self._get_connection()
with conn.cursor() as cursor:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns})")
conn.commit()
self._close_connection(conn)

@reconnect_on_exception
def _insert(
Expand All @@ -238,9 +251,6 @@ def _insert(
columns (tuple): A tuple containing the names of the columns to insert the values into.
values (tuple): A tuple containing the values to insert into the table.
Returns:
None
Examples:
>>> db_client._insert(
... table_name='users',
Expand All @@ -250,8 +260,11 @@ def _insert(
"""
try:
sql_query = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join(['%s'] * len(columns))})"
self.cursor.execute(sql_query, values)
self.database_connection.commit()
conn = self._get_connection()
with conn.cursor() as cursor:
cursor.execute(sql_query, values)
conn.commit()
self._close_connection(conn)
except (psycopg2.Error, IndexError) as error:
log.error(
'[Database]: An error occurred while inserting a row into the table %s: %s\nColumns: %s\nValues: %s\nQuery: %s',
Expand Down Expand Up @@ -296,8 +309,12 @@ def _select(
if kwargs.get('limit', None):
sql_query += f" LIMIT {kwargs.get('limit')}"

self.cursor.execute(sql_query)
return self.cursor.fetchall()
conn = self._get_connection()
with conn.cursor() as cursor:
cursor.execute(sql_query)
response = cursor.fetchall()
self._close_connection(conn)
return response if response else None

@reconnect_on_exception
def _update(
Expand All @@ -314,14 +331,14 @@ def _update(
values (str): The values of values to update in the table.
condition (str): The condition to use for updating the table.
Returns:
None
Examples:
>>> _update('users', "username='new_username', password='new_password'", "id=1")
"""
self.cursor.execute(f"UPDATE {table_name} SET {values} WHERE {condition}")
self.database_connection.commit()
conn = self._get_connection()
with conn.cursor() as cursor:
cursor.execute(f"UPDATE {table_name} SET {values} WHERE {condition}")
conn.commit()
self._close_connection(conn)

@reconnect_on_exception
def _delete(
Expand All @@ -336,26 +353,20 @@ def _delete(
table_name (str): The name of the table to delete rows from.
condition (str): The condition to use to determine which rows to delete.
Returns:
None
Examples:
To delete all rows from the 'users' table where the 'username' column is 'john':
>>> db._delete('users', "username='john'")
"""
self.cursor.execute(f"DELETE FROM {table_name} WHERE {condition}")
self.database_connection.commit()
conn = self._get_connection()
with conn.cursor() as cursor:
cursor.execute(f"DELETE FROM {table_name} WHERE {condition}")
conn.commit()
self._close_connection(conn)

def _reset_stale_records(self) -> None:
"""
Reset stale records in the database. To ensure that the bot is restored after a restart.
More: https://github.com/obervinov/pyinstabot-downloader/issues/84
Args:
None
Returns:
None
"""
# Reset stale status_message (can be only one status_message per chat)
log.info('[Database]: Resetting stale status messages...')
Expand Down Expand Up @@ -799,9 +810,6 @@ def get_users(self) -> list:
"""
Get a list of all users in the database.
Args:
None
Returns:
list: A list of all users from the messages table.
Expand Down

0 comments on commit b340a79

Please sign in to comment.