diff --git a/servers/su/src/domain/clients/local_store.rs b/servers/su/src/domain/clients/local_store.rs index a4a844271..0b6a33272 100644 --- a/servers/su/src/domain/clients/local_store.rs +++ b/servers/su/src/domain/clients/local_store.rs @@ -13,14 +13,16 @@ pub struct LocalStoreClient { logger: Arc, /* A RocksDB instance that is a key value store - of ANS-104 bundles + of ANS-104 bundles, only public for migration + purposes */ - file_db: DB, + pub file_db: DB, /* A RocksDB instance that is an index for querying - and ordering of Process's and Messages + and ordering of Process's and Messages, only public + for migration purposes */ - index_db: DB, + pub index_db: DB, } impl From for StoreErrorType { @@ -98,6 +100,59 @@ impl LocalStoreClient { }) } + // Helper method to generate process key + fn generate_process_key(&self, process_id: &str) -> String { + format!("process:{}", process_id) + } + + // Helper method to generate message composite key + fn msg_composite_key(&self, message_id: &str, assignment_id: &str) -> String { + format!("messages:{}:{}", message_id, assignment_id) + } + + // Helper method to generate message key + fn msg_prefix_key(&self, message_id: &str) -> String { + format!("messages:{}:", message_id) + } + + // Helper method to generate process assignment key + fn assignment_key(&self, assignment_id: &str) -> String { + format!("assignments:{}", assignment_id) + } + + // Helper method to generate process assignment key + fn proc_assignment_key(&self, assignment_id: &str) -> String { + format!("process_assignments:{}", assignment_id) + } + + // Helper method to generate process order key, taking the whole Process + fn proc_order_key(&self, process: &Process) -> Result { + let process_id = &process.process.process_id; + let assignment_id = process.assignment_id()?; + let timestamp = process.timestamp()?; + let epoch = process.epoch()?; + let nonce = process.nonce()?; + + Ok(format!( + "processes:{}:{:010}:{:010}:{:015}:{}", + process_id, epoch, nonce, timestamp, assignment_id + )) + } + + // Helper method to generate message order key, taking the whole Message + fn msg_order_key(&self, message: &Message) -> Result { + let process_id = message.process_id()?; + let assignment_id = message.assignment_id()?; + let timestamp = message.timestamp()?; + let epoch = message.epoch()?; + let nonce = message.nonce()?; + + Ok(format!( + "process_messages:{}:{:010}:{:010}:{:015}:{}", + process_id, epoch, nonce, timestamp, assignment_id + )) + } + fn fetch_message_range( &self, process_id: &String, @@ -169,34 +224,47 @@ impl DataStore for LocalStoreClient { fn save_process(&self, process: &Process, bundle: &[u8]) -> Result { let process_id = &process.process.process_id; let assignment_id = process.assignment_id()?; - let timestamp = process.timestamp()?; - let epoch = process.epoch()?; - let nonce = process.nonce()?; - // Save by process_id, but only store a reference to the assignment_id - let process_key = format!("process:{}", process_id); + let process_key = self.generate_process_key(process_id); self.index_db .put(process_key.as_bytes(), assignment_id.as_bytes())?; - // Store by process_id, epoch, nonce, and timestamp, storing a reference to assignment_id - let process_order_key = format!( - "processes:{}:{:010}:{:010}:{:015}:{}", - process_id, epoch, nonce, timestamp, assignment_id - ); - + let process_order_key = self.proc_order_key(process)?; self.index_db .put(process_order_key.as_bytes(), assignment_id.as_bytes())?; - // Store the binary bundle in file_db (by assignment_id) - let assignment_key = format!("process_assignments:{}", assignment_id); + let assignment_key = self.proc_assignment_key(&assignment_id); self.file_db.put(assignment_key.as_bytes(), bundle)?; Ok("Process saved".to_string()) } + // Modify save_message to use the new msg_order_key method + async fn save_message( + &self, + message: &Message, + bundle_in: &[u8], + ) -> Result { + let message_id = message.message_id()?; + let assignment_id = message.assignment_id()?; + + let message_composite_key = self.msg_composite_key(&message_id, &assignment_id); + self.index_db + .put(message_composite_key.as_bytes(), assignment_id.as_bytes())?; + + let process_order_key = self.msg_order_key(message)?; + self.index_db + .put(process_order_key.as_bytes(), assignment_id.as_bytes())?; + + let assignment_key = self.assignment_key(&assignment_id); + self.file_db.put(assignment_key.as_bytes(), bundle_in)?; + + Ok("Message saved".to_string()) + } + async fn get_process(&self, tx_id: &str) -> Result { - // First, try to fetch the process by assignment_id directly - let assignment_key = format!("process_assignments:{}", tx_id); + // Use assignment_key for assignment_id + let assignment_key = self.assignment_key(tx_id); if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? { // Found the process by assignment_id, deserialize and return it let process: Process = Process::from_bytes(process_bundle)?; @@ -204,14 +272,13 @@ impl DataStore for LocalStoreClient { } // If not found by assignment_id, assume tx_id is a process_id - let process_key = format!("process:{}", tx_id); + let process_key = self.generate_process_key(tx_id); if let Some(assignment_id_bytes) = self.index_db.get(process_key.as_bytes())? { let assignment_id = String::from_utf8(assignment_id_bytes.to_vec())?; // Now fetch the process by assignment_id - let assignment_key = format!("process_assignments:{}", assignment_id); + let assignment_key = self.assignment_key(&assignment_id); if let Some(process_bundle) = self.file_db.get(assignment_key.as_bytes())? { - // Found the process by assignment_id, deserialize and return it let process: Process = Process::from_bytes(process_bundle)?; return Ok(process); } @@ -230,48 +297,16 @@ impl DataStore for LocalStoreClient { } } - async fn save_message( - &self, - message: &Message, - bundle_in: &[u8], - ) -> Result { - let process_id = message.process_id()?; - let message_id = message.message_id()?; - let assignment_id = message.assignment_id()?; - let timestamp = message.timestamp()?; - let epoch = message.epoch()?; - let nonce = message.nonce()?; - - // Save by message_id, but only store a reference to the assignment_id - let message_composite_key = format!("messages:{}:{}", message_id, assignment_id); - self.index_db - .put(message_composite_key.as_bytes(), assignment_id.as_bytes())?; - - // Store by process_id, epoch, nonce, and timestamp, storing a reference to assignment_id - let process_order_key = format!( - "process_messages:{}:{:010}:{:010}:{:015}:{}", - process_id, epoch, nonce, timestamp, assignment_id - ); - self.index_db - .put(process_order_key.as_bytes(), assignment_id.as_bytes())?; - - // Store the binary bundle in file_db (by assignment_id) - let assignment_key = format!("assignments:{}", assignment_id); - self.file_db.put(assignment_key.as_bytes(), bundle_in)?; - - Ok("Message saved".to_string()) - } - fn get_message(&self, tx_id: &str) -> Result { - // Try to fetch the message directly by assignment_id - let assignment_key = format!("assignments:{}", tx_id); + // Use assignment_key for assignment_id + let assignment_key = self.assignment_key(tx_id); if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? { let message: Message = Message::from_bytes(message_bundle)?; return Ok(message); } // If not found by assignment_id, assume tx_id is a message_id - let message_key_prefix = format!("messages:{}:", tx_id); + let message_key_prefix = self.msg_prefix_key(tx_id); let mut iter = self.index_db.prefix_iterator(message_key_prefix.as_bytes()); // Look for the assignment_id for this message_id @@ -279,8 +314,8 @@ impl DataStore for LocalStoreClient { let (_key, assignment_id_bytes) = result?; let assignment_id = String::from_utf8(assignment_id_bytes.to_vec())?; - // Now fetch the message by assignment_id - let assignment_key = format!("assignments:{}", assignment_id); + // Fetch the message using the generated assignment key + let assignment_key = self.assignment_key(&assignment_id); if let Some(message_bundle) = self.file_db.get(assignment_key.as_bytes())? { let message: Message = Message::from_bytes(message_bundle)?; return Ok(message); @@ -322,12 +357,8 @@ impl DataStore for LocalStoreClient { // Fetch the messages for each paginated key for (_, assignment_id) in paginated_keys { - let assignment_key = format!("assignments:{}", assignment_id); - /* - This loop is necessary because it may be that the index - has been built but the message data hasnt finished writing - yet. - */ + let assignment_key = self.assignment_key(&assignment_id); + for _ in 0..10 { if let Some(message_data) = self.index_db.get(assignment_key.as_bytes())? { // Found the message by assignment_id, deserialize and return it @@ -344,12 +375,6 @@ impl DataStore for LocalStoreClient { Ok(PaginatedMessages::from_messages(messages, has_next_page)?) } - /* - Currently this is only running once for each process - that is written to, so it doesn't need to be that - efficient. So it is just pulling the index for the - process into memory and grabbing the last one. - */ fn get_latest_message(&self, process_id: &str) -> Result, StoreErrorType> { let (paginated_keys, _) = self.fetch_message_range(&process_id.to_string(), &None, &None, &None)?; @@ -369,10 +394,15 @@ impl DataStore for LocalStoreClient { } } +/* + This is a migration which moves all data + out of the old data store and into the + local store. +*/ pub mod migration { - use std::{env, io}; - use std::sync::Arc; + use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; use std::time::{Duration, Instant}; use futures::future::join_all; @@ -380,105 +410,216 @@ pub mod migration { use tokio::time::interval; use super::super::store::StoreClient; + use crate::domain::core::dal::{DataItem, Message, Process}; pub async fn migrate_to_local() -> io::Result<()> { let start = Instant::now(); - let data_store = Arc::new( - StoreClient::new_single_connection().expect("Failed to create StoreClient") - ); + let data_store = + Arc::new(StoreClient::new_single_connection().expect("Failed to create StoreClient")); data_store .bytestore .try_read_instance_connect() .expect("Failed to connect to bytestore"); - let local_data_store = Arc::new(super::LocalStoreClient::new().expect("Failed to create LocalStoreClient")); + let local_data_store = + Arc::new(super::LocalStoreClient::new().expect("Failed to create LocalStoreClient")); + let batch_size = 100; let total_count = data_store - .get_message_count() - .expect("Failed to get message count"); - - data_store.logger.log(format!("Total messages to process: {}", total_count)); + .get_message_count() + .expect("Failed to get message count"); - let processed_count = Arc::new(AtomicUsize::new(0)); + data_store + .logger + .log(format!("Total messages to process: {}", total_count)); + let processed_count = Arc::new(AtomicUsize::new(0)); let processed_count_clone = Arc::clone(&processed_count); let data_store_c = Arc::clone(&data_store); + tokio::spawn(async move { let mut interval = interval(Duration::from_secs(10)); - loop { + while processed_count_clone.load(Ordering::Relaxed) < total_count as usize { interval.tick().await; data_store_c.logger.log(format!( "Messages processed update: {}", - processed_count_clone.load(Ordering::SeqCst) + processed_count_clone.load(Ordering::Relaxed) )); - if processed_count_clone.load(Ordering::SeqCst) >= total_count as usize { - break; - } } }); - for batch_start in (0..total_count).step_by(20) { - let batch_end = batch_start + 20 as i64; - - let data_store = Arc::clone(&data_store); - let processed_count = Arc::clone(&processed_count); - - let result = data_store.get_all_messages_no_bundle(batch_start, Some(batch_end)); - - match result { - Ok(messages) => { - let mut save_handles: Vec> = Vec::new(); - for message in messages { - let msg_id = message.0; - let assignment_id = message.1; - let process_id = message.2; - let timestamp = message.3; - let epoch = message.4; - let nonce = message.5; - let hash_chain = message.6; - let data_store = Arc::clone(&data_store); - let processed_count = Arc::clone(&processed_count); - - let binary_key = ( - msg_id.clone(), - assignment_id.clone(), - process_id.clone(), - timestamp.to_string().clone(), - ); - - let handle = tokio::spawn(async move { - // data_store - // .bytestore - // .clone() - // .save_binary( - // msg_id.clone(), - // assignment_id.clone(), - // process_id.clone(), - // timestamp.clone(), - // bundle, - // ) - // .unwrap(); - processed_count.fetch_add(1, Ordering::SeqCst); - }); - - save_handles.push(handle); - } - join_all(save_handles).await; - } - Err(e) => { - data_store - .logger - .error(format!("Error fetching messages: {:?}", e)); - } + for batch_start in (0..total_count).step_by(batch_size) { + if let Ok(messages) = data_store + .get_all_messages_using_bytestore( + batch_start, + Some(batch_start + batch_size as i64), + ) + .await + { + let save_handles: Vec> = + messages + .into_iter() + .map(|message| { + let ( + msg_id, + assignment_id, + process_id, + timestamp, + epoch, + nonce, + _, + bundle, + ) = message; + let processed_count = Arc::clone(&processed_count); + let local_data_store_clone = Arc::clone(&local_data_store); + + let (assignment, composite_key, order_key, assignment_key) = + match assignment_id { + Some(a_id) => ( + a_id.clone(), + format!("messages:{}:{}", msg_id, a_id), + format!( + "process_messages:{}:{:010}:{:010}:{:015}:{}", + process_id, epoch, nonce, timestamp, a_id + ), + format!("assignments:{}", a_id), + ), + None => { + let parsed_message = + Message::from_bytes(bundle.clone()).unwrap(); + ( + parsed_message.assignment.id.clone(), + format!( + "messages:{}:{}", + msg_id, parsed_message.assignment.id + ), + format!( + "process_messages:{}:{:010}:{:010}:{:015}:{}", + process_id, + epoch, + nonce, + timestamp, + parsed_message.assignment.id + ), + format!("assignments:{}", parsed_message.assignment.id), + ) + } + }; + + tokio::task::spawn_blocking(move || { + local_data_store_clone + .index_db + .put(composite_key.as_bytes(), assignment.as_bytes()) + .unwrap(); + local_data_store_clone + .index_db + .put(order_key.as_bytes(), assignment.as_bytes()) + .unwrap(); + local_data_store_clone + .file_db + .put(assignment_key.as_bytes(), bundle) + .unwrap(); + processed_count.fetch_add(1, Ordering::Relaxed); + }) + }) + .collect(); + join_all(save_handles).await; + } else { + data_store.logger.error(format!("Error fetching messages")); } } - let duration = start.elapsed(); - data_store - .logger - .log(format!("Time elapsed in data migration is: {:?}", duration)); - + let total_process_count = data_store + .get_process_count() + .expect("Failed to get process count"); + data_store.logger.log(format!( + "Total processes to process: {}", + total_process_count + )); + + for batch_start in (0..total_process_count).step_by(batch_size) { + if let Ok(processes) = + data_store.get_all_processes(batch_start, Some(batch_start + batch_size as i64)) + { + let save_handles: Vec> = processes + .into_iter() + .map(|process| { + let parsed_process = Process::from_bytes(process.clone()).unwrap(); + let parsed_clone = parsed_process.clone(); + let local_data_store_clone = Arc::clone(&local_data_store); + + let (assignment_id, process_key, order_key, assignment_key) = + match parsed_process.assignment { + Some(assignment) => { + let timestamp = parsed_clone.timestamp().unwrap(); + ( + assignment.id.clone(), + format!("processes:{}", parsed_process.process.process_id), + format!( + "processes:{}:{:010}:{:010}:{:015}:{}", + parsed_process.process.process_id, + 0, + 0, + timestamp, + assignment.id + ), + format!("process_assignments:{}", assignment.id), + ) + } + None => { + let bundle_data_item = + DataItem::from_bytes(process.clone()).unwrap(); + let timestamp = bundle_data_item + .tags() + .iter() + .find(|tag| tag.name == "Timestamp") + .unwrap() + .value + .parse::() + .unwrap(); + ( + bundle_data_item.id().clone(), + format!("processes:{}", parsed_process.process.process_id), + format!( + "processes:{}:{:010}:{:010}:{:015}:{}", + parsed_process.process.process_id, + 0, + 0, + timestamp, + bundle_data_item.id() + ), + format!("process_assignments:{}", bundle_data_item.id()), + ) + } + }; + + tokio::task::spawn_blocking(move || { + local_data_store_clone + .index_db + .put(process_key.as_bytes(), assignment_id.as_bytes()) + .unwrap(); + local_data_store_clone + .index_db + .put(order_key.as_bytes(), assignment_id.as_bytes()) + .unwrap(); + local_data_store_clone + .file_db + .put(assignment_key.as_bytes(), process.clone()) + .unwrap(); + }) + }) + .collect(); + join_all(save_handles).await; + } else { + data_store.logger.error(format!("Error fetching processes")); + } + } + + data_store.logger.log(format!( + "Time elapsed in data migration is: {:?}", + start.elapsed() + )); Ok(()) } } diff --git a/servers/su/src/domain/clients/store.rs b/servers/su/src/domain/clients/store.rs index 0126beecd..4d78da2fd 100644 --- a/servers/su/src/domain/clients/store.rs +++ b/servers/su/src/domain/clients/store.rs @@ -249,6 +249,60 @@ impl StoreClient { } } + /* + Method to get the total number of processes + in the database, this is used by the mig_local migration. + */ + pub fn get_process_count(&self) -> Result { + use super::schema::processes::dsl::*; + let conn = &mut self.get_read_conn()?; + + let count_result: Result = processes.count().get_result(conn); + + match count_result { + Ok(count) => Ok(count), + Err(e) => Err(StoreErrorType::from(e)), + } + } + + /* + Get all processes in the database, within a + certain range. This is used for migrations. + */ + pub fn get_all_processes( + &self, + from: i64, + to: Option, + ) -> Result>, StoreErrorType> { + use super::schema::processes::dsl::*; + let conn = &mut self.get_read_conn()?; + let mut query = processes.into_boxed(); + + // Apply the offset + query = query.offset(from); + + // Apply the limit if `to` is provided + if let Some(to) = to { + let limit = to - from; + query = query.limit(limit); + } + + let db_processes_result: Result, DieselError> = query.load(conn); + + match db_processes_result { + Ok(db_processes) => { + let mut processes_mapped: Vec> = vec![]; + for db_process in db_processes.iter() { + let bytes: Vec = db_process.bundle.clone(); + processes_mapped.push(bytes); + } + + Ok(processes_mapped) + } + Err(e) => Err(StoreErrorType::from(e)), + } + } + /* Get all messages in the database, within a certain range. This is used for the migration. @@ -312,35 +366,37 @@ impl StoreClient { } } - pub fn get_all_messages_no_bundle( + // used by the mig_local migration + pub async fn get_all_messages_using_bytestore( &self, from: i64, to: Option, ) -> Result< Vec<( - String, // message_id - Option, // assignment_id - String, // process_id - i64, // timestamp - i32, // epoch - i32, // nonce - String, // hash_chain + String, // message_id + Option, // assignment_id + String, // process_id + i64, // timestamp + i32, // epoch + i32, // nonce + String, // hash_chain + Vec, // bundle )>, StoreErrorType, > { use super::schema::messages::dsl::*; let conn = &mut self.get_read_conn()?; let mut query = messages.into_boxed(); - + // Apply the offset query = query.offset(from); - + // Apply the limit if `to` is provided if let Some(to) = to { let limit = to - from; query = query.limit(limit); } - + // Select only the fields that you are using let selected_fields = query.select(( message_id, @@ -351,42 +407,108 @@ impl StoreClient { nonce, hash_chain, )); - + // Load only the selected fields let db_messages_result: Result< Vec<( - String, // message_id - Option, // assignment_id - String, // process_id - i64, // timestamp - i32, // epoch - i32, // nonce - String, // hash_chain - )>, - DieselError + String, // message_id + Option, // assignment_id + String, // process_id + i64, // timestamp + i32, // epoch + i32, // nonce + String, // hash_chain + )>, + DieselError, > = selected_fields.order(timestamp.asc()).load(conn); - + match db_messages_result { Ok(db_messages) => { // Map the result into the desired tuple with timestamp as String - let messages_mapped = db_messages.into_iter().map(|db_message| { - ( - db_message.0, // message_id - db_message.1, // assignment_id - db_message.2, // process_id - db_message.3, // timestamp as chrono::NaiveDateTime - db_message.4, // epoch - db_message.5, // nonce - db_message.6, // hash_chain - ) - }).collect::>(); - - Ok(messages_mapped) + let messages_mapped = db_messages + .into_iter() + .map(|db_message| { + ( + db_message.0, // message_id + db_message.1, // assignment_id + db_message.2, // process_id + db_message.3, // timestamp + db_message.4, // epoch + db_message.5, // nonce + db_message.6, // hash_chain + ) + }) + .collect::>(); + + let message_ids: Vec<(String, Option, String, String)> = messages_mapped + .iter() + .map(|msg| { + ( + msg.0.clone(), + msg.1.clone(), + msg.2.clone(), + msg.3.to_string().clone(), + ) + }) + .collect(); + + let binaries = self.bytestore.clone().read_binaries(message_ids).await?; + let mut messages_with_bundles = vec![]; + + for db_message in messages_mapped.iter() { + match binaries.get(&( + db_message.0.clone(), // message id + db_message.1.clone(), // assignment id + db_message.2.clone(), // process id + db_message.3.to_string().clone(), // timestamp + )) { + Some(bytes_result) => { + messages_with_bundles.push(( + db_message.0.clone(), // message_id + db_message.1.clone(), // assignment_id + db_message.2.clone(), // process_id + db_message.3.clone(), // timestamp + db_message.4.clone(), // epoch + db_message.5.clone(), // nonce + db_message.6.clone(), // hash_chain + bytes_result.clone(), // bundle + )); + } + None => { + // Fall back to the database if the binary isn't available + let db_message_with_bundle: DbMessage = match db_message.1.clone() { + Some(assignment_id_d) => messages + .filter( + message_id + .eq(db_message.0.clone()) + .and(assignment_id.eq(assignment_id_d)), + ) + .order(timestamp.asc()) + .first(conn)?, + None => messages + .filter(message_id.eq(db_message.0.clone())) + .order(timestamp.asc()) + .first(conn)?, + }; + messages_with_bundles.push(( + db_message.0.clone(), // message_id + db_message.1.clone(), // assignment_id + db_message.2.clone(), // process_id + db_message.3.clone(), // timestamp + db_message.4.clone(), // epoch + db_message.5.clone(), // nonce + db_message.6.clone(), // hash_chain + db_message_with_bundle.bundle.clone(), // bundle + )); + } + } + } + + Ok(messages_with_bundles) } Err(e) => Err(StoreErrorType::from(e)), } } - /* Used as a fallback when USE_DISK is true. If the @@ -1278,14 +1400,14 @@ mod bytestore { pub fn try_read_instance_connect(&self) -> Result<(), String> { let mut opts = Options::default(); opts.set_enable_blob_files(true); // Enable blob files - + // Open the database in read-only mode let new_db = DB::open_for_read_only(&opts, &self.config.su_data_dir, false) .map_err(|e| format!("Failed to open RocksDB in read-only mode: {:?}", e))?; - + let mut db_write = self.db.write().unwrap(); *db_write = Some(new_db); - + Ok(()) } diff --git a/servers/su/src/domain/core/builder.rs b/servers/su/src/domain/core/builder.rs index 4972f7d77..cefdcdc8b 100644 --- a/servers/su/src/domain/core/builder.rs +++ b/servers/su/src/domain/core/builder.rs @@ -17,6 +17,7 @@ pub struct Builder<'a> { pub struct BuildResult { pub binary: Vec, pub bundle: DataBundle, + pub bundle_data_item: DataItem, } #[derive(Debug, Clone)] @@ -137,6 +138,7 @@ impl<'a> Builder<'a> { Ok(BuildResult { binary: bundle_data_item.as_bytes()?, bundle: data_bundle, + bundle_data_item, }) } @@ -190,10 +192,11 @@ impl<'a> Builder<'a> { Ok(BuildResult { binary: new_data_item.as_bytes()?, bundle: data_bundle, + bundle_data_item: new_data_item, }) } - pub fn parse_data_item(&self, tx: Vec) -> Result { + pub fn parse_data_item(tx: Vec) -> Result { Ok(DataItem::from_bytes(tx)?) } diff --git a/servers/su/src/domain/core/bytes.rs b/servers/su/src/domain/core/bytes.rs index 99106eb87..4b4795304 100644 --- a/servers/su/src/domain/core/bytes.rs +++ b/servers/su/src/domain/core/bytes.rs @@ -41,15 +41,11 @@ impl From for ByteErrorType { #[derive(Clone)] pub struct DataBundle { pub items: Vec, - pub tags: Vec, } impl DataBundle { pub fn new(tags: Vec) -> Self { - DataBundle { - items: Vec::new(), - tags: tags, - } + DataBundle { items: Vec::new() } } pub fn add_item(&mut self, item: DataItem) { @@ -108,10 +104,7 @@ impl DataBundle { items.push(item); } - Ok(Self { - items, - tags: Vec::new(), // Assuming tags are not used in to_bytes - }) + Ok(Self { items }) } } diff --git a/servers/su/src/domain/core/dal.rs b/servers/su/src/domain/core/dal.rs index 4b38505a8..6a75c2b4d 100644 --- a/servers/su/src/domain/core/dal.rs +++ b/servers/su/src/domain/core/dal.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use serde::Deserialize; +pub use super::bytes::DataItem; pub use super::json::{JsonErrorType, Message, PaginatedMessages, Process}; pub use super::router::{ProcessScheduler, Scheduler}; diff --git a/servers/su/src/domain/core/flows.rs b/servers/su/src/domain/core/flows.rs index 29dc941fd..1ef04fee0 100644 --- a/servers/su/src/domain/core/flows.rs +++ b/servers/su/src/domain/core/flows.rs @@ -92,7 +92,7 @@ pub async fn write_item( let (target_id, data_item) = if let (Some(ref process_id), Some(_)) = (&process_id, &assign) { (process_id.clone(), None) } else { - let data_item = builder.parse_data_item(input.clone())?; + let data_item = Builder::parse_data_item(input.clone())?; match data_item.tags().iter().find(|tag| tag.name == "Type") { Some(type_tag) => match type_tag.value.as_str() { "Process" => (data_item.id(), Some(data_item)), @@ -247,7 +247,10 @@ pub async fn write_item( return id_res(&deps, process.process.process_id.clone(), start_top_level); } else { let build_result = builder.build_process(input, &next_schedule_info).await?; - let process = Process::from_bundle_no_assign(&build_result.bundle)?; + let process = Process::from_bundle_no_assign( + &build_result.bundle, + &build_result.bundle_data_item, + )?; deps.data_store .save_process(&process, &build_result.binary)?; deps.logger.log(format!("saved process - {:?}", &process)); diff --git a/servers/su/src/domain/core/json.rs b/servers/su/src/domain/core/json.rs index b1b008612..463f39562 100644 --- a/servers/su/src/domain/core/json.rs +++ b/servers/su/src/domain/core/json.rs @@ -1,3 +1,4 @@ +use actix_web::web::Json; use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; @@ -128,39 +129,36 @@ pub fn hash(data: &[u8]) -> Vec { impl Process { pub fn from_bytes(bytes: Vec) -> Result { let data_item = DataItem::from_bytes(bytes)?; - let top_level_tags = data_item.tags(); /* - using arbitrary tag at the top level to check if - old structure, old structure had protocol tags on - the top level DataItem + Parse bundle to determine if old or new + structure. The old structure had no assignment. */ - let epoch_tag = top_level_tags.iter().find(|tag| tag.name == "Block-Height"); + let bundle_data = DataBundle::from_bytes( + &data_item + .data_bytes() + .ok_or("Bundle data not present in DataItem")?, + )?; - match epoch_tag { - None => { + match bundle_data.items.len() { + 2 => { /* - Current process structure, because protocol tags are - not present at the top level data item. + Current process structure, because it has an + assignment and a message */ - let bundle_data = DataBundle::from_bytes( - &data_item - .data_bytes() - .ok_or("Bundle data not present in DataItem")?, - )?; Ok(Process::from_bundle(&bundle_data)?) } - Some(_) => { + 1 => { /* This is an old message structure so we have to parse it differently. */ - let bundle_data = DataBundle::from_bytes( - &data_item - .data_bytes() - .ok_or("Bundle data not present in DataItem")?, - )?; - Ok(Process::from_bundle_no_assign(&bundle_data)?) + Ok(Process::from_bundle_no_assign(&bundle_data, &data_item)?) + } + _ => { + return Err(JsonErrorType::JsonError( + "Invalid Process Bundle".to_string(), + )) } } } @@ -259,7 +257,10 @@ impl Process { /* for Processes pre aop6 */ - pub fn from_bundle_no_assign(data_bundle: &DataBundle) -> Result { + pub fn from_bundle_no_assign( + data_bundle: &DataBundle, + bundle_data_item: &DataItem, + ) -> Result { let id = data_bundle.items[0].id().clone(); let tags = data_bundle.items[0].tags(); let owner = data_bundle.items[0].owner().clone(); @@ -278,7 +279,7 @@ impl Process { let address_hash = hash(&owner_bytes); let address = base64_url::encode(&address_hash); - let bundle_tags = data_bundle.tags.clone(); + let bundle_tags = bundle_data_item.tags().clone(); let block_tag = bundle_tags .iter() diff --git a/servers/su/src/domain/core/router.rs b/servers/su/src/domain/core/router.rs index d084cc239..e8553a34e 100644 --- a/servers/su/src/domain/core/router.rs +++ b/servers/su/src/domain/core/router.rs @@ -1,3 +1,4 @@ +use super::builder::Builder; use crate::domain::core::dal::StoreErrorType; use crate::domain::flows::{init_builder, Deps}; use serde::Deserialize; @@ -155,7 +156,7 @@ pub async fn redirect_data_item( } let builder = init_builder(&deps)?; - let item = builder.parse_data_item(input.clone())?; + let item = Builder::parse_data_item(input.clone())?; let tags = item.tags().clone(); let id = item.id().clone(); let target = item.target().clone(); diff --git a/servers/su/src/domain/mod.rs b/servers/su/src/domain/mod.rs index 1a5331552..4d5c33759 100644 --- a/servers/su/src/domain/mod.rs +++ b/servers/su/src/domain/mod.rs @@ -19,8 +19,8 @@ pub use clients::metrics::PromMetrics; pub use core::flows; pub use core::router; pub use flows::Deps; -pub use store::migrate_to_disk; pub use local_store::migration::migrate_to_local; +pub use store::migrate_to_disk; pub async fn init_deps(mode: Option, metrics_registry: prometheus::Registry) -> Arc { let logger: Arc = SuLog::init();