Skip to content

Commit

Permalink
Merge pull request #474 from dcSpark/nico/fix_network_issues
Browse files Browse the repository at this point in the history
Nico/fix network issues
  • Loading branch information
nicarq authored Jul 10, 2024
2 parents 1994764 + 81ada6f commit fce14a5
Show file tree
Hide file tree
Showing 27 changed files with 2,570 additions and 1,673 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ streamer.key
shinkai-libs/shinkai-dsl/tmp
tmp
storage_debug_my_local_ai
storage_debug_proxied_my_local_ai
35 changes: 35 additions & 0 deletions scripts/run_local_ai_with_proxy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/bash

export NODE_IP="0.0.0.0"
export NODE_PORT="9952"
export NODE_API_IP="0.0.0.0"
export NODE_API_PORT="9950"
export NODE_WS_PORT="9951"
export IDENTITY_SECRET_KEY="df3f619804a92fdb4057192dc43dd748ea778adc52bc498ce80524c014b81119"
export ENCRYPTION_SECRET_KEY="d83f619804a92fdb4057192dc43dd748ea778adc52bc498ce80524c014b81159"
export PING_INTERVAL_SECS="0"
export GLOBAL_IDENTITY_NAME="@@local_ai_with_proxy.arb-sep-shinkai"
export NODE_STORAGE_PATH="storage_debug_proxied_my_local_ai"
export RUST_LOG="error,info"
export STARTING_NUM_QR_PROFILES="1"
export STARTING_NUM_QR_DEVICES="1"
export FIRST_DEVICE_NEEDS_REGISTRATION_CODE="false"
export LOG_SIMPLE="true"
export NO_SECRET_FILE="true"
export EMBEDDINGS_SERVER_URL="http://localhost:11434/"
export UNSTRUCTURED_SERVER_URL="https://internal.shinkai.com/x-unstructured-api/"
export STATIC_SERVER_PORT="9554"
export STATIC_SERVER_IP="0.0.0.0"
export STATIC_SERVER_FOLDER="./static_server_example"
export INITIAL_AGENT_NAMES="my_gpt,llama3_8b"
export INITIAL_AGENT_URLS="https://api.openai.com,http://localhost:11434"
export INITIAL_AGENT_MODELS="openai:gpt-4o,ollama:llama3:8b-instruct-q4_1"
export RPC_URL="https://public.stackup.sh/api/v1/node/arbitrum-sepolia"
export CONTRACT_ADDRESS="0x1d2D57F78Bc3B878aF68c411a03AcF327c85e0D6"
export SUBSCRIPTION_HTTP_UPLOAD_INTERVAL_MINUTES="1"
export SUBSCRIPTION_UPDATE_CACHE_INTERVAL_MINUTES="1"
export LOG_ALL="1"
export DEBUG_VRKAI="1"
export PROXY_IDENTITY="@@kao_tcp_relayer.arb-sep-shinkai"

cargo run --bin shinkai_node --package shinkai_node
4 changes: 2 additions & 2 deletions shinkai-bin/shinkai-node/src/db/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ impl ShinkaiDB {
// If the database file does not exist, use the default list of column families
vec![
Topic::Inbox.as_str().to_string(),
Topic::ScheduledMessage.as_str().to_string(),
Topic::ScheduledMessage.as_str().to_string(), // I will merge this with something else
Topic::AllMessages.as_str().to_string(),
Topic::Toolkits.as_str().to_string(),
Topic::MessageBoxSymmetricKeys.as_str().to_string(),
Topic::MessagesToRetry.as_str().to_string(),
Topic::AnyQueuesPrefixed.as_str().to_string(),
Topic::CronQueues.as_str().to_string(),
Topic::CronQueues.as_str().to_string(), // I will merge this with something else
Topic::NodeAndUsers.as_str().to_string(),
]
};
Expand Down
2 changes: 0 additions & 2 deletions shinkai-bin/shinkai-node/src/db/db_files_transmission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use super::{db::Topic, db_errors::ShinkaiDBError, ShinkaiDB};
use chrono::Utc;
use rocksdb::{Error, WriteBatch};



