diff --git a/compliant-reward-distribution/indexer/CHANGELOG.md b/compliant-reward-distribution/indexer/CHANGELOG.md index 2fc5b0c3..22bd8916 100644 --- a/compliant-reward-distribution/indexer/CHANGELOG.md +++ b/compliant-reward-distribution/indexer/CHANGELOG.md @@ -2,4 +2,4 @@ ## 0.1.0 -- Initial `indexer` and `server`. +- Initial `indexer`. diff --git a/compliant-reward-distribution/indexer/Cargo.lock b/compliant-reward-distribution/indexer/Cargo.lock index e43e4a6a..5fb986d9 100644 --- a/compliant-reward-distribution/indexer/Cargo.lock +++ b/compliant-reward-distribution/indexer/Cargo.lock @@ -1329,20 +1329,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "handlebars" -version = "4.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" -dependencies = [ - "log", - "pest", - "pest_derive", - "serde", - "serde_json", - "thiserror", -] - [[package]] name = "hashbrown" version = "0.11.2" @@ -1515,12 +1501,10 @@ version = "0.2.0" dependencies = [ "anyhow", "axum", - "chrono", "clap", "concordium-rust-sdk", "deadpool-postgres", "futures", - "handlebars", "http", "serde", "serde_json", @@ -1923,51 +1907,6 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" -[[package]] -name = "pest" -version = "2.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56f8023d0fb78c8e03784ea1c7f3fa36e68a723138990b8d5a47d916b651e7a8" -dependencies = [ - "memchr", - "thiserror", - "ucd-trie", -] - -[[package]] -name = "pest_derive" -version = "2.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0d24f72393fd16ab6ac5738bc33cdb6a9aa73f8b902e8fe29cf4e67d7dd1026" -dependencies = [ - "pest", - "pest_generator", -] - -[[package]] -name = "pest_generator" -version = "2.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdc17e2a6c7d0a492f0158d7a4bd66cc17280308bbaff78d5bef566dca35ab80" -dependencies = [ - "pest", - "pest_meta", - "proc-macro2", - "quote", - "syn 2.0.48", -] - -[[package]] -name = "pest_meta" -version = "2.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "934cd7631c050f4674352a6e835d5f6711ffbfb9345c2fc0107155ac495ae293" -dependencies = [ - "once_cell", - "pest", - "sha2", -] - [[package]] name = "phf" version = "0.11.2" @@ -3093,12 +3032,6 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" -[[package]] -name = "ucd-trie" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" - [[package]] name = "unicase" version = "2.7.0" diff --git a/compliant-reward-distribution/indexer/Cargo.toml b/compliant-reward-distribution/indexer/Cargo.toml index 57fa0f51..6091915d 100644 --- a/compliant-reward-distribution/indexer/Cargo.toml +++ b/compliant-reward-distribution/indexer/Cargo.toml @@ -22,7 +22,6 @@ tower-http = { version = "0.4", features = [ http = "0.2" tonic = { version = "0.10", features = ["tls-roots", "tls"] } thiserror = "1.0" -chrono = "0.4" concordium-rust-sdk = { version = "4.2"} tokio = { version = "1.35", features = ["rt-multi-thread", "macros", "sync", "signal"] } tokio-postgres = { version = "0.7", features = [ @@ -30,4 +29,3 @@ tokio-postgres = { version = "0.7", features = [ "with-chrono-0_4", ] } deadpool-postgres = "0.11" -handlebars = "4.5" diff --git a/compliant-reward-distribution/indexer/README.md b/compliant-reward-distribution/indexer/README.md index 81893e97..58665e5a 100644 --- a/compliant-reward-distribution/indexer/README.md +++ b/compliant-reward-distribution/indexer/README.md @@ -1,9 +1,5 @@ ## Compliant-Reward-Distribution Indexer -There are two binaries in this project. An `indexer` that indexes data into a database and a `server` that serves data from the database. - -The easiest way to run the `indexer` and `server` is to use [docker-compose](https://docs.docker.com/compose/) as described in the [README.md](../README.md) file. - ## Prerequisites - `PostgreSQL` installed or running it in a docker container: https://www.postgresql.org/download/ @@ -24,7 +20,7 @@ Alternatively, you can run the Postgres database in a docker container. The comm docker run -p 5432:5432 -e POSTGRES_PASSWORD=password -e POSTGRES_DB="indexer" --rm postgres ``` -## Build the `indexer` and `server` +## Build the `indexer` To build the tools make sure you have the repository submodules initialized @@ -38,19 +34,17 @@ The tool can be built by running cargo build --release ``` -This will produce two binaries (`indexer` and `server`) in the `target/release` directory. +This will produce the binaries (`indexer`) in the `target/release` directory. # The `indexer` binary -It is a tool for indexing event data from the track and trace contract into a postgres database. The database is configured with the tables from the file `../resources/schema.sql`. The monitored events `ItemStatusChangedEvent` and `ItemCreatedEvent` are indexed in their respective tables. A third table `settings` exists to store global configurations (e.g.: the contract address, latest block processed, and the genesis block hash). +It is a tool for indexing newly created accounts on Concordium into a postgres database. The database is configured with the tables from the file `../resources/schema.sql`. A table `settings` exists to store global configurations. The global configurations are set when the indexer is started for the first time. Re-starting the indexer will check if its current settings are compatible will the stored indexer settings to prevent corrupting the database. In addition, the settings can be queried by the front end to check compatibility. -When the indexer is started for the first time, it will look up when the smart contract instance was created and use that block as the starting block. When the indexer is re-started with the same database settings, it resumes indexing from the `latest_processed_block_height+1` as stored in the database. +When the indexer is started for the first time, it will look up the current block height and start indexing from that block. When the indexer is re-started with the same database settings, it resumes indexing from the `latest_processed_block_height+1` as stored in the database. -All monitored events in a block are atomically added in one database transaction to postgres. This ensures a simple recovery process since we always process the complete block or roll back the database to the beginning of the block. In addition, the indexer has a re-try logic and will try to re-connect to the database pool and re-submit any failed database transaction. - -Each event can be uniquely identified by the `transaction_hash` and `event_index`. The `event_index` is the index from the array of logged events in a transaction. +All newly created accounts in a block are atomically added in one database transaction to postgres. This ensures a simple recovery process since we always process the complete block or roll back the database to the beginning of the block. In addition, the indexer has a re-try logic and will try to re-connect to the database pool and re-submit any failed database transaction. ## Run the `indexer` @@ -67,47 +61,3 @@ There are a few options to configure the indexer: - `--db-connection` should specify your postgreSQL database connection. If not specified, the default value `host=localhost dbname=indexer user=postgres password=password port=5432` is used. - `--log-level` specifies the maximum log level. Possible values are: `trace`, `debug`, `info`, `warn`, and `error`. If not specified, the default value `info` is used. - -## The `server` binary - -You have to build the front end in the folder `../frontend` before running this command. - -## Run the `server` - -```console -cargo run --bin server -``` - -## Configure the `server` - -There are a few options to configure the server: - -- `--listen-address` is the listen address where the server will be listen on. If not specified, the default value `0.0.0.0:8080` is used. - -- `--frontend` is the path to the directory where the frontend assets are located. If not specified, the default value `../frontend/dist` is used. - -- `--db-connection` should specify your postgreSQL database connection. If not specified, the default value `host=localhost dbname=indexer user=postgres password=password port=5432` is used. - -- `--log-level` specifies the maximum log level. Possible values are: `trace`, `debug`, `info`, `warn`, and `error`. If not specified, the default value `info` is used. - -The following option are also available, which are forwarded to the frontend: - -- `--node` specifies the gRPC interface of a Concordium node. (Defaults to `https://grpc.testnet.concordium.com:20000`) - -- `--network` specifies the network to use, i.e., `mainnet` or `testnet`. Defaults to `testnet`. - -- `--contract-address` specifies the contract address of the track and trace contract (format is `<1234,0>`). - -- `--sponsored-transaction-backend` specifies the endpoint to the sponsored transaction backend. (Defaults to `http://localhost:8000`). - -An example of running the service with basic settings and testnet node would be: - -``` console -cargo run --bin server -- --contract-address -``` - -An example to run the service with some filled in example settings would be: - -``` console -cargo run --bin server -- --contract-address "<8901,0>" -``` diff --git a/compliant-reward-distribution/indexer/resources/schema.sql b/compliant-reward-distribution/indexer/resources/schema.sql index 14dfda6a..b2d83765 100644 --- a/compliant-reward-distribution/indexer/resources/schema.sql +++ b/compliant-reward-distribution/indexer/resources/schema.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS settings ( -- Re-starting the indexer will check if its settings are compatible with -- the stored indexer setting to prevent corrupting the database. genesis_block_hash BYTEA NOT NULL, - -- Start block that was indexed. + -- Start block height that was indexed. start_block_height INT8, -- The last block height that was processed. latest_processed_block_height INT8 @@ -37,53 +37,3 @@ CREATE TABLE IF NOT EXISTS accounts ( -- claiming with different accounts for the same identity. uniqueness_hash BYTEA ); - -CREATE TABLE IF NOT EXISTS item_status_changed_events ( - -- Primary key. - id INT8 PRIMARY KEY, - -- The timestamp of the block the event was included in. - block_time TIMESTAMP WITH TIME ZONE NOT NULL, - -- The transaction hash that the event was included in. - transaction_hash BYTEA NOT NULL, - -- The index from the array of logged events in a transaction. - event_index INT8 NOT NULL, - -- The item's id as logged in the event. - item_id INT8 NOT NULL -); - --- Table containing item_status_changed_events successfully submitted to the database from the contract monitored. -CREATE TABLE IF NOT EXISTS item_status_changed_events ( - -- Primary key. - id INT8 PRIMARY KEY, - -- The timestamp of the block the event was included in. - block_time TIMESTAMP WITH TIME ZONE NOT NULL, - -- The transaction hash that the event was included in. - transaction_hash BYTEA NOT NULL, - -- The index from the array of logged events in a transaction. - event_index INT8 NOT NULL, - -- The item's id as logged in the event. - item_id INT8 NOT NULL -); - --- Table containing item_created_events successfully submitted to the database from the contract monitored. -CREATE TABLE IF NOT EXISTS item_created_events ( - -- Primary key. - id INT8 PRIMARY KEY, - -- The timestamp of the block the event was included in. - block_time TIMESTAMP WITH TIME ZONE NOT NULL, - -- The transaction hash that the event was included in. - transaction_hash BYTEA NOT NULL, - -- The index from the array of logged events in a transaction. - event_index INT8 NOT NULL, - -- The item's id as logged in the event. - item_id INT8 NOT NULL, - -- The item's metadata_url as logged in the event. - metadata_url BYTEA NOT NULL -); - --- -- Improve performance on queries for events with given item_id. --- CREATE INDEX IF NOT EXISTS item_changed_index ON item_status_changed_events (item_id); --- -- Improve performance on queries for events with given current status. --- CREATE INDEX IF NOT EXISTS current_status_index ON item_status_changed_events (new_status); --- -- Improve performance on queries for events with given item_id. --- CREATE INDEX IF NOT EXISTS item_created_index ON item_created_events (item_id); diff --git a/compliant-reward-distribution/indexer/src/bin/server.rs b/compliant-reward-distribution/indexer/src/bin/server.rs deleted file mode 100644 index 7132226a..00000000 --- a/compliant-reward-distribution/indexer/src/bin/server.rs +++ /dev/null @@ -1,361 +0,0 @@ -use ::indexer::db::{DatabaseError, DatabasePool, StoredItemStatusChangedEvent}; -use anyhow::Context; -use axum::{ - extract::{rejection::JsonRejection, State}, - http, - response::Html, - routing::{get, post}, - Json, Router, -}; -use clap::Parser; -use concordium_rust_sdk::types::ContractAddress; -use handlebars::{no_escape, Handlebars}; -use http::StatusCode; -use indexer::db::StoredItemCreatedEvent; -use std::fs; -use tower_http::services::ServeDir; - -/// The maximum number of events allowed in a request to the database. -const MAX_REQUEST_LIMIT: u32 = 30; - -/// Server struct to store the db_pool. -#[derive(Clone, Debug)] -pub struct Server { - db_pool: DatabasePool, -} - -/// Errors that this server can produce. -#[derive(Debug, thiserror::Error)] -pub enum ServerError { - #[error("Database error from postgres: {0}")] - DatabaseErrorPostgres(tokio_postgres::Error), - #[error("Database error in type conversion: {0}")] - DatabaseErrorTypeConversion(String), - #[error("Database error in configuration: {0}")] - DatabaseErrorConfiguration(anyhow::Error), - #[error("Failed to extract json object: {0}")] - JsonRejection(#[from] JsonRejection), - #[error("The requested events to the database were above the limit {0}")] - MaxRequestLimit(u32), -} - -/// Mapping DatabaseError to ServerError -impl From for ServerError { - fn from(e: DatabaseError) -> Self { - match e { - DatabaseError::Postgres(e) => ServerError::DatabaseErrorPostgres(e), - DatabaseError::TypeConversion(e) => ServerError::DatabaseErrorTypeConversion(e), - DatabaseError::Configuration(e) => ServerError::DatabaseErrorConfiguration(e), - } - } -} - -impl axum::response::IntoResponse for ServerError { - fn into_response(self) -> axum::response::Response { - let r = match self { - ServerError::DatabaseErrorPostgres(error) => { - tracing::error!("Internal error: {error}."); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json("Internal error".to_string()), - ) - } - ServerError::DatabaseErrorTypeConversion(error) => { - tracing::error!("Internal error: {error}."); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json("Internal error".to_string()), - ) - } - ServerError::DatabaseErrorConfiguration(error) => { - tracing::error!("Internal error: {error}."); - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json("Internal error".to_string()), - ) - } - ServerError::JsonRejection(error) => { - tracing::debug!("Bad request: {error}."); - (StatusCode::BAD_REQUEST, Json(format!("{}", error))) - } - ServerError::MaxRequestLimit(error) => { - tracing::debug!("Bad request: {error}."); - (StatusCode::BAD_REQUEST, Json(format!("{}", error))) - } - }; - r.into_response() - } -} - -/// Command line configuration of the application. -#[derive(Debug, clap::Parser)] -#[command(author, version, about)] -struct Args { - /// The address to listen on. - #[clap( - long = "listen-address", - default_value = "0.0.0.0:8080", - help = "Address where the server will listen on.", - env = "CCD_SERVER_LISTEN_ADDRESS" - )] - listen_address: std::net::SocketAddr, - #[clap( - long = "frontend", - default_value = "../frontend/dist", - help = "Path to the directory where frontend assets are located.", - env = "CCD_SERVER_FRONTEND" - )] - frontend_assets: std::path::PathBuf, - /// Database connection string. - #[arg( - long = "db-connection", - default_value = "host=localhost dbname=indexer user=postgres password=password port=5432", - help = "A connection string detailing the connection to the database used by the \ - application.", - env = "CCD_SERVER_DB_CONNECTION" - )] - db_connection: tokio_postgres::config::Config, - /// Maximum log level. - #[clap( - long = "log-level", - default_value = "info", - help = "The maximum log level. Possible values are: `trace`, `debug`, `info`, `warn`, and \ - `error`.", - env = "CCD_SERVER_LOG_LEVEL" - )] - log_level: tracing_subscriber::filter::LevelFilter, - /// The node used for querying (passed to frontend). - #[arg( - long = "node", - default_value = "https://grpc.testnet.concordium.com:20000", - help = "The endpoint is expected to point to concordium node grpc v2 API's. The endpoint \ - is built into the frontend served, which means the node must enable grpc-web to \ - be used successfully.", - env = "CCD_SERVER_NODE" - )] - node_endpoint: concordium_rust_sdk::v2::Endpoint, - /// The network to connect users to (passed to frontend). - #[clap( - long = "network", - default_value_t = concordium_rust_sdk::web3id::did::Network::Testnet, - help = "The network to connect users to (passed to frontend). Possible values: testnet, mainnet", - env = "CCD_SERVER_NETWORK", - )] - network: concordium_rust_sdk::web3id::did::Network, - /// The contract address of the track and trace contract (passed to - /// frontend). - #[clap( - long = "contract-address", - help = "The contract address of the track and trace contract. Expected format '<123,0>'.", - env = "CCD_SERVER_CONTRACT_ADDRESS" - )] - contract_address: ContractAddress, - /// The sponsored transaction backend (passed to frontend). - #[arg( - long = "sponsored-transaction-backend", - default_value = "http://localhost:8000", - help = "The endpoint is expected to point to a sponsored transaction backend.", - env = "CCD_SERVER_SPONSORED_TRANSACTION_BACKEND" - )] - sponsored_transaction_backend: concordium_rust_sdk::v2::Endpoint, -} - -impl Args { - /// Creates the JSON object required by the frontend. - fn as_frontend_config(&self) -> serde_json::Value { - let config = serde_json::json!({ - "node": self.node_endpoint.uri().to_string(), - "network": self.network, - "contractAddress": self.contract_address, - "sponsoredTransactionBackend": self.sponsored_transaction_backend.uri().to_string(), - }); - let config_string = - serde_json::to_string(&config).expect("JSON serialization always succeeds"); - serde_json::json!({ "config": config_string }) - } -} - -/// The main function. -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let app = Args::parse(); - - { - use tracing_subscriber::prelude::*; - let log_filter = tracing_subscriber::filter::Targets::new() - .with_target(module_path!(), app.log_level) - .with_target("tower_http", app.log_level); - - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer()) - .with(log_filter) - .init(); - } - - // Establish connection to the postgres database. - let db_pool = DatabasePool::create(app.db_connection.clone(), 1, true) - .await - .context("Could not create database pool")?; - - let state = Server { db_pool }; - - tracing::info!("Starting server..."); - - // Insert the frontend config into `index.html` using the handlebars - // placeholder. Then render the `index.html` and assets. Render `index.html` - // file and `assets` folder. - let index_template = fs::read_to_string(app.frontend_assets.join("index.html")) - .context("Frontend was not built or wrong path to the frontend files.")?; - let mut reg = Handlebars::new(); - // Prevent handlebars from escaping inserted objects. - reg.register_escape_fn(no_escape); - - let index_html = reg.render_template(&index_template, &app.as_frontend_config())?; - - let serve_dir_service = ServeDir::new(app.frontend_assets.join("assets")); - - let router = Router::new() - .route("/api/getItemStatusChangedEvents", post(get_item_status_changed_events)) - .route("/api/getItemCreatedEvent", post(get_item_created_event)) - .route("/health", get(health)) - .nest_service("/assets", serve_dir_service) - .fallback(get(|| async { Html(index_html) })) - .with_state(state) - .layer( - tower_http::trace::TraceLayer::new_for_http() - .make_span_with(tower_http::trace::DefaultMakeSpan::new()) - .on_response(tower_http::trace::DefaultOnResponse::new()), - ) - .layer(tower_http::limit::RequestBodyLimitLayer::new(1_000_000)) // at most 1000kB of data. - .layer(tower_http::compression::CompressionLayer::new()); - - tracing::info!("Listening at {}", app.listen_address); - - let shutdown_signal = set_shutdown()?; - - // Create the server. - axum::Server::bind(&app.listen_address) - .serve(router.into_make_service()) - .with_graceful_shutdown(shutdown_signal) - .await?; - - Ok(()) -} - -/// Construct a future for shutdown signals (for unix: SIGINT and SIGTERM) (for -/// windows: ctrl c and ctrl break). The signal handler is set when the future -/// is polled and until then the default signal handler. -fn set_shutdown() -> anyhow::Result> { - use futures::FutureExt; - - #[cfg(unix)] - { - use tokio::signal::unix as unix_signal; - - let mut terminate_stream = unix_signal::signal(unix_signal::SignalKind::terminate())?; - let mut interrupt_stream = unix_signal::signal(unix_signal::SignalKind::interrupt())?; - - Ok(async move { - futures::future::select( - Box::pin(terminate_stream.recv()), - Box::pin(interrupt_stream.recv()), - ) - .map(|_| ()) - .await - }) - } - - #[cfg(windows)] - { - use tokio::signal::windows as windows_signal; - - let mut ctrl_break_stream = windows_signal::ctrl_break()?; - let mut ctrl_c_stream = windows_signal::ctrl_c()?; - - Ok(async move { - futures::future::select( - Box::pin(ctrl_break_stream.recv()), - Box::pin(ctrl_c_stream.recv()), - ) - .map(|_| ()) - .await - }) - } -} - -/// Struct returned by the `health` endpoint. It returns the version of the -/// backend. -#[derive(serde::Serialize)] -struct Health { - version: &'static str, -} - -/// Handles the `health` endpoint, returning the version of the backend. -async fn health() -> Json { - Json(Health { - version: env!("CARGO_PKG_VERSION"), - }) -} - -/// Struct returned by the `getItemStatusChangedEvents` endpoint. It returns a -/// vector of ItemStatusChangedEvents from the database if present. -#[derive(serde::Serialize)] -struct StoredItemStatusChangedEventsReturnValue { - data: Vec, -} - -/// Parameter struct for the `getItemStatusChangedEvents` endpoint send in the -/// request body. -#[derive(serde::Deserialize)] -struct GetItemstatusChangedEventsParam { - item_id: u64, - limit: u32, - offset: u32, -} - -/// Handles the `getItemStatusChangedEvents` endpoint, returning a vector of -/// ItemStatusChangedEvents from the database if present. -async fn get_item_status_changed_events( - State(state): State, - request: Result, JsonRejection>, -) -> Result, ServerError> { - let db = state.db_pool.get().await?; - - let Json(param) = request?; - - if param.limit > MAX_REQUEST_LIMIT { - return Err(ServerError::MaxRequestLimit(MAX_REQUEST_LIMIT)); - } - - let database_result = db - .get_item_status_changed_events_submissions(param.item_id, param.limit, param.offset) - .await?; - - Ok(Json(StoredItemStatusChangedEventsReturnValue { - data: database_result, - })) -} - -/// Struct returned by the `getItemCreatedEvent` endpoint. It returns the -/// itemCreatedEvent from the database if present. -#[derive(serde::Serialize)] -struct StoredItemCreatedEventReturnValue { - data: Option, -} - -/// Handles the `getItemCreatedEvent` endpoint, returning the itemCreatedEvent -/// from the database if present. -async fn get_item_created_event( - State(state): State, - request: Result, JsonRejection>, -) -> Result, ServerError> { - let db = state.db_pool.get().await?; - - let Json(item_id) = request?; - - let database_result = db.get_item_created_event_submission(item_id).await?; - - Ok(Json(StoredItemCreatedEventReturnValue { - data: database_result, - })) -} diff --git a/compliant-reward-distribution/indexer/src/db.rs b/compliant-reward-distribution/indexer/src/db.rs index d049313b..98531cf5 100644 --- a/compliant-reward-distribution/indexer/src/db.rs +++ b/compliant-reward-distribution/indexer/src/db.rs @@ -1,13 +1,5 @@ use anyhow::Context; -use chrono::{DateTime, Utc}; -use concordium_rust_sdk::{ - cis2::MetadataUrl, - smart_contracts::common::from_bytes, - types::{ - hashes::{BlockHash, TransactionHash}, - AbsoluteBlockHeight, - }, -}; +use concordium_rust_sdk::types::{hashes::BlockHash, AbsoluteBlockHeight}; use deadpool_postgres::{GenericClient, Object}; use serde::Serialize; use tokio_postgres::{types::ToSql, NoTls}; @@ -62,78 +54,6 @@ impl TryFrom for StoredConfiguration { } } -/// A `StoredItemStatusChanged` event stored in the database. -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct StoredItemStatusChangedEvent { - /// The timestamp of the block the event was included in. - pub block_time: DateTime, - /// The transaction hash that the event was recorded in. - pub transaction_hash: TransactionHash, - /// The index from the array of logged events in a transaction. - pub event_index: u64, - /// The item's id as logged in the event. - pub item_id: u64, -} - -impl TryFrom for StoredItemStatusChangedEvent { - type Error = DatabaseError; - - // Conversion from the postgres row to the `StoredItemStatusChangedEvent` type. - fn try_from(value: tokio_postgres::Row) -> DatabaseResult { - let raw_transaction_hash: &[u8] = value.try_get("transaction_hash")?; - let raw_item_id: i64 = value.try_get("item_id")?; - let raw_event_index: i64 = value.try_get("event_index")?; - - let events = Self { - block_time: value.try_get("block_time")?, - transaction_hash: raw_transaction_hash - .try_into() - .map_err(|_| DatabaseError::TypeConversion("transaction_hash".to_string()))?, - event_index: raw_event_index as u64, - item_id: raw_item_id as u64, - }; - Ok(events) - } -} - -/// A `StoredItemCreated` event stored in the database. -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct StoredItemCreatedEvent { - /// The timestamp of the block the event was included in. - pub block_time: DateTime, - /// The transaction hash that the event was recorded in. - pub transaction_hash: TransactionHash, - /// The index from the array of logged events in a transaction. - pub event_index: u64, - /// The item's id as logged in the event. - pub item_id: u64, - /// The item's metadata_url as logged in the event. - pub metadata_url: Option, -} - -impl TryFrom for StoredItemCreatedEvent { - type Error = DatabaseError; - - // Conversion from the postgres row to the `StoredItemCreatedEvent` type. - fn try_from(value: tokio_postgres::Row) -> DatabaseResult { - let raw_transaction_hash: &[u8] = value.try_get("transaction_hash")?; - let raw_item_id: i64 = value.try_get("item_id")?; - let raw_event_index: i64 = value.try_get("event_index")?; - - let events = Self { - block_time: value.try_get("block_time")?, - transaction_hash: raw_transaction_hash - .try_into() - .map_err(|_| DatabaseError::TypeConversion("transaction_hash".to_string()))?, - event_index: raw_event_index as u64, - item_id: raw_item_id as u64, - metadata_url: from_bytes(value.try_get("metadata_url")?) - .map_err(|_| DatabaseError::TypeConversion("metadata_url".to_string()))?, - }; - Ok(events) - } -} - /// Database client wrapper pub struct Database { /// The database client @@ -186,68 +106,6 @@ impl Database { opt_row.map(StoredConfiguration::try_from).transpose() } - - /// Get all [`StoredItemStatusChangedEvents`] by item id. - /// The query enforces pagination with the `limit` and `offset` parameter. - /// Note: This function will be used by the http server and the - /// `#[allow(dead_code)]` is only temporary until the http server is - /// developed. - #[allow(dead_code)] - pub async fn get_item_status_changed_events_submissions( - &self, - item_id: u64, - limit: u32, - offset: u32, - ) -> DatabaseResult> { - let get_item_status_changed_event_submissions = self - .client - .prepare_cached( - "SELECT block_time, transaction_hash, event_index, item_id, new_status, from \ - item_status_changed_events WHERE item_id = $1 LIMIT $2 OFFSET $3", - ) - .await?; - let params: [&(dyn ToSql + Sync); 3] = - [&(item_id as i64), &(limit as i64), &(offset as i64)]; - - let rows = self - .client - .query(&get_item_status_changed_event_submissions, ¶ms) - .await?; - - let result: Vec = rows - .into_iter() - .map(StoredItemStatusChangedEvent::try_from) - .collect::, _>>()?; - - Ok(result) - } - - /// Get the [`StoredItemCreatedEvent`] by item id. - /// An error is returned if there are more than one row in the database - /// matching the query. Note: This function will be used by the http - /// server and the `#[allow(dead_code)]` is only temporary until the - /// http server is developed. - #[allow(dead_code)] - pub async fn get_item_created_event_submission( - &self, - item_id: u64, - ) -> DatabaseResult> { - let get_item_created_event_submissions = self - .client - .prepare_cached( - "SELECT block_time, transaction_hash, event_index, item_id, metadata_url, \ - initial_status from item_created_events WHERE item_id = $1", - ) - .await?; - let params: [&(dyn ToSql + Sync); 1] = [&(item_id as i64)]; - - let opt_row = self - .client - .query_opt(&get_item_created_event_submissions, ¶ms) - .await?; - - opt_row.map(StoredItemCreatedEvent::try_from).transpose() - } } /// Representation of a database pool