Skip to content

Commit

Permalink
Merge pull request #539 from dcSpark/bence/drop-async-recursion
Browse files Browse the repository at this point in the history
Drop async-recursion
  • Loading branch information
nicarq authored Sep 10, 2024
2 parents cdcb8d1 + ce319dd commit 3e0bf84
Show file tree
Hide file tree
Showing 12 changed files with 865 additions and 852 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion shinkai-bin/shinkai-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ urlencoding = "2.1.0"
hex = "=0.4.3"
aes-gcm = "0.10.3"
blake3 = "1.2.0"
async-recursion = "1.0.5"
cron-parser = "0.8.1"
thiserror = "1.0.50"
base64 = "0.13.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::network::agent_payments_manager::my_agent_offerings_manager::MyAgentO
use crate::network::ws_manager::WSUpdateHandler;
use crate::tools::tool_router::ToolRouter;
use crate::vector_fs::vector_fs::VectorFS;
use async_recursion::async_recursion;
use async_trait::async_trait;
use shinkai_message_primitives::schemas::inbox_name::InboxName;
use shinkai_message_primitives::schemas::llm_providers::serialized_llm_provider::{
Expand Down Expand Up @@ -95,7 +94,6 @@ impl GenericInferenceChain {
}
}

#[async_recursion]
#[allow(clippy::too_many_arguments)]
pub async fn start_chain(
db: Arc<ShinkaiDB>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_recursion::async_recursion;
use shinkai_message_primitives::schemas::{
inbox_name::InboxName, llm_providers::serialized_llm_provider::SerializedLLMProvider, shinkai_name::ShinkaiName,
};
Expand Down Expand Up @@ -30,7 +29,6 @@ pub struct CronExecutionState {
}

impl JobManager {
#[async_recursion]
pub async fn image_analysis_chain(
_db: Arc<ShinkaiDB>,
full_job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
// - IPFS
// - Arcweave

use std::env;
use std::{env, future::Future, pin::Pin};

use aws_config::timeout::TimeoutConfig;
use aws_sdk_s3::presigning::PresigningConfig;
use aws_sdk_s3::types::EncodingType;
use aws_sdk_s3::{Client as S3Client};
use aws_sdk_s3::Client as S3Client;
use reqwest::{Client, Error as ReqwestError};
use serde::{Deserialize, Serialize};

use async_recursion::async_recursion;
use aws_types::region::Region;
use serde_json::{json, Error as JsonError, Value};
use shinkai_message_primitives::shinkai_message::shinkai_message_schemas::{
Expand Down Expand Up @@ -346,96 +345,96 @@ pub async fn download_file_http(
}
}

#[async_recursion]
pub async fn list_folder_contents(
destination: &FileDestination,
folder_path: &str,
) -> Result<Vec<FileDestinationPath>, FileTransferError> {
let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path);
match destination {
FileDestination::S3(client, credentials) | FileDestination::R2(client, credentials) => {
let mut folder_contents = Vec::new();
let mut continuation_token: Option<String> = None;

loop {
let mut request_builder = client
.list_objects_v2()
.bucket(credentials.bucket.clone())
.prefix(folder_path)
.delimiter("/")
.encoding_type(EncodingType::Url);

if let Some(token) = &continuation_token {
request_builder = request_builder.continuation_token(token);
}

let response = request_builder.send().await.map_err(|sdk_error| {
FileTransferError::Other(format!("Failed to list folder contents: {:?}", sdk_error))
})?;

// Handle files and directories
if let Some(contents) = response.clone().contents {
for object in contents {
if let Some(key) = object.key {
let decoded_key = decode(&key).unwrap_or_default().to_string();
let is_folder = decoded_key.ends_with('/');
let clean_path = if is_folder {
decoded_key.trim_end_matches('/')
} else {
&decoded_key
};
folder_contents.push(FileDestinationPath {
path: clean_path.to_string(),
is_folder,
});
}
pub fn list_folder_contents<'a>(
destination: &'a FileDestination,
folder_path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<Vec<FileDestinationPath>, FileTransferError>> + Send + 'a>> {
Box::pin(async move {
let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path);
match destination {
FileDestination::S3(client, credentials) | FileDestination::R2(client, credentials) => {
let mut folder_contents = Vec::new();
let mut continuation_token: Option<String> = None;

loop {
let mut request_builder = client
.list_objects_v2()
.bucket(credentials.bucket.clone())
.prefix(folder_path)
.delimiter("/")
.encoding_type(EncodingType::Url);

if let Some(token) = &continuation_token {
request_builder = request_builder.continuation_token(token);
}
}

// Handle common prefixes (subdirectories)
if let Some(common_prefixes) = response.clone().common_prefixes {
for prefix in common_prefixes {
if let Some(prefix_key) = prefix.prefix {
let decoded_key = decode(&prefix_key).unwrap_or_default().to_string();
let clean_path = decoded_key.trim_end_matches('/');
if !clean_path.is_empty() {
let response = request_builder.send().await.map_err(|sdk_error| {
FileTransferError::Other(format!("Failed to list folder contents: {:?}", sdk_error))
})?;

// Handle files and directories
if let Some(contents) = response.clone().contents {
for object in contents {
if let Some(key) = object.key {
let decoded_key = decode(&key).unwrap_or_default().to_string();
let is_folder = decoded_key.ends_with('/');
let clean_path = if is_folder {
decoded_key.trim_end_matches('/')
} else {
&decoded_key
};
folder_contents.push(FileDestinationPath {
path: clean_path.to_string(),
is_folder: true,
is_folder,
});
}
}
}
}

if response.is_truncated().unwrap_or_default() {
continuation_token = response.next_continuation_token().map(|s| s.to_string());
} else {
break;
// Handle common prefixes (subdirectories)
if let Some(common_prefixes) = response.clone().common_prefixes {
for prefix in common_prefixes {
if let Some(prefix_key) = prefix.prefix {
let decoded_key = decode(&prefix_key).unwrap_or_default().to_string();
let clean_path = decoded_key.trim_end_matches('/');
if !clean_path.is_empty() {
folder_contents.push(FileDestinationPath {
path: clean_path.to_string(),
is_folder: true,
});
}
}
}
}

if response.is_truncated().unwrap_or_default() {
continuation_token = response.next_continuation_token().map(|s| s.to_string());
} else {
break;
}
}
}

// Recursively list contents of subdirectories
let mut all_contents = Vec::new();
for item in folder_contents {
eprintln!("Item: {:?}", item);
all_contents.push(item.clone());
if item.is_folder {
let subfolder_contents = list_folder_contents(destination, &format!("{}/", item.path)).await?;
all_contents.extend(subfolder_contents);
// Recursively list contents of subdirectories
let mut all_contents = Vec::new();
for item in folder_contents {
eprintln!("Item: {:?}", item);
all_contents.push(item.clone());
if item.is_folder {
let subfolder_contents = list_folder_contents(destination, &format!("{}/", item.path)).await?;
all_contents.extend(subfolder_contents);
}
}
}

Ok(all_contents)
Ok(all_contents)
}
FileDestination::Http { .. } => Err(FileTransferError::Other(
"Listing folder contents is not supported for HTTP destinations.".to_string(),
)),
}
FileDestination::Http { .. } => Err(FileTransferError::Other(
"Listing folder contents is not supported for HTTP destinations.".to_string(),
)),
}
})
}

/// Generates temporary shareable links for all files in a specified folder.
#[async_recursion]
pub async fn generate_temporary_shareable_links_for_folder(
folder_path: &str,
destination: &FileDestination,
Expand Down Expand Up @@ -502,7 +501,7 @@ pub async fn generate_temporary_shareable_link(

Ok(presigned_req.uri().to_string())
}
FileDestination::Http { .. } => {
FileDestination::Http { .. } => {
// For HTTP, we might need to handle this differently as HTTP servers do not typically support presigned URLs
// This would depend on the specific server's capabilities or additional server-side logic to handle temporary links
Err(FileTransferError::Other(
Expand Down Expand Up @@ -553,25 +552,29 @@ pub async fn delete_file_or_folder(destination: &FileDestination, path: &str) ->
}

/// Deletes all files and folders recursively within a specified folder.
#[async_recursion]
pub async fn delete_all_in_folder(destination: &FileDestination, folder_path: &str) -> Result<(), FileTransferError> {
let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path);
let contents = list_folder_contents(destination, folder_path).await?;
// Start by deleting all files in the current folder
for item in &contents {
if !item.is_folder {
delete_file_or_folder(destination, &item.path).await?;
pub fn delete_all_in_folder<'a>(
destination: &'a FileDestination,
folder_path: &'a str,
) -> Pin<Box<dyn Future<Output = Result<(), FileTransferError>> + Send + 'a>> {
Box::pin(async move {
let folder_path = folder_path.strip_prefix('/').unwrap_or(folder_path);
let contents = list_folder_contents(destination, folder_path).await?;
// Start by deleting all files in the current folder
for item in &contents {
if !item.is_folder {
delete_file_or_folder(destination, &item.path).await?;
}
}
}
// Then delete subfolders recursively
for item in contents {
if item.is_folder {
delete_all_in_folder(destination, &item.path).await?;
// After deleting all contents in the subfolder, delete the subfolder itself
delete_file_or_folder(destination, &item.path).await?;
// Then delete subfolders recursively
for item in contents {
if item.is_folder {
delete_all_in_folder(destination, &item.path).await?;
// After deleting all contents in the subfolder, delete the subfolder itself
delete_file_or_folder(destination, &item.path).await?;
}
}
}
Ok(())
Ok(())
})
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 3e0bf84

Please sign in to comment.