impl ShinkaiDB {
pub fn write_symmetric_key(&self, hex_blake3_hash: &str, private_key: &[u8]) -> Result<(), ShinkaiDBError> {
// Get the ColumnFamily handle for MessageBoxSymmetricKeys
Expand Down
15 changes: 0 additions & 15 deletions shinkai-bin/shinkai-node/src/db/db_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,6 @@ impl ShinkaiDB {

let iter = self.db.prefix_iterator_cf(cf_inbox, prefix.as_bytes());
for item in iter {
// HERE
let (_key, value) = item.map_err(ShinkaiDBError::RocksDBError)?;
let message = std::str::from_utf8(&value)?.to_string();
unprocessed_messages.push(message);
Expand Down Expand Up @@ -586,20 +585,6 @@ impl ShinkaiDB {

for message_path in &messages {
if let Some(message) = message_path.first() {
{
// Use shared CFs
let cf_inbox = self.get_cf_handle(Topic::Inbox).unwrap();

// Use a full iterator to go through all keys in the cf_inbox column family
let iter = self.db.iterator_cf(cf_inbox, IteratorMode::Start);

for item in iter {
let (_, _value) = match item {
Ok(kv) => kv,
Err(e) => return Err(ShinkaiDBError::RocksDBError(e)),
};
}
}
let message_key = message.calculate_message_hash_for_pagination();
let hash_message_key = Self::message_key_to_hash(message_key);

Expand Down
290 changes: 290 additions & 0 deletions shinkai-bin/shinkai-node/src/db/db_network_notifications.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName;

use super::{db_errors::ShinkaiDBError, ShinkaiDB, Topic};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserNetworkNotification {
pub message: String,
pub datetime: DateTime<Utc>,
}

impl ShinkaiDB {
/// Writes a notification to the Inbox with a specific prefix
pub fn write_notification(&self, user_profile: ShinkaiName, message: String) -> Result<(), ShinkaiDBError> {
// Get the profile name string
let profile_name = Self::get_profile_name_string(&user_profile)?;

// Calculate the half hash of the profile name
let half_hash = Self::hex_blake3_to_half_hash(&profile_name);

// Get the current timestamp
let datetime = Utc::now();

// Calculate reverse time key by subtracting from Unix time of 2420-01-01
let future_time = DateTime::parse_from_rfc3339("2420-01-01T00:00:00Z")
.unwrap()
.timestamp_millis();
let reverse_time_key = future_time - datetime.timestamp_millis();

// Create the composite key with the specified prefix and reverse time key
let composite_key = format!("network_notif_{}_{}", half_hash, reverse_time_key);

// Create the notification struct
let notification = UserNetworkNotification { message, datetime };

// Serialize the notification
let serialized_notification = serde_json::to_vec(&notification)?;

// Retrieve the handle to the "Inbox" column family
let inbox_cf = self.get_cf_handle(Topic::Inbox).unwrap();

// Insert the serialized notification into the "Inbox" column family using the composite key
self.db.put_cf(inbox_cf, composite_key, serialized_notification)?;

Ok(())
}

pub fn get_last_notifications(
&self,
user_profile: ShinkaiName,
count: usize,
timestamp: Option<String>,
) -> Result<Vec<UserNetworkNotification>, ShinkaiDBError> {
// Get the profile name string
let profile_name = Self::get_profile_name_string(&user_profile)?;

// Calculate the half hash of the profile name
let half_hash = Self::hex_blake3_to_half_hash(&profile_name);

// Retrieve the handle to the "Inbox" column family
let inbox_cf = self.get_cf_handle(Topic::Inbox).unwrap();

// Create the prefix to search for
let prefix = format!("network_notif_{}_", half_hash);

// Create an iterator to scan the "Inbox" column family with prefix
let iter = self.db.prefix_iterator_cf(inbox_cf, prefix.as_bytes());

// Calculate the future time for reverse time key calculation
let future_time = DateTime::parse_from_rfc3339("2420-01-01T00:00:00Z")
.unwrap()
.timestamp_millis();

// Collect the last `count` notifications after the specified timestamp
let mut notifications = Vec::new();
for item in iter {
let (key, value) = item.map_err(ShinkaiDBError::RocksDBError)?;
let key_str = String::from_utf8(key.to_vec()).unwrap();
if let Some(ref ts) = timestamp {
let ts_datetime = DateTime::parse_from_rfc3339(ts).unwrap().with_timezone(&Utc);
let ts_reverse_time_key = future_time - ts_datetime.timestamp_millis();

let ts_composite_key = format!("{}{}", prefix, ts_reverse_time_key);

if key_str > ts_composite_key {
continue;
}
}
let notification: UserNetworkNotification = serde_json::from_slice(&value)?;
notifications.push(notification);
if notifications.len() == count {
break;
}
}

Ok(notifications)
}

/// Retrieves the previous X notifications before a specified timestamp for a given user profile
pub fn get_notifications_before_timestamp(
&self,
user_profile: ShinkaiName,
timestamp: String,
count: usize,
) -> Result<Vec<UserNetworkNotification>, ShinkaiDBError> {
// Get the profile name string
let profile_name = Self::get_profile_name_string(&user_profile)?;

// Calculate the half hash of the profile name
let half_hash = Self::hex_blake3_to_half_hash(&profile_name);

// Retrieve the handle to the "Inbox" column family
let inbox_cf = self.get_cf_handle(Topic::Inbox).unwrap();

// Create the prefix to search for
let prefix = format!("network_notif_{}_", half_hash);

// Create an iterator to scan the "Inbox" column family with prefix
let iter = self.db.prefix_iterator_cf(inbox_cf, prefix.as_bytes());

// Calculate the future time for reverse time key calculation
let future_time = DateTime::parse_from_rfc3339("2420-01-01T00:00:00Z")
.unwrap()
.timestamp_millis();

// Convert timestamp to reverse time key
let ts_datetime = DateTime::parse_from_rfc3339(&timestamp).unwrap().with_timezone(&Utc);
let ts_reverse_time_key = future_time - ts_datetime.timestamp_millis();

// Collect notifications up to the specified timestamp
let mut notifications = Vec::new();
for item in iter {
let (key, value) = item.map_err(ShinkaiDBError::RocksDBError)?;
let key_str = String::from_utf8(key.to_vec()).unwrap();

if key_str < format!("{}{}", prefix, ts_reverse_time_key) {
continue;
}

let notification: UserNetworkNotification = serde_json::from_slice(&value)?;
notifications.push(notification);
if notifications.len() == count {
break;
}
}

// Reverse the order of the notifications
notifications.reverse();

Ok(notifications)
}
}

#[cfg(test)]
mod tests {
use super::*;
use shinkai_vector_resources::utils::hash_string;
use std::fs;
use std::path::Path;
use std::thread::sleep;

fn setup() -> ShinkaiDB {
let path = Path::new("db_tests/");
let _ = fs::remove_dir_all(path);

let node1_db_path = format!("db_tests/{}", hash_string("churrasco italiano"));
ShinkaiDB::new(node1_db_path.as_str()).unwrap()
}

fn test_user() -> ShinkaiName {
ShinkaiName::new("@@test_user.shinkai/main".to_string()).unwrap()
}

fn write_unprefixed_message(
db: &ShinkaiDB,
_user_profile: ShinkaiName,
message: String,
) -> Result<(), ShinkaiDBError> {
// Get the current timestamp in ISO 8601 format with timezone
let timestamp = Utc::now().to_rfc3339();

// Create the composite key without the specified prefix
let composite_key = format!("unprefixed_{}", timestamp);

// Retrieve the handle to the "Inbox" column family
let inbox_cf = db.get_cf_handle(Topic::Inbox).unwrap();

// Insert the message into the "Inbox" column family using the composite key
db.db.put_cf(inbox_cf, composite_key, message)?;

Ok(())
}

#[test]
fn test_write_notification() {
let db = setup();
let user_profile = test_user();
let message = "Test message".to_string();

let result = db.write_notification(user_profile, message);
assert!(result.is_ok());
}

#[test]
fn test_get_last_notifications() {
let db = setup();
let user_profile = test_user();
let messages = vec![
"Test message 1".to_string(),
"Test message 2".to_string(),
"Test message 3".to_string(),
"Test message 4".to_string(),
];

for message in &messages {
db.write_notification(user_profile.clone(), message.clone()).unwrap();
sleep(std::time::Duration::from_millis(1));
}

// Write an unprefixed message
write_unprefixed_message(&db, user_profile.clone(), "Unprefixed message".to_string()).unwrap();

let notifications = db.get_last_notifications(user_profile.clone(), 2, None).unwrap();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].message, "Test message 4");
assert_eq!(notifications[1].message, "Test message 3");

let notifications = db.get_last_notifications(user_profile, 3, None).unwrap();
assert_eq!(notifications.len(), 3);
assert_eq!(notifications[0].message, "Test message 4");
assert_eq!(notifications[1].message, "Test message 3");
assert_eq!(notifications[2].message, "Test message 2");
}

#[test]
fn test_get_last_notifications_with_timestamp() {
let db = setup();
let user_profile = test_user();
let message1 = "Test message 1".to_string();
let message2 = "Test message 2".to_string();
let message3 = "Test message 3".to_string();
let message4 = "Test message 4".to_string();

db.write_notification(user_profile.clone(), message1).unwrap();
sleep(std::time::Duration::from_millis(1));
let timestamp = Utc::now().to_rfc3339();
sleep(std::time::Duration::from_millis(1));
db.write_notification(user_profile.clone(), message2).unwrap();
sleep(std::time::Duration::from_millis(1));
db.write_notification(user_profile.clone(), message3).unwrap();
sleep(std::time::Duration::from_millis(1));
db.write_notification(user_profile.clone(), message4).unwrap();

// Write an unprefixed message
write_unprefixed_message(&db, user_profile.clone(), "Unprefixed message".to_string()).unwrap();

let notifications = db.get_last_notifications(user_profile, 2, Some(timestamp)).unwrap();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].message, "Test message 4");
assert_eq!(notifications[1].message, "Test message 3");
}

#[test]
fn test_get_notifications_before_timestamp() {
let db = setup();
let user_profile = test_user();
let message1 = "Test message 1".to_string();
let message2 = "Test message 2".to_string();
let message3 = "Test message 3".to_string();

db.write_notification(user_profile.clone(), message1).unwrap();
sleep(std::time::Duration::from_millis(1));
db.write_notification(user_profile.clone(), message2).unwrap();
sleep(std::time::Duration::from_millis(1));
let timestamp = Utc::now().to_rfc3339();
sleep(std::time::Duration::from_millis(1));
db.write_notification(user_profile.clone(), message3).unwrap();

// Write an unprefixed message
write_unprefixed_message(&db, user_profile.clone(), "Unprefixed message".to_string()).unwrap();

let notifications = db
.get_notifications_before_timestamp(user_profile, timestamp, 3)
.unwrap();
assert_eq!(notifications.len(), 2);
assert_eq!(notifications[0].message, "Test message 1");
assert_eq!(notifications[1].message, "Test message 2");
}
}
Loading

0 comments on commit fce14a5

Please sign in to comment.