Skip to content

Commit

Permalink
rawkv: Reuse scheduler worker pool for raw modify command (tikv#13286)
Browse files Browse the repository at this point in the history
ref tikv#13284

Signed-off-by: haojinming <[email protected]>

Co-authored-by: Ti Chi Robot <[email protected]>
Signed-off-by: fengou1 <[email protected]>
  • Loading branch information
2 people authored and fengou1 committed Aug 30, 2022
1 parent e1a27ed commit f0f59be
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 99 deletions.
284 changes: 185 additions & 99 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ use rand::prelude::*;
use resource_metering::{FutureExt, ResourceTagFactory};
use tikv_kv::SnapshotExt;
use tikv_util::{
deadline::Deadline,
quota_limiter::QuotaLimiter,
time::{duration_to_ms, Instant, ThreadReadId},
};
Expand Down Expand Up @@ -1446,6 +1447,29 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
Ok(())
}

// Schedule raw modify commands, which reuse the scheduler worker pool.
// TODO: separate the txn and raw commands if needed in the future.
fn sched_raw_command<T>(&self, tag: CommandKind, future: T) -> Result<()>
where
T: Future + Send + 'static,
{
SCHED_STAGE_COUNTER_VEC.get(tag).new.inc();
self.sched
.get_sched_pool(CommandPri::Normal)
.pool
.spawn(future)
.map_err(|_| Error::from(ErrorInner::SchedTooBusy))
}

fn get_deadline(ctx: &Context) -> Deadline {
let execution_duration_limit = if ctx.max_execution_duration_ms == 0 {
crate::storage::txn::scheduler::DEFAULT_EXECUTION_DURATION_LIMIT
} else {
::std::time::Duration::from_millis(ctx.max_execution_duration_ms)
};
Deadline::from_now(execution_duration_limit)
}

/// Delete all keys in the range [`start_key`, `end_key`).
///
/// All keys in the range will be deleted permanently regardless of their
Expand Down Expand Up @@ -1817,44 +1841,60 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
if !F::IS_TTL_ENABLED && ttl != 0 {
return Err(Error::from(ErrorInner::TtlNotEnabled));
}
let deadline = Self::get_deadline(&ctx);
let cf = Self::rawkv_cf(&cf, self.api_version)?;
let engine = self.engine.clone();
self.sched_raw_command(CMD, async move {
if let Err(e) = deadline.check() {
return callback(Err(Error::from(e)));
}
let command_duration = tikv_util::time::Instant::now();
let raw_value = RawValue {
user_value: value,
expire_ts: ttl_to_expire_ts(ttl),
is_delete: false,
};
let m = Modify::Put(
cf,
F::encode_raw_key_owned(key, None),
F::encode_raw_value_owned(raw_value),
);

let raw_value = RawValue {
user_value: value,
expire_ts: ttl_to_expire_ts(ttl),
is_delete: false,
};
let m = Modify::Put(
Self::rawkv_cf(&cf, self.api_version)?,
F::encode_raw_key_owned(key, None),
F::encode_raw_value_owned(raw_value),
);

let mut batch = WriteData::from_modifies(vec![m]);
batch.set_allowed_on_disk_almost_full();

self.engine.async_write(
&ctx,
batch,
Box::new(|res| callback(res.map_err(Error::from))),
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_put.inc();
Ok(())
let mut batch = WriteData::from_modifies(vec![m]);
batch.set_allowed_on_disk_almost_full();
let (cb, f) = tikv_util::future::paired_future_callback();
let async_ret =
engine.async_write(&ctx, batch, Box::new(|res| cb(res.map_err(Error::from))));
let v: Result<()> = match async_ret {
Err(e) => Err(Error::from(e)),
Ok(_) => f.await.unwrap(),
};
callback(v);
KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc();
SCHED_STAGE_COUNTER_VEC.get(CMD).write_finish.inc();
SCHED_HISTOGRAM_VEC_STATIC
.get(CMD)
.observe(command_duration.saturating_elapsed().as_secs_f64());
})
}

fn raw_batch_put_requests_to_modifies(
cf: CfName,
pairs: Vec<KvPair>,
ttls: Vec<u64>,
) -> Result<Vec<Modify>> {
fn check_ttl_valid(key_cnt: usize, ttls: &Vec<u64>) -> Result<()> {
if !F::IS_TTL_ENABLED {
if ttls.iter().any(|&x| x != 0) {
return Err(Error::from(ErrorInner::TtlNotEnabled));
}
} else if ttls.len() != pairs.len() {
} else if ttls.len() != key_cnt {
return Err(Error::from(ErrorInner::TtlLenNotEqualsToPairs));
}
Ok(())
}

let modifies = pairs
fn raw_batch_put_requests_to_modifies(
cf: CfName,
pairs: Vec<KvPair>,
ttls: Vec<u64>,
) -> Vec<Modify> {
pairs
.into_iter()
.zip(ttls)
.map(|((k, v), ttl)| {
Expand All @@ -1869,8 +1909,7 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
F::encode_raw_value_owned(raw_value),
)
})
.collect();
Ok(modifies)
.collect()
}

