diff --git a/Cargo.lock b/Cargo.lock index 8a498bdc7..477fe7549 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9182,7 +9182,6 @@ dependencies = [ "arrow-schema 52.2.0", "async-channel", "async-lock", - "async-recursion", "async-std", "async-trait", "aws-config", @@ -9268,7 +9267,6 @@ name = "shinkai_sheet" version = "0.1.0" dependencies = [ "async-channel", - "async-recursion", "blake3", "chrono", "dashmap", @@ -9362,7 +9360,6 @@ name = "shinkai_vector_resources" version = "0.1.0" dependencies = [ "anyhow", - "async-recursion", "async-trait", "base64 0.13.1", "blake3", diff --git a/shinkai-bin/shinkai-node/Cargo.toml b/shinkai-bin/shinkai-node/Cargo.toml index ccbc29bbe..142b20396 100644 --- a/shinkai-bin/shinkai-node/Cargo.toml +++ b/shinkai-bin/shinkai-node/Cargo.toml @@ -63,7 +63,6 @@ urlencoding = "2.1.0" hex = "=0.4.3" aes-gcm = "0.10.3" blake3 = "1.2.0" -async-recursion = "1.0.5" cron-parser = "0.8.1" thiserror = "1.0.50" base64 = "0.13.0" diff --git a/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/generic_chain/generic_inference_chain.rs b/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/generic_chain/generic_inference_chain.rs index 02173c26a..5f9c7dac1 100644 --- a/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/generic_chain/generic_inference_chain.rs +++ b/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/generic_chain/generic_inference_chain.rs @@ -14,7 +14,6 @@ use crate::network::agent_payments_manager::my_agent_offerings_manager::MyAgentO use crate::network::ws_manager::WSUpdateHandler; use crate::tools::tool_router::ToolRouter; use crate::vector_fs::vector_fs::VectorFS; -use async_recursion::async_recursion; use async_trait::async_trait; use shinkai_message_primitives::schemas::inbox_name::InboxName; use shinkai_message_primitives::schemas::llm_providers::serialized_llm_provider::{ @@ -95,7 +94,6 @@ impl GenericInferenceChain { } } - #[async_recursion] #[allow(clippy::too_many_arguments)] pub async fn start_chain( db: Arc, diff --git a/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/image_analysis_chain/image_analysis_chain.rs b/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/image_analysis_chain/image_analysis_chain.rs index e9786684d..fe5e629e1 100644 --- a/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/image_analysis_chain/image_analysis_chain.rs +++ b/shinkai-bin/shinkai-node/src/llm_provider/execution/chains/image_analysis_chain/image_analysis_chain.rs @@ -1,4 +1,3 @@ -use async_recursion::async_recursion; use shinkai_message_primitives::schemas::{ inbox_name::InboxName, llm_providers::serialized_llm_provider::SerializedLLMProvider, shinkai_name::ShinkaiName, }; @@ -30,7 +29,6 @@ pub struct CronExecutionState { } impl JobManager { - #[async_recursion] pub async fn image_analysis_chain( _db: Arc, full_job: Job, diff --git a/shinkai-bin/shinkai-node/src/network/subscription_manager/http_manager/subscription_file_uploader.rs b/shinkai-bin/shinkai-node/src/network/subscription_manager/http_manager/subscription_file_uploader.rs index 1c042ac48..f5771e44d 100644 --- a/shinkai-bin/shinkai-node/src/network/subscription_manager/http_manager/subscription_file_uploader.rs +++ b/shinkai-bin/shinkai-node/src/network/subscription_manager/http_manager/subscription_file_uploader.rs @@ -15,16 +15,15 @@ // - IPFS // - Arcweave -use std::env; +use std::{env, future::Future, pin::Pin}; use aws_config::timeout::TimeoutConfig; use aws_sdk_s3::presigning::PresigningConfig; use aws_sdk_s3::types::EncodingType; -use aws_sdk_s3::{Client as S3Client}; +use aws_sdk_s3::Client as S3Client; use reqwest::{Client, Error as ReqwestError}; use serde::{Deserialize, Serialize}; -use async_recursion::async_recursion; use aws_types::region::Region; use serde_json::{json, Error as JsonError, Value}; use shinkai_message_primitives::shinkai_message::shinkai_message_schemas::{ @@ -346,96 +345,96 @@ pub async fn download_file_http( } } -#[async_recursion] -pub async fn list_folder_contents( - destination: &FileDestination, - folder_path: &str, -) -> Result, FileTransferError> { - let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path); - match destination { - FileDestination::S3(client, credentials) | FileDestination::R2(client, credentials) => { - let mut folder_contents = Vec::new(); - let mut continuation_token: Option = None; - - loop { - let mut request_builder = client - .list_objects_v2() - .bucket(credentials.bucket.clone()) - .prefix(folder_path) - .delimiter("/") - .encoding_type(EncodingType::Url); - - if let Some(token) = &continuation_token { - request_builder = request_builder.continuation_token(token); - } - - let response = request_builder.send().await.map_err(|sdk_error| { - FileTransferError::Other(format!("Failed to list folder contents: {:?}", sdk_error)) - })?; - - // Handle files and directories - if let Some(contents) = response.clone().contents { - for object in contents { - if let Some(key) = object.key { - let decoded_key = decode(&key).unwrap_or_default().to_string(); - let is_folder = decoded_key.ends_with('/'); - let clean_path = if is_folder { - decoded_key.trim_end_matches('/') - } else { - &decoded_key - }; - folder_contents.push(FileDestinationPath { - path: clean_path.to_string(), - is_folder, - }); - } +pub fn list_folder_contents<'a>( + destination: &'a FileDestination, + folder_path: &'a str, +) -> Pin, FileTransferError>> + Send + 'a>> { + Box::pin(async move { + let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path); + match destination { + FileDestination::S3(client, credentials) | FileDestination::R2(client, credentials) => { + let mut folder_contents = Vec::new(); + let mut continuation_token: Option = None; + + loop { + let mut request_builder = client + .list_objects_v2() + .bucket(credentials.bucket.clone()) + .prefix(folder_path) + .delimiter("/") + .encoding_type(EncodingType::Url); + + if let Some(token) = &continuation_token { + request_builder = request_builder.continuation_token(token); } - } - // Handle common prefixes (subdirectories) - if let Some(common_prefixes) = response.clone().common_prefixes { - for prefix in common_prefixes { - if let Some(prefix_key) = prefix.prefix { - let decoded_key = decode(&prefix_key).unwrap_or_default().to_string(); - let clean_path = decoded_key.trim_end_matches('/'); - if !clean_path.is_empty() { + let response = request_builder.send().await.map_err(|sdk_error| { + FileTransferError::Other(format!("Failed to list folder contents: {:?}", sdk_error)) + })?; + + // Handle files and directories + if let Some(contents) = response.clone().contents { + for object in contents { + if let Some(key) = object.key { + let decoded_key = decode(&key).unwrap_or_default().to_string(); + let is_folder = decoded_key.ends_with('/'); + let clean_path = if is_folder { + decoded_key.trim_end_matches('/') + } else { + &decoded_key + }; folder_contents.push(FileDestinationPath { path: clean_path.to_string(), - is_folder: true, + is_folder, }); } } } - } - if response.is_truncated().unwrap_or_default() { - continuation_token = response.next_continuation_token().map(|s| s.to_string()); - } else { - break; + // Handle common prefixes (subdirectories) + if let Some(common_prefixes) = response.clone().common_prefixes { + for prefix in common_prefixes { + if let Some(prefix_key) = prefix.prefix { + let decoded_key = decode(&prefix_key).unwrap_or_default().to_string(); + let clean_path = decoded_key.trim_end_matches('/'); + if !clean_path.is_empty() { + folder_contents.push(FileDestinationPath { + path: clean_path.to_string(), + is_folder: true, + }); + } + } + } + } + + if response.is_truncated().unwrap_or_default() { + continuation_token = response.next_continuation_token().map(|s| s.to_string()); + } else { + break; + } } - } - // Recursively list contents of subdirectories - let mut all_contents = Vec::new(); - for item in folder_contents { - eprintln!("Item: {:?}", item); - all_contents.push(item.clone()); - if item.is_folder { - let subfolder_contents = list_folder_contents(destination, &format!("{}/", item.path)).await?; - all_contents.extend(subfolder_contents); + // Recursively list contents of subdirectories + let mut all_contents = Vec::new(); + for item in folder_contents { + eprintln!("Item: {:?}", item); + all_contents.push(item.clone()); + if item.is_folder { + let subfolder_contents = list_folder_contents(destination, &format!("{}/", item.path)).await?; + all_contents.extend(subfolder_contents); + } } - } - Ok(all_contents) + Ok(all_contents) + } + FileDestination::Http { .. } => Err(FileTransferError::Other( + "Listing folder contents is not supported for HTTP destinations.".to_string(), + )), } - FileDestination::Http { .. } => Err(FileTransferError::Other( - "Listing folder contents is not supported for HTTP destinations.".to_string(), - )), - } + }) } /// Generates temporary shareable links for all files in a specified folder. -#[async_recursion] pub async fn generate_temporary_shareable_links_for_folder( folder_path: &str, destination: &FileDestination, @@ -502,7 +501,7 @@ pub async fn generate_temporary_shareable_link( Ok(presigned_req.uri().to_string()) } - FileDestination::Http { .. } => { + FileDestination::Http { .. } => { // For HTTP, we might need to handle this differently as HTTP servers do not typically support presigned URLs // This would depend on the specific server's capabilities or additional server-side logic to handle temporary links Err(FileTransferError::Other( @@ -553,25 +552,29 @@ pub async fn delete_file_or_folder(destination: &FileDestination, path: &str) -> } /// Deletes all files and folders recursively within a specified folder. -#[async_recursion] -pub async fn delete_all_in_folder(destination: &FileDestination, folder_path: &str) -> Result<(), FileTransferError> { - let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path); - let contents = list_folder_contents(destination, folder_path).await?; - // Start by deleting all files in the current folder - for item in &contents { - if !item.is_folder { - delete_file_or_folder(destination, &item.path).await?; +pub fn delete_all_in_folder<'a>( + destination: &'a FileDestination, + folder_path: &'a str, +) -> Pin> + Send + 'a>> { + Box::pin(async move { + let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path); + let contents = list_folder_contents(destination, folder_path).await?; + // Start by deleting all files in the current folder + for item in &contents { + if !item.is_folder { + delete_file_or_folder(destination, &item.path).await?; + } } - } - // Then delete subfolders recursively - for item in contents { - if item.is_folder { - delete_all_in_folder(destination, &item.path).await?; - // After deleting all contents in the subfolder, delete the subfolder itself - delete_file_or_folder(destination, &item.path).await?; + // Then delete subfolders recursively + for item in contents { + if item.is_folder { + delete_all_in_folder(destination, &item.path).await?; + // After deleting all contents in the subfolder, delete the subfolder itself + delete_file_or_folder(destination, &item.path).await?; + } } - } - Ok(()) + Ok(()) + }) } #[cfg(test)] diff --git a/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_reader.rs b/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_reader.rs index 3c14356c3..c871d4458 100644 --- a/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_reader.rs +++ b/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_reader.rs @@ -1,9 +1,11 @@ +use std::future::Future; +use std::pin::Pin; + use super::vector_fs::VectorFS; use super::vector_fs_error::VectorFSError; use super::vector_fs_types::{FSEntry, FSFolder, FSItem, FSRoot}; use super::vector_fs_writer::VFSWriter; use crate::db::db_profile_bound::ProfileBoundWriteBatch; -use async_recursion::async_recursion; use serde::{Deserialize, Serialize}; use serde_json::Value; use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName; @@ -189,75 +191,77 @@ impl VectorFS { let mut folder_merkle_hash_map = std::collections::HashMap::new(); // Recursive function to process each entry and populate the VRPack - #[async_recursion] - async fn process_entry( - entry: &FSEntry, - vrpack: &mut VRPack, + fn process_entry<'a>( + entry: &'a FSEntry, + vrpack: &'a mut VRPack, current_path: VRPath, - vector_fs: &VectorFS, - reader: &VFSReader, + vector_fs: &'a VectorFS, + reader: &'a VFSReader, vec_fs_base_path: VRPath, - folder_merkle_hash_map: &mut std::collections::HashMap, - ) -> Result<(), VectorFSError> { - match entry { - FSEntry::Root(folder) => { - for child in &folder.child_folders { - let entry = FSEntry::Folder(child.clone()); - process_entry( - &entry, - vrpack, - current_path.clone(), - vector_fs, - reader, - vec_fs_base_path.clone(), - folder_merkle_hash_map, - ) - .await?; - } - } - FSEntry::Folder(folder) => { - let inner_path = current_path.push_cloned(folder.name.clone()); - vrpack.create_folder(&folder.name, current_path.clone())?; - for child in &folder.child_folders { - let entry = FSEntry::Folder(child.clone()); - process_entry( - &entry, - vrpack, - inner_path.clone(), - vector_fs, - reader, - vec_fs_base_path.clone(), - folder_merkle_hash_map, - ) - .await?; + folder_merkle_hash_map: &'a mut std::collections::HashMap, + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + match entry { + FSEntry::Root(folder) => { + for child in &folder.child_folders { + let entry = FSEntry::Folder(child.clone()); + process_entry( + &entry, + vrpack, + current_path.clone(), + vector_fs, + reader, + vec_fs_base_path.clone(), + folder_merkle_hash_map, + ) + .await?; + } } - for child in &folder.child_items { - let entry = FSEntry::Item(child.clone()); - process_entry( - &entry, - vrpack, - inner_path.clone(), - vector_fs, - reader, - vec_fs_base_path.clone(), - folder_merkle_hash_map, - ) - .await?; + FSEntry::Folder(folder) => { + let inner_path = current_path.push_cloned(folder.name.clone()); + vrpack.create_folder(&folder.name, current_path.clone())?; + for child in &folder.child_folders { + let entry = FSEntry::Folder(child.clone()); + process_entry( + &entry, + vrpack, + inner_path.clone(), + vector_fs, + reader, + vec_fs_base_path.clone(), + folder_merkle_hash_map, + ) + .await?; + } + for child in &folder.child_items { + let entry = FSEntry::Item(child.clone()); + process_entry( + &entry, + vrpack, + inner_path.clone(), + vector_fs, + reader, + vec_fs_base_path.clone(), + folder_merkle_hash_map, + ) + .await?; + } + + folder_merkle_hash_map.insert(inner_path.clone(), folder.merkle_hash.to_string()); } - - folder_merkle_hash_map.insert(inner_path.clone(), folder.merkle_hash.to_string()); - } - FSEntry::Item(item) => { - // For each item, use retrieve_vrkai to get the VRKai object - let item_path = vec_fs_base_path.append_path_cloned(¤t_path.push_cloned(item.name.clone())); - let item_reader = reader.new_reader_copied_data(item_path, vector_fs).await?; - match vector_fs.retrieve_vrkai(&item_reader).await { - Ok(vrkai) => vrpack.insert_vrkai(&vrkai, current_path.clone(), false)?, - Err(e) => return Err(e), + FSEntry::Item(item) => { + // For each item, use retrieve_vrkai to get the VRKai object + let item_path = + vec_fs_base_path.append_path_cloned(¤t_path.push_cloned(item.name.clone())); + let item_reader = reader.new_reader_copied_data(item_path, vector_fs).await?; + match vector_fs.retrieve_vrkai(&item_reader).await { + Ok(vrkai) => vrpack.insert_vrkai(&vrkai, current_path.clone(), false)?, + Err(e) => return Err(e), + } } } - } - Ok(()) + Ok(()) + }) } // Start processing from the root or folder of the FSEntry diff --git a/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_writer.rs b/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_writer.rs index 460978b79..52d0b1ee2 100644 --- a/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_writer.rs +++ b/shinkai-bin/shinkai-node/src/vector_fs/vector_fs_writer.rs @@ -16,6 +16,8 @@ use shinkai_vector_resources::{ vector_resource::{BaseVectorResource, MapVectorResource, Node, VRHeader, VRPath, VRSourceReference}, }; use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; /// A struct that represents having rights to write to the VectorFS under a profile/at a specific path. /// If a VFSWriter struct is constructed, that means the `requester_name` has passed @@ -95,128 +97,129 @@ impl VectorFS { } /// Internal method to copy the FSFolder from the writer's path into being held underneath the destination_path. - #[async_recursion::async_recursion] - async fn internal_wb_copy_folder( - &self, - writer: &VFSWriter, + fn internal_wb_copy_folder<'a>( + &'a self, + writer: &'a VFSWriter, destination_path: VRPath, mut write_batch: ProfileBoundWriteBatch, is_recursive_call: bool, - ) -> Result<(ProfileBoundWriteBatch, FSFolder), VectorFSError> { - let current_datetime = ShinkaiTime::generate_time_now(); - let destination_writer = writer.new_writer_copied_data(destination_path.clone(), self).await?; + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let current_datetime = ShinkaiTime::generate_time_now(); + let destination_writer = writer.new_writer_copied_data(destination_path.clone(), self).await?; - // Ensure paths are valid before proceeding - self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) - .await?; - if &destination_path != &VRPath::root() { - self.validate_path_points_to_folder(destination_path.clone(), &writer.profile) + // Ensure paths are valid before proceeding + self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) .await?; - } - let destination_child_path = destination_path.push_cloned(writer.path.last_path_id()?); - if self - .validate_path_points_to_entry(destination_child_path.clone(), &writer.profile) - .await - .is_ok() - { - return Err(VectorFSError::CannotOverwriteFSEntry(destination_child_path.clone())); - } + if &destination_path != &VRPath::root() { + self.validate_path_points_to_folder(destination_path.clone(), &writer.profile) + .await?; + } + let destination_child_path = destination_path.push_cloned(writer.path.last_path_id()?); + if self + .validate_path_points_to_entry(destination_child_path.clone(), &writer.profile) + .await + .is_ok() + { + return Err(VectorFSError::CannotOverwriteFSEntry(destination_child_path.clone())); + } - // Get the existing folder - let (folder_ret_node, embedding) = self._get_node_from_core_resource(writer).await?; - let metadata = folder_ret_node.node.metadata.clone(); - let mut folder_resource = folder_ret_node.node.get_vector_resource_content()?.clone(); - // Backup tag index, remove nodes/embeddings, and then reapply tag index - let cloned_tag_index = folder_resource.as_trait_object().get_data_tag_index().clone(); - let nodes_embeddings = folder_resource.as_trait_object_mut().remove_root_nodes()?; - folder_resource - .as_trait_object_mut() - .set_data_tag_index(cloned_tag_index); - - // We insert the emptied folder resource into the destination path, and copy permissions - self._add_existing_vr_to_core_resource( - &destination_writer, - folder_resource, - embedding, - metadata, - current_datetime, - ) - .await?; - { - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - internals - .permissions_index - .copy_path_permission(writer.path.clone(), destination_path.clone()) - .await?; - self._update_fs_internals(writer.profile.clone(), internals).await?; - } + // Get the existing folder + let (folder_ret_node, embedding) = self._get_node_from_core_resource(writer).await?; + let metadata = folder_ret_node.node.metadata.clone(); + let mut folder_resource = folder_ret_node.node.get_vector_resource_content()?.clone(); + // Backup tag index, remove nodes/embeddings, and then reapply tag index + let cloned_tag_index = folder_resource.as_trait_object().get_data_tag_index().clone(); + let nodes_embeddings = folder_resource.as_trait_object_mut().remove_root_nodes()?; + folder_resource + .as_trait_object_mut() + .set_data_tag_index(cloned_tag_index); - // Determine and copy permissions from the parent of the new copied folder - let parent_path = destination_path.parent_path(); - let (read_permission, write_permission) = if parent_path == VRPath::root() { - (ReadPermission::Private, WritePermission::Private) - } else { - let parent_permissions = self - .get_profile_fs_internals_cloned(&writer.profile) - .await? - .permissions_index - .get_path_permission(&parent_path) - .await - .unwrap_or(PathPermission { - read_permission: ReadPermission::Private, - write_permission: WritePermission::Private, - whitelist: HashMap::new(), - }); - (parent_permissions.read_permission, parent_permissions.write_permission) - }; + // We insert the emptied folder resource into the destination path, and copy permissions + self._add_existing_vr_to_core_resource( + &destination_writer, + folder_resource, + embedding, + metadata, + current_datetime, + ) + .await?; + { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + internals + .permissions_index + .copy_path_permission(writer.path.clone(), destination_path.clone()) + .await?; + self._update_fs_internals(writer.profile.clone(), internals).await?; + } - // Set permissions for the new copied folder - { - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - internals - .permissions_index - .insert_path_permission(destination_child_path.clone(), read_permission, write_permission) - .await?; - self._update_fs_internals(writer.profile.clone(), internals).await?; - } + // Determine and copy permissions from the parent of the new copied folder + let parent_path = destination_path.parent_path(); + let (read_permission, write_permission) = if parent_path == VRPath::root() { + (ReadPermission::Private, WritePermission::Private) + } else { + let parent_permissions = self + .get_profile_fs_internals_cloned(&writer.profile) + .await? + .permissions_index + .get_path_permission(&parent_path) + .await + .unwrap_or(PathPermission { + read_permission: ReadPermission::Private, + write_permission: WritePermission::Private, + whitelist: HashMap::new(), + }); + (parent_permissions.read_permission, parent_permissions.write_permission) + }; - // Now we copy each of the folder's original child folders/items (nodes) and add them to their destination path - for (node, _) in nodes_embeddings { - let origin_writer = writer - .new_writer_copied_data(writer.path.push_cloned(node.id.clone()), self) - .await?; - let dest_path = destination_child_path.clone(); - match node.content { - NodeContent::Resource(_) => { - let (batch, _) = self - .internal_wb_copy_folder(&origin_writer, dest_path, write_batch, true) - .await?; - write_batch = batch; - } - NodeContent::VRHeader(_) => { - let (batch, _) = self.wb_copy_item(&origin_writer, dest_path, write_batch).await?; - write_batch = batch; + // Set permissions for the new copied folder + { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + internals + .permissions_index + .insert_path_permission(destination_child_path.clone(), read_permission, write_permission) + .await?; + self._update_fs_internals(writer.profile.clone(), internals).await?; + } + + // Now we copy each of the folder's original child folders/items (nodes) and add them to their destination path + for (node, _) in nodes_embeddings { + let origin_writer = writer + .new_writer_copied_data(writer.path.push_cloned(node.id.clone()), self) + .await?; + let dest_path = destination_child_path.clone(); + match node.content { + NodeContent::Resource(_) => { + let (batch, _) = self + .internal_wb_copy_folder(&origin_writer, dest_path, write_batch, true) + .await?; + write_batch = batch; + } + NodeContent::VRHeader(_) => { + let (batch, _) = self.wb_copy_item(&origin_writer, dest_path, write_batch).await?; + write_batch = batch; + } + _ => continue, } - _ => continue, } - } - // Only commit updating the fs internals once at the top level, efficiency improvement - if !is_recursive_call { - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; - } + // Only commit updating the fs internals once at the top level, efficiency improvement + if !is_recursive_call { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; + } - // Fetch the new FSFolder after everything has been copied over in fs internals - let reader = destination_writer - .new_reader_copied_data(destination_child_path.clone(), self) - .await?; - let fs_entry = self.retrieve_fs_entry(&reader).await?; + // Fetch the new FSFolder after everything has been copied over in fs internals + let reader = destination_writer + .new_reader_copied_data(destination_child_path.clone(), self) + .await?; + let fs_entry = self.retrieve_fs_entry(&reader).await?; - match fs_entry { - FSEntry::Folder(new_folder) => Ok((write_batch, new_folder)), - _ => Err(VectorFSError::PathDoesNotPointAtFolder(destination_child_path)), - } + match fs_entry { + FSEntry::Folder(new_folder) => Ok((write_batch, new_folder)), + _ => Err(VectorFSError::PathDoesNotPointAtFolder(destination_child_path)), + } + }) } /// Deletes the folder at writer's path, including all items and subfolders within. @@ -237,63 +240,64 @@ impl VectorFS { } /// Internal method that deletes the folder at writer's path, including all items and subfolders within, using a write batch. - #[async_recursion::async_recursion] - async fn internal_wb_delete_folder( - &self, - writer: &VFSWriter, + fn internal_wb_delete_folder<'a>( + &'a self, + writer: &'a VFSWriter, mut write_batch: ProfileBoundWriteBatch, is_recursive_call: bool, - ) -> Result { - self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) - .await?; - - // Read the folder node first without removing it - let (folder_node, _) = self._get_node_from_core_resource(writer).await?; - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - let folder = - FSFolder::from_vector_resource_node(folder_node.node, writer.path.clone(), &internals.last_read_index)?; - - // Iterate over items in the folder and delete each - for item in folder.child_items { - let item_writer = VFSWriter { - requester_name: writer.requester_name.clone(), - path: writer.path.push_cloned(item.name.clone()), - profile: writer.profile.clone(), - }; - write_batch = self.wb_delete_item(&item_writer, write_batch).await?; - } - - // Recursively delete subfolders - for subfolder in folder.child_folders { - let folder_writer = VFSWriter { - requester_name: writer.requester_name.clone(), - path: writer.path.push_cloned(subfolder.name.clone()), - profile: writer.profile.clone(), - }; - write_batch = self - .internal_wb_delete_folder(&folder_writer, write_batch, true) + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) .await?; - } - // Now remove the folder node from the core resource and remove its path permissions - let (_removed_folder_node, _) = self._remove_node_from_core_resource(writer).await?; - { + // Read the folder node first without removing it + let (folder_node, _) = self._get_node_from_core_resource(writer).await?; let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - internals - .permissions_index - .remove_path_permission(folder.path.clone()) - .await; - self._update_fs_internals(writer.profile.clone(), internals).await?; - } + let folder = + FSFolder::from_vector_resource_node(folder_node.node, writer.path.clone(), &internals.last_read_index)?; + + // Iterate over items in the folder and delete each + for item in folder.child_items { + let item_writer = VFSWriter { + requester_name: writer.requester_name.clone(), + path: writer.path.push_cloned(item.name.clone()), + profile: writer.profile.clone(), + }; + write_batch = self.wb_delete_item(&item_writer, write_batch).await?; + } - // Only commit updating the fs internals once at the top level, efficiency improvement - // TODO: Efficiency, have each recursive call return the list of folder/item paths to delete in the permissions index, and do it all just once here - if !is_recursive_call { - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; - } + // Recursively delete subfolders + for subfolder in folder.child_folders { + let folder_writer = VFSWriter { + requester_name: writer.requester_name.clone(), + path: writer.path.push_cloned(subfolder.name.clone()), + profile: writer.profile.clone(), + }; + write_batch = self + .internal_wb_delete_folder(&folder_writer, write_batch, true) + .await?; + } - Ok(write_batch) + // Now remove the folder node from the core resource and remove its path permissions + let (_removed_folder_node, _) = self._remove_node_from_core_resource(writer).await?; + { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + internals + .permissions_index + .remove_path_permission(folder.path.clone()) + .await; + self._update_fs_internals(writer.profile.clone(), internals).await?; + } + + // Only commit updating the fs internals once at the top level, efficiency improvement + // TODO: Efficiency, have each recursive call return the list of folder/item paths to delete in the permissions index, and do it all just once here + if !is_recursive_call { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; + } + + Ok(write_batch) + }) } /// Deletes the FSItem at the writer's path. @@ -753,71 +757,72 @@ impl VectorFS { } /// Updates the permissions of a folder, all its subfolders, and subitems recursively. - #[async_recursion::async_recursion] - pub async fn update_permissions_recursively( - &self, - writer: &VFSWriter, + pub fn update_permissions_recursively<'a>( + &'a self, + writer: &'a VFSWriter, read_permission: ReadPermission, write_permission: WritePermission, - ) -> Result<(), VectorFSError> { - // Ensure the path points to a folder before proceeding - self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) - .await?; - - // Retrieve the folder node - let (folder_node, _) = self._get_node_from_core_resource(writer).await?; - let mut folder_resource = folder_node.node.get_vector_resource_content()?.clone(); - - // Update permissions for the current folder - { - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - internals - .permissions_index - .insert_path_permission(writer.path.clone(), read_permission.clone(), write_permission.clone()) + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + // Ensure the path points to a folder before proceeding + self.validate_path_points_to_folder(writer.path.clone(), &writer.profile) .await?; - self._update_fs_internals(writer.profile.clone(), internals).await?; - } - // Recursively update permissions for all child nodes - if let NodeContent::Resource(_) = folder_node.node.content { - let nodes_embeddings = folder_resource.as_trait_object_mut().remove_root_nodes()?; - for (node, _) in nodes_embeddings { - let child_writer = writer - .new_writer_copied_data(writer.path.push_cloned(node.id.clone()), self) + // Retrieve the folder node + let (folder_node, _) = self._get_node_from_core_resource(writer).await?; + let mut folder_resource = folder_node.node.get_vector_resource_content()?.clone(); + + // Update permissions for the current folder + { + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + internals + .permissions_index + .insert_path_permission(writer.path.clone(), read_permission.clone(), write_permission.clone()) .await?; - match node.content { - NodeContent::Resource(_res) => { - self.update_permissions_recursively( - &child_writer, - read_permission.clone(), - write_permission.clone(), - ) + self._update_fs_internals(writer.profile.clone(), internals).await?; + } + + // Recursively update permissions for all child nodes + if let NodeContent::Resource(_) = folder_node.node.content { + let nodes_embeddings = folder_resource.as_trait_object_mut().remove_root_nodes()?; + for (node, _) in nodes_embeddings { + let child_writer = writer + .new_writer_copied_data(writer.path.push_cloned(node.id.clone()), self) .await?; - } - NodeContent::VRHeader(_) => { - let internals = self.get_profile_fs_internals_cloned(&child_writer.profile).await?; - internals - .permissions_index - .insert_path_permission( - child_writer.path.clone(), + match node.content { + NodeContent::Resource(_res) => { + self.update_permissions_recursively( + &child_writer, read_permission.clone(), write_permission.clone(), ) .await?; - self._update_fs_internals(writer.profile.clone(), internals).await?; + } + NodeContent::VRHeader(_) => { + let internals = self.get_profile_fs_internals_cloned(&child_writer.profile).await?; + internals + .permissions_index + .insert_path_permission( + child_writer.path.clone(), + read_permission.clone(), + write_permission.clone(), + ) + .await?; + self._update_fs_internals(writer.profile.clone(), internals).await?; + } + _ => continue, } - _ => continue, } } - } - // Save the FSInternals into the FSDB - let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; - let mut write_batch = writer.new_write_batch()?; - self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; - self.db.write_pb(write_batch)?; + // Save the FSInternals into the FSDB + let internals = self.get_profile_fs_internals_cloned(&writer.profile).await?; + let mut write_batch = writer.new_write_batch()?; + self.db.wb_save_profile_fs_internals(&internals, &mut write_batch)?; + self.db.write_pb(write_batch)?; - Ok(()) + Ok(()) + }) } /// Extracts the VRPack into the VectorFS underneath the folder specified in the writer's path. Uses the VRPack's name diff --git a/shinkai-libs/shinkai-sheet/Cargo.toml b/shinkai-libs/shinkai-sheet/Cargo.toml index 57584be7f..663e0c491 100644 --- a/shinkai-libs/shinkai-sheet/Cargo.toml +++ b/shinkai-libs/shinkai-sheet/Cargo.toml @@ -14,7 +14,6 @@ regex = "1" shinkai_dsl = { path = "../shinkai-dsl" } shinkai_message_primitives = { path = "../shinkai-message-primitives" } async-channel = "1.6.1" -async-recursion = "1.0.5" blake3 = "1.2.0" [dependencies.serde] diff --git a/shinkai-libs/shinkai-sheet/src/sheet.rs b/shinkai-libs/shinkai-sheet/src/sheet.rs index fb27de67d..8747b7e11 100644 --- a/shinkai-libs/shinkai-sheet/src/sheet.rs +++ b/shinkai-libs/shinkai-sheet/src/sheet.rs @@ -1,5 +1,4 @@ use async_channel::Sender; -use async_recursion::async_recursion; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use shinkai_dsl::dsl_schemas::Workflow; @@ -7,7 +6,11 @@ use shinkai_message_primitives::schemas::sheet::{ Cell, CellId, CellStatus, ColumnBehavior, ColumnDefinition, ColumnIndex, ColumnUuid, RowIndex, RowUuid, UuidString, WorkflowSheetJobData, }; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + future::Future, + pin::Pin, +}; use uuid::Uuid; use crate::{cell_name_converter::CellNameConverter, column_dependency_manager::ColumnDependencyManager}; @@ -584,57 +587,120 @@ pub enum SheetAction { } // Implement the reducer function -#[async_recursion] -pub async fn sheet_reducer(mut state: Sheet, action: SheetAction) -> (Sheet, Vec) { - // if std::env::var("LOG_REDUX").is_ok() { - println!(""); - println!("Dispatching action: {:?}", action); - println!("Current state: \n"); - state.print_as_ascii_table(); - // } - - let mut jobs = Vec::new(); - match action { - SheetAction::SetColumn(definition) => { - if let ColumnBehavior::Formula(ref formula) = definition.behavior { - let dependencies = state.parse_formula_dependencies(formula); - for dep in dependencies { - state - .column_dependency_manager - .add_dependency(definition.id.clone(), dep); +pub fn sheet_reducer( + mut state: Sheet, + action: SheetAction, +) -> Pin)> + Send>> { + Box::pin(async move { + // if std::env::var("LOG_REDUX").is_ok() { + println!(""); + println!("Dispatching action: {:?}", action); + println!("Current state: \n"); + state.print_as_ascii_table(); + // } + + let mut jobs = Vec::new(); + match action { + SheetAction::SetColumn(definition) => { + if let ColumnBehavior::Formula(ref formula) = definition.behavior { + let dependencies = state.parse_formula_dependencies(formula); + for dep in dependencies { + state + .column_dependency_manager + .add_dependency(definition.id.clone(), dep); + } } - } - state.columns.insert(definition.clone().id, definition.clone()); + state.columns.insert(definition.clone().id, definition.clone()); - // Collect row UUIDs before mutable borrow - let row_uuids: Vec = state.rows.keys().cloned().collect(); + // Collect row UUIDs before mutable borrow + let row_uuids: Vec = state.rows.keys().cloned().collect(); - // Initialize new column cells with None for all existing rows - for row_uuid in &row_uuids { - if let Some(row) = state.rows.get_mut(row_uuid) { - let status = if let ColumnBehavior::Text = definition.behavior { - CellStatus::Ready - } else { - CellStatus::Pending + // Initialize new column cells with None for all existing rows + for row_uuid in &row_uuids { + if let Some(row) = state.rows.get_mut(row_uuid) { + let status = if let ColumnBehavior::Text = definition.behavior { + CellStatus::Ready + } else { + CellStatus::Pending + }; + + row.insert( + definition.id.clone(), + Cell { + value: None, + last_updated: Utc::now(), + status, + input_hash: None, + }, + ); + } + } + + // Send updates after initializing the cells + if let Some(sender) = &state.update_sender { + for row_uuid in &row_uuids { + if let Some(update_info) = + state.generate_cell_update_info(row_uuid.clone(), definition.id.clone()) + { + let sender_clone = sender.clone(); + tokio::spawn(async move { + if let Err(e) = sender_clone.send(SheetUpdate::CellUpdated(update_info)).await { + eprintln!("Failed to send update: {:?}", e); + } + }); + } + } + } + + // Create jobs for new cells in the added column + for row_uuid in state.rows.keys().cloned().collect::>() { + let value = match &definition.clone().behavior { + ColumnBehavior::Formula(formula) => { + state.evaluate_formula(formula, row_uuid.clone(), definition.clone().id) + } + _ => None, }; - row.insert( - definition.id.clone(), - Cell { - value: None, - last_updated: Utc::now(), - status, - input_hash: None, - }, - ); + if let Some(value) = value { + let (new_state, mut new_jobs) = sheet_reducer( + state, + SheetAction::SetCellValue { + row: row_uuid.clone(), + col: definition.clone().id, + value, + input_hash: None, + }, + ) + .await; + state = new_state; + jobs.append(&mut new_jobs); + } } } + SheetAction::SetCellValue { + row, + col, + value, + input_hash, + } => { + if !state.columns.contains_key(&col) { + return (state, jobs); // Column index out of bounds + } - // Send updates after initializing the cells - if let Some(sender) = &state.update_sender { - for row_uuid in &row_uuids { - if let Some(update_info) = state.generate_cell_update_info(row_uuid.clone(), definition.id.clone()) - { + let row_cells = state.rows.entry(row.clone()).or_default(); + row_cells.insert( + col.clone(), + Cell { + value: Some(value), + last_updated: Utc::now(), + status: CellStatus::Ready, + input_hash, + }, + ); + + // Send update after setting the cell value + if let Some(sender) = &state.update_sender { + if let Some(update_info) = state.generate_cell_update_info(row.clone(), col.clone()) { let sender_clone = sender.clone(); tokio::spawn(async move { if let Err(e) = sender_clone.send(SheetUpdate::CellUpdated(update_info)).await { @@ -643,89 +709,42 @@ pub async fn sheet_reducer(mut state: Sheet, action: SheetAction) -> (Sheet, Vec }); } } - } - // Create jobs for new cells in the added column - for row_uuid in state.rows.keys().cloned().collect::>() { - let value = match &definition.clone().behavior { - ColumnBehavior::Formula(formula) => { - state.evaluate_formula(formula, row_uuid.clone(), definition.clone().id) - } - _ => None, - }; - - if let Some(value) = value { - let (new_state, mut new_jobs) = sheet_reducer( - state, - SheetAction::SetCellValue { - row: row_uuid.clone(), - col: definition.clone().id, - value, - input_hash: None, - }, - ) - .await; - state = new_state; - jobs.append(&mut new_jobs); - } - } - } - SheetAction::SetCellValue { - row, - col, - value, - input_hash, - } => { - if !state.columns.contains_key(&col) { - return (state, jobs); // Column index out of bounds + // Trigger updates for cells dependent on the updated cell + let changed_cell_id = CellId(format!("{}:{}", row, col)); + let (new_state, mut new_jobs) = sheet_reducer( + state, + SheetAction::PropagateUpdateToDependents { + changed_cell_id, + visited: HashSet::new(), + depth: 0, + }, + ) + .await; + state = new_state; + jobs.append(&mut new_jobs); } - - let row_cells = state.rows.entry(row.clone()).or_default(); - row_cells.insert( - col.clone(), - Cell { - value: Some(value), - last_updated: Utc::now(), - status: CellStatus::Ready, - input_hash, - }, - ); - - // Send update after setting the cell value - if let Some(sender) = &state.update_sender { - if let Some(update_info) = state.generate_cell_update_info(row.clone(), col.clone()) { - let sender_clone = sender.clone(); - tokio::spawn(async move { - if let Err(e) = sender_clone.send(SheetUpdate::CellUpdated(update_info)).await { - eprintln!("Failed to send update: {:?}", e); - } - }); + SheetAction::SetCellPending { row, col } => { + if !state.columns.contains_key(&col) { + return (state, jobs); // Column index out of bounds } - } - - // Trigger updates for cells dependent on the updated cell - let changed_cell_id = CellId(format!("{}:{}", row, col)); - let (new_state, mut new_jobs) = sheet_reducer( - state, - SheetAction::PropagateUpdateToDependents { - changed_cell_id, - visited: HashSet::new(), - depth: 0, - }, - ) - .await; - state = new_state; - jobs.append(&mut new_jobs); - } - SheetAction::SetCellPending { row, col } => { - if !state.columns.contains_key(&col) { - return (state, jobs); // Column index out of bounds - } - if let Some(row_cells) = state.rows.get_mut(&row) { - if let Some(cell) = row_cells.get_mut(&col) { - cell.status = CellStatus::Pending; + if let Some(row_cells) = state.rows.get_mut(&row) { + if let Some(cell) = row_cells.get_mut(&col) { + cell.status = CellStatus::Pending; + } else { + row_cells.insert( + col.clone(), + Cell { + value: None, + last_updated: Utc::now(), + status: CellStatus::Pending, + input_hash: None, + }, + ); + } } else { + let mut row_cells = HashMap::new(); row_cells.insert( col.clone(), Cell { @@ -735,294 +754,283 @@ pub async fn sheet_reducer(mut state: Sheet, action: SheetAction) -> (Sheet, Vec input_hash: None, }, ); + state.rows.insert(row.clone(), row_cells); } - } else { - let mut row_cells = HashMap::new(); - row_cells.insert( - col.clone(), - Cell { - value: None, - last_updated: Utc::now(), - status: CellStatus::Pending, - input_hash: None, - }, - ); - state.rows.insert(row.clone(), row_cells); - } - if let Some(sender) = &state.update_sender { - if let Some(update_info) = state.generate_cell_update_info(row.clone(), col.clone()) { - let sender_clone = sender.clone(); - tokio::spawn(async move { - if let Err(e) = sender_clone.send(SheetUpdate::CellUpdated(update_info)).await { - eprintln!("Failed to send update: {:?}", e); - } - }); + if let Some(sender) = &state.update_sender { + if let Some(update_info) = state.generate_cell_update_info(row.clone(), col.clone()) { + let sender_clone = sender.clone(); + tokio::spawn(async move { + if let Err(e) = sender_clone.send(SheetUpdate::CellUpdated(update_info)).await { + eprintln!("Failed to send update: {:?}", e); + } + }); + } } } - } - SheetAction::PropagateUpdateToDependents { - changed_cell_id, - mut visited, - depth, - } => { - if depth >= MAX_DEPENDENCY_DEPTH { - eprintln!("Maximum dependency depth reached. Possible circular dependency detected."); - return (state, jobs); - } + SheetAction::PropagateUpdateToDependents { + changed_cell_id, + mut visited, + depth, + } => { + if depth >= MAX_DEPENDENCY_DEPTH { + eprintln!("Maximum dependency depth reached. Possible circular dependency detected."); + return (state, jobs); + } - let (row, col) = state.cell_id_to_indices(&changed_cell_id); - eprintln!("TriggerUpdateEvent: {:?}", changed_cell_id); + let (row, col) = state.cell_id_to_indices(&changed_cell_id); + eprintln!("TriggerUpdateEvent: {:?}", changed_cell_id); - if !visited.insert((row.clone(), col.clone())) { - eprintln!("Circular dependency detected at cell ({}, {})", row, col); - return (state, jobs); - } - - let reverse_dependents = state.column_dependency_manager.get_reverse_dependents(col.clone()); - eprintln!("Col: {:?} Dependents: {:?}", col, reverse_dependents); - for reverse_dependent_col in reverse_dependents { - if let Some(column_definition) = state.columns.get(&reverse_dependent_col).cloned() { - match &column_definition.behavior { - ColumnBehavior::Formula(formula) => { - if let Some(value) = - state.evaluate_formula(formula, row.clone(), reverse_dependent_col.clone()) - { - let (new_state, mut new_jobs) = sheet_reducer( - state, - SheetAction::SetCellValue { - row: row.clone(), - col: reverse_dependent_col.clone(), - value, - input_hash: None, - }, - ) - .await; - state = new_state; - jobs.append(&mut new_jobs); + if !visited.insert((row.clone(), col.clone())) { + eprintln!("Circular dependency detected at cell ({}, {})", row, col); + return (state, jobs); + } - eprintln!("row: {:?}, dep_col: {:?}, col: {:?}", row, reverse_dependent_col, col); - let new_cell_id = CellId(format!("{}:{}", row, reverse_dependent_col)); - eprintln!("TriggerUpdateEvent newcellid: {:?}", new_cell_id); - if changed_cell_id != new_cell_id { + let reverse_dependents = state.column_dependency_manager.get_reverse_dependents(col.clone()); + eprintln!("Col: {:?} Dependents: {:?}", col, reverse_dependents); + for reverse_dependent_col in reverse_dependents { + if let Some(column_definition) = state.columns.get(&reverse_dependent_col).cloned() { + match &column_definition.behavior { + ColumnBehavior::Formula(formula) => { + if let Some(value) = + state.evaluate_formula(formula, row.clone(), reverse_dependent_col.clone()) + { let (new_state, mut new_jobs) = sheet_reducer( state, - SheetAction::PropagateUpdateToDependents { - changed_cell_id: new_cell_id, - visited: visited.clone(), - depth: depth + 1, + SheetAction::SetCellValue { + row: row.clone(), + col: reverse_dependent_col.clone(), + value, + input_hash: None, }, ) .await; state = new_state; jobs.append(&mut new_jobs); + + eprintln!("row: {:?}, dep_col: {:?}, col: {:?}", row, reverse_dependent_col, col); + let new_cell_id = CellId(format!("{}:{}", row, reverse_dependent_col)); + eprintln!("TriggerUpdateEvent newcellid: {:?}", new_cell_id); + if changed_cell_id != new_cell_id { + let (new_state, mut new_jobs) = sheet_reducer( + state, + SheetAction::PropagateUpdateToDependents { + changed_cell_id: new_cell_id, + visited: visited.clone(), + depth: depth + 1, + }, + ) + .await; + state = new_state; + jobs.append(&mut new_jobs); + } } } - } - ColumnBehavior::LLMCall { - input: _, // used under the hood with get_input_cells_for_column - workflow, - workflow_name, - llm_provider_name, - input_hash, - } => { - // Check if input_hash is present and matches the blake3 hash of the current input cells values - let input_cells = - state.get_input_cells_for_column(row.clone(), reverse_dependent_col.clone()); - let workflow_job_data = WorkflowSheetJobData { - sheet_id: state.uuid.clone(), - row: row.clone(), - col: reverse_dependent_col.clone(), - col_definition: column_definition.clone(), - workflow: workflow.clone(), - workflow_name: workflow_name.clone(), - input_cells, - llm_provider_name: llm_provider_name.clone(), - }; - - // Update the cell status to Pending - let (new_state, mut new_jobs) = sheet_reducer( - state, - SheetAction::SetCellPending { + ColumnBehavior::LLMCall { + input: _, // used under the hood with get_input_cells_for_column + workflow, + workflow_name, + llm_provider_name, + input_hash: _, + } => { + // Check if input_hash is present and matches the blake3 hash of the current input cells values + let input_cells = + state.get_input_cells_for_column(row.clone(), reverse_dependent_col.clone()); + let workflow_job_data = WorkflowSheetJobData { + sheet_id: state.uuid.clone(), row: row.clone(), col: reverse_dependent_col.clone(), - }, - ) - .await; - state = new_state; - jobs.append(&mut new_jobs); - jobs.push(workflow_job_data); - } - _ => {} - } - } - } - } - SheetAction::RemoveColumn(col_uuid) => { - // Get dependents before removing the column - let dependents = state.column_dependency_manager.get_reverse_dependents(col_uuid.clone()); - - // Remove the column - state.columns.remove(&col_uuid); - for row in state.rows.values_mut() { - row.remove(&col_uuid); - } - state.column_dependency_manager.remove_column(col_uuid.clone()); - - // Remove the column from display_columns - state.display_columns.retain(|uuid| uuid != &col_uuid); - - // Trigger updates for columns dependent on the removed column - for dependent_col in dependents { - for row_uuid in state.rows.keys().cloned().collect::>() { - if let Some(column_definition) = state.columns.get(&dependent_col).cloned() { - if let ColumnBehavior::Formula(formula) = &column_definition.behavior { - if let Some(value) = - state.evaluate_formula(formula, row_uuid.clone(), dependent_col.clone()) - { + col_definition: column_definition.clone(), + workflow: workflow.clone(), + workflow_name: workflow_name.clone(), + input_cells, + llm_provider_name: llm_provider_name.clone(), + }; + + // Update the cell status to Pending let (new_state, mut new_jobs) = sheet_reducer( state, - SheetAction::SetCellValue { - row: row_uuid.clone(), - col: dependent_col.clone(), - value, - input_hash: None, + SheetAction::SetCellPending { + row: row.clone(), + col: reverse_dependent_col.clone(), }, ) .await; state = new_state; jobs.append(&mut new_jobs); + jobs.push(workflow_job_data); } + _ => {} } } } } - } - SheetAction::TriggerUpdateColumnValues(col_uuid) => { - let row_uuids: Vec<_> = state.rows.keys().cloned().collect(); - for row_uuid in row_uuids { - if let Some(column_definition) = state.columns.get(&col_uuid).cloned() { - if let ColumnBehavior::LLMCall { - input, // used under the hood with get_input_cells_for_column - workflow, - workflow_name, - llm_provider_name, - input_hash: _, - } = &column_definition.behavior - { - let dependencies = state.parse_formula_dependencies(input); - let all_dependencies_met = dependencies.iter().all(|dep_col| { - let cell_value = state.get_cell_value(row_uuid.clone(), dep_col.clone()); - cell_value.as_ref().map_or(false, |v| !v.is_empty()) - }); - - if all_dependencies_met { - let input_cells = state.get_input_cells_for_column(row_uuid.clone(), col_uuid.clone()); - let workflow_job_data = WorkflowSheetJobData { - sheet_id: state.uuid.clone(), - row: row_uuid.clone(), - col: col_uuid.clone(), - col_definition: column_definition.clone(), - workflow: workflow.clone(), - workflow_name: workflow_name.clone(), - input_cells, - llm_provider_name: llm_provider_name.clone(), - }; - - // Update the cell status to Pending - if let Some(row_cells) = state.rows.get_mut(&row_uuid) { - if let Some(cell) = row_cells.get_mut(&col_uuid) { - cell.status = CellStatus::Pending; + SheetAction::RemoveColumn(col_uuid) => { + // Get dependents before removing the column + let dependents = state.column_dependency_manager.get_reverse_dependents(col_uuid.clone()); + + // Remove the column + state.columns.remove(&col_uuid); + for row in state.rows.values_mut() { + row.remove(&col_uuid); + } + state.column_dependency_manager.remove_column(col_uuid.clone()); + + // Remove the column from display_columns + state.display_columns.retain(|uuid| uuid != &col_uuid); + + // Trigger updates for columns dependent on the removed column + for dependent_col in dependents { + for row_uuid in state.rows.keys().cloned().collect::>() { + if let Some(column_definition) = state.columns.get(&dependent_col).cloned() { + if let ColumnBehavior::Formula(formula) = &column_definition.behavior { + if let Some(value) = + state.evaluate_formula(formula, row_uuid.clone(), dependent_col.clone()) + { + let (new_state, mut new_jobs) = sheet_reducer( + state, + SheetAction::SetCellValue { + row: row_uuid.clone(), + col: dependent_col.clone(), + value, + input_hash: None, + }, + ) + .await; + state = new_state; + jobs.append(&mut new_jobs); } } + } + } + } + } + SheetAction::TriggerUpdateColumnValues(col_uuid) => { + let row_uuids: Vec<_> = state.rows.keys().cloned().collect(); + for row_uuid in row_uuids { + if let Some(column_definition) = state.columns.get(&col_uuid).cloned() { + if let ColumnBehavior::LLMCall { + input, // used under the hood with get_input_cells_for_column + workflow, + workflow_name, + llm_provider_name, + input_hash: _, + } = &column_definition.behavior + { + let dependencies = state.parse_formula_dependencies(input); + let all_dependencies_met = dependencies.iter().all(|dep_col| { + let cell_value = state.get_cell_value(row_uuid.clone(), dep_col.clone()); + cell_value.as_ref().map_or(false, |v| !v.is_empty()) + }); + + if all_dependencies_met { + let input_cells = state.get_input_cells_for_column(row_uuid.clone(), col_uuid.clone()); + let workflow_job_data = WorkflowSheetJobData { + sheet_id: state.uuid.clone(), + row: row_uuid.clone(), + col: col_uuid.clone(), + col_definition: column_definition.clone(), + workflow: workflow.clone(), + workflow_name: workflow_name.clone(), + input_cells, + llm_provider_name: llm_provider_name.clone(), + }; + + // Update the cell status to Pending + if let Some(row_cells) = state.rows.get_mut(&row_uuid) { + if let Some(cell) = row_cells.get_mut(&col_uuid) { + cell.status = CellStatus::Pending; + } + } - jobs.push(workflow_job_data); + jobs.push(workflow_job_data); + } } } } } - } - SheetAction::RemoveRow(row_uuid) => { - state.rows.remove(&row_uuid); - state.display_rows.retain(|uuid| uuid != &row_uuid); - // Optionally, you can add logic to handle dependencies or other side effects - } - SheetAction::AddRow(row_uuid) => { - eprintln!("SheetAction::AddRow: {:?}", row_uuid); - if state.rows.contains_key(&row_uuid) { - return (state, jobs); // Row already exists, return current state + SheetAction::RemoveRow(row_uuid) => { + state.rows.remove(&row_uuid); + state.display_rows.retain(|uuid| uuid != &row_uuid); + // Optionally, you can add logic to handle dependencies or other side effects } + SheetAction::AddRow(row_uuid) => { + eprintln!("SheetAction::AddRow: {:?}", row_uuid); + if state.rows.contains_key(&row_uuid) { + return (state, jobs); // Row already exists, return current state + } - let mut row_cells = HashMap::new(); - for (col_uuid, col_def) in &state.columns { - if let ColumnBehavior::Text = col_def.behavior { - row_cells.insert( - col_uuid.clone(), - Cell { - value: Some("".to_string()), // Default empty value for text columns - last_updated: Utc::now(), - status: CellStatus::Ready, - input_hash: None, - }, - ); - } else { - // Check the states of dependent cells to determine the status of the new cell - let status = if let ColumnBehavior::Formula(formula) - | ColumnBehavior::LLMCall { input: formula, .. } = &col_def.behavior - { - let dependencies = state.parse_formula_dependencies(formula); - let any_dependency_missing = dependencies - .iter() - .any(|dep_col| state.get_cell_value(row_uuid.clone(), dep_col.clone()).is_none()); - if any_dependency_missing { - CellStatus::Waiting + let mut row_cells = HashMap::new(); + for (col_uuid, col_def) in &state.columns { + if let ColumnBehavior::Text = col_def.behavior { + row_cells.insert( + col_uuid.clone(), + Cell { + value: Some("".to_string()), // Default empty value for text columns + last_updated: Utc::now(), + status: CellStatus::Ready, + input_hash: None, + }, + ); + } else { + // Check the states of dependent cells to determine the status of the new cell + let status = if let ColumnBehavior::Formula(formula) + | ColumnBehavior::LLMCall { input: formula, .. } = &col_def.behavior + { + let dependencies = state.parse_formula_dependencies(formula); + let any_dependency_missing = dependencies + .iter() + .any(|dep_col| state.get_cell_value(row_uuid.clone(), dep_col.clone()).is_none()); + if any_dependency_missing { + CellStatus::Waiting + } else { + CellStatus::Pending + } } else { CellStatus::Pending - } - } else { - CellStatus::Pending - }; - - row_cells.insert( - col_uuid.clone(), - Cell { - value: None, - last_updated: Utc::now(), - status, - input_hash: None, - }, - ); - } - } - state.rows.insert(row_uuid.clone(), row_cells); - state.display_rows.push(row_uuid.clone()); - - // Collect update events for non-text columns - let mut update_events = Vec::new(); - for (col_uuid, col_def) in &state.columns { - if let ColumnBehavior::Text = col_def.behavior { - continue; // Skip text columns + }; + + row_cells.insert( + col_uuid.clone(), + Cell { + value: None, + last_updated: Utc::now(), + status, + input_hash: None, + }, + ); + } } + state.rows.insert(row_uuid.clone(), row_cells); + state.display_rows.push(row_uuid.clone()); + + // Collect update events for non-text columns + let mut update_events = Vec::new(); + for (col_uuid, col_def) in &state.columns { + if let ColumnBehavior::Text = col_def.behavior { + continue; // Skip text columns + } - let changed_cell_id = CellId(format!("{}:{}", row_uuid, col_uuid)); - eprintln!("\nAddRow TriggerUpdateEvent: {:?}", changed_cell_id); - update_events.push(SheetAction::PropagateUpdateToDependents { - changed_cell_id, - visited: HashSet::new(), - depth: 0, - }); - } + let changed_cell_id = CellId(format!("{}:{}", row_uuid, col_uuid)); + eprintln!("\nAddRow TriggerUpdateEvent: {:?}", changed_cell_id); + update_events.push(SheetAction::PropagateUpdateToDependents { + changed_cell_id, + visited: HashSet::new(), + depth: 0, + }); + } - // Apply update events - for event in update_events { - let (new_state, mut new_jobs) = sheet_reducer(state.clone(), event).await; - eprintln!("update_events New state: {:?}", new_state); - state = new_state; - jobs.append(&mut new_jobs); + // Apply update events + for event in update_events { + let (new_state, mut new_jobs) = sheet_reducer(state.clone(), event).await; + eprintln!("update_events New state: {:?}", new_state); + state = new_state; + jobs.append(&mut new_jobs); + } } } - } - println!("After state: \n"); - state.print_as_ascii_table(); - (state, jobs) + println!("After state: \n"); + state.print_as_ascii_table(); + (state, jobs) + }) } diff --git a/shinkai-libs/shinkai-vector-resources/Cargo.toml b/shinkai-libs/shinkai-vector-resources/Cargo.toml index 32992ebcd..e5b767385 100644 --- a/shinkai-libs/shinkai-vector-resources/Cargo.toml +++ b/shinkai-libs/shinkai-vector-resources/Cargo.toml @@ -15,7 +15,6 @@ ordered-float = "3.7.0" blake3 = "1.5.0" keyphrases = "0.3.2" async-trait = "0.1.74" -async-recursion = "1.0.5" scraper = "0.19.0" chrono = { version = "0.4", features = ["serde"] } chrono-tz = "0.5" diff --git a/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser.rs b/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser.rs index 02b1220df..3016f83da 100644 --- a/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser.rs +++ b/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser.rs @@ -11,7 +11,7 @@ use crate::source::TextChunkingStrategy; use crate::source::VRSourceReference; use crate::vector_resource::{BaseVectorResource, DocumentVectorResource, VectorResourceCore}; #[cfg(feature = "desktop-only")] -use async_recursion::async_recursion; +use std::{future::Future, pin::Pin}; #[cfg(feature = "desktop-only")] #[derive(Clone)] @@ -207,7 +207,7 @@ impl ShinkaiFileParser { distribution_info: DistributionInfo, ) -> Result { let new_text_groups = ShinkaiFileParser::generate_text_group_embeddings( - &text_groups, + text_groups, generator.box_clone(), 31, max_node_text_size, @@ -269,72 +269,73 @@ impl ShinkaiFileParser { Ok(resource) } - #[async_recursion] #[cfg(feature = "desktop-only")] /// Recursively processes all text groups & their sub groups into DocumentResources. /// This method assumes your text groups already have embeddings generated for them. - async fn process_new_doc_resource_with_embeddings_already_generated( + fn process_new_doc_resource_with_embeddings_already_generated<'a>( text_groups: Vec, - generator: &dyn EmbeddingGenerator, - name: &str, + generator: &'a dyn EmbeddingGenerator, + name: &'a str, desc: Option, source: VRSourceReference, - parsing_tags: &Vec, + parsing_tags: &'a Vec, resource_embedding: Option, - ) -> Result { - let name = ShinkaiFileParser::clean_name(name); - let max_embedding_token_count = generator.model_type().max_input_token_count(); - let resource_desc = Self::_setup_resource_description( - desc, - &text_groups, - max_embedding_token_count, - max_embedding_token_count.checked_div(2).unwrap_or(100), - ); - let mut doc = DocumentVectorResource::new_empty(&name, resource_desc.as_deref(), source.clone(), true); - doc.set_embedding_model_used(generator.model_type()); + ) -> Pin> + Send + 'a>> { + Box::pin(async move { + let name = ShinkaiFileParser::clean_name(name); + let max_embedding_token_count = generator.model_type().max_input_token_count(); + let resource_desc = Self::_setup_resource_description( + desc, + &text_groups, + max_embedding_token_count, + max_embedding_token_count.checked_div(2).unwrap_or(100), + ); + let mut doc = DocumentVectorResource::new_empty(&name, resource_desc.as_deref(), source.clone(), true); + doc.set_embedding_model_used(generator.model_type()); - // Sets the keywords - let keywords = Self::extract_keywords(&text_groups, 25); - doc.keywords_mut().set_keywords(keywords.clone()); - doc.keywords_mut().update_keywords_embedding(generator).await?; - // Sets a Resource Embedding if none provided. Primarily only used at the root level as the rest should already have them. - match resource_embedding { - Some(embedding) => doc.set_resource_embedding(embedding), - None => { - doc.update_resource_embedding(generator, None).await?; + // Sets the keywords + let keywords = Self::extract_keywords(&text_groups, 25); + doc.keywords_mut().set_keywords(keywords.clone()); + doc.keywords_mut().update_keywords_embedding(generator).await?; + // Sets a Resource Embedding if none provided. Primarily only used at the root level as the rest should already have them. + match resource_embedding { + Some(embedding) => doc.set_resource_embedding(embedding), + None => { + doc.update_resource_embedding(generator, None).await?; + } } - } - // Add each text group as either Vector Resource Nodes, - // or data-holding Nodes depending on if each has any sub-groups - for grouped_text in &text_groups { - let (_, metadata, has_sub_groups, new_name) = Self::process_grouped_text(grouped_text); - if has_sub_groups { - let new_doc = Self::process_new_doc_resource_with_embeddings_already_generated( - grouped_text.sub_groups.clone(), - generator, - &new_name, - None, - source.clone(), - parsing_tags, - grouped_text.embedding.clone(), - ) - .await?; - doc.append_vector_resource_node_auto(new_doc, metadata)?; - } else { - if grouped_text.text.len() <= 2 { - continue; - } - if let Some(embedding) = &grouped_text.embedding { - doc.append_text_node(&grouped_text.text, metadata, embedding.clone(), parsing_tags)?; + // Add each text group as either Vector Resource Nodes, + // or data-holding Nodes depending on if each has any sub-groups + for grouped_text in &text_groups { + let (_, metadata, has_sub_groups, new_name) = Self::process_grouped_text(grouped_text); + if has_sub_groups { + let new_doc = Self::process_new_doc_resource_with_embeddings_already_generated( + grouped_text.sub_groups.clone(), + generator, + &new_name, + None, + source.clone(), + parsing_tags, + grouped_text.embedding.clone(), + ) + .await?; + doc.append_vector_resource_node_auto(new_doc, metadata)?; } else { - let embedding = generator.generate_embedding_default(&grouped_text.text).await?; - doc.append_text_node(&grouped_text.text, metadata, embedding, parsing_tags)?; + if grouped_text.text.len() <= 2 { + continue; + } + if let Some(embedding) = &grouped_text.embedding { + doc.append_text_node(&grouped_text.text, metadata, embedding.clone(), parsing_tags)?; + } else { + let embedding = generator.generate_embedding_default(&grouped_text.text).await?; + doc.append_text_node(&grouped_text.text, metadata, embedding, parsing_tags)?; + } } } - } - Ok(BaseVectorResource::Document(doc)) + Ok(BaseVectorResource::Document(doc)) + }) } #[cfg(feature = "desktop-only")] diff --git a/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser_grouping.rs b/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser_grouping.rs index b5d55085c..a7ce5547e 100644 --- a/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser_grouping.rs +++ b/shinkai-libs/shinkai-vector-resources/src/file_parser/file_parser_grouping.rs @@ -3,11 +3,11 @@ use super::file_parser_types::TextGroup; use crate::embedding_generator::EmbeddingGenerator; use crate::embeddings::Embedding; use crate::resource_errors::VRError; -#[cfg(feature = "desktop-only")] -use async_recursion::async_recursion; use keyphrases::KeyPhraseExtractor; use regex::Regex; use std::collections::HashMap; +#[cfg(feature = "desktop-only")] +use std::{future::Future, pin::Pin}; impl ShinkaiFileParser { /// Recursive function to collect all texts from the text groups and their subgroups @@ -59,75 +59,77 @@ impl ShinkaiFileParser { } #[cfg(feature = "desktop-only")] - #[async_recursion] /// Recursively goes through all of the text groups and batch generates embeddings /// for all of them in parallel, processing up to 10 futures at a time. - pub async fn generate_text_group_embeddings( - text_groups: &Vec, + pub fn generate_text_group_embeddings( + text_groups: Vec, generator: Box, mut max_batch_size: u64, max_node_text_size: u64, collect_texts_and_indices: fn(&[TextGroup], u64, Vec) -> (Vec, Vec<(Vec, usize)>), - ) -> Result, VRError> { - // Clone the input text_groups - let mut text_groups = text_groups.clone(); + ) -> Pin, VRError>> + Send>> { + Box::pin(async move { + // Clone the input text_groups - // Collect all texts from the text groups and their subgroups - let (texts, indices) = collect_texts_and_indices(&text_groups, max_node_text_size, vec![]); + let mut text_groups = text_groups; - // Generate embeddings for all texts in batches - let ids: Vec = vec!["".to_string(); texts.len()]; - let mut all_futures = Vec::new(); - let mut current_batch_futures = Vec::new(); - - for (index, batch) in texts.chunks(max_batch_size as usize).enumerate() { - let batch_texts = batch.to_vec(); - let batch_ids = ids[..batch.len()].to_vec(); - let generator_clone = generator.box_clone(); // Clone the generator for use in the future. - - // Use the `move` keyword to take ownership of `generator_clone` inside the async block. - let future = async move { generator_clone.generate_embeddings(&batch_texts, &batch_ids).await }; - current_batch_futures.push(future); - - // If we've collected 10 futures or are at the last batch, add them to all_futures and start a new vector - if current_batch_futures.len() == 10 || index == texts.chunks(max_batch_size as usize).count() - 1 { - all_futures.push(current_batch_futures); - current_batch_futures = Vec::new(); + // Collect all texts from the text groups and their subgroups + let (texts, indices) = collect_texts_and_indices(&text_groups, max_node_text_size, vec![]); + + // Generate embeddings for all texts in batches + let ids: Vec = vec!["".to_string(); texts.len()]; + let mut all_futures = Vec::new(); + let mut current_batch_futures = Vec::new(); + + for (index, batch) in texts.chunks(max_batch_size as usize).enumerate() { + let batch_texts = batch.to_vec(); + let batch_ids = ids[..batch.len()].to_vec(); + let generator_clone = generator.box_clone(); // Clone the generator for use in the future. + + // Use the `move` keyword to take ownership of `generator_clone` inside the async block. + let future = async move { generator_clone.generate_embeddings(&batch_texts, &batch_ids).await }; + current_batch_futures.push(future); + + // If we've collected 10 futures or are at the last batch, add them to all_futures and start a new vector + if current_batch_futures.len() == 10 || index == texts.chunks(max_batch_size as usize).count() - 1 { + all_futures.push(current_batch_futures); + current_batch_futures = Vec::new(); + } } - } - // Process each group of up to 10 futures in sequence - let mut embeddings = Vec::new(); - for futures_group in all_futures { - let results = futures::future::join_all(futures_group).await; - for result in results { - match result { - Ok(batch_embeddings) => { - embeddings.extend(batch_embeddings); - } - Err(e) => { - if max_batch_size > 5 { - max_batch_size -= 5; - return Self::generate_text_group_embeddings( - &text_groups, - generator, - max_batch_size, - max_node_text_size, - collect_texts_and_indices, - ) - .await; - } else { - return Err(e); + // Process each group of up to 10 futures in sequence + let mut embeddings = Vec::new(); + for futures_group in all_futures { + let results = futures::future::join_all(futures_group).await; + for result in results { + match result { + Ok(batch_embeddings) => { + embeddings.extend(batch_embeddings); + } + Err(e) => { + if max_batch_size > 5 { + max_batch_size -= 5; + return Self::generate_text_group_embeddings( + text_groups, + generator, + max_batch_size, + max_node_text_size, + collect_texts_and_indices, + ) + .await; + } else { + return Err(e); + } } } } } - } - // Assign the generated embeddings back to the text groups and their subgroups - Self::assign_embeddings(&mut text_groups, &mut embeddings, &indices); + // Assign the generated embeddings back to the text groups and their subgroups + Self::assign_embeddings(&mut text_groups, &mut embeddings, &indices); - Ok(text_groups) + Ok(text_groups) + }) } #[cfg(feature = "desktop-only")]