Skip to content

Commit

Permalink
chore(su): readme change for local store #987
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Oct 15, 2024
1 parent 4f55670 commit 6990b96
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 20 deletions.
6 changes: 6 additions & 0 deletions servers/su/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ Create a .env file with the following variables, or set them in the OS:
- `ENABLE_PROCESS_ASSIGNMENT` enables AOP-6 boot loader, if enabled, the Process on a new spawn will become the first Message/Nonce in its message list. It will get an Assignment.
- `ARWEAVE_URL_LIST` list of arweave urls that have tx access aka url/txid returns the tx. Used by gateway calls for checking transactions etc...

## Experimental environment variables
To use the expirimental fully local storage system set the following evnironment variables.
- `USE_LOCAL_STORE` if true the SU will operate on purely RocksDB
- `SU_FILE_DB_DIR` a local RocksDB directory of bundles
- `SU_INDEX_DB_DIR` a local index of processes and messages

> You can also use a `.env` file to set environment variables when running in
> development mode, See the `.env.example` for an example `.env`
Expand Down
28 changes: 12 additions & 16 deletions servers/su/src/domain/clients/local_store/store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use rocksdb::{Options, DB};
use tokio::time::{sleep, Duration};

Expand All @@ -27,11 +26,6 @@ pub struct LocalStoreClient {
and Messages, only public for migration purposes
*/
pub index_db: DB,
/*
A cache of the latest messages for a process based
on the last from parameter that has been queried
*/
pub index_cache: DashMap<String, Vec<(String, String)>>
}

impl From<rocksdb::Error> for StoreErrorType {
Expand Down Expand Up @@ -68,13 +62,10 @@ impl LocalStoreClient {
Err(e) => panic!("failed to open cf with options: {}", e),
};

let index_cache = DashMap::new();

Ok(LocalStoreClient {
_logger: logger,
file_db,
index_db,
index_cache
})
}

Expand Down Expand Up @@ -176,7 +167,7 @@ impl LocalStoreClient {
for querying message ranges for the /processid
message list
*/
fn fetch_message_range(
async fn fetch_message_range(
&self,
process_id: &String,
from: &Option<String>,
Expand Down Expand Up @@ -235,7 +226,7 @@ impl LocalStoreClient {
}
}

paginated_keys.push((key_str, assignment_id));
paginated_keys.push((key_str.clone(), assignment_id));
count += 1;

match limit {
Expand Down Expand Up @@ -450,8 +441,9 @@ impl DataStore for LocalStoreClient {
actual_limit -= 1;
}

let (paginated_keys, has_next_page) =
self.fetch_message_range(process_id, from, to, &Some(actual_limit))?;
let (paginated_keys, has_next_page) = self
.fetch_message_range(process_id, from, to, &Some(actual_limit))
.await?;

/*
Fetch the messages for each paginated key. This
Expand Down Expand Up @@ -482,9 +474,13 @@ impl DataStore for LocalStoreClient {
were pulling all the message keys into memory and
picking the latest one.
*/
fn get_latest_message(&self, process_id: &str) -> Result<Option<Message>, StoreErrorType> {
let (paginated_keys, _) =
self.fetch_message_range(&process_id.to_string(), &None, &None, &None)?;
async fn get_latest_message(
&self,
process_id: &str,
) -> Result<Option<Message>, StoreErrorType> {
let (paginated_keys, _) = self
.fetch_message_range(&process_id.to_string(), &None, &None, &None)
.await?;

if paginated_keys.len() < 1 {
return Ok(None);
Expand Down
1 change: 0 additions & 1 deletion servers/su/src/domain/clients/local_store/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ mod tests {

let (process_bundle, message_bundles) = bundle_list();
let test_process = Process::from_bytes(process_bundle.clone())?;
println!("{:?}", test_process);
client.save_process(&test_process, &process_bundle)?;

for bundle in message_bundles.iter() {
Expand Down
5 changes: 4 additions & 1 deletion servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,10 @@ impl DataStore for StoreClient {
}
}

fn get_latest_message(&self, process_id_in: &str) -> Result<Option<Message>, StoreErrorType> {
async fn get_latest_message(
&self,
process_id_in: &str,
) -> Result<Option<Message>, StoreErrorType> {
use super::schema::messages::dsl::*;
/*
This must use get_conn because it needs
Expand Down
5 changes: 4 additions & 1 deletion servers/su/src/domain/core/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ pub trait DataStore: Send + Sync {
limit: &Option<i32>,
) -> Result<PaginatedMessages, StoreErrorType>;
fn get_message(&self, message_id_in: &str) -> Result<Message, StoreErrorType>;
fn get_latest_message(&self, process_id_in: &str) -> Result<Option<Message>, StoreErrorType>;
async fn get_latest_message(
&self,
process_id_in: &str,
) -> Result<Option<Message>, StoreErrorType>;
fn check_existing_message(&self, message_id: &String) -> Result<(), StoreErrorType>;
}

Expand Down
2 changes: 1 addition & 1 deletion servers/su/src/domain/core/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl ProcessScheduler {

(cached_info.schedule_info.epoch, new_nonce, new_hash_chain)
} else {
let latest_message = match self.deps.data_store.get_latest_message(&id) {
let latest_message = match self.deps.data_store.get_latest_message(&id).await {
Ok(m) => m,
Err(e) => return Err(format!("{:?}", e)),
};
Expand Down

0 comments on commit 6990b96

Please sign in to comment.