/// Write some keys to the storage in a batch.
Expand All @@ -1882,10 +1921,11 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
ttls: Vec<u64>,
callback: Callback<()>,
) -> Result<()> {
const CMD: CommandKind = CommandKind::raw_batch_put;
Self::check_api_version(
self.api_version,
ctx.api_version,
CommandKind::raw_batch_put,
CMD,
pairs.iter().map(|(ref k, _)| k),
)?;

Expand All @@ -1896,18 +1936,32 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
self.max_key_size,
callback
);
Self::check_ttl_valid(pairs.len(), &ttls)?;

let modifies = Self::raw_batch_put_requests_to_modifies(cf, pairs, ttls)?;
let mut batch = WriteData::from_modifies(modifies);
batch.set_allowed_on_disk_almost_full();

self.engine.async_write(
&ctx,
batch,
Box::new(|res| callback(res.map_err(Error::from))),
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_put.inc();
Ok(())
let engine = self.engine.clone();
let deadline = Self::get_deadline(&ctx);
self.sched_raw_command(CMD, async move {
if let Err(e) = deadline.check() {
return callback(Err(Error::from(e)));
}
let command_duration = tikv_util::time::Instant::now();
let modifies = Self::raw_batch_put_requests_to_modifies(cf, pairs, ttls);
let mut batch = WriteData::from_modifies(modifies);
batch.set_allowed_on_disk_almost_full();
let (cb, f) = tikv_util::future::paired_future_callback();
let async_ret =
engine.async_write(&ctx, batch, Box::new(|res| cb(res.map_err(Error::from))));
let v: Result<()> = match async_ret {
Err(e) => Err(Error::from(e)),
Ok(_) => f.await.unwrap(),
};
callback(v);
KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc();
SCHED_STAGE_COUNTER_VEC.get(CMD).write_finish.inc();
SCHED_HISTOGRAM_VEC_STATIC
.get(CMD)
.observe(command_duration.saturating_elapsed().as_secs_f64());
})
}

