diff --git a/.github/workflows/publish.yaml b/.github/workflows/publish.yaml index 2805448..1609a09 100644 --- a/.github/workflows/publish.yaml +++ b/.github/workflows/publish.yaml @@ -13,8 +13,8 @@ jobs: runs-on: ubuntu-latest permissions: - contents: read - id-token: write + contents: read + id-token: write steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/releaser.yaml b/.github/workflows/releaser.yaml index 617775d..66bfa86 100644 --- a/.github/workflows/releaser.yaml +++ b/.github/workflows/releaser.yaml @@ -9,9 +9,9 @@ on: - main permissions: - contents: write # Necessary for accessing and modifying repository content - pull-requests: write # Necessary for interacting with pull requests - actions: write # Necessary for triggering other workflows + contents: write # Necessary for accessing and modifying repository content + pull-requests: write # Necessary for interacting with pull requests + actions: write # Necessary for triggering other workflows jobs: tag-discussion: @@ -21,8 +21,8 @@ jobs: - name: Checkout Code uses: actions/checkout@v3 with: - ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) - repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo + ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository) + repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo - name: Calculate new versions id: calculate_versions @@ -165,15 +165,15 @@ jobs: git push origin ${{ steps.determine_version.outputs.new_version }} - name: Trigger Release Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' - name: Trigger Publish Workflow run: | - curl -X POST \ - -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ - -H "Accept: application/vnd.github.everest-preview+json" \ - https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ - -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' + curl -X POST \ + -H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \ + -H "Accept: application/vnd.github.everest-preview+json" \ + https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \ + -d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}' diff --git a/deno.json b/deno.json index 7c279fe..4dca9c6 100644 --- a/deno.json +++ b/deno.json @@ -31,7 +31,7 @@ }, "tasks": { "check": "deno fmt && deno lint --fix && deno check ./src/actors/mod.ts ./src/actors/hono/middleware.ts", - "test": "rm kv;deno test -A --unstable-kv .", + "test": "rm kv;deno test -A --env --unstable-cron --unstable-kv .", "release": "deno run -A jsr:@deco/scripts/release" }, "lock": false, diff --git a/src/actors/env.ts b/src/actors/env.ts new file mode 100644 index 0000000..2d6f972 --- /dev/null +++ b/src/actors/env.ts @@ -0,0 +1,3 @@ +export const DENO_ISOLATE_INSTANCE_ID: string | undefined = Deno.env.get( + "DENO_ISOLATE_INSTANCE_ID", +); diff --git a/src/actors/runtime.test.ts b/src/actors/runtime.test.ts index 300bee7..6b25d1f 100644 --- a/src/actors/runtime.test.ts +++ b/src/actors/runtime.test.ts @@ -2,6 +2,7 @@ import { assertEquals, assertFalse } from "@std/assert"; import { actors } from "./proxy.ts"; import { ActorRuntime } from "./runtime.ts"; import type { ActorState } from "./state.ts"; +import { triggerAlarms } from "./storage/denoKv/alarms.ts"; import type { ChannelUpgrader } from "./util/channels/channel.ts"; import { WatchTarget } from "./util/watch.ts"; @@ -22,6 +23,13 @@ class Counter { this.watchTarget.notify(this.count); return this.count; } + async scheduleIncrement(): Promise { + await this.state.storage.setAlarm(Date.now() + 1); // next tick + } + + async alarm() { + await this.increment(); + } async decrement(): Promise { this.count--; @@ -110,4 +118,10 @@ Deno.test("counter tests", async () => { assertEquals(done, false); } watcher.return?.(); + + const counterWatcher = await actor.watch(); + await actor.scheduleIncrement(); + await triggerAlarms(); + assertEquals(await counterWatcher.next().then((x) => x.value), 2); + counterWatcher.return?.(); }); diff --git a/src/actors/runtime.ts b/src/actors/runtime.ts index 374d10f..03d2963 100644 --- a/src/actors/runtime.ts +++ b/src/actors/runtime.ts @@ -1,7 +1,8 @@ import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http"; +import { DENO_ISOLATE_INSTANCE_ID } from "./env.ts"; import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts"; import { ActorState } from "./state.ts"; -import { DenoKvActorStorage } from "./storage/denoKv.ts"; +import { DenoKvActorStorage } from "./storage/denoKv/storage.ts"; import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts"; import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts"; @@ -119,7 +120,7 @@ export class ActorRuntime { async fetch(req: Request): Promise { const url = new URL(req.url); const actorId = req.headers.get(ACTOR_ID_HEADER_NAME) ?? - url.searchParams.get(ACTOR_ID_QS_NAME); + url.searchParams.get(ACTOR_ID_QS_NAME) ?? DENO_ISOLATE_INSTANCE_ID; if (!actorId) { return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, { status: 400, diff --git a/src/actors/storage.ts b/src/actors/storage.ts index 9319a80..b62e77f 100644 --- a/src/actors/storage.ts +++ b/src/actors/storage.ts @@ -53,4 +53,7 @@ export interface ActorStorage { delete(keys: string[][], options?: ActorStoragePutOptions): Promise; deleteAll(options?: ActorStoragePutOptions): Promise; atomic(storage: (st: ActorStorage) => Promise): Promise; + setAlarm(dt: number): Promise; + getAlarm(): Promise; + deleteAlarm(): Promise; } diff --git a/src/actors/storage/cached.ts b/src/actors/storage/cached.ts index 7ba0e7e..35c3368 100644 --- a/src/actors/storage/cached.ts +++ b/src/actors/storage/cached.ts @@ -8,8 +8,23 @@ import type { export class CachedStorage implements ActorStorage { protected cache: Map = new Map(); + protected alarm: Promise | null = null; constructor(protected innerStorage: ActorStorage) {} + setAlarm(dt: number): Promise { + return this.innerStorage.setAlarm(dt).then(() => { + this.alarm = Promise.resolve(dt); + }); + } + getAlarm(): Promise { + this.alarm ??= this.innerStorage.getAlarm(); + return this.alarm; + } + deleteAlarm(): Promise { + return this.innerStorage.deleteAlarm().then(() => { + this.alarm = null; + }); + } private async getMany( keys: string[][], diff --git a/src/actors/storage/denoKv/alarms.ts b/src/actors/storage/denoKv/alarms.ts new file mode 100644 index 0000000..9b71391 --- /dev/null +++ b/src/actors/storage/denoKv/alarms.ts @@ -0,0 +1,154 @@ +import { actors } from "../../proxy.ts"; +import { kv } from "./storage.ts"; + +function minutesNow(timestamp = Date.now()): number { + const date = new Date(timestamp); + + // Set seconds and milliseconds to zero to floor to the start of the next minute + date.setSeconds(0, 0); + + // Add one minute (60,000 milliseconds) + const nextMinute = new Date(date.getTime() + 60 * 1000); + + // Subtract 1 millisecond to get the last millisecond of the current minute + return nextMinute.getTime() - 1; +} + +export interface Alarm extends CreateAlarmPayload { + id: string; + retries?: Retry[]; +} +export interface CreateAlarmPayload { + actorName: string; + actorId: string; + triggerAt: number; +} +export interface Retry { + reason: string; +} +const TRIGGERS = ["triggers"]; +const ALARMS = ["alarms"]; +export const Alarms = { + id: (alarm: Omit) => + `${alarm.actorName}:${alarm.actorId}`, + schedule: async (payload: CreateAlarmPayload): Promise => { + const id = Alarms.id(payload); + const currentAlarm = await Alarms.get(payload); + + const triggerKey = [...TRIGGERS, payload.triggerAt, id]; + const alarm = { ...payload, id }; + let transaction = kv.atomic().set(triggerKey, alarm).check(currentAlarm); + if (currentAlarm.versionstamp !== null && currentAlarm.value) { + transaction = transaction.delete([ + ...TRIGGERS, + currentAlarm.value.triggerAt, + id, + ]); + } + await Alarms.set(payload, transaction); + await transaction.commit(); + return alarm; + }, + get: async (payload: Omit) => { + const id = Alarms.id(payload); + const key = [...ALARMS, id]; + const result = await kv.get(key); + return result; + }, + set: async ( + payload: Omit | Alarm, + denoKv: { set: Deno.Kv["set"] | Deno.AtomicOperation["set"] } = kv, + ) => { + const id = Alarms.id(payload); + const alarm = { ...payload, id }; + const key = [...ALARMS, id]; + const result = denoKv.set(key, alarm); + if (result instanceof Deno.AtomicOperation) { + return; + } + const awaited = await result; + if (awaited.ok) { + throw new Error(`alarm could not be created`); + } + return { + key, + value: alarm, + versionstamp: awaited.versionstamp, + }; + }, + ack: async ( + alarm: Alarm | Omit, + ): Promise => { + const id = Alarms.id(alarm); + const triggerAt = "triggerAt" in alarm + ? alarm.triggerAt + : (await Alarms.get(alarm)).value?.triggerAt; + if (!triggerAt) { + throw new Error(`alarm ${id} has no triggerAt`); + } + + await kv.atomic().delete([...ALARMS, id]).delete([ + ...TRIGGERS, + triggerAt, + id, + ]).commit(); + }, + retry: async (alarm: Alarm, reason: string): Promise => { + const savedAlarm = await Alarms.get(alarm); + + const transaction = kv.atomic().check(savedAlarm); + await Alarms.set({ + ...alarm, + retries: [...alarm.retries ?? [], { reason }], + }, transaction); + await transaction + .commit(); + }, + getRetries: async (alarm: Alarm): Promise => { + const retry = await Alarms.get(alarm); + return retry?.value?.retries; + }, + next: async function* (): AsyncIterableIterator { + const selector = { + prefix: TRIGGERS, + end: [...TRIGGERS, minutesNow()], + }; + const iter = kv.list(selector); + + for await (const alarm of iter) yield alarm.value; + }, +}; + +const tryAck = async (alarm: Alarm): Promise => { + try { + const proxy = actors.proxy<{ alarm: () => Promise }>( + alarm.actorName, + ).id(alarm.actorId); + await proxy.alarm(); + await Alarms.ack(alarm); + } catch (error) { + const retries = await Alarms.getRetries(alarm); + if (retries === undefined) { + console.error(`retrying ${alarm}`, error); + await Alarms.retry(alarm, (error as Error).message); + } else if (retries?.length === 10) { + console.error(`retrying ${alarm}`, error, retries); + await Alarms.ack(alarm); + } + } +}; + +const inflight: Record> = {}; + +export const triggerAlarms = async () => { + for await (const alarm of Alarms.next()) { + inflight[alarm.id] ??= tryAck(alarm).finally(() => + delete inflight[alarm.id] + ); + } + await Promise.all(Object.values(inflight)); +}; +// TODO (@author M. Candeia): this will make all isolates to spin up every 1 minute, would this be a problem? +// Shouldn't instead have a single job that spin ups isolates if they need to be called instead? +// On the other hand it is good because it works locally and remotelly in the same way. +Deno.cron("schedulerD", "* * * * *", triggerAlarms); diff --git a/src/actors/storage/denoKv.ts b/src/actors/storage/denoKv/storage.ts similarity index 84% rename from src/actors/storage/denoKv.ts rename to src/actors/storage/denoKv/storage.ts index e2c6f40..28960e0 100644 --- a/src/actors/storage/denoKv.ts +++ b/src/actors/storage/denoKv/storage.ts @@ -3,7 +3,8 @@ import type { ActorStorage, ActorStorageListOptions, ActorStoragePutOptions, -} from "../storage.ts"; +} from "../../storage.ts"; +import { Alarms } from "./alarms.ts"; export interface StorageOptions { actorName: string; @@ -18,7 +19,7 @@ const ACTORS_DENO_KV_TOKEN = Deno.env.get("ACTORS_DENO_KV_TOKEN"); ACTORS_DENO_KV_TOKEN && Deno.env.set("DENO_KV_ACCESS_TOKEN", ACTORS_DENO_KV_TOKEN); -const kv = await Deno.openKv(ACTORS_KV_DATABASE); +export const kv = await Deno.openKv(ACTORS_KV_DATABASE); interface AtomicOp { kv: Deno.AtomicOperation; @@ -35,6 +36,27 @@ export class DenoKvActorStorage implements ActorStorage { this.kvOrTransaction = options.atomicOp?.kv ?? kv; this.atomicOp = options.atomicOp; } + async setAlarm(dt: number): Promise { + await Alarms.schedule({ + actorId: this.options.actorId, + actorName: this.options.actorName, + triggerAt: dt, + }); + } + async getAlarm(): Promise { + return await Alarms.get( + { + actorId: this.options.actorId, + actorName: this.options.actorName, + }, + ).then((alarm) => alarm.value?.triggerAt ?? null); + } + async deleteAlarm(): Promise { + await Alarms.ack({ + actorId: this.options.actorId, + actorName: this.options.actorName, + }); + } async atomic(_storage: (st: ActorStorage) => Promise): Promise { if (this.atomicOp) { @@ -62,7 +84,8 @@ export class DenoKvActorStorage implements ActorStorage { // Build the full key based on actor name, id, and provided key buildKey(key: string[]): string[] { - return [this.options.actorName, this.options.actorId, ...key]; + // id should come first as multiples actors name can share same id + return [this.options.actorId, this.options.actorName, ...key]; } // Single get method that handles both single and multiple keys @@ -130,6 +153,10 @@ export class DenoKvActorStorage implements ActorStorage { keys: string[][], options?: ActorStoragePutOptions, ): Promise; + async delete( + keys: string, + options?: ActorStoragePutOptions, + ): Promise; async delete( keyOrKeys: string | string[] | string[][], _options?: ActorStoragePutOptions, @@ -180,7 +207,6 @@ export class DenoKvActorStorage implements ActorStorage { ); for await (const entry of iter) { - console.log(entry); result.push([(entry.key as string[]).slice(-2), entry.value]); }