Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ds): Implement recalibration for sampling mode PROJECT #80241

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/sentry/dynamic_sampling/rules/biases/recalibration_bias.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from sentry.dynamic_sampling.rules.biases.base import Bias
from sentry.dynamic_sampling.rules.utils import RESERVED_IDS, PolymorphicRule, RuleType
from sentry.dynamic_sampling.tasks.helpers.recalibrate_orgs import get_adjusted_factor
from sentry.dynamic_sampling.tasks.helpers.recalibrate_orgs import (
get_adjusted_factor,
get_adjusted_project_factor,
)
from sentry.dynamic_sampling.utils import is_project_mode_sampling
from sentry.models.project import Project


Expand All @@ -17,7 +21,11 @@ class RecalibrationBias(Bias):
"""

def generate_rules(self, project: Project, base_sample_rate: float) -> list[PolymorphicRule]:
adjusted_factor = get_adjusted_factor(project.organization.id)
if is_project_mode_sampling(project.organization):
adjusted_factor = get_adjusted_project_factor(project.id)
else:
adjusted_factor = get_adjusted_factor(project.organization.id)

# We don't want to generate any rule in case the factor is 1.0 since we should multiply the factor and 1.0
# is the identity of the multiplication.
if adjusted_factor == 1.0:
Expand Down
14 changes: 4 additions & 10 deletions src/sentry/dynamic_sampling/tasks/boost_low_volume_projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from collections import defaultdict
from collections.abc import Mapping, Sequence
from datetime import datetime, timedelta
from enum import Enum

import sentry_sdk
from snuba_sdk import (
Expand Down Expand Up @@ -57,7 +56,7 @@
dynamic_sampling_task_with_context,
sample_function,
)
from sentry.dynamic_sampling.types import DynamicSamplingMode
from sentry.dynamic_sampling.types import DynamicSamplingMode, SamplingMeasure
from sentry.dynamic_sampling.utils import has_dynamic_sampling, is_project_mode_sampling
from sentry.models.options import OrganizationOption
from sentry.models.organization import Organization
Expand All @@ -79,13 +78,6 @@
logger = logging.getLogger(__name__)


class SamplingMeasure(Enum):
"""The type of data being measured for dynamic sampling rebalancing."""

SPANS = "spans"
TRANSACTIONS = "transactions"


@instrumented_task(
name="sentry.dynamic_sampling.tasks.boost_low_volume_projects",
queue="dynamicsampling",
Expand Down Expand Up @@ -121,7 +113,9 @@ def boost_low_volume_projects(context: TaskContext) -> None:


@metrics.wraps("dynamic_sampling.partition_by_measure")
def partition_by_measure(org_ids: list[OrganizationId]) -> Mapping[SamplingMeasure, list[int]]:
def partition_by_measure(
org_ids: list[OrganizationId],
) -> Mapping[SamplingMeasure, list[OrganizationId]]:
"""
Partitions the orgs by the measure that will be used to adjust the sample
rates. This is controlled through a feature flag on the organization,
Expand Down
42 changes: 42 additions & 0 deletions src/sentry/dynamic_sampling/tasks/helpers/recalibrate_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,48 @@ def delete_adjusted_factor(org_id: int) -> None:
redis_client.delete(cache_key)


def generate_recalibrate_projects_cache_key(project_id: int) -> str:
return f"ds::p:{project_id}:rate_rebalance_factor2"


def set_guarded_adjusted_project_factor(project_id: int, adjusted_factor: float) -> None:
redis_client = get_redis_client_for_ds()
cache_key = generate_recalibrate_projects_cache_key(project_id)

