Skip to content

Commit

Permalink
continuous deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcountryman committed Sep 21, 2023
1 parent 39d20a7 commit e610b7f
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 6 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ redis-store = ["fred"]
sqlx-store = ["sqlx"]
sqlite-store = ["sqlx/sqlite", "sqlx-store"]
postgres-store = ["sqlx/postgres", "sqlx-store"]
tokio = ["dep:tokio"]

[dependencies]
async-trait = "0.1"
Expand All @@ -32,12 +33,14 @@ sqlx = { optional = true, version = "0.7.1", features = [
"uuid",
"runtime-tokio",
] }
tokio = { optional = true, version = "1", features = ["full"] }

[dev-dependencies]
axum = "0.6"
hyper = "0.14"
tokio = { version = "1", features = ["full"] }
tower = "0.4"
tokio-test = "0.4"

[package.metadata.docs.rs]
all-features = true
Expand All @@ -57,8 +60,8 @@ required-features = ["axum-core", "redis-store"]

[[example]]
name = "sqlite-store"
required-features = ["axum-core", "sqlite-store"]
required-features = ["axum-core", "sqlite-store", "tokio"]

[[example]]
name = "postgres-store"
required-features = ["axum-core", "postgres-store"]
required-features = ["axum-core", "postgres-store", "tokio"]
7 changes: 7 additions & 0 deletions examples/postgres-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ async fn main() {
let pool = PgPool::connect(database_url).await.unwrap();
let session_store = PostgresStore::new(pool);
session_store.migrate().await.unwrap();

tokio::task::spawn(
session_store
.clone()
.continuously_delete_expired(tokio::time::Duration::from_secs(60)),
);

let session_service = ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_: BoxError| async {
StatusCode::BAD_REQUEST
Expand Down
7 changes: 7 additions & 0 deletions examples/sqlite-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ async fn main() {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
let session_store = SqliteStore::new(pool);
session_store.migrate().await.unwrap();

tokio::task::spawn(
session_store
.clone()
.continuously_delete_expired(tokio::time::Duration::from_secs(60)),
);

let session_service = ServiceBuilder::new()
.layer(HandleErrorLayer::new(|_: BoxError| async {
StatusCode::BAD_REQUEST
Expand Down
88 changes: 84 additions & 4 deletions src/sqlx_store/postgres_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
Session, SessionStore, SqlxStoreError,
};

/// A SQLite session store.
/// A PostgreSQL session store.
#[derive(Clone, Debug)]
pub struct PostgresStore {
pool: PgPool,
Expand All @@ -17,6 +17,18 @@ pub struct PostgresStore {

impl PostgresStore {
/// Create a new PostgreSQL store with the provided connection pool.
///
/// # Examples
///
/// ```rust,no_run
/// use tower_sessions::{sqlx::PgPool, PostgresStore};
///
/// # tokio_test::block_on(async {
/// let database_url = std::option_env!("DATABASE_URL").unwrap();
/// let pool = PgPool::connect(database_url).await.unwrap();
/// let session_store = PostgresStore::new(pool);
/// # })
/// ```
pub fn new(pool: PgPool) -> Self {
Self {
pool,
Expand All @@ -26,6 +38,19 @@ impl PostgresStore {
}

/// Migrate the session schema.
///
/// # Examples
///
/// ```rust,no_run
/// use tower_sessions::{sqlx::PgPool, PostgresStore};
///
/// # tokio_test::block_on(async {
/// let database_url = std::option_env!("DATABASE_URL").unwrap();
/// let pool = PgPool::connect(database_url).await.unwrap();
/// let session_store = PostgresStore::new(pool);
/// session_store.migrate().await.unwrap();
/// # })
/// ```
pub async fn migrate(&self) -> sqlx::Result<()> {
let mut tx = self.pool.begin().await?;

Expand Down Expand Up @@ -53,6 +78,63 @@ impl PostgresStore {

Ok(())
}

#[cfg(feature = "tokio")]
/// This function will keep running indefinitely, deleting expired rows and
/// then waiting for the specified period before deleting again.
///
/// Generally this will be used as a task, for example via
/// `tokio::task::spawn`.
///
/// # Arguments
///
/// * `period` - The interval at which expired rows should be deleted.
///
/// # Errors
///
/// This function returns a `Result` that contains an error of type
/// `sqlx::Error` if the deletion operation fails.
///
/// # Examples
///
/// ```rust,no_run
/// use tower_sessions::{sqlx::PgPool, PostgresStore};
///
/// # tokio_test::block_on(async {
/// let database_url = std::option_env!("DATABASE_URL").unwrap();
/// let pool = PgPool::connect(database_url).await.unwrap();
/// let session_store = PostgresStore::new(pool);
///
/// tokio::task::spawn(
/// session_store
/// .clone()
/// .continuously_delete_expired(tokio::time::Duration::from_secs(60)),
/// );
/// # })
/// ```
pub async fn continuously_delete_expired(
self,
period: tokio::time::Duration,
) -> Result<(), sqlx::Error> {
let mut interval = tokio::time::interval(period);
loop {
self.delete_expired().await?;
interval.tick().await;
}
}

async fn delete_expired(&self) -> sqlx::Result<()> {
let query = format!(
r#"
delete from {schema_name}.{table_name}
where expiration_time < (now() at time zone 'utc')
"#,
schema_name = self.schema_name,
table_name = self.table_name
);
sqlx::query(&query).execute(&self.pool).await?;
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -109,9 +191,7 @@ impl SessionStore for PostgresStore {

async fn delete(&self, session_id: &SessionId) -> Result<(), Self::Error> {
let query = format!(
r#"
delete from {schema_name}.{table_name} where id = $1
"#,
r#"delete from {schema_name}.{table_name} where id = $1"#,
schema_name = self.schema_name,
table_name = self.table_name
);
Expand Down
66 changes: 66 additions & 0 deletions src/sqlx_store/sqlite_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ pub struct SqliteStore {

impl SqliteStore {
/// Create a new SQLite store with the provided connection pool.
///
/// # Examples
///
/// ```rust,no_run
/// use tower_sessions::{sqlx::SqlitePool, SqliteStore};
///
/// # tokio_test::block_on(async {
/// let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
/// let session_store = SqliteStore::new(pool);
/// # })
/// ```
pub fn new(pool: SqlitePool) -> Self {
Self {
pool,
Expand Down Expand Up @@ -54,6 +65,61 @@ impl SqliteStore {
sqlx::query(&query).execute(&self.pool).await?;
Ok(())
}

async fn delete_expired(&self) -> sqlx::Result<()> {
let query = format!(
r#"
delete from {table_name}
where expiration_time < datetime('now', 'utc')
"#,
table_name = self.table_name
);
sqlx::query(&query).execute(&self.pool).await?;
Ok(())
}

#[cfg(feature = "tokio")]
/// This function will keep running indefinitely, deleting expired rows and
/// then waiting for the specified period before deleting again.
///
/// Generally this will be used as a task, for example via
/// `tokio::task::spawn`.
///
/// # Arguments
///
/// * `period` - The interval at which expired rows should be deleted.
///
/// # Errors
///
/// This function returns a `Result` that contains an error of type
/// `sqlx::Error` if the deletion operation fails.
///
/// # Examples
///
/// ```rust,no_run
/// use tower_sessions::{sqlx::SqlitePool, SqliteStore};
///
/// # tokio_test::block_on(async {
/// let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
/// let session_store = SqliteStore::new(pool);
///
/// tokio::task::spawn(
/// session_store
/// .clone()
/// .continuously_delete_expired(tokio::time::Duration::from_secs(60)),
/// );
/// # })
/// ```
pub async fn continuously_delete_expired(
self,
period: tokio::time::Duration,
) -> Result<(), sqlx::Error> {
let mut interval = tokio::time::interval(period);
loop {
self.delete_expired().await?;
interval.tick().await;
}
}
}

#[async_trait]
Expand Down

0 comments on commit e610b7f

Please sign in to comment.