diff --git a/shinkai-bin/shinkai-node/src/network/network_manager/network_job_manager.rs b/shinkai-bin/shinkai-node/src/network/network_manager/network_job_manager.rs index 64c620222..4145369b9 100644 --- a/shinkai-bin/shinkai-node/src/network/network_manager/network_job_manager.rs +++ b/shinkai-bin/shinkai-node/src/network/network_manager/network_job_manager.rs @@ -4,8 +4,6 @@ use crate::network::agent_payments_manager::external_agent_offerings_manager::Ex use crate::network::agent_payments_manager::my_agent_offerings_manager::MyAgentOfferingsManager; use crate::network::node::ProxyConnectionInfo; -use shinkai_db::schemas::ws_types::WSUpdateHandler; -use shinkai_db::db::{ShinkaiDB, Topic}; use aes_gcm::aead::generic_array::GenericArray; use aes_gcm::aead::Aead; use aes_gcm::Aes256Gcm; @@ -14,6 +12,8 @@ use chrono::{DateTime, Utc}; use ed25519_dalek::SigningKey; use futures::Future; use serde::{Deserialize, Serialize}; +use shinkai_db::db::{ShinkaiDB, Topic}; +use shinkai_db::schemas::ws_types::WSUpdateHandler; use shinkai_job_queue_manager::job_queue_manager::JobQueueManager; use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName; use shinkai_message_primitives::schemas::shinkai_network::NetworkMessageType; @@ -109,6 +109,7 @@ impl NetworkJobManager { let jobs_map = Arc::new(Mutex::new(HashMap::new())); { let shinkai_db = db.upgrade().ok_or("Failed to upgrade shinkai_db").unwrap(); + let all_jobs = shinkai_db.get_all_jobs().unwrap(); let mut jobs = jobs_map.lock().await; for job in all_jobs { diff --git a/shinkai-bin/shinkai-node/src/network/v2_api/api_v2_commands_jobs.rs b/shinkai-bin/shinkai-node/src/network/v2_api/api_v2_commands_jobs.rs index 19108c803..39e99c2a8 100644 --- a/shinkai-bin/shinkai-node/src/network/v2_api/api_v2_commands_jobs.rs +++ b/shinkai-bin/shinkai-node/src/network/v2_api/api_v2_commands_jobs.rs @@ -5,7 +5,7 @@ use ed25519_dalek::SigningKey; use reqwest::StatusCode; use serde_json::Value; use shinkai_db::db::ShinkaiDB; -use shinkai_http_api::node_api_router::{APIError, SendResponseBodyData}; +use shinkai_http_api::node_api_router::{APIError, SendResponseBody, SendResponseBodyData}; use shinkai_message_primitives::{ schemas::{ identity::Identity, @@ -1353,7 +1353,7 @@ impl Node { db: Arc, bearer: String, job_id: String, - res: Sender>, + res: Sender>, ) -> Result<(), NodeError> { // Validate the bearer token if Self::validate_bearer_token(&bearer, db.clone(), &res).await.is_err() { @@ -1363,7 +1363,13 @@ impl Node { // Remove the job match db.remove_job(&job_id) { Ok(_) => { - let _ = res.send(Ok(())).await; + let _ = res + .send(Ok(SendResponseBody { + status: "success".to_string(), + message: "Job removed successfully".to_string(), + data: None, + })) + .await; Ok(()) } Err(err) => { diff --git a/shinkai-bin/shinkai-node/tests/it/db_job_tests.rs b/shinkai-bin/shinkai-node/tests/it/db_job_tests.rs index c22063456..f90f27dc4 100644 --- a/shinkai-bin/shinkai-node/tests/it/db_job_tests.rs +++ b/shinkai-bin/shinkai-node/tests/it/db_job_tests.rs @@ -63,10 +63,16 @@ fn generate_message_with_text( mod tests { use std::collections::{HashMap, HashSet}; - use shinkai_db::db::db_errors::ShinkaiDBError; + use shinkai_db::{db::db_errors::ShinkaiDBError, schemas::inbox_permission::InboxPermission}; use shinkai_message_primitives::{ - schemas::{inbox_name::InboxName, job::ForkedJob, subprompts::SubPrompt}, - shinkai_message::shinkai_message_schemas::JobMessage, + schemas::{ + identity::{StandardIdentity, StandardIdentityType}, + inbox_name::InboxName, + job::ForkedJob, + shinkai_name::ShinkaiName, + subprompts::SubPrompt, + }, + shinkai_message::shinkai_message_schemas::{IdentityPermissions, JobMessage}, shinkai_utils::{ encryption::unsafe_deterministic_encryption_keypair, job_scope::JobScope, @@ -976,32 +982,83 @@ mod tests { assert_eq!(job.forked_jobs[1].message_id, forked_message2_id); } - #[test] - fn test_remove_job() { + #[tokio::test] + async fn test_remove_job() { setup(); - let job_id = "job1".to_string(); + let job1_id = "job1".to_string(); + let job2_id = "job2".to_string(); let agent_id = "agent1".to_string(); let scope = JobScope::new_default(); let db_path = format!("db_tests/{}", hash_string(&agent_id.clone().to_string())); let mut shinkai_db = ShinkaiDB::new(&db_path).unwrap(); - // Create a new job - create_new_job(&mut shinkai_db, job_id.clone(), agent_id.clone(), scope); + // Create new jobs + create_new_job(&mut shinkai_db, job1_id.clone(), agent_id.clone(), scope.clone()); + create_new_job(&mut shinkai_db, job2_id.clone(), agent_id.clone(), scope); - // Retrieve all jobs - let jobs = shinkai_db.get_all_jobs().unwrap(); + // Check smart_inboxes + let node1_identity_name = "@@node1.shinkai"; + let node1_subidentity_name = "main_profile_node1"; + let (_, node1_identity_pk) = unsafe_deterministic_signature_keypair(0); + let (_, node1_encryption_pk) = unsafe_deterministic_encryption_keypair(0); - // Check if the job exists - let job_ids: Vec = jobs.iter().map(|job| job.job_id().to_string()).collect(); - assert!(job_ids.contains(&job_id)); + let (_, node1_subidentity_pk) = unsafe_deterministic_signature_keypair(100); + let (_, node1_subencryption_pk) = unsafe_deterministic_encryption_keypair(100); - // Remove the job - shinkai_db.remove_job(&job_id).unwrap(); + let node1_profile_identity = StandardIdentity::new( + ShinkaiName::from_node_and_profile_names( + node1_identity_name.to_string(), + node1_subidentity_name.to_string(), + ) + .unwrap(), + None, + node1_encryption_pk.clone(), + node1_identity_pk.clone(), + Some(node1_subencryption_pk), + Some(node1_subidentity_pk), + StandardIdentityType::Profile, + IdentityPermissions::Standard, + ); + + let _ = shinkai_db.insert_profile(node1_profile_identity.clone()); + + let inbox1_name = InboxName::get_job_inbox_name_from_params(job1_id.clone()).unwrap(); + let inbox2_name = InboxName::get_job_inbox_name_from_params(job2_id.clone()).unwrap(); + + shinkai_db + .add_permission( + &inbox1_name.to_string(), + &node1_profile_identity, + InboxPermission::Admin, + ) + .unwrap(); + shinkai_db + .add_permission( + &inbox2_name.to_string(), + &node1_profile_identity, + InboxPermission::Admin, + ) + .unwrap(); + + let smart_inboxes = shinkai_db + .get_all_smart_inboxes_for_profile(node1_profile_identity.clone()) + .unwrap(); + assert_eq!(smart_inboxes.len(), 2); + + // Remove the first job + shinkai_db.remove_job(&job1_id).unwrap(); // Check if the job is removed - match shinkai_db.get_job(&job_id) { + match shinkai_db.get_job(&job1_id) { Ok(_) => panic!("Expected an error when getting a removed job"), Err(e) => assert_eq!(e, ShinkaiDBError::DataNotFound), } + + // Check if the smart_inbox is removed + let smart_inboxes = shinkai_db + .get_all_smart_inboxes_for_profile(node1_profile_identity.clone()) + .unwrap(); + assert_eq!(smart_inboxes.len(), 1); + assert!(smart_inboxes[0].inbox_id != inbox1_name.to_string()); } } diff --git a/shinkai-libs/shinkai-db/src/db/db_jobs.rs b/shinkai-libs/shinkai-db/src/db/db_jobs.rs index 3a1011573..a80f02205 100644 --- a/shinkai-libs/shinkai-db/src/db/db_jobs.rs +++ b/shinkai-libs/shinkai-db/src/db/db_jobs.rs @@ -737,13 +737,18 @@ impl ShinkaiDB { format!("jobinbox_agent_{}_{}", Self::llm_provider_id_to_hash(&job_id), job_id); let job_inbox_name = format!("jobinbox_{}_inboxname", job_id); let job_conversation_inbox_name_key = format!("jobinbox_{}_conversation_inbox_name", job_id); - let all_jobs_time_keyed = format!("all_jobs_time_keyed_placeholder_to_fit_prefix__{}", job_id); let job_smart_inbox_name_key = format!("{}_smart_inbox_name", job_id); let job_is_hidden_key = format!("jobinbox_{}_is_hidden", job_id); let job_read_list_key = format!("jobinbox_{}_read_list", job_id); let job_config_key = format!("jobinbox_{}_config", job_id); let job_associated_ui_key = format!("jobinbox_{}_associated_ui", job_id); + let job_inbox_name_content = format!("job_inbox::{}::false", job_id); + let inbox_searchable = format!( + "inbox_placeholder_value_to_match_prefix_abcdef_{}", + job_inbox_name_content + ); + // Start a write batch let mut batch = rocksdb::WriteBatch::default(); @@ -755,12 +760,25 @@ impl ShinkaiDB { batch.delete_cf(cf_inbox, job_parent_llm_provider_id_key.as_bytes()); batch.delete_cf(cf_inbox, job_inbox_name.as_bytes()); batch.delete_cf(cf_inbox, job_conversation_inbox_name_key.as_bytes()); - batch.delete_cf(cf_inbox, all_jobs_time_keyed.as_bytes()); batch.delete_cf(cf_inbox, job_smart_inbox_name_key.as_bytes()); batch.delete_cf(cf_inbox, job_is_hidden_key.as_bytes()); batch.delete_cf(cf_inbox, job_read_list_key.as_bytes()); batch.delete_cf(cf_inbox, job_config_key.as_bytes()); batch.delete_cf(cf_inbox, job_associated_ui_key.as_bytes()); + batch.delete_cf(cf_inbox, inbox_searchable.as_bytes()); + + let all_jobs_time_keyed_prefix = b"all_jobs_time_keyed_placeholder_to_fit_prefix__"; + let iter = self.db.prefix_iterator_cf(cf_inbox, all_jobs_time_keyed_prefix); + for item in iter { + let (key, value) = item.map_err(ShinkaiDBError::RocksDBError)?; + // The value is the job ID + let item_job_id = std::str::from_utf8(&value)?.to_string(); + + if item_job_id == job_id { + batch.delete_cf(cf_inbox, key); + break; + } + } // Remove step history let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.to_string())?; diff --git a/shinkai-libs/shinkai-http-api/src/api_v2/api_v2_handlers_jobs.rs b/shinkai-libs/shinkai-http-api/src/api_v2/api_v2_handlers_jobs.rs index aa638cafd..af424f210 100644 --- a/shinkai-libs/shinkai-http-api/src/api_v2/api_v2_handlers_jobs.rs +++ b/shinkai-libs/shinkai-http-api/src/api_v2/api_v2_handlers_jobs.rs @@ -1048,7 +1048,7 @@ pub async fn fork_job_messages_handler( path = "/v2/remove_job", request_body = RemoveJobRequest, responses( - (status = 200, description = "Successfully removed job", body = Value), + (status = 200, description = "Successfully removed job", body = SendResponseBody), (status = 400, description = "Bad request", body = APIError), (status = 500, description = "Internal server error", body = APIError) ) diff --git a/shinkai-libs/shinkai-http-api/src/node_commands.rs b/shinkai-libs/shinkai-http-api/src/node_commands.rs index 584377589..a9ef4034c 100644 --- a/shinkai-libs/shinkai-http-api/src/node_commands.rs +++ b/shinkai-libs/shinkai-http-api/src/node_commands.rs @@ -42,6 +42,8 @@ use shinkai_tools_primitives::tools::shinkai_tool::{ShinkaiTool, ShinkaiToolHead // }; use x25519_dalek::PublicKey as EncryptionPublicKey; +use crate::node_api_router::SendResponseBody; + use super::{ api_v1::api_v1_handlers::APIUseRegistrationCodeSuccessResponse, api_v2::api_v2_handlers_general::InitialRegistrationRequest, @@ -566,7 +568,7 @@ pub enum NodeCommand { V2ApiRemoveJob { bearer: String, job_id: String, - res: Sender>, + res: Sender>, }, V2ApiVecFSRetrievePathSimplifiedJson { bearer: String,