Skip to content

Commit

Permalink
perf(su): data migration working for RocksDB upgrade #987
Browse files Browse the repository at this point in the history
  • Loading branch information
VinceJuliano committed Oct 15, 2024
1 parent df82ae6 commit ffae9eb
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 221 deletions.
431 changes: 286 additions & 145 deletions servers/su/src/domain/clients/local_store.rs

Large diffs are not rendered by default.

200 changes: 161 additions & 39 deletions servers/su/src/domain/clients/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,60 @@ impl StoreClient {
}
}

/*
Method to get the total number of processes
in the database, this is used by the mig_local migration.
*/
pub fn get_process_count(&self) -> Result<i64, StoreErrorType> {
use super::schema::processes::dsl::*;
let conn = &mut self.get_read_conn()?;

let count_result: Result<i64, DieselError> = processes.count().get_result(conn);

match count_result {
Ok(count) => Ok(count),
Err(e) => Err(StoreErrorType::from(e)),
}
}

/*
Get all processes in the database, within a
certain range. This is used for migrations.
*/
pub fn get_all_processes(
&self,
from: i64,
to: Option<i64>,
) -> Result<Vec<Vec<u8>>, StoreErrorType> {
use super::schema::processes::dsl::*;
let conn = &mut self.get_read_conn()?;
let mut query = processes.into_boxed();

// Apply the offset
query = query.offset(from);

// Apply the limit if `to` is provided
if let Some(to) = to {
let limit = to - from;
query = query.limit(limit);
}

let db_processes_result: Result<Vec<DbProcess>, DieselError> = query.load(conn);

match db_processes_result {
Ok(db_processes) => {
let mut processes_mapped: Vec<Vec<u8>> = vec![];
for db_process in db_processes.iter() {
let bytes: Vec<u8> = db_process.bundle.clone();
processes_mapped.push(bytes);
}

Ok(processes_mapped)
}
Err(e) => Err(StoreErrorType::from(e)),
}
}

