Skip to content

Commit

Permalink
Merge pull request #629 from dcSpark/bence/fix-remove-job
Browse files Browse the repository at this point in the history
Fix remove job
  • Loading branch information
nicarq authored Oct 30, 2024
2 parents 09b057b + 70b09e1 commit 18d0c7a
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use crate::network::agent_payments_manager::external_agent_offerings_manager::Ex
use crate::network::agent_payments_manager::my_agent_offerings_manager::MyAgentOfferingsManager;
use crate::network::node::ProxyConnectionInfo;

use shinkai_db::schemas::ws_types::WSUpdateHandler;
use shinkai_db::db::{ShinkaiDB, Topic};
use aes_gcm::aead::generic_array::GenericArray;
use aes_gcm::aead::Aead;
use aes_gcm::Aes256Gcm;
Expand All @@ -14,6 +12,8 @@ use chrono::{DateTime, Utc};
use ed25519_dalek::SigningKey;
use futures::Future;
use serde::{Deserialize, Serialize};
use shinkai_db::db::{ShinkaiDB, Topic};
use shinkai_db::schemas::ws_types::WSUpdateHandler;
use shinkai_job_queue_manager::job_queue_manager::JobQueueManager;
use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName;
use shinkai_message_primitives::schemas::shinkai_network::NetworkMessageType;
Expand Down Expand Up @@ -109,6 +109,7 @@ impl NetworkJobManager {
let jobs_map = Arc::new(Mutex::new(HashMap::new()));
{
let shinkai_db = db.upgrade().ok_or("Failed to upgrade shinkai_db").unwrap();

let all_jobs = shinkai_db.get_all_jobs().unwrap();
let mut jobs = jobs_map.lock().await;
for job in all_jobs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ed25519_dalek::SigningKey;
use reqwest::StatusCode;
use serde_json::Value;
use shinkai_db::db::ShinkaiDB;
use shinkai_http_api::node_api_router::{APIError, SendResponseBodyData};
use shinkai_http_api::node_api_router::{APIError, SendResponseBody, SendResponseBodyData};
use shinkai_message_primitives::{
schemas::{
identity::Identity,
Expand Down Expand Up @@ -1353,7 +1353,7 @@ impl Node {
db: Arc<ShinkaiDB>,
bearer: String,
job_id: String,
res: Sender<Result<(), APIError>>,
res: Sender<Result<SendResponseBody, APIError>>,
) -> Result<(), NodeError> {
// Validate the bearer token
if Self::validate_bearer_token(&bearer, db.clone(), &res).await.is_err() {
Expand All @@ -1363,7 +1363,13 @@ impl Node {
// Remove the job
match db.remove_job(&job_id) {
Ok(_) => {
let _ = res.send(Ok(())).await;
let _ = res
.send(Ok(SendResponseBody {
status: "success".to_string(),
message: "Job removed successfully".to_string(),
data: None,
}))
.await;
Ok(())
}
Err(err) => {
Expand Down
89 changes: 73 additions & 16 deletions shinkai-bin/shinkai-node/tests/it/db_job_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,16 @@ fn generate_message_with_text(
mod tests {
use std::collections::{HashMap, HashSet};

use shinkai_db::db::db_errors::ShinkaiDBError;
use shinkai_db::{db::db_errors::ShinkaiDBError, schemas::inbox_permission::InboxPermission};
use shinkai_message_primitives::{
schemas::{inbox_name::InboxName, job::ForkedJob, subprompts::SubPrompt},
shinkai_message::shinkai_message_schemas::JobMessage,
schemas::{
identity::{StandardIdentity, StandardIdentityType},
inbox_name::InboxName,
job::ForkedJob,
shinkai_name::ShinkaiName,
subprompts::SubPrompt,
},
shinkai_message::shinkai_message_schemas::{IdentityPermissions, JobMessage},
shinkai_utils::{
encryption::unsafe_deterministic_encryption_keypair,
job_scope::JobScope,
Expand Down Expand Up @@ -976,32 +982,83 @@ mod tests {
assert_eq!(job.forked_jobs[1].message_id, forked_message2_id);
}

#[test]
fn test_remove_job() {
#[tokio::test]
async fn test_remove_job() {
setup();
let job_id = "job1".to_string();
let job1_id = "job1".to_string();
let job2_id = "job2".to_string();
let agent_id = "agent1".to_string();
let scope = JobScope::new_default();
let db_path = format!("db_tests/{}", hash_string(&agent_id.clone().to_string()));
let mut shinkai_db = ShinkaiDB::new(&db_path).unwrap();

// Create a new job
create_new_job(&mut shinkai_db, job_id.clone(), agent_id.clone(), scope);
// Create new jobs
create_new_job(&mut shinkai_db, job1_id.clone(), agent_id.clone(), scope.clone());
create_new_job(&mut shinkai_db, job2_id.clone(), agent_id.clone(), scope);

// Retrieve all jobs
let jobs = shinkai_db.get_all_jobs().unwrap();
// Check smart_inboxes
let node1_identity_name = "@@node1.shinkai";
let node1_subidentity_name = "main_profile_node1";
let (_, node1_identity_pk) = unsafe_deterministic_signature_keypair(0);
let (_, node1_encryption_pk) = unsafe_deterministic_encryption_keypair(0);

// Check if the job exists
let job_ids: Vec<String> = jobs.iter().map(|job| job.job_id().to_string()).collect();
assert!(job_ids.contains(&job_id));
let (_, node1_subidentity_pk) = unsafe_deterministic_signature_keypair(100);
let (_, node1_subencryption_pk) = unsafe_deterministic_encryption_keypair(100);

// Remove the job
shinkai_db.remove_job(&job_id).unwrap();
let node1_profile_identity = StandardIdentity::new(
ShinkaiName::from_node_and_profile_names(
node1_identity_name.to_string(),
node1_subidentity_name.to_string(),
)
.unwrap(),
None,
node1_encryption_pk.clone(),
node1_identity_pk.clone(),
Some(node1_subencryption_pk),
Some(node1_subidentity_pk),
StandardIdentityType::Profile,
IdentityPermissions::Standard,
);

let _ = shinkai_db.insert_profile(node1_profile_identity.clone());

let inbox1_name = InboxName::get_job_inbox_name_from_params(job1_id.clone()).unwrap();
let inbox2_name = InboxName::get_job_inbox_name_from_params(job2_id.clone()).unwrap();

shinkai_db
.add_permission(
&inbox1_name.to_string(),
&node1_profile_identity,
InboxPermission::Admin,
)
.unwrap();
shinkai_db
.add_permission(
&inbox2_name.to_string(),
&node1_profile_identity,
InboxPermission::Admin,
)
.unwrap();

let smart_inboxes = shinkai_db
.get_all_smart_inboxes_for_profile(node1_profile_identity.clone())
.unwrap();
assert_eq!(smart_inboxes.len(), 2);

// Remove the first job
shinkai_db.remove_job(&job1_id).unwrap();

// Check if the job is removed
match shinkai_db.get_job(&job_id) {
match shinkai_db.get_job(&job1_id) {
Ok(_) => panic!("Expected an error when getting a removed job"),
Err(e) => assert_eq!(e, ShinkaiDBError::DataNotFound),
}

// Check if the smart_inbox is removed
let smart_inboxes = shinkai_db
.get_all_smart_inboxes_for_profile(node1_profile_identity.clone())
.unwrap();
assert_eq!(smart_inboxes.len(), 1);
assert!(smart_inboxes[0].inbox_id != inbox1_name.to_string());
}
}
22 changes: 20 additions & 2 deletions shinkai-libs/shinkai-db/src/db/db_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,13 +737,18 @@ impl ShinkaiDB {
format!("jobinbox_agent_{}_{}", Self::llm_provider_id_to_hash(&job_id), job_id);
let job_inbox_name = format!("jobinbox_{}_inboxname", job_id);
let job_conversation_inbox_name_key = format!("jobinbox_{}_conversation_inbox_name", job_id);
let all_jobs_time_keyed = format!("all_jobs_time_keyed_placeholder_to_fit_prefix__{}", job_id);
let job_smart_inbox_name_key = format!("{}_smart_inbox_name", job_id);
let job_is_hidden_key = format!("jobinbox_{}_is_hidden", job_id);
let job_read_list_key = format!("jobinbox_{}_read_list", job_id);
let job_config_key = format!("jobinbox_{}_config", job_id);
let job_associated_ui_key = format!("jobinbox_{}_associated_ui", job_id);

let job_inbox_name_content = format!("job_inbox::{}::false", job_id);
let inbox_searchable = format!(
"inbox_placeholder_value_to_match_prefix_abcdef_{}",
job_inbox_name_content
);

// Start a write batch
let mut batch = rocksdb::WriteBatch::default();

Expand All @@ -755,12 +760,25 @@ impl ShinkaiDB {
batch.delete_cf(cf_inbox, job_parent_llm_provider_id_key.as_bytes());
batch.delete_cf(cf_inbox, job_inbox_name.as_bytes());
batch.delete_cf(cf_inbox, job_conversation_inbox_name_key.as_bytes());
batch.delete_cf(cf_inbox, all_jobs_time_keyed.as_bytes());
batch.delete_cf(cf_inbox, job_smart_inbox_name_key.as_bytes());
batch.delete_cf(cf_inbox, job_is_hidden_key.as_bytes());
batch.delete_cf(cf_inbox, job_read_list_key.as_bytes());
batch.delete_cf(cf_inbox, job_config_key.as_bytes());
batch.delete_cf(cf_inbox, job_associated_ui_key.as_bytes());
batch.delete_cf(cf_inbox, inbox_searchable.as_bytes());

let all_jobs_time_keyed_prefix = b"all_jobs_time_keyed_placeholder_to_fit_prefix__";
let iter = self.db.prefix_iterator_cf(cf_inbox, all_jobs_time_keyed_prefix);
for item in iter {
let (key, value) = item.map_err(ShinkaiDBError::RocksDBError)?;
// The value is the job ID
let item_job_id = std::str::from_utf8(&value)?.to_string();

if item_job_id == job_id {
batch.delete_cf(cf_inbox, key);
break;
}
}

// Remove step history
let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.to_string())?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ pub async fn fork_job_messages_handler(
path = "/v2/remove_job",
request_body = RemoveJobRequest,
responses(
(status = 200, description = "Successfully removed job", body = Value),
(status = 200, description = "Successfully removed job", body = SendResponseBody),
(status = 400, description = "Bad request", body = APIError),
(status = 500, description = "Internal server error", body = APIError)
)
Expand Down
4 changes: 3 additions & 1 deletion shinkai-libs/shinkai-http-api/src/node_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use shinkai_tools_primitives::tools::shinkai_tool::{ShinkaiTool, ShinkaiToolHead
// };
use x25519_dalek::PublicKey as EncryptionPublicKey;

use crate::node_api_router::SendResponseBody;

use super::{
api_v1::api_v1_handlers::APIUseRegistrationCodeSuccessResponse,
api_v2::api_v2_handlers_general::InitialRegistrationRequest,
Expand Down Expand Up @@ -566,7 +568,7 @@ pub enum NodeCommand {
V2ApiRemoveJob {
bearer: String,
job_id: String,
res: Sender<Result<(), APIError>>,
res: Sender<Result<SendResponseBody, APIError>>,
},
V2ApiVecFSRetrievePathSimplifiedJson {
bearer: String,
Expand Down

0 comments on commit 18d0c7a

Please sign in to comment.