# Only if the factor we want to adjust is different from 1.0 we want to store a value in Redis, since it doesn't
# make sense to 1.0 because the generated rule will multiply by 1.0 any number which won't make any difference.
if adjusted_factor != 1.0:
redis_client.set(cache_key, adjusted_factor)
# Since we don't want any error to cause the system to drift significantly from the target sample rate, we want
# to set a small TTL for the adjusted factor.
redis_client.pexpire(cache_key, ADJUSTED_FACTOR_REDIS_CACHE_KEY_TTL)
else:
delete_adjusted_project_factor(project_id)


def get_adjusted_project_factor(project_id: int) -> float:
redis_client = get_redis_client_for_ds()
cache_key = generate_recalibrate_projects_cache_key(project_id)

try:
value = redis_client.get(cache_key)
if value is not None:
return float(value)
except (TypeError, ValueError):
# By default, the previous factor is equal to the identity of the multiplication and this is done because
# the recalibration rule will be a factor rule and thus multiplied with the first sample rate rule that will
# match after this.
pass
return 1.0


def delete_adjusted_project_factor(project_id: int) -> None:
redis_client = get_redis_client_for_ds()
cache_key = generate_recalibrate_projects_cache_key(project_id)

redis_client.delete(cache_key)


def compute_adjusted_factor(
prev_factor: float, effective_sample_rate: float, target_sample_rate: float
) -> float | None:
Expand Down
143 changes: 115 additions & 28 deletions src/sentry/dynamic_sampling/tasks/recalibrate_orgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import sentry_sdk

from sentry import quotas
from sentry.constants import SAMPLING_MODE_DEFAULT, TARGET_SAMPLE_RATE_DEFAULT
from sentry.dynamic_sampling.rules.utils import DecisionKeepCount, OrganizationId, ProjectId
from sentry.dynamic_sampling.tasks.boost_low_volume_projects import (
fetch_projects_with_total_root_transaction_count_and_rates,
)
from sentry.dynamic_sampling.tasks.common import GetActiveOrgsVolumes, TimedIterator
from sentry.dynamic_sampling.tasks.constants import (
MAX_REBALANCE_FACTOR,
Expand All @@ -12,8 +17,11 @@
from sentry.dynamic_sampling.tasks.helpers.recalibrate_orgs import (
compute_adjusted_factor,
delete_adjusted_factor,
delete_adjusted_project_factor,
get_adjusted_factor,
get_adjusted_project_factor,
set_guarded_adjusted_factor,
set_guarded_adjusted_project_factor,
)
from sentry.dynamic_sampling.tasks.helpers.sample_rate import get_org_sample_rate
from sentry.dynamic_sampling.tasks.logging import log_sample_rate_source
Expand All @@ -23,7 +31,10 @@
dynamic_sampling_task_with_context,
sample_function,
)
from sentry.dynamic_sampling.types import DynamicSamplingMode, SamplingMeasure
from sentry.dynamic_sampling.utils import has_dynamic_sampling
from sentry.models.options.organization_option import OrganizationOption
from sentry.models.options.project_option import ProjectOption
from sentry.models.organization import Organization
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task
Expand All @@ -44,14 +55,29 @@ def recalibrate_orgs(context: TaskContext) -> None:
context,
GetActiveOrgsVolumes(),
):
valid_orgs = []
modes = OrganizationOption.objects.get_value_bulk_id(
[v.org_id for v in org_volumes], "sentry:sampling_mode", SAMPLING_MODE_DEFAULT
)

orgs_batch = []
projects_batch = []

for org_volume in org_volumes:
if org_volume.is_valid_for_recalibration():
valid_orgs.append((org_volume.org_id, org_volume.total, org_volume.indexed))
if not org_volume.is_valid_for_recalibration():
continue

# We run an asynchronous job for recalibrating a batch of orgs whose size is specified in
# `GetActiveOrgsVolumes`.
recalibrate_orgs_batch.delay(valid_orgs)
if modes[org_volume.org_id] == DynamicSamplingMode.PROJECT:
projects_batch.append(org_volume.org_id)
else:
orgs_batch.append((org_volume.org_id, org_volume.total, org_volume.indexed))

