From 7badb80a463b1575e803e269e15e5a5fa49355cb Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Tue, 1 Oct 2024 20:57:01 -0300 Subject: [PATCH 1/6] Add scheduling on top of denokv Signed-off-by: Marcos Candeia --- deno.json | 2 +- src/actors/env.ts | 3 + src/actors/runtime.test.ts | 14 ++ src/actors/runtime.ts | 5 +- src/actors/storage.ts | 3 + src/actors/storage/cached.ts | 15 ++ src/actors/storage/denoKv/alarms.ts | 160 ++++++++++++++++++ .../storage/{denoKv.ts => denoKv/storage.ts} | 32 +++- 8 files changed, 228 insertions(+), 6 deletions(-) create mode 100644 src/actors/env.ts create mode 100644 src/actors/storage/denoKv/alarms.ts rename src/actors/storage/{denoKv.ts => denoKv/storage.ts} (85%) diff --git a/deno.json b/deno.json index 7c279fe..4dca9c6 100644 --- a/deno.json +++ b/deno.json @@ -31,7 +31,7 @@ }, "tasks": { "check": "deno fmt && deno lint --fix && deno check ./src/actors/mod.ts ./src/actors/hono/middleware.ts", - "test": "rm kv;deno test -A --unstable-kv .", + "test": "rm kv;deno test -A --env --unstable-cron --unstable-kv .", "release": "deno run -A jsr:@deco/scripts/release" }, "lock": false, diff --git a/src/actors/env.ts b/src/actors/env.ts new file mode 100644 index 0000000..2d6f972 --- /dev/null +++ b/src/actors/env.ts @@ -0,0 +1,3 @@ +export const DENO_ISOLATE_INSTANCE_ID: string | undefined = Deno.env.get( + "DENO_ISOLATE_INSTANCE_ID", +); diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 300bee7..6b25d1f 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -2,6 +2,7 @@ import { assertEquals, assertFalse } from "@std/assert"; import { actors } from "./proxy.ts"; import { ActorRuntime } from "./runtime.ts"; import type { ActorState } from "./state.ts"; +import { triggerAlarms } from "./storage/denoKv/alarms.ts"; import type { ChannelUpgrader } from "./util/channels/channel.ts"; import { WatchTarget } from "./util/watch.ts"; @@ -22,6 +23,13 @@ class Counter { this.watchTarget.notify(this.count); return this.count; } + async scheduleIncrement(): Promise { + await this.state.storage.setAlarm(Date.now() + 1); // next tick + } + + async alarm() { + await this.increment(); + } async decrement(): Promise { this.count--; @@ -110,4 +118,10 @@ Deno.test("counter tests", async () => { assertEquals(done, false); } watcher.return?.(); + + const counterWatcher = await actor.watch(); + await actor.scheduleIncrement(); + await triggerAlarms(); + assertEquals(await counterWatcher.next().then((x) => x.value), 2); + counterWatcher.return?.(); }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 374d10f..03d2963 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -1,7 +1,8 @@ import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http"; +import { DENO_ISOLATE_INSTANCE_ID } from "./env.ts"; import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts"; import { ActorState } from "./state.ts"; -import { DenoKvActorStorage } from "./storage/denoKv.ts"; +import { DenoKvActorStorage } from "./storage/denoKv/storage.ts"; import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts"; import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts"; @@ -119,7 +120,7 @@ export class ActorRuntime { async fetch(req: Request): Promise { const url = new URL(req.url); const actorId = req.headers.get(ACTOR_ID_HEADER_NAME) ?? - url.searchParams.get(ACTOR_ID_QS_NAME); + url.searchParams.get(ACTOR_ID_QS_NAME) ?? DENO_ISOLATE_INSTANCE_ID; if (!actorId) { return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, { status: 400, diff --git a/src/actors/storage.ts b/src/actors/storage.ts index 9319a80..b62e77f 100644 --- a/src/actors/storage.ts +++ b/src/actors/storage.ts @@ -53,4 +53,7 @@ export interface ActorStorage { delete(keys: string[][], options?: ActorStoragePutOptions): Promise; deleteAll(options?: ActorStoragePutOptions): Promise; atomic(storage: (st: ActorStorage) => Promise): Promise; + setAlarm(dt: number): Promise; + getAlarm(): Promise; + deleteAlarm(): Promise; } diff --git a/src/actors/storage/cached.ts b/src/actors/storage/cached.ts index 7ba0e7e..35c3368 100644 --- a/src/actors/storage/cached.ts +++ b/src/actors/storage/cached.ts @@ -8,8 +8,23 @@ import type { export class CachedStorage implements ActorStorage { protected cache: Map = new Map(); + protected alarm: Promise | null = null; constructor(protected innerStorage: ActorStorage) {} + setAlarm(dt: number): Promise { + return this.innerStorage.setAlarm(dt).then(() => { + this.alarm = Promise.resolve(dt); + }); + } + getAlarm(): Promise { + this.alarm ??= this.innerStorage.getAlarm(); + return this.alarm; + } + deleteAlarm(): Promise { + return this.innerStorage.deleteAlarm().then(() => { + this.alarm = null; + }); + } private async getMany( keys: string[][], diff --git a/src/actors/storage/denoKv/alarms.ts b/src/actors/storage/denoKv/alarms.ts new file mode 100644 index 0000000..b763df7 --- /dev/null +++ b/src/actors/storage/denoKv/alarms.ts @@ -0,0 +1,160 @@ +import { actors } from "../../proxy.ts"; +import { kv } from "./storage.ts"; + +function minutesNow(timestamp = Date.now()): number { + const date = new Date(timestamp); + + // Set seconds and milliseconds to zero to floor to the start of the next minute + date.setSeconds(0, 0); + + // Add one minute (60,000 milliseconds) + const nextMinute = new Date(date.getTime() + 60 * 1000); + + // Subtract 1 millisecond to get the last millisecond of the current minute + return nextMinute.getTime() - 1; +} + +export interface Alarm extends CreateAlarmPayload { + id: string; + retries?: Retry[]; +} +export interface CreateAlarmPayload { + actorName: string; + actorId: string; + triggerAt: number; +} +export interface Retry { + reason: string; +} +const TRIGGERS = ["triggers"]; +const ALARMS = ["alarms"]; +export const Alarms = { + id: (alarm: Omit) => + `${alarm.actorName}:${alarm.actorId}`, + schedule: async (payload: CreateAlarmPayload): Promise => { + const id = Alarms.id(payload); + const currentAlarm = await Alarms.get(payload); + + const triggerKey = [...TRIGGERS, payload.triggerAt, id]; + const alarm = { ...payload, id }; + let transaction = kv.atomic().set(triggerKey, alarm).check(currentAlarm); + if (currentAlarm.versionstamp !== null && currentAlarm.value) { + transaction = transaction.delete([ + ...TRIGGERS, + currentAlarm.value.triggerAt, + id, + ]); + } + await Alarms.set(payload, transaction); + await transaction.commit(); + return alarm; + }, + get: async (payload: Omit) => { + const id = Alarms.id(payload); + const key = [...ALARMS, id]; + const result = await kv.get(key); + return result; + }, + set: async ( + payload: Omit | Alarm, + denoKv: { set: Deno.Kv["set"] | Deno.AtomicOperation["set"] } = kv, + ) => { + const id = Alarms.id(payload); + const alarm = { ...payload, id }; + const key = [...ALARMS, id]; + const result = denoKv.set(key, alarm); + if (result instanceof Deno.AtomicOperation) { + return; + } + const awaited = await result; + if (awaited.ok) { + throw new Error(`alarm could not be created`); + } + return { + key, + value: alarm, + versionstamp: awaited.versionstamp, + }; + }, + ack: async (alarm: Alarm): Promise => { + await kv.atomic().delete([...ALARMS, alarm.id]).delete([ + ...TRIGGERS, + alarm.triggerAt, + alarm.id, + ]).commit(); + }, + retry: async (alarm: Alarm, reason: string): Promise => { + const savedAlarm = await Alarms.get(alarm); + + const transaction = kv.atomic().check(savedAlarm); + await Alarms.set({ + ...alarm, + retries: [...alarm.retries ?? [], { reason }], + }, transaction); + await transaction + .commit(); + }, + getRetries: async (alarm: Alarm): Promise => { + const retry = await Alarms.get(alarm); + return retry?.value?.retries; + }, + next: async function* (): AsyncIterableIterator { + const selector = { + prefix: TRIGGERS, + end: [...TRIGGERS, minutesNow()], + }; + const iter = kv.list(selector); + + for await (const alarm of iter) yield alarm.value; + }, + async *watch() { + while (true) { + let foundAlarms = false; + + // Fetch and yield alarms + for await (const alarm of Alarms.next()) { + foundAlarms = true; + yield alarm; + } + + // Sleep until the next minute if no alarms were found + if (!foundAlarms) { + const now = new Date(); + const msUntilNextMinute = 60 * 1000 - + (now.getSeconds() * 1000 + now.getMilliseconds()); + await new Promise((resolve) => setTimeout(resolve, msUntilNextMinute)); + } + } + }, +}; + +const tryAck = async (alarm: Alarm): Promise => { + try { + const proxy = actors.proxy<{ alarm: () => Promise }>( + alarm.actorName, + ).id(alarm.actorId); + await proxy.alarm(); + await Alarms.ack(alarm); + } catch (error) { + const retries = await Alarms.getRetries(alarm); + if (retries === undefined) { + console.error(`retrying ${alarm}`, error); + await Alarms.retry(alarm, (error as Error).message); + } else if (retries?.length === 10) { + console.error(`retrying ${alarm}`, error, retries); + await Alarms.ack(alarm); + } + } +}; + +const inflight: Record> = {}; + +export const triggerAlarms = async () => { + for await (const alarm of Alarms.next()) { + inflight[alarm.id] ??= tryAck(alarm).finally(() => + delete inflight[alarm.id] + ); + } + await Promise.all(Object.values(inflight)); +}; +Deno.cron("schedulerD", "* * * * *", triggerAlarms); diff --git a/src/actors/storage/denoKv.ts b/src/actors/storage/denoKv/storage.ts similarity index 85% rename from src/actors/storage/denoKv.ts rename to src/actors/storage/denoKv/storage.ts index e2c6f40..340d411 100644 --- a/src/actors/storage/denoKv.ts +++ b/src/actors/storage/denoKv/storage.ts @@ -3,7 +3,8 @@ import type { ActorStorage, ActorStorageListOptions, ActorStoragePutOptions, -} from "../storage.ts"; +} from "../../storage.ts"; +import { Alarms } from "./alarms.ts"; export interface StorageOptions { actorName: string; @@ -18,7 +19,9 @@ const ACTORS_DENO_KV_TOKEN = Deno.env.get("ACTORS_DENO_KV_TOKEN"); ACTORS_DENO_KV_TOKEN && Deno.env.set("DENO_KV_ACCESS_TOKEN", ACTORS_DENO_KV_TOKEN); -const kv = await Deno.openKv(ACTORS_KV_DATABASE); +export const kv = await Deno.openKv(ACTORS_KV_DATABASE); + +const ALARM_KEY = "alarm"; interface AtomicOp { kv: Deno.AtomicOperation; @@ -35,6 +38,24 @@ export class DenoKvActorStorage implements ActorStorage { this.kvOrTransaction = options.atomicOp?.kv ?? kv; this.atomicOp = options.atomicOp; } + async setAlarm(dt: number): Promise { + await Alarms.schedule({ + actorId: this.options.actorId, + actorName: this.options.actorName, + triggerAt: dt, + }); + } + async getAlarm(): Promise { + return await Alarms.get( + { + actorId: this.options.actorId, + actorName: this.options.actorName, + }, + ).then((alarm) => alarm.value?.triggerAt ?? null); + } + async deleteAlarm(): Promise { + await this.delete(ALARM_KEY); + } async atomic(_storage: (st: ActorStorage) => Promise): Promise { if (this.atomicOp) { @@ -62,7 +83,8 @@ export class DenoKvActorStorage implements ActorStorage { // Build the full key based on actor name, id, and provided key buildKey(key: string[]): string[] { - return [this.options.actorName, this.options.actorId, ...key]; + // id should come first as multiples actors name can share same id + return [this.options.actorId, this.options.actorName, ...key]; } // Single get method that handles both single and multiple keys @@ -130,6 +152,10 @@ export class DenoKvActorStorage implements ActorStorage { keys: string[][], options?: ActorStoragePutOptions, ): Promise; + async delete( + keys: string, + options?: ActorStoragePutOptions, + ): Promise; async delete( keyOrKeys: string | string[] | string[][], _options?: ActorStoragePutOptions, From 95783e014071acb7976302c9cbe343e7197da7c1 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Tue, 1 Oct 2024 21:01:20 -0300 Subject: [PATCH 2/6] Use alarm ack to delete alarms Signed-off-by: Marcos Candeia --- src/actors/storage/denoKv/alarms.ts | 18 ++++++++++++++---- src/actors/storage/denoKv/storage.ts | 7 ++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/actors/storage/denoKv/alarms.ts b/src/actors/storage/denoKv/alarms.ts index b763df7..271961e 100644 --- a/src/actors/storage/denoKv/alarms.ts +++ b/src/actors/storage/denoKv/alarms.ts @@ -76,11 +76,21 @@ export const Alarms = { versionstamp: awaited.versionstamp, }; }, - ack: async (alarm: Alarm): Promise => { - await kv.atomic().delete([...ALARMS, alarm.id]).delete([ + ack: async ( + alarm: Alarm | Omit, + ): Promise => { + const id = Alarms.id(alarm); + const triggerAt = "triggerAt" in alarm + ? alarm.triggerAt + : (await Alarms.get(alarm)).value?.triggerAt; + if (!triggerAt) { + throw new Error(`alarm ${id} has no triggerAt`); + } + + await kv.atomic().delete([...ALARMS, id]).delete([ ...TRIGGERS, - alarm.triggerAt, - alarm.id, + triggerAt, + id, ]).commit(); }, retry: async (alarm: Alarm, reason: string): Promise => { diff --git a/src/actors/storage/denoKv/storage.ts b/src/actors/storage/denoKv/storage.ts index 340d411..c0ccb77 100644 --- a/src/actors/storage/denoKv/storage.ts +++ b/src/actors/storage/denoKv/storage.ts @@ -21,8 +21,6 @@ ACTORS_DENO_KV_TOKEN && export const kv = await Deno.openKv(ACTORS_KV_DATABASE); -const ALARM_KEY = "alarm"; - interface AtomicOp { kv: Deno.AtomicOperation; dirty: Deno.KvEntryMaybe[]; @@ -54,7 +52,10 @@ export class DenoKvActorStorage implements ActorStorage { ).then((alarm) => alarm.value?.triggerAt ?? null); } async deleteAlarm(): Promise { - await this.delete(ALARM_KEY); + await Alarms.ack({ + actorId: this.options.actorId, + actorName: this.options.actorName, + }); } async atomic(_storage: (st: ActorStorage) => Promise): Promise { From acb0c9e0b1e9dad89f3b288ecad24e878e29ee3a Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Wed, 2 Oct 2024 09:20:42 -0300 Subject: [PATCH 3/6] format yaml Signed-off-by: Marcos Candeia --- .github/workflows/publish.yaml | 4 ++-- .github/workflows/releaser.yaml | 30 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 2805448..1609a09 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -13,8 +13,8 @@ jobs: runs-on: ubuntu-latest permissions: - contents: read - id-token: write + contents: read + id-token: write steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/releaser.yaml b/.github/workflows/releaser.yaml index 617775d..66bfa86 100644 --- a/.github/workflows/releaser.yaml +++ b/.github/workflows/releaser.yaml @@ -9,9 +9,9 @@ on: - main permissions: - contents: write # Necessary for accessing and modifying repository content - pull-requests: write # Necessary for interacting with pull requests - actions: write # Necessary for triggering other workflows + contents: write # Necessary for accessing and modifying repository content + pull-requests: write # Necessary for interacting with pull requests + actions: write # Necessary for triggering other workflows jobs: tag-discussion: @@ -21,8 +21,8 @@ jobs: - name: Checkout Code uses: actions/checkout@v3 with: - ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) - repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo + ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) + repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo - name: Calculate new versions id: calculate_versions @@ -165,15 +165,15 @@ jobs: git push origin ${{ steps.determine_version.outputs.new_version }} - name: Trigger Release Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' - name: Trigger Publish Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' From afe2a427169a2b5707b66e8bff40a5c7fab4def8 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Wed, 2 Oct 2024 11:05:36 -0300 Subject: [PATCH 4/6] Remove storage log Signed-off-by: Marcos Candeia --- src/actors/storage/denoKv/storage.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/actors/storage/denoKv/storage.ts b/src/actors/storage/denoKv/storage.ts index c0ccb77..28960e0 100644 --- a/src/actors/storage/denoKv/storage.ts +++ b/src/actors/storage/denoKv/storage.ts @@ -207,7 +207,6 @@ export class DenoKvActorStorage implements ActorStorage { ); for await (const entry of iter) { - console.log(entry); result.push([(entry.key as string[]).slice(-2), entry.value]); } From 38ba01b113c4e198d1a140dcf949a419f6846e9b Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Wed, 2 Oct 2024 11:07:34 -0300 Subject: [PATCH 5/6] Remove unnecessary watch method Signed-off-by: Marcos Candeia --- src/actors/storage/denoKv/alarms.ts | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/src/actors/storage/denoKv/alarms.ts b/src/actors/storage/denoKv/alarms.ts index 271961e..16e6f4d 100644 --- a/src/actors/storage/denoKv/alarms.ts +++ b/src/actors/storage/denoKv/alarms.ts @@ -117,25 +117,6 @@ export const Alarms = { for await (const alarm of iter) yield alarm.value; }, - async *watch() { - while (true) { - let foundAlarms = false; - - // Fetch and yield alarms - for await (const alarm of Alarms.next()) { - foundAlarms = true; - yield alarm; - } - - // Sleep until the next minute if no alarms were found - if (!foundAlarms) { - const now = new Date(); - const msUntilNextMinute = 60 * 1000 - - (now.getSeconds() * 1000 + now.getMilliseconds()); - await new Promise((resolve) => setTimeout(resolve, msUntilNextMinute)); - } - } - }, }; const tryAck = async (alarm: Alarm): Promise => { From f9f9c22781ec9b5370e0829d3709a94f41388175 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Wed, 2 Oct 2024 11:10:29 -0300 Subject: [PATCH 6/6] Add schedulerD comment Signed-off-by: Marcos Candeia --- src/actors/storage/denoKv/alarms.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/actors/storage/denoKv/alarms.ts b/src/actors/storage/denoKv/alarms.ts index 16e6f4d..9b71391 100644 --- a/src/actors/storage/denoKv/alarms.ts +++ b/src/actors/storage/denoKv/alarms.ts @@ -148,4 +148,7 @@ export const triggerAlarms = async () => { } await Promise.all(Object.values(inflight)); }; +// TODO (@author M. Candeia): this will make all isolates to spin up every 1 minute, would this be a problem? +// Shouldn't instead have a single job that spin ups isolates if they need to be called instead? +// On the other hand it is good because it works locally and remotelly in the same way. Deno.cron("schedulerD", "* * * * *", triggerAlarms);