/*
Get all messages in the database, within a
certain range. This is used for the migration.
Expand Down Expand Up @@ -312,35 +366,37 @@ impl StoreClient {
}
}

pub fn get_all_messages_no_bundle(
// used by the mig_local migration
pub async fn get_all_messages_using_bytestore(
&self,
from: i64,
to: Option<i64>,
) -> Result<
Vec<(
String, // message_id
Option<String>, // assignment_id
String, // process_id
i64, // timestamp
i32, // epoch
i32, // nonce
String, // hash_chain
String, // message_id
Option<String>, // assignment_id
String, // process_id
i64, // timestamp
i32, // epoch
i32, // nonce
String, // hash_chain
Vec<u8>, // bundle
)>,
StoreErrorType,
> {
use super::schema::messages::dsl::*;
let conn = &mut self.get_read_conn()?;
let mut query = messages.into_boxed();

// Apply the offset
query = query.offset(from);

// Apply the limit if `to` is provided
if let Some(to) = to {
let limit = to - from;
query = query.limit(limit);
}

// Select only the fields that you are using
let selected_fields = query.select((
message_id,
Expand All @@ -351,42 +407,108 @@ impl StoreClient {
nonce,
hash_chain,
));

// Load only the selected fields
let db_messages_result: Result<
Vec<(
String, // message_id
Option<String>, // assignment_id
String, // process_id
i64, // timestamp
i32, // epoch
i32, // nonce
String, // hash_chain
)>,
DieselError
String, // message_id
Option<String>, // assignment_id
String, // process_id
i64, // timestamp
i32, // epoch
i32, // nonce
String, // hash_chain
)>,
DieselError,
> = selected_fields.order(timestamp.asc()).load(conn);

match db_messages_result {
Ok(db_messages) => {
// Map the result into the desired tuple with timestamp as String
let messages_mapped = db_messages.into_iter().map(|db_message| {
(
db_message.0, // message_id
db_message.1, // assignment_id
db_message.2, // process_id
db_message.3, // timestamp as chrono::NaiveDateTime
db_message.4, // epoch
db_message.5, // nonce
db_message.6, // hash_chain
)
}).collect::<Vec<_>>();

Ok(messages_mapped)
let messages_mapped = db_messages
.into_iter()
.map(|db_message| {
(
db_message.0, // message_id
db_message.1, // assignment_id
db_message.2, // process_id
db_message.3, // timestamp
db_message.4, // epoch
db_message.5, // nonce
db_message.6, // hash_chain
)
})
.collect::<Vec<_>>();

let message_ids: Vec<(String, Option<String>, String, String)> = messages_mapped
.iter()
.map(|msg| {
(
msg.0.clone(),
msg.1.clone(),
msg.2.clone(),
msg.3.to_string().clone(),
)
})
.collect();

let binaries = self.bytestore.clone().read_binaries(message_ids).await?;
let mut messages_with_bundles = vec![];

for db_message in messages_mapped.iter() {
match binaries.get(&(
db_message.0.clone(), // message id
db_message.1.clone(), // assignment id
db_message.2.clone(), // process id
db_message.3.to_string().clone(), // timestamp
)) {
Some(bytes_result) => {
messages_with_bundles.push((
db_message.0.clone(), // message_id
db_message.1.clone(), // assignment_id
db_message.2.clone(), // process_id
db_message.3.clone(), // timestamp
db_message.4.clone(), // epoch
db_message.5.clone(), // nonce
db_message.6.clone(), // hash_chain
bytes_result.clone(), // bundle
));
}
None => {
// Fall back to the database if the binary isn't available
let db_message_with_bundle: DbMessage = match db_message.1.clone() {
Some(assignment_id_d) => messages
.filter(
message_id
.eq(db_message.0.clone())
.and(assignment_id.eq(assignment_id_d)),
)
.order(timestamp.asc())
.first(conn)?,
None => messages
.filter(message_id.eq(db_message.0.clone()))
.order(timestamp.asc())
.first(conn)?,
};
messages_with_bundles.push((
db_message.0.clone(), // message_id
db_message.1.clone(), // assignment_id
db_message.2.clone(), // process_id
db_message.3.clone(), // timestamp
db_message.4.clone(), // epoch
db_message.5.clone(), // nonce
db_message.6.clone(), // hash_chain
db_message_with_bundle.bundle.clone(), // bundle
));
}
}
}

Ok(messages_with_bundles)
}
Err(e) => Err(StoreErrorType::from(e)),
}
}


/*
Used as a fallback when USE_DISK is true. If the
Expand Down Expand Up @@ -1278,14 +1400,14 @@ mod bytestore {
pub fn try_read_instance_connect(&self) -> Result<(), String> {
let mut opts = Options::default();
opts.set_enable_blob_files(true); // Enable blob files

// Open the database in read-only mode
let new_db = DB::open_for_read_only(&opts, &self.config.su_data_dir, false)
.map_err(|e| format!("Failed to open RocksDB in read-only mode: {:?}", e))?;

let mut db_write = self.db.write().unwrap();
*db_write = Some(new_db);

Ok(())
}

Expand Down
5 changes: 4 additions & 1 deletion servers/su/src/domain/core/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub struct Builder<'a> {
pub struct BuildResult {
pub binary: Vec<u8>,
pub bundle: DataBundle,
pub bundle_data_item: DataItem,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -137,6 +138,7 @@ impl<'a> Builder<'a> {
Ok(BuildResult {
binary: bundle_data_item.as_bytes()?,
bundle: data_bundle,
bundle_data_item,
})
}

Expand Down Expand Up @@ -190,10 +192,11 @@ impl<'a> Builder<'a> {
Ok(BuildResult {
binary: new_data_item.as_bytes()?,
bundle: data_bundle,
bundle_data_item: new_data_item,
})
}

pub fn parse_data_item(&self, tx: Vec<u8>) -> Result<DataItem, BuilderErrorType> {
pub fn parse_data_item(tx: Vec<u8>) -> Result<DataItem, BuilderErrorType> {
Ok(DataItem::from_bytes(tx)?)
}

Expand Down
11 changes: 2 additions & 9 deletions servers/su/src/domain/core/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@ impl From<String> for ByteErrorType {
#[derive(Clone)]
pub struct DataBundle {
pub items: Vec<DataItem>,
pub tags: Vec<Tag>,
}

impl DataBundle {
pub fn new(tags: Vec<Tag>) -> Self {
DataBundle {
items: Vec::new(),
tags: tags,
}
DataBundle { items: Vec::new() }
}

pub fn add_item(&mut self, item: DataItem) {
Expand Down Expand Up @@ -108,10 +104,7 @@ impl DataBundle {
items.push(item);
}

Ok(Self {
items,
tags: Vec::new(), // Assuming tags are not used in to_bytes
})
Ok(Self { items })
}
}

Expand Down
1 change: 1 addition & 0 deletions servers/su/src/domain/core/dal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use serde::Deserialize;

pub use super::bytes::DataItem;
pub use super::json::{JsonErrorType, Message, PaginatedMessages, Process};
pub use super::router::{ProcessScheduler, Scheduler};

Expand Down
7 changes: 5 additions & 2 deletions servers/su/src/domain/core/flows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub async fn write_item(
let (target_id, data_item) = if let (Some(ref process_id), Some(_)) = (&process_id, &assign) {
(process_id.clone(), None)
} else {
let data_item = builder.parse_data_item(input.clone())?;
let data_item = Builder::parse_data_item(input.clone())?;
match data_item.tags().iter().find(|tag| tag.name == "Type") {
Some(type_tag) => match type_tag.value.as_str() {
"Process" => (data_item.id(), Some(data_item)),
Expand Down Expand Up @@ -247,7 +247,10 @@ pub async fn write_item(
return id_res(&deps, process.process.process_id.clone(), start_top_level);
} else {
let build_result = builder.build_process(input, &next_schedule_info).await?;
let process = Process::from_bundle_no_assign(&build_result.bundle)?;
let process = Process::from_bundle_no_assign(
&build_result.bundle,
&build_result.bundle_data_item,
)?;
deps.data_store
.save_process(&process, &build_result.binary)?;
deps.logger.log(format!("saved process - {:?}", &process));
Expand Down
Loading

0 comments on commit ffae9eb

Please sign in to comment.