# We run an asynchronous job for recalibrating a batch of orgs whose
# size is specified in `GetActiveOrgsVolumes`.
if orgs_batch:
recalibrate_orgs_batch.delay(orgs_batch)

if projects_batch:
recalibrate_projects_batch.delay(projects_batch)


@instrumented_task(
Expand All @@ -64,7 +90,7 @@ def recalibrate_orgs(context: TaskContext) -> None:
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task
def recalibrate_orgs_batch(orgs: Sequence[tuple[int, int, int]]) -> None:
def recalibrate_orgs_batch(orgs: Sequence[tuple[OrganizationId, int, int]]) -> None:
for org_id, total, indexed in orgs:
try:
recalibrate_org(org_id, total, indexed)
Expand All @@ -73,7 +99,7 @@ def recalibrate_orgs_batch(orgs: Sequence[tuple[int, int, int]]) -> None:
continue


def recalibrate_org(org_id: int, total: int, indexed: int) -> None:
def recalibrate_org(org_id: OrganizationId, total: int, indexed: int) -> None:
try:
# We need the organization object for the feature flag.
organization = Organization.objects.get_from_cache(id=org_id)
Expand All @@ -92,26 +118,16 @@ def recalibrate_org(org_id: int, total: int, indexed: int) -> None:
org_id=org_id,
default_sample_rate=quotas.backend.get_blended_sample_rate(organization_id=org_id),
)
if success:
sample_function(
function=log_sample_rate_source,
_sample_rate=0.1,
org_id=org_id,
project_id=None,
used_for="recalibrate_orgs",
source="sliding_window_org",
sample_rate=target_sample_rate,
)
else:
sample_function(
function=log_sample_rate_source,
_sample_rate=0.1,
org_id=org_id,
project_id=None,
used_for="recalibrate_orgs",
source="blended_sample_rate",
sample_rate=target_sample_rate,
)

sample_function(
function=log_sample_rate_source,
_sample_rate=0.1,
org_id=org_id,
project_id=None,
used_for="recalibrate_orgs",
source="sliding_window_org" if success else "blended_sample_rate",
sample_rate=target_sample_rate,
)

# If we didn't find any sample rate, we can't recalibrate the organization.
if target_sample_rate is None:
Expand Down Expand Up @@ -141,3 +157,74 @@ def recalibrate_org(org_id: int, total: int, indexed: int) -> None:

# At the end we set the adjusted factor.
set_guarded_adjusted_factor(org_id, adjusted_factor)


@instrumented_task(
name="sentry.dynamic_sampling.tasks.recalibrate_projects_batch",
queue="dynamicsampling",
default_retry_delay=5,
max_retries=5,
soft_time_limit=25 * 60,
time_limit=2 * 60 + 5,
silo_mode=SiloMode.REGION,
)
@dynamic_sampling_task_with_context(max_task_execution=MAX_TASK_SECONDS)
def recalibrate_projects_batch(context: TaskContext, orgs: list[OrganizationId]) -> None:
for org_id, projects in fetch_projects_with_total_root_transaction_count_and_rates(
context, org_ids=orgs, measure=SamplingMeasure.SPANS
).items():
sample_rates = ProjectOption.objects.get_value_bulk_id(
[t[0] for t in projects], "sentry:target_sample_rate"
)

for project_id, total, keep, _ in projects:
try:
recalibrate_project(org_id, project_id, total, keep, sample_rates[project_id])
except Exception as e:
sentry_sdk.capture_exception(e)
continue


def recalibrate_project(
org_id: OrganizationId,
project_id: ProjectId,
total: int,
indexed: DecisionKeepCount,
target_sample_rate: float | None,
) -> None:
if target_sample_rate is None:
target_sample_rate = TARGET_SAMPLE_RATE_DEFAULT

