Skip to content

Commit

Permalink
fix rpc issue (#529)
Browse files Browse the repository at this point in the history
* fix

* - fix: bump shinkai-node v0.7.29

---------

Co-authored-by: Alfredo Gallardo <[email protected]>
  • Loading branch information
nicarq and agallardol authored Aug 16, 2024
1 parent f9266ad commit 307d10f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 46 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion shinkai-bin/shinkai-node/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shinkai_node"
version = "0.7.28"
version = "0.7.29"
edition = "2021"
authors.workspace = true
# this causes `cargo run` in the workspace root to run this package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod tests {
use ethers::types::U256;
use shinkai_crypto_identities::{OnchainIdentity, ShinkaiRegistry};
use shinkai_message_primitives::shinkai_utils::shinkai_logging::init_default_tracing;
use tokio::{runtime::Runtime, time::sleep};
use tokio::runtime::Runtime;

#[test]
fn test_get_identity_record() {
Expand Down
126 changes: 83 additions & 43 deletions shinkai-libs/shinkai-crypto-identities/src/shinkai_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ impl OnchainIdentity {
let default_value = "localhost:9550";
let first_address = self
.address_or_proxy_nodes
.iter().filter(|addr| !addr.is_empty())
.iter()
.filter(|addr| !addr.is_empty())
.collect::<Vec<_>>()
.first()
.map_or(default_value, |addr| addr.as_str());
Expand Down Expand Up @@ -186,6 +187,7 @@ pub trait ShinkaiRegistryTrait {
pub struct ShinkaiRegistry {
pub contract: ContractInstance<Arc<Provider<Http>>, Provider<Http>>,
pub cache: Arc<DashMap<String, (SystemTime, OnchainIdentity)>>,
pub rpc_endpoints: Vec<String>, // TODO: needs to be updated for mainnet -- also depends on the network
}

impl ShinkaiRegistry {
Expand Down Expand Up @@ -219,9 +221,18 @@ impl ShinkaiRegistry {
let abi: Abi = serde_json::from_str(&abi_json).map_err(ShinkaiRegistryError::JsonError)?;

let contract = Contract::new(contract_address, abi, Arc::new(provider));

let rpc_endpoints = vec![
url.to_string(),
"https://arbitrum-sepolia.blockpi.network/v1/rpc/public".to_string(),
"https://endpoints.omniatech.io/v1/arbitrum/sepolia/public".to_string(),
"https://sepolia-rollup.arbitrum.io/rpc".to_string(),
];

Ok(Self {
contract,
cache: Arc::new(DashMap::new()),
rpc_endpoints,
})
}

Expand All @@ -245,8 +256,11 @@ impl ShinkaiRegistry {
let identity_clone = identity.clone();
let contract_clone = self.contract.clone();
let cache_clone = self.cache.clone();
let rpc_endpoints_clone = self.rpc_endpoints.clone();
task::spawn(async move {
if let Err(e) = Self::update_cache(&contract_clone, &cache_clone, identity_clone).await {
if let Err(e) =
Self::update_cache(&contract_clone, &cache_clone, identity_clone, rpc_endpoints_clone).await
{
// Log the error
shinkai_log(
ShinkaiLogOption::CryptoIdentity,
Expand All @@ -261,17 +275,24 @@ impl ShinkaiRegistry {
}

// Otherwise, update the cache
let record = Self::update_cache(&self.contract, &self.cache, identity.clone()).await?;
let record = Self::update_cache(
&self.contract,
&self.cache,
identity.clone(),
self.rpc_endpoints.clone(),
)
.await?;
Ok(record.clone())
}

async fn update_cache(
contract: &ContractInstance<Arc<Provider<Http>>, Provider<Http>>,
cache: &DashMap<String, (SystemTime, OnchainIdentity)>,
identity: String,
rpc_endpoints: Vec<String>,
) -> Result<OnchainIdentity, ShinkaiRegistryError> {
// Fetch the identity record from the contract
let record = Self::fetch_identity_record(contract, identity.clone()).await?;
let record = Self::fetch_identity_record(contract, identity.clone(), rpc_endpoints).await?;

// Update the cache and the timestamp
cache.insert(identity.clone(), (SystemTime::now(), record.clone()));
Expand All @@ -286,48 +307,67 @@ impl ShinkaiRegistry {
pub async fn fetch_identity_record(
contract: &ContractInstance<Arc<Provider<Http>>, Provider<Http>>,
identity: String,
rpc_endpoints: Vec<String>,
) -> Result<OnchainIdentity, ShinkaiRegistryError> {
let function_call = match contract.method::<_, (U256, U256, String, String, bool, Vec<String>, U256, U256)>(
"getIdentityData",
(identity.clone(),),
) {
Ok(call) => call,
Err(err) => {
shinkai_log(
ShinkaiLogOption::CryptoIdentity,
ShinkaiLogLevel::Error,
format!("Error creating function call: {}", err).as_str(),
);
return Err(ShinkaiRegistryError::ContractAbiError(err));
let mut last_error = None;

for rpc in rpc_endpoints {
let provider = match Provider::<Http>::try_from(rpc) {
Ok(provider) => provider,
Err(err) => {
last_error = Some(ShinkaiRegistryError::CustomError(err.to_string()));
continue;
}
};

let contract = Contract::new(contract.address(), contract.abi().clone(), Arc::new(provider));

let function_call = match contract.method::<_, (U256, U256, String, String, bool, Vec<String>, U256, U256)>(
"getIdentityData",
(identity.clone(),),
) {
Ok(call) => call,
Err(err) => {
shinkai_log(
ShinkaiLogOption::CryptoIdentity,
ShinkaiLogLevel::Error,
format!("Error creating function call: {}", err).as_str(),
);
last_error = Some(ShinkaiRegistryError::ContractAbiError(err));
continue;
}
};

match function_call.call().await {
Ok(result) => {
let last_updated = UNIX_EPOCH + Duration::from_secs(result.7.low_u64());
let last_updated = DateTime::<Utc>::from(last_updated);

return Ok(OnchainIdentity {
shinkai_identity: identity,
bound_nft: result.0,
staked_tokens: result.1,
encryption_key: result.2,
signature_key: result.3,
routing: result.4,
address_or_proxy_nodes: result.5,
delegated_tokens: result.6,
last_updated,
});
}
Err(e) => {
eprintln!("Error calling contract: {}", e);
shinkai_log(
ShinkaiLogOption::CryptoIdentity,
ShinkaiLogLevel::Error,
format!("Error calling contract: {}", e).as_str(),
);
last_error = Some(ShinkaiRegistryError::CustomError("Contract Error".to_string()));
}
}
};

let result: (U256, U256, String, String, bool, Vec<String>, U256, U256) = match function_call.call().await {
Ok(res) => res,
Err(e) => {
shinkai_log(
ShinkaiLogOption::CryptoIdentity,
ShinkaiLogLevel::Error,
format!("Error calling contract: {}", e).as_str(),
);
return Err(ShinkaiRegistryError::CustomError("Contract Error".to_string()));
}
};
}

let last_updated = UNIX_EPOCH + Duration::from_secs(result.7.low_u64());
let last_updated = DateTime::<Utc>::from(last_updated);

Ok(OnchainIdentity {
shinkai_identity: identity,
bound_nft: result.0,
staked_tokens: result.1,
encryption_key: result.2,
signature_key: result.3,
routing: result.4,
address_or_proxy_nodes: result.5,
delegated_tokens: result.6,
last_updated,
})
Err(last_error.unwrap_or_else(|| ShinkaiRegistryError::CustomError("All RPC endpoints failed".to_string())))
}
}

Expand Down

0 comments on commit 307d10f

Please sign in to comment.