From bec213a7a944c662a108dba525bb9012b8fe4e85 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 26 Oct 2023 17:40:38 +0800 Subject: [PATCH 01/26] trace scan requests Signed-off-by: Ping Yu --- Cargo.toml | 1 + src/request/plan.rs | 2 ++ src/transaction/transaction.rs | 23 ++++++++++++++++++++++- 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9535a6e4..ffa0446c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ serde_derive = "1.0" thiserror = "1" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } tonic = { version = "0.9", features = ["tls"] } +tracing = "0.1" [dev-dependencies] clap = "2" diff --git a/src/request/plan.rs b/src/request/plan.rs index ab72e8aa..51d8782e 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -11,6 +11,7 @@ use log::debug; use log::info; use tokio::sync::Semaphore; use tokio::time::sleep; +use tracing::instrument; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -104,6 +105,7 @@ where { // A plan may involve multiple shards #[async_recursion] + #[instrument(skip_all)] async fn single_plan_handler( pd_client: Arc, current_plan: P, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 671d6140..389cdb4d 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use std::fmt; use std::iter; use std::marker::PhantomData; use std::sync::Arc; @@ -12,6 +13,7 @@ use log::debug; use log::warn; use tokio::sync::RwLock; use tokio::time::Duration; +use tracing::{instrument, Span}; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -87,6 +89,17 @@ pub struct Transaction, } +impl> fmt::Debug for Transaction { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Transaction") + .field("timestamp", &self.timestamp) + .field("options", &self.options) + .field("is_heartbeat_started", &self.is_heartbeat_started) + .field("start_instant", &self.start_instant) + .finish() + } +} + impl> Transaction { pub(crate) fn new( timestamp: Timestamp, @@ -353,6 +366,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` + #[instrument(skip_all)] pub async fn scan( &mut self, range: impl Into, @@ -389,6 +403,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` + #[instrument(skip_all)] pub async fn scan_keys( &mut self, range: impl Into, @@ -404,6 +419,7 @@ impl> Transaction { /// Create a 'scan_reverse' request. /// /// Similar to [`scan`](Transaction::scan), but scans in the reverse direction. + #[instrument(skip_all)] pub async fn scan_reverse( &mut self, range: impl Into, @@ -416,6 +432,7 @@ impl> Transaction { /// Create a 'scan_keys_reverse' request. /// /// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction. + #[instrument(skip_all)] pub async fn scan_keys_reverse( &mut self, range: impl Into, @@ -758,6 +775,7 @@ impl> Transaction { plan.execute().await } + #[instrument(skip(range), fields(range))] async fn scan_inner( &mut self, range: impl Into, @@ -770,9 +788,12 @@ impl> Transaction { let rpc = self.rpc.clone(); let retry_options = self.options.retry_options.clone(); + let range = range.into(); + Span::current().record("range", &tracing::field::debug(&range)); + self.buffer .scan_and_fetch( - range.into(), + range, limit, !key_only, reverse, From e35f479c7643ef81a3d45c236c78fa6d2e843803 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 26 Oct 2023 21:01:27 +0800 Subject: [PATCH 02/26] wip Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 389cdb4d..5239e482 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -791,6 +791,8 @@ impl> Transaction { let range = range.into(); Span::current().record("range", &tracing::field::debug(&range)); + tracing::info!("scan_inner xxx"); + self.buffer .scan_and_fetch( range, From 547eabbecb437baff0ba378d4278d712d3b7b5cb Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 26 Oct 2023 21:35:46 +0800 Subject: [PATCH 03/26] wip Signed-off-by: Ping Yu --- src/raw/requests.rs | 9 +++++++++ src/request/mod.rs | 2 +- src/request/plan.rs | 17 ++++++++++++++--- src/store/request.rs | 3 ++- src/transaction/transaction.rs | 12 ++++++------ 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/raw/requests.rs b/src/raw/requests.rs index 23bfce73..2e4bfc8e 100644 --- a/src/raw/requests.rs +++ b/src/raw/requests.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; +use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; use std::time::Duration; @@ -404,6 +405,14 @@ impl Request for RawCoprocessorRequest { } } +impl std::fmt::Debug for RawCoprocessorRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RawCoprocessorRequest") + .field("inner", &self.inner) + .finish() + } +} + impl KvRequest for RawCoprocessorRequest { type Response = kvrpcpb::RawCoprocessorResponse; } diff --git a/src/request/mod.rs b/src/request/mod.rs index aecaf26d..1f2db6d4 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,7 +129,7 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone)] + #[derive(Clone, Debug)] struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/plan.rs b/src/request/plan.rs index 51d8782e..4f4d5693 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -7,10 +7,10 @@ use async_recursion::async_recursion; use async_trait::async_trait; use futures::future::try_join_all; use futures::prelude::*; -use log::debug; -use log::info; use tokio::sync::Semaphore; use tokio::time::sleep; +use tracing::debug; +use tracing::info; use tracing::instrument; use crate::backoff::Backoff; @@ -58,7 +58,9 @@ pub struct Dispatch { impl Plan for Dispatch { type Result = Req::Response; + #[instrument(skip_all, fields(label = self.request.label(), request = ?self.request))] async fn execute(&self) -> Result { + debug!("execute"); let stats = tikv_stats(self.request.label()); let result = self .kv_client @@ -113,6 +115,7 @@ where permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { + debug!("single_plan_handler"); let shards = current_plan.shards(&pd_client).collect::>().await; let mut handles = Vec::new(); for shard in shards { @@ -151,6 +154,7 @@ where } #[async_recursion] + #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] async fn single_shard_handler( pd_client: Arc, plan: P, @@ -159,6 +163,7 @@ where permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { + debug!("single_shard_handler"); // limit concurrent requests let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; @@ -212,11 +217,13 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level + #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, region_store: RegionStore, ) -> Result { + debug!("handle_region_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { @@ -268,11 +275,13 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level + #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, error: EpochNotMatch, ) -> Result { + debug!("on_region_epoch_not_match: {:?}", error); let ver_id = region_store.region_with_leader.ver_id(); if error.current_regions.is_empty() { pd_client.invalidate_region_cache(ver_id).await; @@ -304,6 +313,7 @@ where Ok(false) } + #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -313,7 +323,7 @@ where preserve_region_results: bool, e: Error, ) -> Result<::Result> { - debug!("handle grpc error: {:?}", e); + debug!("handle_grpc_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); pd_client.invalidate_region_cache(ver_id).await; match backoff.next_delay_duration() { @@ -351,6 +361,7 @@ where { type Result = Vec>; + #[instrument(skip_all)] async fn execute(&self) -> Result { // Limit the maximum concurrency of multi-region request. If there are // too many concurrent requests, TiKV is more likely to return a "TiKV diff --git a/src/store/request.rs b/src/store/request.rs index ec7e08a4..d8fc7533 100644 --- a/src/store/request.rs +++ b/src/store/request.rs @@ -1,6 +1,7 @@ // Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. use std::any::Any; +use std::fmt::Debug; use std::time::Duration; use async_trait::async_trait; @@ -13,7 +14,7 @@ use crate::Error; use crate::Result; #[async_trait] -pub trait Request: Any + Sync + Send + 'static { +pub trait Request: Any + Sync + Send + Debug + 'static { async fn dispatch( &self, client: &TikvClient, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 5239e482..6866b4da 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -9,11 +9,12 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; -use log::debug; -use log::warn; use tokio::sync::RwLock; use tokio::time::Duration; -use tracing::{instrument, Span}; +use tracing::debug; +use tracing::instrument; +use tracing::warn; +use tracing::Span; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -775,7 +776,7 @@ impl> Transaction { plan.execute().await } - #[instrument(skip(range), fields(range))] + #[instrument(skip(self, range), fields(range))] async fn scan_inner( &mut self, range: impl Into, @@ -783,6 +784,7 @@ impl> Transaction { key_only: bool, reverse: bool, ) -> Result> { + debug!("scan_inner"); self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); @@ -791,8 +793,6 @@ impl> Transaction { let range = range.into(); Span::current().record("range", &tracing::field::debug(&range)); - tracing::info!("scan_inner xxx"); - self.buffer .scan_and_fetch( range, From e35a71bba368158c386e67697dadc2abceb0822a Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 27 Oct 2023 13:28:05 +0800 Subject: [PATCH 04/26] wip Signed-off-by: Ping Yu --- src/request/plan.rs | 39 +++++++++++++++++++--------------- src/request/plan_builder.rs | 2 ++ src/store/mod.rs | 28 ++++++++++++++++++++++++ src/transaction/snapshot.rs | 2 ++ src/transaction/transaction.rs | 10 ++++----- 5 files changed, 59 insertions(+), 22 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 4f4d5693..abbf72ce 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -9,9 +9,9 @@ use futures::future::try_join_all; use futures::prelude::*; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::debug; -use tracing::info; use tracing::instrument; +use tracing::{debug, span}; +use tracing::{info, Instrument}; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -58,9 +58,9 @@ pub struct Dispatch { impl Plan for Dispatch { type Result = Req::Response; - #[instrument(skip_all, fields(label = self.request.label(), request = ?self.request))] + #[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label(), request = ?self.request))] async fn execute(&self) -> Result { - debug!("execute"); + debug!("Dispatch::execute"); let stats = tikv_stats(self.request.label()); let result = self .kv_client @@ -120,16 +120,21 @@ where let mut handles = Vec::new(); for shard in shards { let (shard, region_store) = shard?; + let span = span!(tracing::Level::INFO, "shard", ?region_store); + let mut clone = current_plan.clone(); clone.apply_shard(shard, ®ion_store)?; - let handle = tokio::spawn(Self::single_shard_handler( - pd_client.clone(), - clone, - region_store, - backoff.clone(), - permits.clone(), - preserve_region_results, - )); + let handle = tokio::spawn( + Self::single_shard_handler( + pd_client.clone(), + clone, + region_store, + backoff.clone(), + permits.clone(), + preserve_region_results, + ) + .instrument(span), + ); handles.push(handle); } @@ -154,7 +159,7 @@ where } #[async_recursion] - #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] + #[instrument(skip_all, fields(region_store = ?region_store))] async fn single_shard_handler( pd_client: Arc, plan: P, @@ -217,7 +222,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] + #[instrument(skip_all, fields(region_store = ?region_store))] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, @@ -275,7 +280,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] + #[instrument(skip_all, fields(region_store = ?region_store))] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, @@ -313,7 +318,7 @@ where Ok(false) } - #[instrument(skip_all, fields(region = ?region_store.region_with_leader))] + #[instrument(skip_all, fields(region_store = ?region_store))] async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -361,7 +366,7 @@ where { type Result = Vec>; - #[instrument(skip_all)] + #[instrument(name = "RetryableMultiRegion::execute", skip_all)] async fn execute(&self) -> Result { // Limit the maximum concurrency of multi-region request. If there are // too many concurrent requests, TiKV is more likely to return a "TiKV diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 8e2329e7..519ff5b8 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -2,6 +2,7 @@ use std::marker::PhantomData; use std::sync::Arc; +use tracing::instrument; use super::plan::PreserveShard; use crate::backoff::Backoff; @@ -159,6 +160,7 @@ where /// Preserve all results, even some of them are Err. /// To pass all responses to merge, and handle partial successful results correctly. + #[instrument(skip_all)] pub fn retry_multi_region_preserve_results( self, backoff: Backoff, diff --git a/src/store/mod.rs b/src/store/mod.rs index a244a1bc..fdfb232b 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -6,6 +6,7 @@ mod request; use std::cmp::max; use std::cmp::min; +use std::fmt; use std::sync::Arc; use async_trait::async_trait; @@ -33,6 +34,33 @@ pub struct RegionStore { pub client: Arc, } +impl fmt::Debug for RegionStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RegionStore") + .field("region_id", &self.region_with_leader.region.id) + .field( + "region_version", + &self + .region_with_leader + .region + .region_epoch + .as_ref() + .map(|e| e.version) + .unwrap_or_default(), + ) + .field( + "leader_store_id", + &self + .region_with_leader + .leader + .as_ref() + .map(|l| l.store_id) + .unwrap_or_default(), + ) + .finish() + } +} + #[derive(new, Clone)] pub struct Store { pub client: Arc, diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index a8aa9464..93f3220e 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -3,6 +3,7 @@ use derive_new::new; use log::debug; use std::marker::PhantomData; +use tracing::instrument; use crate::codec::ApiV1TxnCodec; use crate::pd::{PdClient, PdRpcClient}; @@ -50,6 +51,7 @@ impl> Snapshot { } /// Scan a range, return at most `limit` key-value pairs that lying in the range. + #[instrument(name = "Snapshot::scan", skip_all)] pub async fn scan( &mut self, range: impl Into, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 6866b4da..f55c6486 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -367,7 +367,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` - #[instrument(skip_all)] + #[instrument(name = "Transaction::scan", skip_all)] pub async fn scan( &mut self, range: impl Into, @@ -404,7 +404,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` - #[instrument(skip_all)] + #[instrument(name = "Transaction::scan_keys", skip_all)] pub async fn scan_keys( &mut self, range: impl Into, @@ -420,7 +420,7 @@ impl> Transaction { /// Create a 'scan_reverse' request. /// /// Similar to [`scan`](Transaction::scan), but scans in the reverse direction. - #[instrument(skip_all)] + #[instrument(name = "Transaction::scan_reverse", skip_all)] pub async fn scan_reverse( &mut self, range: impl Into, @@ -433,7 +433,7 @@ impl> Transaction { /// Create a 'scan_keys_reverse' request. /// /// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction. - #[instrument(skip_all)] + #[instrument(name = "Transaction::scan_keys_reverse", skip_all)] pub async fn scan_keys_reverse( &mut self, range: impl Into, @@ -784,7 +784,7 @@ impl> Transaction { key_only: bool, reverse: bool, ) -> Result> { - debug!("scan_inner"); + debug!("Transaction::scan_inner"); self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); From 942dbfc67d9005bd9d3cdabba5c4fe59d7387153 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 27 Oct 2023 15:56:55 +0800 Subject: [PATCH 05/26] wip Signed-off-by: Ping Yu --- Cargo.toml | 2 +- src/kv/mod.rs | 2 +- src/request/plan.rs | 25 ++++++++++++++++++++----- src/transaction/lock.rs | 14 ++++++++++++-- src/transaction/requests.rs | 9 +++++++++ src/transaction/transaction.rs | 2 +- 6 files changed, 44 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ffa0446c..f9bc6395 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ serde_derive = "1.0" thiserror = "1" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } tonic = { version = "0.9", features = ["tls"] } -tracing = "0.1" +tracing = "0.1.40" [dev-dependencies] clap = "2" diff --git a/src/kv/mod.rs b/src/kv/mod.rs index 489110e6..9e9cc6b1 100644 --- a/src/kv/mod.rs +++ b/src/kv/mod.rs @@ -14,7 +14,7 @@ pub use key::Key; pub use kvpair::KvPair; pub use value::Value; -struct HexRepr<'a>(pub &'a [u8]); +pub struct HexRepr<'a>(pub &'a [u8]); impl<'a> fmt::Display for HexRepr<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/request/plan.rs b/src/request/plan.rs index abbf72ce..ba92b821 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -9,9 +9,9 @@ use futures::future::try_join_all; use futures::prelude::*; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::instrument; use tracing::{debug, span}; use tracing::{info, Instrument}; +use tracing::{info_span, instrument}; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -58,7 +58,7 @@ pub struct Dispatch { impl Plan for Dispatch { type Result = Req::Response; - #[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label(), request = ?self.request))] + #[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label()))] async fn execute(&self) -> Result { debug!("Dispatch::execute"); let stats = tikv_stats(self.request.label()); @@ -69,6 +69,7 @@ impl Plan for Dispatch { .dispatch(&self.request) .await; let result = stats.done(result); + debug!("Dispatch::execute done"); result.map(|r| { *r.downcast() .expect("Downcast failed: request and response type mismatch") @@ -139,6 +140,7 @@ where } let results = try_join_all(handles).await?; + debug!("single_plan_handler done"); if preserve_region_results { Ok(results .into_iter() @@ -173,6 +175,7 @@ where let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; drop(permit); + debug!("single_shard_handler execute done"); let mut resp = match res { Ok(resp) => resp, @@ -331,7 +334,7 @@ where debug!("handle_grpc_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); pd_client.invalidate_region_cache(ver_id).await; - match backoff.next_delay_duration() { + let res = match backoff.next_delay_duration() { Some(duration) => { sleep(duration).await; Self::single_plan_handler( @@ -344,7 +347,9 @@ where .await } None => Err(e), - } + }; + debug!("handle_grpc_error done"); + res } } @@ -487,6 +492,7 @@ impl>>, M: Me { type Result = M::Out; + #[instrument(name = "MergeResponse::execute", skip_all)] async fn execute(&self) -> Result { self.merge.merge(self.inner.execute().await?) } @@ -584,15 +590,21 @@ where type Result = P::Result; async fn execute(&self) -> Result { + let span = info_span!("ResolveLock::execute"); + let _enter = span.enter(); + debug!("ResolveLock::execute"); + let mut result = self.inner.execute().await?; let mut clone = self.clone(); loop { let locks = result.take_locks(); if locks.is_empty() { + debug!("ResolveLock::execute ok"); return Ok(result); } if self.backoff.is_none() { + debug!("ResolveLock::execute lock error"); return Err(Error::ResolveLockError(locks)); } @@ -602,7 +614,10 @@ where result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { - None => return Err(Error::ResolveLockError(live_locks)), + None => { + debug!("ResolveLock::execute lock error"); + return Err(Error::ResolveLockError(live_locks)); + } Some(delay_duration) => { sleep(delay_duration).await; result = clone.inner.execute().await?; diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 0793c4f1..4f046811 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -5,13 +5,15 @@ use std::collections::HashSet; use std::sync::Arc; use fail::fail_point; -use log::debug; -use log::error; use tokio::sync::RwLock; +use tracing::error; +use tracing::instrument; +use tracing::{debug, info_span}; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::backoff::OPTIMISTIC_BACKOFF; +use crate::kv::HexRepr; use crate::pd::PdClient; use crate::proto::kvrpcpb; use crate::proto::kvrpcpb::TxnInfo; @@ -41,6 +43,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get /// its status (committed or rolled back). Then, we use the status of its primary lock to determine /// the status of the other keys in the same transaction. +#[instrument(skip_all)] pub async fn resolve_locks( locks: Vec, pd_client: Arc, @@ -59,6 +62,9 @@ pub async fn resolve_locks( let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); for lock in expired_locks { + let span = info_span!("cleanup_expired_lock", lock.lock_version, lock.primary_lock = %HexRepr(&lock.primary_lock)); + let _enter = span.enter(); + let region_ver_id = pd_client .region_for_key(&lock.primary_lock.clone().into()) .await? @@ -75,6 +81,7 @@ pub async fn resolve_locks( let commit_version = match commit_versions.get(&lock.lock_version) { Some(&commit_version) => commit_version, None => { + debug!("cleanup lock"); let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version); let encoded_req = EncodedRequest::new(request, pd_client.get_codec()); let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req) @@ -84,6 +91,7 @@ pub async fn resolve_locks( .post_process_default() .plan(); let commit_version = plan.execute().await?; + debug!("cleanup lock done: commit_version {}", commit_version); commit_versions.insert(lock.lock_version, commit_version); commit_version } @@ -104,6 +112,7 @@ pub async fn resolve_locks( Ok(live_locks) } +#[instrument(skip(key, pd_client), fields(key = %HexRepr(key)))] async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, @@ -134,6 +143,7 @@ async fn resolve_lock_with_retry( // ResolveLockResponse can have at most 1 error match errors.pop() { e @ Some(Error::RegionError(_)) => { + pd_client.invalidate_region_cache(ver_id).await; error = e; continue; } diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 5e5a0514..b0753f0c 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -8,6 +8,8 @@ use either::Either; use futures::stream::BoxStream; use futures::stream::{self}; use futures::StreamExt; +use tracing::debug; +use tracing::instrument; use super::transaction::TXN_COMMIT_BATCH_SIZE; use crate::collect_first; @@ -175,7 +177,14 @@ shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { type Out = Vec; + #[instrument(name = "Collect::merge", skip_all)] fn merge(&self, input: Vec>) -> Result { + let length: usize = input + .iter() + .map(|r| r.as_ref().map(|r| r.pairs.len()).unwrap_or_default()) + .sum(); + debug!("Collect::merge: result length {}", length); + input .into_iter() .flat_map_ok(|resp| resp.pairs.into_iter().map(Into::into)) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f55c6486..5407cf72 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -776,7 +776,7 @@ impl> Transaction { plan.execute().await } - #[instrument(skip(self, range), fields(range))] + #[instrument(skip(self, range), fields(range, version=self.timestamp.version()))] async fn scan_inner( &mut self, range: impl Into, From 56e8f4de206d1e14a589bea2d6c2c495e6c5d7e6 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 30 Oct 2023 18:53:34 +0800 Subject: [PATCH 06/26] fix blocking_write Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 5407cf72..f30d0e68 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -1008,7 +1008,9 @@ impl Drop for Transaction { if std::thread::panicking() { return; } - let mut status = futures::executor::block_on(self.status.write()); + // Don't use `futures::executor::block_on`. + // See https://github.com/tokio-rs/tokio/issues/2376. + let mut status = self.status.blocking_write(); if *status == TransactionStatus::Active { match self.options.check_level { CheckLevel::Panic => { From 75ecb2ca27ef33d29140e9b725ba3fd6f5d24ce6 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 30 Oct 2023 21:00:52 +0800 Subject: [PATCH 07/26] comment out block_on Signed-off-by: Ping Yu --- src/transaction/snapshot.rs | 2 +- src/transaction/transaction.rs | 45 +++++++++++++++++----------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 93f3220e..a50df7d1 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -23,7 +23,7 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot> { +pub struct Snapshot = PdRpcClient> { transaction: Transaction, phantom: PhantomData, } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f30d0e68..f2449aa9 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -13,7 +13,6 @@ use tokio::sync::RwLock; use tokio::time::Duration; use tracing::debug; use tracing::instrument; -use tracing::warn; use tracing::Span; use crate::backoff::Backoff; @@ -79,7 +78,7 @@ use crate::Value; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction> { +pub struct Transaction = PdRpcClient> { status: Arc>, timestamp: Timestamp, buffer: Buffer, @@ -1002,28 +1001,29 @@ impl> Transaction { } } -impl Drop for Transaction { +impl> Drop for Transaction { + #[instrument(skip_all, fields(version=self.timestamp.version()))] fn drop(&mut self) { debug!("dropping transaction"); - if std::thread::panicking() { - return; - } - // Don't use `futures::executor::block_on`. - // See https://github.com/tokio-rs/tokio/issues/2376. - let mut status = self.status.blocking_write(); - if *status == TransactionStatus::Active { - match self.options.check_level { - CheckLevel::Panic => { - panic!("Dropping an active transaction. Consider commit or rollback it.") - } - CheckLevel::Warn => { - warn!("Dropping an active transaction. Consider commit or rollback it.") - } - - CheckLevel::None => {} - } - } - *status = TransactionStatus::Dropped; + // if std::thread::panicking() { + // return; + // } + // // Don't use `futures::executor::block_on`. + // // See https://github.com/tokio-rs/tokio/issues/2376. + // let mut status = tokio::task::spawn_blocking(self.status.write()); + // if *status == TransactionStatus::Active { + // match self.options.check_level { + // CheckLevel::Panic => { + // panic!("Dropping an active transaction. Consider commit or rollback it.") + // } + // CheckLevel::Warn => { + // warn!("Dropping an active transaction. Consider commit or rollback it.") + // } + // + // CheckLevel::None => {} + // } + // } + // *status = TransactionStatus::Dropped; } } @@ -1472,6 +1472,7 @@ enum TransactionStatus { /// The transaction has tried to rollback. Only `rollback` is allowed. StartedRollback, /// The transaction has been dropped. + #[allow(dead_code)] Dropped, } From 83ae139680742fb4eaeb9c5f5ac14309fdf31379 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 31 Oct 2023 20:46:02 +0800 Subject: [PATCH 08/26] use AtomicU8 for status Signed-off-by: Ping Yu --- src/transaction/snapshot.rs | 2 +- src/transaction/transaction.rs | 134 +++++++++++++++++++++------------ 2 files changed, 87 insertions(+), 49 deletions(-) diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index a8aa9464..0d1e4803 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -22,7 +22,7 @@ use crate::Value; /// /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods. #[derive(new)] -pub struct Snapshot> { +pub struct Snapshot = PdRpcClient> { transaction: Transaction, phantom: PhantomData, } diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 671d6140..10a7f6e7 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -2,6 +2,8 @@ use std::iter; use std::marker::PhantomData; +use std::sync::atomic; +use std::sync::atomic::AtomicU8; use std::sync::Arc; use std::time::Instant; @@ -10,7 +12,6 @@ use fail::fail_point; use futures::prelude::*; use log::debug; use log::warn; -use tokio::sync::RwLock; use tokio::time::Duration; use crate::backoff::Backoff; @@ -76,8 +77,8 @@ use crate::Value; /// txn.commit().await.unwrap(); /// # }); /// ``` -pub struct Transaction> { - status: Arc>, +pub struct Transaction = PdRpcClient> { + status: Arc, timestamp: Timestamp, buffer: Buffer, rpc: Arc, @@ -99,7 +100,7 @@ impl> Transaction { TransactionStatus::Active }; Transaction { - status: Arc::new(RwLock::new(status)), + status: Arc::new(AtomicU8::new(status as u8)), timestamp, buffer: Buffer::new(options.is_pessimistic()), rpc, @@ -632,15 +633,16 @@ impl> Transaction { /// ``` pub async fn commit(&mut self) -> Result> { debug!("commiting transaction"); - { - let mut status = self.status.write().await; - if !matches!( - *status, - TransactionStatus::StartedCommit | TransactionStatus::Active - ) { - return Err(Error::OperationAfterCommitError); - } - *status = TransactionStatus::StartedCommit; + if !self.transit_status( + |status| { + matches!( + status, + TransactionStatus::StartedCommit | TransactionStatus::Active + ) + }, + TransactionStatus::StartedCommit, + ) { + return Err(Error::OperationAfterCommitError); } let primary_key = self.buffer.get_primary_key(); @@ -665,8 +667,7 @@ impl> Transaction { .await; if res.is_ok() { - let mut status = self.status.write().await; - *status = TransactionStatus::Committed; + self.set_status(TransactionStatus::Committed); } res } @@ -689,21 +690,18 @@ impl> Transaction { /// ``` pub async fn rollback(&mut self) -> Result<()> { debug!("rolling back transaction"); - { - let status = self.status.read().await; - if !matches!( - *status, - TransactionStatus::StartedRollback - | TransactionStatus::Active - | TransactionStatus::StartedCommit - ) { - return Err(Error::OperationAfterCommitError); - } - } - - { - let mut status = self.status.write().await; - *status = TransactionStatus::StartedRollback; + if !self.transit_status( + |status| { + matches!( + status, + TransactionStatus::StartedRollback + | TransactionStatus::Active + | TransactionStatus::StartedCommit + ) + }, + TransactionStatus::StartedRollback, + ) { + return Err(Error::OperationAfterCommitError); } let primary_key = self.buffer.get_primary_key(); @@ -721,8 +719,7 @@ impl> Transaction { .await; if res.is_ok() { - let mut status = self.status.write().await; - *status = TransactionStatus::Rolledback; + self.set_status(TransactionStatus::Rolledback); } res } @@ -906,8 +903,7 @@ impl> Transaction { /// Checks if the transaction can perform arbitrary operations. async fn check_allow_operation(&self) -> Result<()> { - let status = self.status.read().await; - match *status { + match self.get_status() { TransactionStatus::ReadOnly | TransactionStatus::Active => Ok(()), TransactionStatus::Committed | TransactionStatus::Rolledback @@ -946,9 +942,9 @@ impl> Transaction { loop { tokio::time::sleep(heartbeat_interval).await; { - let status = status.read().await; + let status: TransactionStatus = status.load(atomic::Ordering::Acquire).into(); if matches!( - *status, + status, TransactionStatus::Rolledback | TransactionStatus::Committed | TransactionStatus::Dropped @@ -977,16 +973,42 @@ impl> Transaction { } }); } + + fn get_status(&self) -> TransactionStatus { + self.status.load(atomic::Ordering::Acquire).into() + } + + fn set_status(&self, status: TransactionStatus) { + self.status.store(status as u8, atomic::Ordering::Release); + } + + fn transit_status(&self, check_status: F, next: TransactionStatus) -> bool + where + F: Fn(TransactionStatus) -> bool, + { + let mut current = self.get_status(); + while check_status(current) { + match self.status.compare_exchange_weak( + current as u8, + next as u8, + atomic::Ordering::AcqRel, + atomic::Ordering::Acquire, + ) { + Ok(_) => return true, + Err(x) => current = x.into(), + } + } + false + } } -impl Drop for Transaction { +impl> Drop for Transaction { fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() { return; } - let mut status = futures::executor::block_on(self.status.write()); - if *status == TransactionStatus::Active { + if self.get_status() == TransactionStatus::Active { match self.options.check_level { CheckLevel::Panic => { panic!("Dropping an active transaction. Consider commit or rollback it.") @@ -998,7 +1020,7 @@ impl Drop for Transaction { CheckLevel::None => {} } } - *status = TransactionStatus::Dropped; + self.set_status(TransactionStatus::Dropped); } } @@ -1432,22 +1454,38 @@ impl Committer { } } -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone, Copy)] +#[repr(u8)] enum TransactionStatus { /// The transaction is read-only [`Snapshot`](super::Snapshot), no need to commit or rollback or panic on drop. - ReadOnly, + ReadOnly = 0, /// The transaction have not been committed or rolled back. - Active, + Active = 1, /// The transaction has committed. - Committed, + Committed = 2, /// The transaction has tried to commit. Only `commit` is allowed. - StartedCommit, + StartedCommit = 3, /// The transaction has rolled back. - Rolledback, + Rolledback = 4, /// The transaction has tried to rollback. Only `rollback` is allowed. - StartedRollback, + StartedRollback = 5, /// The transaction has been dropped. - Dropped, + Dropped = 6, +} + +impl From for TransactionStatus { + fn from(num: u8) -> Self { + match num { + 0 => TransactionStatus::ReadOnly, + 1 => TransactionStatus::Active, + 2 => TransactionStatus::Committed, + 3 => TransactionStatus::StartedCommit, + 4 => TransactionStatus::Rolledback, + 5 => TransactionStatus::StartedRollback, + 6 => TransactionStatus::Dropped, + _ => panic!("Unknown transaction status {}", num), + } + } } #[cfg(test)] From 34a79367270e5d92a10dc598a3df1bd8de874797 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 31 Oct 2023 21:06:29 +0800 Subject: [PATCH 09/26] simplify Signed-off-by: Ping Yu --- src/request/plan.rs | 22 +++++-------------- src/transaction/transaction.rs | 40 +++++++++++++++------------------- 2 files changed, 23 insertions(+), 39 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index ba92b821..2e1fdc2d 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -60,7 +60,6 @@ impl Plan for Dispatch { #[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label()))] async fn execute(&self) -> Result { - debug!("Dispatch::execute"); let stats = tikv_stats(self.request.label()); let result = self .kv_client @@ -69,7 +68,6 @@ impl Plan for Dispatch { .dispatch(&self.request) .await; let result = stats.done(result); - debug!("Dispatch::execute done"); result.map(|r| { *r.downcast() .expect("Downcast failed: request and response type mismatch") @@ -116,7 +114,6 @@ where permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { - debug!("single_plan_handler"); let shards = current_plan.shards(&pd_client).collect::>().await; let mut handles = Vec::new(); for shard in shards { @@ -140,7 +137,6 @@ where } let results = try_join_all(handles).await?; - debug!("single_plan_handler done"); if preserve_region_results { Ok(results .into_iter() @@ -170,12 +166,10 @@ where permits: Arc, preserve_region_results: bool, ) -> Result<::Result> { - debug!("single_shard_handler"); // limit concurrent requests let permit = permits.acquire().await.unwrap(); let res = plan.execute().await; drop(permit); - debug!("single_shard_handler execute done"); let mut resp = match res { Ok(resp) => resp, @@ -225,13 +219,12 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region_store = ?region_store))] + #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, region_store: RegionStore, ) -> Result { - debug!("handle_region_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { @@ -283,13 +276,12 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region_store = ?region_store))] + #[instrument(skip_all, fields(region_store = ?region_store, error = ?error))] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, error: EpochNotMatch, ) -> Result { - debug!("on_region_epoch_not_match: {:?}", error); let ver_id = region_store.region_with_leader.ver_id(); if error.current_regions.is_empty() { pd_client.invalidate_region_cache(ver_id).await; @@ -321,7 +313,7 @@ where Ok(false) } - #[instrument(skip_all, fields(region_store = ?region_store))] + #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -331,10 +323,9 @@ where preserve_region_results: bool, e: Error, ) -> Result<::Result> { - debug!("handle_grpc_error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); pd_client.invalidate_region_cache(ver_id).await; - let res = match backoff.next_delay_duration() { + match backoff.next_delay_duration() { Some(duration) => { sleep(duration).await; Self::single_plan_handler( @@ -347,9 +338,7 @@ where .await } None => Err(e), - }; - debug!("handle_grpc_error done"); - res + } } } @@ -592,7 +581,6 @@ where async fn execute(&self) -> Result { let span = info_span!("ResolveLock::execute"); let _enter = span.enter(); - debug!("ResolveLock::execute"); let mut result = self.inner.execute().await?; let mut clone = self.clone(); diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index f2449aa9..bd8a3a67 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -11,9 +11,9 @@ use fail::fail_point; use futures::prelude::*; use tokio::sync::RwLock; use tokio::time::Duration; -use tracing::debug; use tracing::instrument; use tracing::Span; +use tracing::{debug, warn}; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -783,7 +783,6 @@ impl> Transaction { key_only: bool, reverse: bool, ) -> Result> { - debug!("Transaction::scan_inner"); self.check_allow_operation().await?; let timestamp = self.timestamp.clone(); let rpc = self.rpc.clone(); @@ -1005,25 +1004,23 @@ impl> Drop for Transaction { #[instrument(skip_all, fields(version=self.timestamp.version()))] fn drop(&mut self) { debug!("dropping transaction"); - // if std::thread::panicking() { - // return; - // } - // // Don't use `futures::executor::block_on`. - // // See https://github.com/tokio-rs/tokio/issues/2376. - // let mut status = tokio::task::spawn_blocking(self.status.write()); - // if *status == TransactionStatus::Active { - // match self.options.check_level { - // CheckLevel::Panic => { - // panic!("Dropping an active transaction. Consider commit or rollback it.") - // } - // CheckLevel::Warn => { - // warn!("Dropping an active transaction. Consider commit or rollback it.") - // } - // - // CheckLevel::None => {} - // } - // } - // *status = TransactionStatus::Dropped; + if std::thread::panicking() { + return; + } + let mut status = futures::executor::block_on(self.status.write()); + if *status == TransactionStatus::Active { + match self.options.check_level { + CheckLevel::Panic => { + panic!("Dropping an active transaction. Consider commit or rollback it.") + } + CheckLevel::Warn => { + warn!("Dropping an active transaction. Consider commit or rollback it.") + } + + CheckLevel::None => {} + } + } + *status = TransactionStatus::Dropped; } } @@ -1472,7 +1469,6 @@ enum TransactionStatus { /// The transaction has tried to rollback. Only `rollback` is allowed. StartedRollback, /// The transaction has been dropped. - #[allow(dead_code)] Dropped, } From fdec18162e21cc5bc7bbae7022ad432603e07270 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 31 Oct 2023 21:08:01 +0800 Subject: [PATCH 10/26] fix check Signed-off-by: Ping Yu --- src/transaction/lock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 0793c4f1..afb1d6c4 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -98,7 +98,7 @@ pub async fn resolve_locks( .await?; clean_regions .entry(lock.lock_version) - .or_insert_with(HashSet::new) + .or_default() .insert(cleaned_region); } Ok(live_locks) From a1afcc1fe1db573079799494fd81232f2174986b Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 31 Oct 2023 21:16:49 +0800 Subject: [PATCH 11/26] skip exchange on equal Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 10a7f6e7..e984f153 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -988,6 +988,9 @@ impl> Transaction { { let mut current = self.get_status(); while check_status(current) { + if current == next { + return true; + } match self.status.compare_exchange_weak( current as u8, next as u8, From 74df9f3fed9fbb4931bb19c6f6deb8ddd3dc2379 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Tue, 31 Oct 2023 21:22:33 +0800 Subject: [PATCH 12/26] fix check Signed-off-by: Ping Yu --- src/transaction/transaction.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 3dd84ea3..466390bd 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -6,6 +6,7 @@ use std::marker::PhantomData; use std::sync::atomic; use std::sync::atomic::AtomicU8; use std::sync::Arc; +use std::time::Duration; use std::time::Instant; use derive_new::new; From bab9d01b2882209a7bb7143556fd1c07939afbb8 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 1 Nov 2023 18:20:33 +0800 Subject: [PATCH 13/26] polish Signed-off-by: Ping Yu --- src/request/plan.rs | 9 ++++++--- src/transaction/lock.rs | 3 ++- src/transaction/transaction.rs | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 2e1fdc2d..f102473c 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -9,9 +9,12 @@ use futures::future::try_join_all; use futures::prelude::*; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::{debug, span}; -use tracing::{info, Instrument}; -use tracing::{info_span, instrument}; +use tracing::debug; +use tracing::info; +use tracing::info_span; +use tracing::instrument; +use tracing::span; +use tracing::Instrument; use crate::backoff::Backoff; use crate::pd::PdClient; diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index d870b994..9c183a22 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -6,9 +6,10 @@ use std::sync::Arc; use fail::fail_point; use tokio::sync::RwLock; +use tracing::debug; use tracing::error; +use tracing::info_span; use tracing::instrument; -use tracing::{debug, info_span}; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 466390bd..bcc1b1ab 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -12,9 +12,10 @@ use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; +use tracing::debug; use tracing::instrument; +use tracing::warn; use tracing::Span; -use tracing::{debug, warn}; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; From 7d8b777c84aeaadfb652407b5cecf17d8b058d31 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Thu, 2 Nov 2023 20:30:49 +0800 Subject: [PATCH 14/26] trace tso Signed-off-by: Ping Yu --- src/pd/timestamp.rs | 6 ++++-- src/request/plan.rs | 10 ++++++++++ src/transaction/lock.rs | 5 +++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index 672b587b..f1730bba 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -20,13 +20,14 @@ use futures::prelude::*; use futures::task::AtomicWaker; use futures::task::Context; use futures::task::Poll; -use log::debug; -use log::info; use pin_project::pin_project; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; use tonic::transport::Channel; +use tracing::debug; +use tracing::info; +use tracing::instrument; use crate::internal_err; use crate::proto::pdpb::pd_client::PdClient; @@ -63,6 +64,7 @@ impl TimestampOracle { Ok(TimestampOracle { request_tx }) } + #[instrument(name = "TimestampOracle::get_timestamp", skip_all)] pub(crate) async fn get_timestamp(self) -> Result { debug!("getting current timestamp"); let (request, response) = oneshot::channel(); diff --git a/src/request/plan.rs b/src/request/plan.rs index f102473c..6d6941ad 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -587,7 +587,12 @@ where let mut result = self.inner.execute().await?; let mut clone = self.clone(); + let mut retry_cnt = 0; loop { + retry_cnt += 1; + let span = info_span!("ResolveLock::execute::retry", retry_cnt); + let _enter = span.enter(); + let locks = result.take_locks(); if locks.is_empty() { debug!("ResolveLock::execute ok"); @@ -602,6 +607,7 @@ where let pd_client = self.pd_client.clone(); let live_locks = resolve_locks(locks, pd_client.clone()).await?; if live_locks.is_empty() { + debug!("ResolveLock::execute lock error retry (resolved)",); result = self.inner.execute().await?; } else { match clone.backoff.next_delay_duration() { @@ -610,6 +616,10 @@ where return Err(Error::ResolveLockError(live_locks)); } Some(delay_duration) => { + debug!( + "ResolveLock::execute lock error retry (delay {:?})", + delay_duration + ); sleep(delay_duration).await; result = clone.inner.execute().await?; } diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 9c183a22..1d32c54c 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -58,6 +58,11 @@ pub async fn resolve_locks( ts.physical - Timestamp::from_version(lock.lock_version).physical >= lock.lock_ttl as i64 }); + debug!( + "resolving locks: expired_locks {}, live_locks {}", + expired_locks.len(), + live_locks.len() + ); // records the commit version of each primary lock (representing the status of the transaction) let mut commit_versions: HashMap = HashMap::new(); From aabe8e5e45d5fa7796210237733b0b1169310321 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 3 Nov 2023 16:31:20 +0800 Subject: [PATCH 15/26] fix get tso hang Signed-off-by: Ping Yu --- src/pd/retry.rs | 52 +++++++++++++++++++++++++++++---------------- src/pd/timestamp.rs | 4 ++-- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 0e65e13b..2dd54487 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -70,18 +70,12 @@ impl RetryClient { } } -macro_rules! retry { - ($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{ +macro_rules! retry_core { + ($self: ident, $tag: literal, $call: expr) => {{ let stats = pd_stats($tag); let mut last_err = Ok(()); for _ in 0..LEADER_CHANGE_RETRY { - // use the block here to drop the guard of the read lock, - // otherwise `reconnect` will try to acquire the write lock and results in a deadlock - let res = { - let $cluster = &mut $self.cluster.write().await.0; - let res = $call.await; - res - }; + let res = $call; match stats.done(res) { Ok(r) => return Ok(r), @@ -103,6 +97,28 @@ macro_rules! retry { }}; } +macro_rules! retry_mut { + ($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{ + retry_core!($self, $tag, { + // use the block here to drop the guard of the lock, + // otherwise `reconnect` will try to acquire the write lock and results in a deadlock + let $cluster = &mut $self.cluster.write().await.0; + $call.await + }) + }}; +} + +macro_rules! retry { + ($self: ident, $tag: literal, |$cluster: ident| $call: expr) => {{ + retry_core!($self, $tag, { + // use the block here to drop the guard of the lock, + // otherwise `reconnect` will try to acquire the write lock and results in a deadlock + let $cluster = &$self.cluster.read().await.0; + $call.await + }) + }}; +} + impl RetryClient { pub async fn connect( endpoints: &[String], @@ -127,7 +143,7 @@ impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. async fn get_region(self: Arc, key: Vec) -> Result { - retry!(self, "get_region", |cluster| { + retry_mut!(self, "get_region", |cluster| { let key = key.clone(); async { cluster @@ -141,7 +157,7 @@ impl RetryClientTrait for RetryClient { } async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { - retry!(self, "get_region_by_id", |cluster| async { + retry_mut!(self, "get_region_by_id", |cluster| async { cluster .get_region_by_id(region_id, self.timeout) .await @@ -152,7 +168,7 @@ impl RetryClientTrait for RetryClient { } async fn get_store(self: Arc, id: StoreId) -> Result { - retry!(self, "get_store", |cluster| async { + retry_mut!(self, "get_store", |cluster| async { cluster .get_store(id, self.timeout) .await @@ -161,7 +177,7 @@ impl RetryClientTrait for RetryClient { } async fn get_all_stores(self: Arc) -> Result> { - retry!(self, "get_all_stores", |cluster| async { + retry_mut!(self, "get_all_stores", |cluster| async { cluster .get_all_stores(self.timeout) .await @@ -174,7 +190,7 @@ impl RetryClientTrait for RetryClient { } async fn update_safepoint(self: Arc, safepoint: u64) -> Result { - retry!(self, "update_gc_safepoint", |cluster| async { + retry_mut!(self, "update_gc_safepoint", |cluster| async { cluster .update_safepoint(safepoint, self.timeout) .await @@ -257,11 +273,11 @@ mod test { } async fn retry_err(client: Arc) -> Result<()> { - retry!(client, "test", |_c| ready(Err(internal_err!("whoops")))) + retry_mut!(client, "test", |_c| ready(Err(internal_err!("whoops")))) } async fn retry_ok(client: Arc) -> Result<()> { - retry!(client, "test", |_c| ready(Ok::<_, Error>(()))) + retry_mut!(client, "test", |_c| ready(Ok::<_, Error>(()))) } executor::block_on(async { @@ -310,7 +326,7 @@ mod test { client: Arc, max_retries: Arc, ) -> Result<()> { - retry!(client, "test", |c| { + retry_mut!(client, "test", |c| { c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1; @@ -326,7 +342,7 @@ mod test { client: Arc, max_retries: Arc, ) -> Result<()> { - retry!(client, "test", |c| { + retry_mut!(client, "test", |c| { c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); let max_retries = max_retries.fetch_sub(1, Ordering::SeqCst) - 1; diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index 672b587b..ac99e052 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -148,8 +148,8 @@ impl Stream for TsoRequestStream { Poll::Ready(Some(sender)) => { requests.push(sender); } - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => break, + Poll::Ready(None) if requests.is_empty() => return Poll::Ready(None), + _ => break, } } From 55887f4093d99d00639969bc7dfe7a573e9bd3c0 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Fri, 3 Nov 2023 23:45:27 +0800 Subject: [PATCH 16/26] change all log to tracing Signed-off-by: Ping Yu --- src/common/security.rs | 2 +- src/pd/client.rs | 2 +- src/pd/cluster.rs | 6 +++--- src/raw/client.rs | 2 +- src/transaction/client.rs | 4 ++-- src/transaction/snapshot.rs | 2 +- tests/common/mod.rs | 4 ++-- tests/failpoint_tests.rs | 2 +- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/common/security.rs b/src/common/security.rs index 483759cf..8b7e4801 100644 --- a/src/common/security.rs +++ b/src/common/security.rs @@ -6,12 +6,12 @@ use std::path::Path; use std::path::PathBuf; use std::time::Duration; -use log::info; use regex::Regex; use tonic::transport::Certificate; use tonic::transport::Channel; use tonic::transport::ClientTlsConfig; use tonic::transport::Identity; +use tracing::info; use crate::internal_err; use crate::Result; diff --git a/src/pd/client.rs b/src/pd/client.rs index 5461cb57..34e2e42b 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use async_trait::async_trait; use futures::prelude::*; use futures::stream::BoxStream; -use log::info; use tokio::sync::RwLock; +use tracing::info; use crate::compat::stream_fn; use crate::kv::codec; diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 3df4d255..668ec0b3 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -6,12 +6,12 @@ use std::time::Duration; use std::time::Instant; use async_trait::async_trait; -use log::error; -use log::info; -use log::warn; use tonic::transport::Channel; use tonic::IntoRequest; use tonic::Request; +use tracing::error; +use tracing::info; +use tracing::warn; use super::timestamp::TimestampOracle; use crate::internal_err; diff --git a/src/raw/client.rs b/src/raw/client.rs index fc733015..df276a69 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::u32; use futures::StreamExt; -use log::debug; +use tracing::debug; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::common::Error; diff --git a/src/transaction/client.rs b/src/transaction/client.rs index 4bcb16d9..cd50e37c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -2,8 +2,8 @@ use std::sync::Arc; -use log::debug; -use log::info; +use tracing::debug; +use tracing::info; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index a50df7d1..452e2dd4 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -1,8 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use derive_new::new; -use log::debug; use std::marker::PhantomData; +use tracing::debug; use tracing::instrument; use crate::codec::ApiV1TxnCodec; diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4d63dd56..2566bf6f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,8 +7,6 @@ use std::convert::TryInto; use std::env; use std::time::Duration; -use log::info; -use log::warn; use rand::Rng; use tikv_client::Key; use tikv_client::RawClient; @@ -17,6 +15,8 @@ use tikv_client::Transaction; use tikv_client::TransactionClient; use tikv_client::{ColumnFamily, Snapshot, TransactionOptions}; use tokio::time::sleep; +use tracing::info; +use tracing::warn; const ENV_PD_ADDRS: &str = "PD_ADDRS"; const ENV_ENABLE_MULIT_REGION: &str = "MULTI_REGION"; diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index f34dff48..6906b0ff 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -9,7 +9,6 @@ use std::time::Duration; use common::*; use fail::FailScenario; -use log::info; use rand::thread_rng; use serial_test::serial; use tikv_client::transaction::Client; @@ -21,6 +20,7 @@ use tikv_client::Result; use tikv_client::RetryOptions; use tikv_client::TransactionClient; use tikv_client::TransactionOptions; +use tracing::info; #[tokio::test] #[serial] From 1d3d0744dc2e4ca8317fa8589910113b41d06d58 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Sat, 4 Nov 2023 16:08:08 +0800 Subject: [PATCH 17/26] more trace for tso Signed-off-by: Ping Yu --- src/pd/cluster.rs | 3 +++ src/pd/retry.rs | 15 ++++++++++++++- src/pd/timestamp.rs | 19 +++++++++++++++++++ 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 668ec0b3..3961e108 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -11,6 +11,7 @@ use tonic::IntoRequest; use tonic::Request; use tracing::error; use tracing::info; +use tracing::instrument; use tracing::warn; use super::timestamp::TimestampOracle; @@ -103,6 +104,7 @@ impl Connection { Connection { security_mgr } } + #[instrument(name = "pd::Connection::connect_cluster", skip_all)] pub async fn connect_cluster( &self, endpoints: &[String], @@ -122,6 +124,7 @@ impl Connection { } // Re-establish connection with PD leader in asynchronous fashion. + #[instrument(name = "pd::Connection::reconnect", skip_all)] pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 2dd54487..5cf173c7 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -10,6 +10,9 @@ use std::time::Instant; use async_trait::async_trait; use tokio::sync::RwLock; use tokio::time::sleep; +use tracing::debug; +use tracing::info_span; +use tracing::instrument; use crate::pd::Cluster; use crate::pd::Connection; @@ -74,7 +77,10 @@ macro_rules! retry_core { ($self: ident, $tag: literal, $call: expr) => {{ let stats = pd_stats($tag); let mut last_err = Ok(()); - for _ in 0..LEADER_CHANGE_RETRY { + for retry in 0..LEADER_CHANGE_RETRY { + let span = info_span!("RetryClient::retry", retry); + let _enter = span.enter(); + let res = $call; match stats.done(res) { @@ -82,6 +88,7 @@ macro_rules! retry_core { Err(e) => last_err = Err(e), } + debug!("retry on last_err: {:?}", last_err); let mut reconnect_count = MAX_REQUEST_COUNT; while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await { reconnect_count -= 1; @@ -142,6 +149,7 @@ impl RetryClient { impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. + #[instrument(name = "RetryClient::get_region", skip_all)] async fn get_region(self: Arc, key: Vec) -> Result { retry_mut!(self, "get_region", |cluster| { let key = key.clone(); @@ -156,6 +164,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_region_by_id", skip(self))] async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { retry_mut!(self, "get_region_by_id", |cluster| async { cluster @@ -167,6 +176,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_store", skip(self))] async fn get_store(self: Arc, id: StoreId) -> Result { retry_mut!(self, "get_store", |cluster| async { cluster @@ -176,6 +186,7 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_all_stores", skip(self))] async fn get_all_stores(self: Arc) -> Result> { retry_mut!(self, "get_all_stores", |cluster| async { cluster @@ -185,10 +196,12 @@ impl RetryClientTrait for RetryClient { }) } + #[instrument(name = "RetryClient::get_timestamp", skip(self))] async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } + #[instrument(name = "RetryClient::update_safepoint", skip(self))] async fn update_safepoint(self: Arc, safepoint: u64) -> Result { retry_mut!(self, "update_gc_safepoint", |cluster| async { cluster diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index b7e25fea..5fe16a12 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -27,6 +27,7 @@ use tokio::sync::Mutex; use tonic::transport::Channel; use tracing::debug; use tracing::info; +use tracing::info_span; use tracing::instrument; use crate::internal_err; @@ -76,6 +77,7 @@ impl TimestampOracle { } } +#[instrument(name = "TimestampOracle::run_tso", skip_all)] async fn run_tso( cluster_id: u64, mut pd_client: PdClient, @@ -100,6 +102,10 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { + let span = info_span!("handle_response"); + let _enter = span.enter(); + debug!("got response: {:?}", resp); + let mut pending_requests = pending_requests.lock().await; // Wake up the sending future blocked by too many pending requests as we are consuming @@ -132,6 +138,7 @@ struct TsoRequestStream { impl Stream for TsoRequestStream { type Item = TsoRequest; + #[instrument(name = "TsoRequestStream::poll_next", skip_all)] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); @@ -155,6 +162,12 @@ impl Stream for TsoRequestStream { } } + debug!( + "got requests: len {}, pending_requests {}", + requests.len(), + pending_requests.len() + ); + if !requests.is_empty() { let req = TsoRequest { header: Some(RequestHeader { @@ -171,6 +184,12 @@ impl Stream for TsoRequestStream { }; pending_requests.push_back(request_group); + debug!( + "sending request to PD: {:?}, pending_requests {}", + req, + pending_requests.len() + ); + Poll::Ready(Some(req)) } else { // Set the waker to the context, then the stream can be waked up after the pending queue From 7ba5f5502d723c614c7f50e5b8f32bb7a51ae3eb Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 6 Nov 2023 19:06:51 +0800 Subject: [PATCH 18/26] wake Signed-off-by: Ping Yu --- src/pd/timestamp.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index ac99e052..a1cc7fbd 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -98,15 +98,13 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { - let mut pending_requests = pending_requests.lock().await; - - // Wake up the sending future blocked by too many pending requests as we are consuming - // some of them here. - if pending_requests.len() == MAX_PENDING_COUNT { - sending_future_waker.wake(); + { + let mut pending_requests = pending_requests.lock().await; + allocate_timestamps(&resp, &mut pending_requests)?; } - allocate_timestamps(&resp, &mut pending_requests)?; + // Wake up the sending future blocked by too many pending requests or locked. + sending_future_waker.wake(); } // TODO: distinguish between unexpected stream termination and expected end of test info!("TSO stream terminated"); @@ -139,6 +137,7 @@ impl Stream for TsoRequestStream { { pending_requests } else { + this.self_waker.register(cx.waker()); return Poll::Pending; }; let mut requests = Vec::new(); From faf135be99fe03d2b4c4393d596010c9ace6c09f Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 13 Nov 2023 18:23:17 +0800 Subject: [PATCH 19/26] print locks Signed-off-by: Ping Yu --- src/transaction/lock.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 1d32c54c..16b8e509 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -59,9 +59,9 @@ pub async fn resolve_locks( >= lock.lock_ttl as i64 }); debug!( - "resolving locks: expired_locks {}, live_locks {}", - expired_locks.len(), - live_locks.len() + "resolving locks: expired_locks {:?}, live_locks {:?}", + expired_locks, + live_locks ); // records the commit version of each primary lock (representing the status of the transaction) From c2325e2ab5e0cbbda7021c25d218a3c820ff0995 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 12:18:49 +0800 Subject: [PATCH 20/26] tracing for gc Signed-off-by: Ping Yu --- src/request/plan.rs | 1 + src/transaction/client.rs | 10 ++++++++-- src/transaction/lock.rs | 11 +++++++++-- src/transaction/requests.rs | 1 + 4 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 6d6941ad..7eb85d31 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -701,6 +701,7 @@ where { type Result = CleanupLocksResult; + #[instrument(name = "CleanupLocks::execute", skip_all)] async fn execute(&self) -> Result { let mut result = CleanupLocksResult::default(); let mut inner = self.inner.clone(); diff --git a/src/transaction/client.rs b/src/transaction/client.rs index cd50e37c..f25503ed 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use tracing::debug; use tracing::info; +use tracing::instrument; +use tracing::Span; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; @@ -248,6 +250,7 @@ impl Client { /// /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We skip the second step "delete ranges" which is an optimization for TiDB. + #[instrument(skip(self))] pub async fn gc(&self, safepoint: Timestamp) -> Result { debug!("invoking transactional gc request"); @@ -269,17 +272,20 @@ impl Client { Ok(res) } + #[instrument(skip(self, range), fields(range))] pub async fn cleanup_locks( &self, range: impl Into, safepoint: &Timestamp, options: ResolveLocksOptions, ) -> Result { - debug!("invoking cleanup async commit locks"); + debug!("invoking cleanup locks"); // scan all locks with ts <= safepoint let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); - let req = new_scan_lock_request(range.into(), safepoint, options.batch_size); + let range = range.into(); + Span::current().record("range", &tracing::field::debug(&range)); + let req = new_scan_lock_request(range, safepoint, options.batch_size); let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) .cleanup_locks(ctx.clone(), options, backoff) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 16b8e509..22310b07 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -10,6 +10,7 @@ use tracing::debug; use tracing::error; use tracing::info_span; use tracing::instrument; +use tracing::Span; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -60,8 +61,7 @@ pub async fn resolve_locks( }); debug!( "resolving locks: expired_locks {:?}, live_locks {:?}", - expired_locks, - live_locks + expired_locks, live_locks ); // records the commit version of each primary lock (representing the status of the transaction) @@ -225,6 +225,7 @@ impl LockResolver { /// _Cleanup_ the given locks. Returns whether all the given locks are resolved. /// /// Note: Will rollback RUNNING transactions. ONLY use in GC. + #[instrument(skip_all, fields(store = ?store, locks = ?locks))] pub async fn cleanup_locks( &mut self, store: RegionStore, @@ -343,6 +344,7 @@ impl LockResolver { } #[allow(clippy::too_many_arguments)] + #[instrument(skip(self, pd_client, primary), fields(primary), ret(Debug))] pub async fn check_txn_status( &mut self, pd_client: Arc, @@ -354,6 +356,8 @@ impl LockResolver { force_sync_commit: bool, resolving_pessimistic_lock: bool, ) -> Result> { + Span::current().record("primary", &tracing::field::display(HexRepr(&primary))); + if let Some(txn_status) = self.ctx.get_resolved(txn_id).await { return Ok(txn_status); } @@ -386,6 +390,7 @@ impl LockResolver { let current = pd_client.clone().get_timestamp().await?; res.check_ttl(current); + debug!("check_txn_status: status:{:?}", res); let res = Arc::new(res); if res.is_cacheable() { self.ctx.save_resolved(txn_id, res.clone()).await; @@ -393,6 +398,7 @@ impl LockResolver { Ok(res) } + #[instrument(skip(self, pd_client), fields(keys = ?keys), ret(Debug))] async fn check_all_secondaries( &mut self, pd_client: Arc, @@ -409,6 +415,7 @@ impl LockResolver { plan.execute().await } + #[instrument(skip(self, pd_client, txn_infos), fields(store = ?store, txn_infos = ?txn_infos), ret(Debug))] async fn batch_resolve_locks( &mut self, pd_client: Arc, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index d1eff10e..491002d4 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -825,6 +825,7 @@ impl Merge for Collect { } } +#[derive(Debug)] pub struct SecondaryLocksStatus { pub commit_ts: Option, pub min_commit_ts: u64, From e067c34a177342adbf89b0ad07682a58ae36fb59 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 16:51:26 +0800 Subject: [PATCH 21/26] do not trace single shard Signed-off-by: Ping Yu --- src/request/plan.rs | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 7eb85d31..4824b79a 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -125,18 +125,15 @@ where let mut clone = current_plan.clone(); clone.apply_shard(shard, ®ion_store)?; - let handle = tokio::spawn( - Self::single_shard_handler( - pd_client.clone(), - clone, - region_store, - backoff.clone(), - permits.clone(), - preserve_region_results, - ) - .instrument(span), - ); - handles.push(handle); + let handle = tokio::spawn(Self::single_shard_handler( + pd_client.clone(), + clone, + region_store, + backoff.clone(), + permits.clone(), + preserve_region_results, + )); + handles.push(handle.instrument(span)); } let results = try_join_all(handles).await?; From 36be2d0e7cb9d5b9bac3804f54a91f650b730f0c Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 17:09:05 +0800 Subject: [PATCH 22/26] gc with range Signed-off-by: Ping Yu --- src/request/plan.rs | 2 +- src/transaction/client.rs | 11 +++++++---- tests/integration_tests.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 4824b79a..72835cd4 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -626,7 +626,7 @@ where } } -#[derive(Default)] +#[derive(Default, Debug)] pub struct CleanupLocksResult { pub region_error: Option, pub key_error: Option>, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index f25503ed..f3c0c3ef 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -250,15 +250,18 @@ impl Client { /// /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We skip the second step "delete ranges" which is an optimization for TiDB. - #[instrument(skip(self))] - pub async fn gc(&self, safepoint: Timestamp) -> Result { + #[instrument(skip(self, range), fields(range), ret(Debug))] + pub async fn gc(&self, range: impl Into, safepoint: Timestamp) -> Result { debug!("invoking transactional gc request"); let options = ResolveLocksOptions { batch_size: SCAN_LOCK_BATCH_SIZE, ..Default::default() }; - self.cleanup_locks(.., &safepoint, options).await?; + let range = range.into(); + Span::current().record("range", &tracing::field::debug(&range)); + + self.cleanup_locks(range, &safepoint, options).await?; // update safepoint to PD let res: bool = self @@ -272,7 +275,7 @@ impl Client { Ok(res) } - #[instrument(skip(self, range), fields(range))] + #[instrument(skip(self, range), fields(range), ret(Debug))] pub async fn cleanup_locks( &self, range: impl Into, diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 71d48283..a56569ec 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -554,7 +554,7 @@ async fn raw_req() -> Result<()> { async fn txn_update_safepoint() -> Result<()> { init().await?; let client = TransactionClient::new(pd_addrs()).await?; - let res = client.gc(client.current_timestamp().await?).await?; + let res = client.gc(.., client.current_timestamp().await?).await?; assert!(res); Ok(()) } From 20f51be448dd618a54b06fa4862ed5f0c159f28a Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 19:47:45 +0800 Subject: [PATCH 23/26] polish trace Signed-off-by: Ping Yu --- src/request/plan.rs | 19 ++++++++++--------- src/transaction/lock.rs | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 72835cd4..28574ce0 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -13,8 +13,8 @@ use tracing::debug; use tracing::info; use tracing::info_span; use tracing::instrument; -use tracing::span; -use tracing::Instrument; +// use tracing::span; +// use tracing::Instrument; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -109,7 +109,7 @@ where { // A plan may involve multiple shards #[async_recursion] - #[instrument(skip_all)] + // #[instrument(skip_all)] async fn single_plan_handler( pd_client: Arc, current_plan: P, @@ -121,7 +121,7 @@ where let mut handles = Vec::new(); for shard in shards { let (shard, region_store) = shard?; - let span = span!(tracing::Level::INFO, "shard", ?region_store); + // let span = span!(tracing::Level::INFO, "shard", ?region_store); let mut clone = current_plan.clone(); clone.apply_shard(shard, ®ion_store)?; @@ -133,7 +133,8 @@ where permits.clone(), preserve_region_results, )); - handles.push(handle.instrument(span)); + handles.push(handle); + // handles.push(handle.instrument(span)); } let results = try_join_all(handles).await?; @@ -157,7 +158,7 @@ where } #[async_recursion] - #[instrument(skip_all, fields(region_store = ?region_store))] + // #[instrument(skip_all, fields(region_store = ?region_store))] async fn single_shard_handler( pd_client: Arc, plan: P, @@ -219,7 +220,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] + // #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, @@ -276,7 +277,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - #[instrument(skip_all, fields(region_store = ?region_store, error = ?error))] + // #[instrument(skip_all, fields(region_store = ?region_store, error = ?error))] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, @@ -313,7 +314,7 @@ where Ok(false) } - #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] + // #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] async fn handle_grpc_error( pd_client: Arc, plan: P, diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 22310b07..2b466d59 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -261,6 +261,12 @@ impl LockResolver { l.lock_type == kvrpcpb::Op::PessimisticLock as i32, ) .await?; + debug!( + "cleanup_locks: txn_id:{}, primary:{}, status:{:?}", + txn_id, + HexRepr(&l.primary_lock), + status + ); // If the transaction uses async commit, check_txn_status will reject rolling back the primary lock. // Then we need to check the secondary locks to determine the final status of the transaction. @@ -307,7 +313,7 @@ impl LockResolver { match &status.kind { TransactionStatusKind::Locked(_, lock_info) => { error!( - "cleanup_locks fail to clean locks, this result is not expected. txn_id:{}", + "cleanup_locks: fail to clean locks, this result is not expected. txn_id:{}", txn_id ); return Err(Error::ResolveLockError(vec![lock_info.clone()])); @@ -318,7 +324,7 @@ impl LockResolver { } debug!( - "batch resolve locks, region:{:?}, txn:{:?}", + "cleanup_locks: batch resolve locks, region:{:?}, txn:{:?}", store.region_with_leader.ver_id(), txn_infos ); @@ -334,6 +340,10 @@ impl LockResolver { let cleaned_region = self .batch_resolve_locks(pd_client.clone(), store.clone(), txn_info_vec) .await?; + debug!( + "cleanup_locks: batch resolve locks, cleaned_region:{:?}", + cleaned_region + ); for txn_id in txn_ids { self.ctx .save_cleaned_region(txn_id, cleaned_region.clone()) From 2fb5c228e5c2e76141b92fbd6a1bc0f0a85ff461 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 20:17:29 +0800 Subject: [PATCH 24/26] trace handle_region_error Signed-off-by: Ping Yu --- src/request/plan.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/request/plan.rs b/src/request/plan.rs index 28574ce0..3a20286a 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -226,6 +226,11 @@ where e: errorpb::Error, region_store: RegionStore, ) -> Result { + debug!( + "handle_region_error, error:{:?}, region_store:{:?}", + e, region_store + ); + let ver_id = region_store.region_with_leader.ver_id(); if let Some(not_leader) = e.not_leader { if let Some(leader) = not_leader.leader { From 87f5c5c7f5fb60373e49127c04a2da5dbca987b0 Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Wed, 22 Nov 2023 20:35:59 +0800 Subject: [PATCH 25/26] no trace ResolveLock::execute Signed-off-by: Ping Yu --- src/request/plan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 3a20286a..6fa82e03 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -585,8 +585,8 @@ where type Result = P::Result; async fn execute(&self) -> Result { - let span = info_span!("ResolveLock::execute"); - let _enter = span.enter(); + // let span = info_span!("ResolveLock::execute"); + // let _enter = span.enter(); let mut result = self.inner.execute().await?; let mut clone = self.clone(); From 5695e2ebc70d0f688cb3ad258b028d5c577abb36 Mon Sep 17 00:00:00 2001 From: Andy Lok Date: Mon, 18 Dec 2023 17:24:47 +0800 Subject: [PATCH 26/26] migrate to tikv/minitrace-rust Signed-off-by: Andy Lok --- Cargo.toml | 4 +-- src/common/security.rs | 2 +- src/pd/client.rs | 2 +- src/pd/cluster.rs | 11 +++---- src/pd/retry.rs | 22 ++++++------- src/pd/timestamp.rs | 16 ++++----- src/raw/client.rs | 2 +- src/request/mod.rs | 2 +- src/request/plan.rs | 59 ++++++++++++++++------------------ src/request/plan_builder.rs | 3 +- src/transaction/client.rs | 21 ++++++------ src/transaction/lock.rs | 33 +++++++++++-------- src/transaction/requests.rs | 5 ++- src/transaction/snapshot.rs | 5 ++- src/transaction/transaction.rs | 24 +++++++------- tests/common/mod.rs | 4 +-- tests/failpoint_tests.rs | 2 +- 17 files changed, 104 insertions(+), 113 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eb27b4f2..d18622a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,8 @@ either = "1.6" fail = "0.4" futures = { version = "0.3" } lazy_static = "1" -log = "0.4" +log = { version = "0.4", features = ["kv_unstable"] } +minitrace = "0.6.2" pin-project = "1" prometheus = { version = "0.13", default-features = false } prost = "0.12" @@ -44,7 +45,6 @@ serde_derive = "1.0" thiserror = "1" tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] } tonic = { version = "0.10", features = ["tls"] } -tracing = "0.1.40" [dev-dependencies] clap = "2" diff --git a/src/common/security.rs b/src/common/security.rs index 8b7e4801..483759cf 100644 --- a/src/common/security.rs +++ b/src/common/security.rs @@ -6,12 +6,12 @@ use std::path::Path; use std::path::PathBuf; use std::time::Duration; +use log::info; use regex::Regex; use tonic::transport::Certificate; use tonic::transport::Channel; use tonic::transport::ClientTlsConfig; use tonic::transport::Identity; -use tracing::info; use crate::internal_err; use crate::Result; diff --git a/src/pd/client.rs b/src/pd/client.rs index 34e2e42b..5461cb57 100644 --- a/src/pd/client.rs +++ b/src/pd/client.rs @@ -6,8 +6,8 @@ use std::sync::Arc; use async_trait::async_trait; use futures::prelude::*; use futures::stream::BoxStream; +use log::info; use tokio::sync::RwLock; -use tracing::info; use crate::compat::stream_fn; use crate::kv::codec; diff --git a/src/pd/cluster.rs b/src/pd/cluster.rs index 3961e108..2176d7ff 100644 --- a/src/pd/cluster.rs +++ b/src/pd/cluster.rs @@ -6,13 +6,12 @@ use std::time::Duration; use std::time::Instant; use async_trait::async_trait; +use log::error; +use log::info; +use log::warn; use tonic::transport::Channel; use tonic::IntoRequest; use tonic::Request; -use tracing::error; -use tracing::info; -use tracing::instrument; -use tracing::warn; use super::timestamp::TimestampOracle; use crate::internal_err; @@ -104,7 +103,7 @@ impl Connection { Connection { security_mgr } } - #[instrument(name = "pd::Connection::connect_cluster", skip_all)] + #[minitrace::trace] pub async fn connect_cluster( &self, endpoints: &[String], @@ -124,7 +123,7 @@ impl Connection { } // Re-establish connection with PD leader in asynchronous fashion. - #[instrument(name = "pd::Connection::reconnect", skip_all)] + #[minitrace::trace] pub async fn reconnect(&self, cluster: &mut Cluster, timeout: Duration) -> Result<()> { warn!("updating pd client"); let start = Instant::now(); diff --git a/src/pd/retry.rs b/src/pd/retry.rs index 5cf173c7..ffe2a7df 100644 --- a/src/pd/retry.rs +++ b/src/pd/retry.rs @@ -8,11 +8,10 @@ use std::time::Duration; use std::time::Instant; use async_trait::async_trait; +use log::debug; +use minitrace::prelude::*; use tokio::sync::RwLock; use tokio::time::sleep; -use tracing::debug; -use tracing::info_span; -use tracing::instrument; use crate::pd::Cluster; use crate::pd::Connection; @@ -78,8 +77,7 @@ macro_rules! retry_core { let stats = pd_stats($tag); let mut last_err = Ok(()); for retry in 0..LEADER_CHANGE_RETRY { - let span = info_span!("RetryClient::retry", retry); - let _enter = span.enter(); + let _span = LocalSpan::enter_with_local_parent("RetryClient::retry"); let res = $call; @@ -88,7 +86,7 @@ macro_rules! retry_core { Err(e) => last_err = Err(e), } - debug!("retry on last_err: {:?}", last_err); + debug!("retry {} on last_err: {:?}", retry, last_err); let mut reconnect_count = MAX_REQUEST_COUNT; while let Err(e) = $self.reconnect(RECONNECT_INTERVAL_SEC).await { reconnect_count -= 1; @@ -149,7 +147,7 @@ impl RetryClient { impl RetryClientTrait for RetryClient { // These get_* functions will try multiple times to make a request, reconnecting as necessary. // It does not know about encoding. Caller should take care of it. - #[instrument(name = "RetryClient::get_region", skip_all)] + #[minitrace::trace] async fn get_region(self: Arc, key: Vec) -> Result { retry_mut!(self, "get_region", |cluster| { let key = key.clone(); @@ -164,7 +162,7 @@ impl RetryClientTrait for RetryClient { }) } - #[instrument(name = "RetryClient::get_region_by_id", skip(self))] + #[minitrace::trace] async fn get_region_by_id(self: Arc, region_id: RegionId) -> Result { retry_mut!(self, "get_region_by_id", |cluster| async { cluster @@ -176,7 +174,7 @@ impl RetryClientTrait for RetryClient { }) } - #[instrument(name = "RetryClient::get_store", skip(self))] + #[minitrace::trace] async fn get_store(self: Arc, id: StoreId) -> Result { retry_mut!(self, "get_store", |cluster| async { cluster @@ -186,7 +184,7 @@ impl RetryClientTrait for RetryClient { }) } - #[instrument(name = "RetryClient::get_all_stores", skip(self))] + #[minitrace::trace] async fn get_all_stores(self: Arc) -> Result> { retry_mut!(self, "get_all_stores", |cluster| async { cluster @@ -196,12 +194,12 @@ impl RetryClientTrait for RetryClient { }) } - #[instrument(name = "RetryClient::get_timestamp", skip(self))] + #[minitrace::trace] async fn get_timestamp(self: Arc) -> Result { retry!(self, "get_timestamp", |cluster| cluster.get_timestamp()) } - #[instrument(name = "RetryClient::update_safepoint", skip(self))] + #[minitrace::trace] async fn update_safepoint(self: Arc, safepoint: u64) -> Result { retry_mut!(self, "update_gc_safepoint", |cluster| async { cluster diff --git a/src/pd/timestamp.rs b/src/pd/timestamp.rs index 20243990..bddb613a 100644 --- a/src/pd/timestamp.rs +++ b/src/pd/timestamp.rs @@ -20,15 +20,14 @@ use futures::prelude::*; use futures::task::AtomicWaker; use futures::task::Context; use futures::task::Poll; +use log::debug; +use log::info; +use minitrace::prelude::*; use pin_project::pin_project; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; use tonic::transport::Channel; -use tracing::debug; -use tracing::info; -use tracing::info_span; -use tracing::instrument; use crate::internal_err; use crate::proto::pdpb::pd_client::PdClient; @@ -65,7 +64,7 @@ impl TimestampOracle { Ok(TimestampOracle { request_tx }) } - #[instrument(name = "TimestampOracle::get_timestamp", skip_all)] + #[minitrace::trace] pub(crate) async fn get_timestamp(self) -> Result { debug!("getting current timestamp"); let (request, response) = oneshot::channel(); @@ -77,7 +76,7 @@ impl TimestampOracle { } } -#[instrument(name = "TimestampOracle::run_tso", skip_all)] +#[minitrace::trace] async fn run_tso( cluster_id: u64, mut pd_client: PdClient, @@ -102,8 +101,7 @@ async fn run_tso( let mut responses = pd_client.tso(request_stream).await?.into_inner(); while let Some(Ok(resp)) = responses.next().await { - let span = info_span!("handle_response"); - let _enter = span.enter(); + let _span = LocalSpan::enter_with_local_parent("handle_response"); debug!("got response: {:?}", resp); { @@ -136,7 +134,7 @@ struct TsoRequestStream { impl Stream for TsoRequestStream { type Item = TsoRequest; - #[instrument(name = "TsoRequestStream::poll_next", skip_all)] + #[minitrace::trace] fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let mut this = self.project(); diff --git a/src/raw/client.rs b/src/raw/client.rs index df276a69..fc733015 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::u32; use futures::StreamExt; -use tracing::debug; +use log::debug; use crate::backoff::DEFAULT_REGION_BACKOFF; use crate::common::Error; diff --git a/src/request/mod.rs b/src/request/mod.rs index 1f2db6d4..b975dc4b 100644 --- a/src/request/mod.rs +++ b/src/request/mod.rs @@ -129,7 +129,7 @@ mod test { impl HasLocks for MockRpcResponse {} - #[derive(Clone, Debug)] + #[derive(Debug, Clone)] struct MockKvRequest { test_invoking_count: Arc, } diff --git a/src/request/plan.rs b/src/request/plan.rs index 6fa82e03..1cbd6276 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -7,14 +7,12 @@ use async_recursion::async_recursion; use async_trait::async_trait; use futures::future::try_join_all; use futures::prelude::*; +use log::debug; +use log::info; +use minitrace::future::FutureExt; +use minitrace::prelude::*; use tokio::sync::Semaphore; use tokio::time::sleep; -use tracing::debug; -use tracing::info; -use tracing::info_span; -use tracing::instrument; -// use tracing::span; -// use tracing::Instrument; use crate::backoff::Backoff; use crate::pd::PdClient; @@ -61,7 +59,7 @@ pub struct Dispatch { impl Plan for Dispatch { type Result = Req::Response; - #[instrument(name = "Dispatch::execute", skip_all, fields(label = self.request.label()))] + #[minitrace::trace] async fn execute(&self) -> Result { let stats = tikv_stats(self.request.label()); let result = self @@ -109,7 +107,7 @@ where { // A plan may involve multiple shards #[async_recursion] - // #[instrument(skip_all)] + #[minitrace::trace] async fn single_plan_handler( pd_client: Arc, current_plan: P, @@ -121,20 +119,20 @@ where let mut handles = Vec::new(); for shard in shards { let (shard, region_store) = shard?; - // let span = span!(tracing::Level::INFO, "shard", ?region_store); - let mut clone = current_plan.clone(); clone.apply_shard(shard, ®ion_store)?; - let handle = tokio::spawn(Self::single_shard_handler( - pd_client.clone(), - clone, - region_store, - backoff.clone(), - permits.clone(), - preserve_region_results, - )); + let handle = tokio::spawn( + Self::single_shard_handler( + pd_client.clone(), + clone, + region_store, + backoff.clone(), + permits.clone(), + preserve_region_results, + ) + .in_span(Span::enter_with_local_parent("single_shard_handler")), + ); handles.push(handle); - // handles.push(handle.instrument(span)); } let results = try_join_all(handles).await?; @@ -158,7 +156,7 @@ where } #[async_recursion] - // #[instrument(skip_all, fields(region_store = ?region_store))] + #[minitrace::trace] async fn single_shard_handler( pd_client: Arc, plan: P, @@ -220,7 +218,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - // #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] + #[minitrace::trace] async fn handle_region_error( pd_client: Arc, e: errorpb::Error, @@ -282,7 +280,7 @@ where // 1. Ok(true): error has been resolved, retry immediately // 2. Ok(false): backoff, and then retry // 3. Err(Error): can't be resolved, return the error to upper level - // #[instrument(skip_all, fields(region_store = ?region_store, error = ?error))] + #[minitrace::trace] async fn on_region_epoch_not_match( pd_client: Arc, region_store: RegionStore, @@ -319,7 +317,7 @@ where Ok(false) } - // #[instrument(skip_all, fields(region_store = ?region_store, error = ?e))] + #[minitrace::trace] async fn handle_grpc_error( pd_client: Arc, plan: P, @@ -329,6 +327,7 @@ where preserve_region_results: bool, e: Error, ) -> Result<::Result> { + debug!("handle grpc error: {:?}", e); let ver_id = region_store.region_with_leader.ver_id(); pd_client.invalidate_region_cache(ver_id).await; match backoff.next_delay_duration() { @@ -366,7 +365,7 @@ where { type Result = Vec>; - #[instrument(name = "RetryableMultiRegion::execute", skip_all)] + #[minitrace::trace] async fn execute(&self) -> Result { // Limit the maximum concurrency of multi-region request. If there are // too many concurrent requests, TiKV is more likely to return a "TiKV @@ -487,7 +486,7 @@ impl>>, M: Me { type Result = M::Out; - #[instrument(name = "MergeResponse::execute", skip_all)] + #[minitrace::trace] async fn execute(&self) -> Result { self.merge.merge(self.inner.execute().await?) } @@ -584,17 +583,15 @@ where { type Result = P::Result; + #[minitrace::trace] async fn execute(&self) -> Result { - // let span = info_span!("ResolveLock::execute"); - // let _enter = span.enter(); - let mut result = self.inner.execute().await?; let mut clone = self.clone(); let mut retry_cnt = 0; loop { retry_cnt += 1; - let span = info_span!("ResolveLock::execute::retry", retry_cnt); - let _enter = span.enter(); + let _span = LocalSpan::enter_with_local_parent("ResolveLock::execute::retry") + .with_property(|| ("retry_count", retry_cnt.to_string())); let locks = result.take_locks(); if locks.is_empty() { @@ -704,7 +701,7 @@ where { type Result = CleanupLocksResult; - #[instrument(name = "CleanupLocks::execute", skip_all)] + #[minitrace::trace] async fn execute(&self) -> Result { let mut result = CleanupLocksResult::default(); let mut inner = self.inner.clone(); diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs index 519ff5b8..e66ce03f 100644 --- a/src/request/plan_builder.rs +++ b/src/request/plan_builder.rs @@ -2,7 +2,6 @@ use std::marker::PhantomData; use std::sync::Arc; -use tracing::instrument; use super::plan::PreserveShard; use crate::backoff::Backoff; @@ -160,7 +159,7 @@ where /// Preserve all results, even some of them are Err. /// To pass all responses to merge, and handle partial successful results correctly. - #[instrument(skip_all)] + #[minitrace::trace] pub fn retry_multi_region_preserve_results( self, backoff: Backoff, diff --git a/src/transaction/client.rs b/src/transaction/client.rs index f3c0c3ef..55275a3c 100644 --- a/src/transaction/client.rs +++ b/src/transaction/client.rs @@ -2,10 +2,9 @@ use std::sync::Arc; -use tracing::debug; -use tracing::info; -use tracing::instrument; -use tracing::Span; +use log::as_debug; +use log::debug; +use log::info; use crate::backoff::{DEFAULT_REGION_BACKOFF, DEFAULT_STORE_BACKOFF}; use crate::config::Config; @@ -250,16 +249,15 @@ impl Client { /// /// This is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). /// We skip the second step "delete ranges" which is an optimization for TiDB. - #[instrument(skip(self, range), fields(range), ret(Debug))] + #[minitrace::trace] pub async fn gc(&self, range: impl Into, safepoint: Timestamp) -> Result { - debug!("invoking transactional gc request"); + let range = range.into(); + debug!(range = as_debug!(range); "invoking transactional gc request"); let options = ResolveLocksOptions { batch_size: SCAN_LOCK_BATCH_SIZE, ..Default::default() }; - let range = range.into(); - Span::current().record("range", &tracing::field::debug(&range)); self.cleanup_locks(range, &safepoint, options).await?; @@ -275,19 +273,18 @@ impl Client { Ok(res) } - #[instrument(skip(self, range), fields(range), ret(Debug))] + #[minitrace::trace] pub async fn cleanup_locks( &self, range: impl Into, safepoint: &Timestamp, options: ResolveLocksOptions, ) -> Result { - debug!("invoking cleanup locks"); + let range = range.into(); + debug!(range = as_debug!(range); "invoking transactional gc request"); // scan all locks with ts <= safepoint let ctx = ResolveLocksContext::default(); let backoff = Backoff::equal_jitter_backoff(100, 10000, 50); - let range = range.into(); - Span::current().record("range", &tracing::field::debug(&range)); let req = new_scan_lock_request(range, safepoint, options.batch_size); let encoded_req = EncodedRequest::new(req, self.pd.get_codec()); let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req) diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs index 2b466d59..ab91ed6b 100644 --- a/src/transaction/lock.rs +++ b/src/transaction/lock.rs @@ -5,12 +5,12 @@ use std::collections::HashSet; use std::sync::Arc; use fail::fail_point; +use log::as_display; +use log::debug; +use log::error; +use log::info; +use minitrace::prelude::*; use tokio::sync::RwLock; -use tracing::debug; -use tracing::error; -use tracing::info_span; -use tracing::instrument; -use tracing::Span; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -45,7 +45,7 @@ const RESOLVE_LOCK_RETRY_LIMIT: usize = 10; /// the key. We first use `CleanupRequest` to let the status of the primary lock converge and get /// its status (committed or rolled back). Then, we use the status of its primary lock to determine /// the status of the other keys in the same transaction. -#[instrument(skip_all)] +#[minitrace::trace] pub async fn resolve_locks( locks: Vec, pd_client: Arc, @@ -68,8 +68,13 @@ pub async fn resolve_locks( let mut commit_versions: HashMap = HashMap::new(); let mut clean_regions: HashMap> = HashMap::new(); for lock in expired_locks { - let span = info_span!("cleanup_expired_lock", lock.lock_version, lock.primary_lock = %HexRepr(&lock.primary_lock)); - let _enter = span.enter(); + let _span = + LocalSpan::enter_with_local_parent("cleanup_expired_lock").with_properties(|| { + [ + ("lock_version", lock.lock_version.to_string()), + ("primary_lock", HexRepr(&lock.primary_lock).to_string()), + ] + }); let region_ver_id = pd_client .region_for_key(&lock.primary_lock.clone().into()) @@ -118,7 +123,7 @@ pub async fn resolve_locks( Ok(live_locks) } -#[instrument(skip(key, pd_client), fields(key = %HexRepr(key)))] +#[minitrace::trace] async fn resolve_lock_with_retry( #[allow(clippy::ptr_arg)] key: &Vec, start_version: u64, @@ -225,7 +230,7 @@ impl LockResolver { /// _Cleanup_ the given locks. Returns whether all the given locks are resolved. /// /// Note: Will rollback RUNNING transactions. ONLY use in GC. - #[instrument(skip_all, fields(store = ?store, locks = ?locks))] + #[minitrace::trace] pub async fn cleanup_locks( &mut self, store: RegionStore, @@ -354,7 +359,7 @@ impl LockResolver { } #[allow(clippy::too_many_arguments)] - #[instrument(skip(self, pd_client, primary), fields(primary), ret(Debug))] + #[minitrace::trace] pub async fn check_txn_status( &mut self, pd_client: Arc, @@ -366,7 +371,7 @@ impl LockResolver { force_sync_commit: bool, resolving_pessimistic_lock: bool, ) -> Result> { - Span::current().record("primary", &tracing::field::display(HexRepr(&primary))); + info!("primary" = as_display!(HexRepr(&primary)); "check_txn_status"); if let Some(txn_status) = self.ctx.get_resolved(txn_id).await { return Ok(txn_status); @@ -408,7 +413,7 @@ impl LockResolver { Ok(res) } - #[instrument(skip(self, pd_client), fields(keys = ?keys), ret(Debug))] + #[minitrace::trace] async fn check_all_secondaries( &mut self, pd_client: Arc, @@ -425,7 +430,7 @@ impl LockResolver { plan.execute().await } - #[instrument(skip(self, pd_client, txn_infos), fields(store = ?store, txn_infos = ?txn_infos), ret(Debug))] + #[minitrace::trace] async fn batch_resolve_locks( &mut self, pd_client: Arc, diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs index 491002d4..798a68ea 100644 --- a/src/transaction/requests.rs +++ b/src/transaction/requests.rs @@ -8,8 +8,7 @@ use either::Either; use futures::stream::BoxStream; use futures::stream::{self}; use futures::StreamExt; -use tracing::debug; -use tracing::instrument; +use log::debug; use super::transaction::TXN_COMMIT_BATCH_SIZE; use crate::collect_first; @@ -177,7 +176,7 @@ shardable_range!(kvrpcpb::ScanRequest); impl Merge for Collect { type Out = Vec; - #[instrument(name = "Collect::merge", skip_all)] + #[minitrace::trace] fn merge(&self, input: Vec>) -> Result { let length: usize = input .iter() diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs index 452e2dd4..85573d99 100644 --- a/src/transaction/snapshot.rs +++ b/src/transaction/snapshot.rs @@ -1,9 +1,8 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use derive_new::new; +use log::debug; use std::marker::PhantomData; -use tracing::debug; -use tracing::instrument; use crate::codec::ApiV1TxnCodec; use crate::pd::{PdClient, PdRpcClient}; @@ -51,7 +50,7 @@ impl> Snapshot { } /// Scan a range, return at most `limit` key-value pairs that lying in the range. - #[instrument(name = "Snapshot::scan", skip_all)] + #[minitrace::trace] pub async fn scan( &mut self, range: impl Into, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index bcc1b1ab..16e26383 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -6,16 +6,16 @@ use std::marker::PhantomData; use std::sync::atomic; use std::sync::atomic::AtomicU8; use std::sync::Arc; -use std::time::Duration; use std::time::Instant; use derive_new::new; use fail::fail_point; use futures::prelude::*; -use tracing::debug; -use tracing::instrument; -use tracing::warn; -use tracing::Span; +use log::as_debug; +use log::debug; +use log::info; +use log::warn; +use std::time::Duration; use crate::backoff::Backoff; use crate::backoff::DEFAULT_REGION_BACKOFF; @@ -368,7 +368,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` - #[instrument(name = "Transaction::scan", skip_all)] + #[minitrace::trace] pub async fn scan( &mut self, range: impl Into, @@ -405,7 +405,7 @@ impl> Transaction { /// txn.commit().await.unwrap(); /// # }); /// ``` - #[instrument(name = "Transaction::scan_keys", skip_all)] + #[minitrace::trace] pub async fn scan_keys( &mut self, range: impl Into, @@ -421,7 +421,7 @@ impl> Transaction { /// Create a 'scan_reverse' request. /// /// Similar to [`scan`](Transaction::scan), but scans in the reverse direction. - #[instrument(name = "Transaction::scan_reverse", skip_all)] + #[minitrace::trace] pub async fn scan_reverse( &mut self, range: impl Into, @@ -434,7 +434,7 @@ impl> Transaction { /// Create a 'scan_keys_reverse' request. /// /// Similar to [`scan`](Transaction::scan_keys), but scans in the reverse direction. - #[instrument(name = "Transaction::scan_keys_reverse", skip_all)] + #[minitrace::trace] pub async fn scan_keys_reverse( &mut self, range: impl Into, @@ -773,7 +773,7 @@ impl> Transaction { plan.execute().await } - #[instrument(skip(self, range), fields(range, version=self.timestamp.version()))] + #[minitrace::trace] async fn scan_inner( &mut self, range: impl Into, @@ -787,7 +787,7 @@ impl> Transaction { let retry_options = self.options.retry_options.clone(); let range = range.into(); - Span::current().record("range", &tracing::field::debug(&range)); + info!(range = as_debug!(&range); "scanning range"); self.buffer .scan_and_fetch( @@ -1028,7 +1028,7 @@ impl> Transaction { } impl> Drop for Transaction { - #[instrument(skip_all, fields(version=self.timestamp.version()))] + #[minitrace::trace] fn drop(&mut self) { debug!("dropping transaction"); if std::thread::panicking() { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2566bf6f..4d63dd56 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -7,6 +7,8 @@ use std::convert::TryInto; use std::env; use std::time::Duration; +use log::info; +use log::warn; use rand::Rng; use tikv_client::Key; use tikv_client::RawClient; @@ -15,8 +17,6 @@ use tikv_client::Transaction; use tikv_client::TransactionClient; use tikv_client::{ColumnFamily, Snapshot, TransactionOptions}; use tokio::time::sleep; -use tracing::info; -use tracing::warn; const ENV_PD_ADDRS: &str = "PD_ADDRS"; const ENV_ENABLE_MULIT_REGION: &str = "MULTI_REGION"; diff --git a/tests/failpoint_tests.rs b/tests/failpoint_tests.rs index 6906b0ff..f34dff48 100644 --- a/tests/failpoint_tests.rs +++ b/tests/failpoint_tests.rs @@ -9,6 +9,7 @@ use std::time::Duration; use common::*; use fail::FailScenario; +use log::info; use rand::thread_rng; use serial_test::serial; use tikv_client::transaction::Client; @@ -20,7 +21,6 @@ use tikv_client::Result; use tikv_client::RetryOptions; use tikv_client::TransactionClient; use tikv_client::TransactionOptions; -use tracing::info; #[tokio::test] #[serial]