Skip to content

Commit

Permalink
Merge pull request #280 from backend-developers-ltd/db-manifest
Browse files Browse the repository at this point in the history
Validator db: separate executor count for each executor class
  • Loading branch information
adal-chiriliuc-reef authored Oct 24, 2024
2 parents 120d098 + aa38a40 commit 117fc92
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.15 on 2024-10-17 13:15

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("validator", "0039_delete_jobfinishedreceipt_delete_jobstartedreceipt"),
]

operations = [
migrations.RemoveConstraint(
model_name="minermanifest",
name="unique_miner_manifest",
),
migrations.AddField(
model_name="minermanifest",
name="executor_class",
field=models.CharField(default="spin_up-4min.gpu-24gb", max_length=255),
preserve_default=False,
),
migrations.AddConstraint(
model_name="minermanifest",
constraint=models.UniqueConstraint(
fields=("miner", "batch", "executor_class"), name="unique_miner_manifest"
),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,15 @@ class MinerManifest(models.Model):
miner = models.ForeignKey(Miner, on_delete=models.CASCADE)
batch = models.ForeignKey(SyntheticJobBatch, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
executor_class = models.CharField(max_length=255)
executor_count = models.IntegerField(default=0)
online_executor_count = models.IntegerField(default=0)

class Meta:
constraints = [
UniqueConstraint(fields=["miner", "batch"], name="unique_miner_manifest"),
UniqueConstraint(
fields=["miner", "batch", "executor_class"], name="unique_miner_manifest"
),
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ class BatchContext:
clients: dict[str, MinerClient]
executors: dict[str, defaultdict[ExecutorClass, int]]
job_generators: dict[str, dict[ExecutorClass, list[BaseSyntheticJobGenerator]]]
online_executor_count: dict[str, int]
previous_online_executor_count: dict[str, int | None]
online_executor_count: dict[str, defaultdict[ExecutorClass, int]]
previous_online_executor_count: dict[str, defaultdict[ExecutorClass, int]]

manifests: dict[str, ExecutorManifest | None]
manifest_events: dict[str, asyncio.Event]
Expand Down Expand Up @@ -654,8 +654,8 @@ def _init_context(
ctx.clients[hotkey] = create_miner_client(ctx=ctx, miner_hotkey=hotkey)
ctx.executors[hotkey] = defaultdict(int)
ctx.job_generators[hotkey] = {}
ctx.online_executor_count[hotkey] = 0
ctx.previous_online_executor_count[hotkey] = None
ctx.online_executor_count[hotkey] = defaultdict(int)
ctx.previous_online_executor_count[hotkey] = defaultdict(int)
ctx.manifests[hotkey] = None
ctx.manifest_events[hotkey] = asyncio.Event()

Expand All @@ -665,10 +665,11 @@ def _init_context(
def _get_max_spin_up_time(ctx: BatchContext) -> int:
max_spin_up_time = _MIN_SPIN_UP_TIME
for executors in ctx.executors.values():
for executor_class in executors.keys():
spin_up_time = EXECUTOR_CLASS[executor_class].spin_up_time
assert spin_up_time is not None
max_spin_up_time = max(max_spin_up_time, spin_up_time)
for executor_class, count in executors.items():
if count > 0:
spin_up_time = EXECUTOR_CLASS[executor_class].spin_up_time
assert spin_up_time is not None
max_spin_up_time = max(max_spin_up_time, spin_up_time)
return max_spin_up_time


Expand Down Expand Up @@ -1368,16 +1369,18 @@ async def _score_jobs(ctx: BatchContext) -> None:
# compute for each hotkey how many executors finished successfully
for job in ctx.jobs.values():
if job.success:
ctx.online_executor_count[job.miner_hotkey] += 1
ctx.online_executor_count[job.miner_hotkey][job.executor_class] += 1

# apply manifest bonus
# do not combine with the previous loop, we use online_executor_count
for job in ctx.jobs.values():
if job.success:
try:
job.score_manifest_multiplier = await get_manifest_multiplier(
ctx.previous_online_executor_count[job.miner_hotkey],
ctx.online_executor_count[job.miner_hotkey],
ctx.previous_online_executor_count[job.miner_hotkey].get(
job.executor_class, None
),
ctx.online_executor_count[job.miner_hotkey].get(job.executor_class, 0),
)
except (Exception, asyncio.CancelledError) as exc:
logger.warning("%s failed to score: %r", job.name, exc)
Expand Down Expand Up @@ -1405,7 +1408,8 @@ def _db_get_previous_online_executor_count(ctx: BatchContext) -> None:
for manifest in MinerManifest.objects.filter(batch_id=previous_batch.id):
# only update if the miner is still serving
if manifest.miner.hotkey in ctx.previous_online_executor_count:
ctx.previous_online_executor_count[manifest.miner.hotkey] = (
executor_class = ExecutorClass(manifest.executor_class)
ctx.previous_online_executor_count[manifest.miner.hotkey][executor_class] = (
manifest.online_executor_count
)

Expand Down Expand Up @@ -1474,14 +1478,15 @@ def _db_persist(ctx: BatchContext) -> None:

miner_manifests: list[MinerManifest] = []
for miner in ctx.miners.values():
manifest = ctx.manifests[miner.hotkey]
if manifest is not None:
for executor_class, count in ctx.executors[miner.hotkey].items():
online_executor_count = ctx.online_executor_count[miner.hotkey].get(executor_class, 0)
miner_manifests.append(
MinerManifest(
miner=miner,
batch=batch,
executor_count=manifest.total_count,
online_executor_count=ctx.online_executor_count[miner.hotkey],
executor_class=executor_class,
executor_count=count,
online_executor_count=online_executor_count,
)
)
MinerManifest.objects.bulk_create(miner_manifests)
Expand Down Expand Up @@ -1573,7 +1578,7 @@ async def execute_synthetic_batch_run(
await ctx.checkpoint_system_event("_get_total_executor_count")
total_executor_count = _get_total_executor_count(ctx)

if total_executor_count != 0:
if total_executor_count > 0:
await ctx.checkpoint_system_event("_generate_jobs")
await _generate_jobs(ctx)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ async def test_manifest_dance_incentives(
await MinerManifest.objects.acreate(
miner=miner,
batch=batch,
executor_class=DEFAULT_EXECUTOR_CLASS,
executor_count=prev_online_executor_count,
online_executor_count=prev_online_executor_count,
)
Expand Down Expand Up @@ -168,6 +169,7 @@ async def test_synthetic_job_batch(
await MinerManifest.objects.acreate(
miner=miner,
batch=batch,
executor_class=DEFAULT_EXECUTOR_CLASS,
executor_count=prev_online_executor_count,
online_executor_count=prev_online_executor_count,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ async def test_manifest_dance_incentives(
await MinerManifest.objects.acreate(
miner=miner,
batch=batch,
executor_class=DEFAULT_EXECUTOR_CLASS,
executor_count=prev_online_executor_count,
online_executor_count=prev_online_executor_count,
)
Expand Down Expand Up @@ -511,6 +512,7 @@ def test_create_and_run_synthetic_job_batch(
MinerManifest.objects.create(
miner=miner,
batch=batch,
executor_class=DEFAULT_EXECUTOR_CLASS,
executor_count=previous_online_executors,
online_executor_count=previous_online_executors,
)
Expand Down

0 comments on commit 117fc92

Please sign in to comment.