fn raw_delete_request_to_modify(cf: CfName, key: Vec<u8>) -> Modify {
Expand All @@ -1928,26 +1982,35 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
key: Vec<u8>,
callback: Callback<()>,
) -> Result<()> {
Self::check_api_version(
self.api_version,
ctx.api_version,
CommandKind::raw_delete,
[&key],
)?;
const CMD: CommandKind = CommandKind::raw_delete;
Self::check_api_version(self.api_version, ctx.api_version, CMD, [&key])?;

check_key_size!(Some(&key).into_iter(), self.max_key_size, callback);

let m = Self::raw_delete_request_to_modify(Self::rawkv_cf(&cf, self.api_version)?, key);
let mut batch = WriteData::from_modifies(vec![m]);
batch.set_allowed_on_disk_almost_full();

self.engine.async_write(
&ctx,
batch,
Box::new(|res| callback(res.map_err(Error::from))),
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete.inc();
Ok(())
let cf = Self::rawkv_cf(&cf, self.api_version)?;
let engine = self.engine.clone();
let deadline = Self::get_deadline(&ctx);
self.sched_raw_command(CMD, async move {
if let Err(e) = deadline.check() {
return callback(Err(Error::from(e)));
}
let command_duration = tikv_util::time::Instant::now();
let m = Self::raw_delete_request_to_modify(cf, key);
let mut batch = WriteData::from_modifies(vec![m]);
batch.set_allowed_on_disk_almost_full();
let (cb, f) = tikv_util::future::paired_future_callback();
let async_ret =
engine.async_write(&ctx, batch, Box::new(|res| cb(res.map_err(Error::from))));
let v: Result<()> = match async_ret {
Err(e) => Err(Error::from(e)),
Ok(_) => f.await.unwrap(),
};
callback(v);
KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc();
SCHED_STAGE_COUNTER_VEC.get(CMD).write_finish.inc();
SCHED_HISTOGRAM_VEC_STATIC
.get(CMD)
.observe(command_duration.saturating_elapsed().as_secs_f64());
})
}

/// Delete all raw keys in [`start_key`, `end_key`).
Expand All @@ -1962,31 +2025,45 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
end_key: Vec<u8>,
callback: Callback<()>,
) -> Result<()> {
const CMD: CommandKind = CommandKind::raw_delete_range;
check_key_size!([&start_key, &end_key], self.max_key_size, callback);
Self::check_api_version_ranges(
self.api_version,
ctx.api_version,
CommandKind::raw_delete_range,
CMD,
[(Some(&start_key), Some(&end_key))],
)?;

let cf = Self::rawkv_cf(&cf, self.api_version)?;
let start_key = F::encode_raw_key_owned(start_key, None);
let end_key = F::encode_raw_key_owned(end_key, None);

let mut batch =
WriteData::from_modifies(vec![Modify::DeleteRange(cf, start_key, end_key, false)]);
batch.set_allowed_on_disk_almost_full();

// TODO: special notification channel for API V2.

self.engine.async_write(
&ctx,
batch,
Box::new(|res| callback(res.map_err(Error::from))),
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_delete_range.inc();
Ok(())
let engine = self.engine.clone();
let deadline = Self::get_deadline(&ctx);
self.sched_raw_command(CMD, async move {
if let Err(e) = deadline.check() {
return callback(Err(Error::from(e)));
}
let command_duration = tikv_util::time::Instant::now();
let start_key = F::encode_raw_key_owned(start_key, None);
let end_key = F::encode_raw_key_owned(end_key, None);

let mut batch =
WriteData::from_modifies(vec![Modify::DeleteRange(cf, start_key, end_key, false)]);
batch.set_allowed_on_disk_almost_full();

// TODO: special notification channel for API V2.
let (cb, f) = tikv_util::future::paired_future_callback();
let async_ret =
engine.async_write(&ctx, batch, Box::new(|res| cb(res.map_err(Error::from))));
let v: Result<()> = match async_ret {
Err(e) => Err(Error::from(e)),
Ok(_) => f.await.unwrap(),
};
callback(v);
KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc();
SCHED_STAGE_COUNTER_VEC.get(CMD).write_finish.inc();
SCHED_HISTOGRAM_VEC_STATIC
.get(CMD)
.observe(command_duration.saturating_elapsed().as_secs_f64());
})
}

/// Delete some raw keys in a batch.
Expand All @@ -1999,30 +2076,38 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
keys: Vec<Vec<u8>>,
callback: Callback<()>,
) -> Result<()> {
Self::check_api_version(
self.api_version,
ctx.api_version,
CommandKind::raw_batch_delete,
&keys,
)?;
const CMD: CommandKind = CommandKind::raw_batch_delete;
Self::check_api_version(self.api_version, ctx.api_version, CMD, &keys)?;

let cf = Self::rawkv_cf(&cf, self.api_version)?;
check_key_size!(keys.iter(), self.max_key_size, callback);

let modifies = keys
.into_iter()
.map(|k| Self::raw_delete_request_to_modify(cf, k))
.collect();
let mut batch = WriteData::from_modifies(modifies);
batch.set_allowed_on_disk_almost_full();

self.engine.async_write(
&ctx,
batch,
Box::new(|res| callback(res.map_err(Error::from))),
)?;
KV_COMMAND_COUNTER_VEC_STATIC.raw_batch_delete.inc();
Ok(())
let engine = self.engine.clone();
let deadline = Self::get_deadline(&ctx);
self.sched_raw_command(CMD, async move {
if let Err(e) = deadline.check() {
return callback(Err(Error::from(e)));
}
let command_duration = tikv_util::time::Instant::now();
let modifies = keys
.into_iter()
.map(|k| Self::raw_delete_request_to_modify(cf, k))
.collect();
let mut batch = WriteData::from_modifies(modifies);
batch.set_allowed_on_disk_almost_full();
let (cb, f) = tikv_util::future::paired_future_callback();
let async_ret =
engine.async_write(&ctx, batch, Box::new(|res| cb(res.map_err(Error::from))));
let v: Result<()> = match async_ret {
Err(e) => Err(Error::from(e)),
Ok(_) => f.await.unwrap(),
};
callback(v);
KV_COMMAND_COUNTER_VEC_STATIC.get(CMD).inc();
SCHED_STAGE_COUNTER_VEC.get(CMD).write_finish.inc();
SCHED_HISTOGRAM_VEC_STATIC
.get(CMD)
.observe(command_duration.saturating_elapsed().as_secs_f64());
})
}

/// Scan raw keys in a range.
Expand Down Expand Up @@ -2444,7 +2529,8 @@ impl<E: Engine, L: LockManager, F: KvFormat> Storage<E, L, F> {
)?;

let cf = Self::rawkv_cf(&cf, self.api_version)?;
let modifies = Self::raw_batch_put_requests_to_modifies(cf, pairs, ttls)?;
Self::check_ttl_valid(pairs.len(), &ttls)?;
let modifies = Self::raw_batch_put_requests_to_modifies(cf, pairs, ttls);
let cmd = RawAtomicStore::new(cf, modifies, ctx);
self.sched_txn_command(cmd, callback)
}
Expand Down
Loading

0 comments on commit f0f59be

Please sign in to comment.