sample_function(
function=log_sample_rate_source,
_sample_rate=0.1,
org_id=org_id,
project_id=project_id,
used_for="recalibrate_orgs",
source="project_setting",
sample_rate=target_sample_rate,
)

# We compute the effective sample rate that we had in the last considered time window.
effective_sample_rate = indexed / total
# We get the previous factor that was used for the recalibration.
previous_factor = get_adjusted_project_factor(project_id)

# We want to compute the new adjusted factor.
adjusted_factor = compute_adjusted_factor(
previous_factor, effective_sample_rate, target_sample_rate
)
if adjusted_factor is None:
sentry_sdk.capture_message(
"The adjusted factor for org recalibration could not be computed"
)
return

if adjusted_factor < MIN_REBALANCE_FACTOR or adjusted_factor > MAX_REBALANCE_FACTOR:
# In case the new factor would result into too much recalibration, we want to remove it from cache,
# effectively removing the generated rule.
delete_adjusted_project_factor(project_id)
return

# At the end we set the adjusted factor.
set_guarded_adjusted_project_factor(project_id, adjusted_factor)
12 changes: 12 additions & 0 deletions src/sentry/dynamic_sampling/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
from enum import Enum

from django.db import models
from django.utils.translation import gettext_lazy as _


class DynamicSamplingMode(models.TextChoices):
"""Defines the scope where target sample rates are configured in an
organization."""

ORGANIZATION = "organization", _("Organization")
PROJECT = "project", _("Project")


class SamplingMeasure(Enum):
"""The type of data being measured for dynamic sampling rebalancing."""

SPANS = "spans"
TRANSACTIONS = "transactions"
13 changes: 11 additions & 2 deletions src/sentry/models/options/organization_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,24 @@

class OrganizationOptionManager(OptionManager["OrganizationOption"]):
def get_value_bulk(
self, instances: Sequence[Organization], key: str
self, instances: Sequence[Organization], key: str, default: Any = None
) -> Mapping[Organization, Any]:
instance_map = {i.id: i for i in instances}
queryset = self.filter(organization__in=instances, key=key)
result = {i: None for i in instances}
result = {i: default for i in instances}
for obj in queryset:
result[instance_map[obj.organization_id]] = obj.value
return result

def get_value_bulk_id(
self, ids: Sequence[int], key: str, default: Any = None
) -> Mapping[int, Any]:
queryset = self.filter(organization_id__in=ids, key=key)
result = {i: default for i in ids}
for obj in queryset:
result[obj.organization_id] = obj.value
return result

def get_value(
self,
organization: Organization,
Expand Down
7 changes: 7 additions & 0 deletions src/sentry/models/options/project_option.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ def get_value_bulk(self, instances: Sequence[Project], key: str) -> Mapping[Proj
result[instance_map[obj.project_id]] = obj.value
return result

def get_value_bulk_id(self, ids: Sequence[int], key: str) -> Mapping[int, Any]:
queryset = self.filter(project_id__in=ids, key=key)
result = {i: None for i in ids}
for obj in queryset:
result[obj.project_id] = obj.value
return result

def get_value(
self,
project: int | Project,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from sentry.dynamic_sampling.rules.base import get_guarded_project_sample_rate
from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds
from sentry.dynamic_sampling.tasks.boost_low_volume_projects import (
SamplingMeasure,
boost_low_volume_projects,
boost_low_volume_projects_of_org_with_query,
fetch_projects_with_total_root_transaction_count_and_rates,
Expand All @@ -18,7 +17,7 @@
generate_sliding_window_org_cache_key,
)
from sentry.dynamic_sampling.tasks.task_context import TaskContext
from sentry.dynamic_sampling.types import DynamicSamplingMode
from sentry.dynamic_sampling.types import DynamicSamplingMode, SamplingMeasure
from sentry.models.options.organization_option import OrganizationOption
from sentry.models.organization import Organization
from sentry.models.project import Project
Expand Down
Loading
Loading