From e610b7f2f153fbcd8dbd33aa1da470588e161e0d Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Thu, 21 Sep 2023 15:44:17 -0700 Subject: [PATCH] continuous deletion --- Cargo.toml | 7 ++- examples/postgres-store.rs | 7 +++ examples/sqlite-store.rs | 7 +++ src/sqlx_store/postgres_store.rs | 88 ++++++++++++++++++++++++++++++-- src/sqlx_store/sqlite_store.rs | 66 ++++++++++++++++++++++++ 5 files changed, 169 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b4286f..21b6d62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ redis-store = ["fred"] sqlx-store = ["sqlx"] sqlite-store = ["sqlx/sqlite", "sqlx-store"] postgres-store = ["sqlx/postgres", "sqlx-store"] +tokio = ["dep:tokio"] [dependencies] async-trait = "0.1" @@ -32,12 +33,14 @@ sqlx = { optional = true, version = "0.7.1", features = [ "uuid", "runtime-tokio", ] } +tokio = { optional = true, version = "1", features = ["full"] } [dev-dependencies] axum = "0.6" hyper = "0.14" tokio = { version = "1", features = ["full"] } tower = "0.4" +tokio-test = "0.4" [package.metadata.docs.rs] all-features = true @@ -57,8 +60,8 @@ required-features = ["axum-core", "redis-store"] [[example]] name = "sqlite-store" -required-features = ["axum-core", "sqlite-store"] +required-features = ["axum-core", "sqlite-store", "tokio"] [[example]] name = "postgres-store" -required-features = ["axum-core", "postgres-store"] +required-features = ["axum-core", "postgres-store", "tokio"] diff --git a/examples/postgres-store.rs b/examples/postgres-store.rs index 7a3a069..da5406c 100644 --- a/examples/postgres-store.rs +++ b/examples/postgres-store.rs @@ -17,6 +17,13 @@ async fn main() { let pool = PgPool::connect(database_url).await.unwrap(); let session_store = PostgresStore::new(pool); session_store.migrate().await.unwrap(); + + tokio::task::spawn( + session_store + .clone() + .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + ); + let session_service = ServiceBuilder::new() .layer(HandleErrorLayer::new(|_: BoxError| async { StatusCode::BAD_REQUEST diff --git a/examples/sqlite-store.rs b/examples/sqlite-store.rs index d61f0f5..6e5d8d4 100644 --- a/examples/sqlite-store.rs +++ b/examples/sqlite-store.rs @@ -16,6 +16,13 @@ async fn main() { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); let session_store = SqliteStore::new(pool); session_store.migrate().await.unwrap(); + + tokio::task::spawn( + session_store + .clone() + .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + ); + let session_service = ServiceBuilder::new() .layer(HandleErrorLayer::new(|_: BoxError| async { StatusCode::BAD_REQUEST diff --git a/src/sqlx_store/postgres_store.rs b/src/sqlx_store/postgres_store.rs index da2b917..c52bfc2 100644 --- a/src/sqlx_store/postgres_store.rs +++ b/src/sqlx_store/postgres_store.rs @@ -7,7 +7,7 @@ use crate::{ Session, SessionStore, SqlxStoreError, }; -/// A SQLite session store. +/// A PostgreSQL session store. #[derive(Clone, Debug)] pub struct PostgresStore { pool: PgPool, @@ -17,6 +17,18 @@ pub struct PostgresStore { impl PostgresStore { /// Create a new PostgreSQL store with the provided connection pool. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tower_sessions::{sqlx::PgPool, PostgresStore}; + /// + /// # tokio_test::block_on(async { + /// let database_url = std::option_env!("DATABASE_URL").unwrap(); + /// let pool = PgPool::connect(database_url).await.unwrap(); + /// let session_store = PostgresStore::new(pool); + /// # }) + /// ``` pub fn new(pool: PgPool) -> Self { Self { pool, @@ -26,6 +38,19 @@ impl PostgresStore { } /// Migrate the session schema. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tower_sessions::{sqlx::PgPool, PostgresStore}; + /// + /// # tokio_test::block_on(async { + /// let database_url = std::option_env!("DATABASE_URL").unwrap(); + /// let pool = PgPool::connect(database_url).await.unwrap(); + /// let session_store = PostgresStore::new(pool); + /// session_store.migrate().await.unwrap(); + /// # }) + /// ``` pub async fn migrate(&self) -> sqlx::Result<()> { let mut tx = self.pool.begin().await?; @@ -53,6 +78,63 @@ impl PostgresStore { Ok(()) } + + #[cfg(feature = "tokio")] + /// This function will keep running indefinitely, deleting expired rows and + /// then waiting for the specified period before deleting again. + /// + /// Generally this will be used as a task, for example via + /// `tokio::task::spawn`. + /// + /// # Arguments + /// + /// * `period` - The interval at which expired rows should be deleted. + /// + /// # Errors + /// + /// This function returns a `Result` that contains an error of type + /// `sqlx::Error` if the deletion operation fails. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tower_sessions::{sqlx::PgPool, PostgresStore}; + /// + /// # tokio_test::block_on(async { + /// let database_url = std::option_env!("DATABASE_URL").unwrap(); + /// let pool = PgPool::connect(database_url).await.unwrap(); + /// let session_store = PostgresStore::new(pool); + /// + /// tokio::task::spawn( + /// session_store + /// .clone() + /// .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + /// ); + /// # }) + /// ``` + pub async fn continuously_delete_expired( + self, + period: tokio::time::Duration, + ) -> Result<(), sqlx::Error> { + let mut interval = tokio::time::interval(period); + loop { + self.delete_expired().await?; + interval.tick().await; + } + } + + async fn delete_expired(&self) -> sqlx::Result<()> { + let query = format!( + r#" + delete from {schema_name}.{table_name} + where expiration_time < (now() at time zone 'utc') + "#, + schema_name = self.schema_name, + table_name = self.table_name + ); + sqlx::query(&query).execute(&self.pool).await?; + Ok(()) + } } #[async_trait] @@ -109,9 +191,7 @@ impl SessionStore for PostgresStore { async fn delete(&self, session_id: &SessionId) -> Result<(), Self::Error> { let query = format!( - r#" - delete from {schema_name}.{table_name} where id = $1 - "#, + r#"delete from {schema_name}.{table_name} where id = $1"#, schema_name = self.schema_name, table_name = self.table_name ); diff --git a/src/sqlx_store/sqlite_store.rs b/src/sqlx_store/sqlite_store.rs index 318526c..e006e20 100644 --- a/src/sqlx_store/sqlite_store.rs +++ b/src/sqlx_store/sqlite_store.rs @@ -16,6 +16,17 @@ pub struct SqliteStore { impl SqliteStore { /// Create a new SQLite store with the provided connection pool. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tower_sessions::{sqlx::SqlitePool, SqliteStore}; + /// + /// # tokio_test::block_on(async { + /// let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + /// let session_store = SqliteStore::new(pool); + /// # }) + /// ``` pub fn new(pool: SqlitePool) -> Self { Self { pool, @@ -54,6 +65,61 @@ impl SqliteStore { sqlx::query(&query).execute(&self.pool).await?; Ok(()) } + + async fn delete_expired(&self) -> sqlx::Result<()> { + let query = format!( + r#" + delete from {table_name} + where expiration_time < datetime('now', 'utc') + "#, + table_name = self.table_name + ); + sqlx::query(&query).execute(&self.pool).await?; + Ok(()) + } + + #[cfg(feature = "tokio")] + /// This function will keep running indefinitely, deleting expired rows and + /// then waiting for the specified period before deleting again. + /// + /// Generally this will be used as a task, for example via + /// `tokio::task::spawn`. + /// + /// # Arguments + /// + /// * `period` - The interval at which expired rows should be deleted. + /// + /// # Errors + /// + /// This function returns a `Result` that contains an error of type + /// `sqlx::Error` if the deletion operation fails. + /// + /// # Examples + /// + /// ```rust,no_run + /// use tower_sessions::{sqlx::SqlitePool, SqliteStore}; + /// + /// # tokio_test::block_on(async { + /// let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + /// let session_store = SqliteStore::new(pool); + /// + /// tokio::task::spawn( + /// session_store + /// .clone() + /// .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + /// ); + /// # }) + /// ``` + pub async fn continuously_delete_expired( + self, + period: tokio::time::Duration, + ) -> Result<(), sqlx::Error> { + let mut interval = tokio::time::interval(period); + loop { + self.delete_expired().await?; + interval.tick().await; + } + } } #[async_trait]