Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alarms powered by cron #5

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ jobs:
runs-on: ubuntu-latest

permissions:
contents: read
id-token: write
contents: read
id-token: write

steps:
- uses: actions/checkout@v4
Expand Down
30 changes: 15 additions & 15 deletions .github/workflows/releaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ on:
- main

permissions:
contents: write # Necessary for accessing and modifying repository content
pull-requests: write # Necessary for interacting with pull requests
actions: write # Necessary for triggering other workflows
contents: write # Necessary for accessing and modifying repository content
pull-requests: write # Necessary for interacting with pull requests
actions: write # Necessary for triggering other workflows

jobs:
tag-discussion:
Expand All @@ -21,8 +21,8 @@ jobs:
- name: Checkout Code
uses: actions/checkout@v3
with:
ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository)
repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo
ref: ${{ github.event.pull_request.base.ref }} # Checkout the base branch (target repository)
repository: ${{ github.event.pull_request.base.repo.full_name }} # Checkout from the target repo

- name: Calculate new versions
id: calculate_versions
Expand Down Expand Up @@ -165,15 +165,15 @@ jobs:
git push origin ${{ steps.determine_version.outputs.new_version }}
- name: Trigger Release Workflow
run: |
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.everest-preview+json" \
https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \
-d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}'
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.everest-preview+json" \
https://api.github.com/repos/${{ github.repository }}/actions/workflows/release.yaml/dispatches \
-d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}'
- name: Trigger Publish Workflow
run: |
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.everest-preview+json" \
https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \
-d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}'
curl -X POST \
-H "Authorization: token ${{ secrets.GITHUB_TOKEN }}" \
-H "Accept: application/vnd.github.everest-preview+json" \
https://api.github.com/repos/${{ github.repository }}/actions/workflows/publish.yaml/dispatches \
-d '{"ref":"main", "inputs":{"tag_name":"${{ steps.determine_version.outputs.new_version }}"}}'
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
"tasks": {
"check": "deno fmt && deno lint --fix && deno check ./src/actors/mod.ts ./src/actors/hono/middleware.ts",
"test": "rm kv;deno test -A --unstable-kv .",
"test": "rm kv;deno test -A --env --unstable-cron --unstable-kv .",
"release": "deno run -A jsr:@deco/scripts/release"
},
"lock": false,
Expand Down
3 changes: 3 additions & 0 deletions src/actors/env.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export const DENO_ISOLATE_INSTANCE_ID: string | undefined = Deno.env.get(
"DENO_ISOLATE_INSTANCE_ID",
);
14 changes: 14 additions & 0 deletions src/actors/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { assertEquals, assertFalse } from "@std/assert";
import { actors } from "./proxy.ts";
import { ActorRuntime } from "./runtime.ts";
import type { ActorState } from "./state.ts";
import { triggerAlarms } from "./storage/denoKv/alarms.ts";
import type { ChannelUpgrader } from "./util/channels/channel.ts";
import { WatchTarget } from "./util/watch.ts";

Expand All @@ -22,6 +23,13 @@ class Counter {
this.watchTarget.notify(this.count);
return this.count;
}
async scheduleIncrement(): Promise<void> {
await this.state.storage.setAlarm(Date.now() + 1); // next tick
}

async alarm() {
await this.increment();
}

async decrement(): Promise<number> {
this.count--;
Expand Down Expand Up @@ -110,4 +118,10 @@ Deno.test("counter tests", async () => {
assertEquals(done, false);
}
watcher.return?.();

const counterWatcher = await actor.watch();
await actor.scheduleIncrement();
await triggerAlarms();
assertEquals(await counterWatcher.next().then((x) => x.value), 2);
counterWatcher.return?.();
});
5 changes: 3 additions & 2 deletions src/actors/runtime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { type ServerSentEventMessage, ServerSentEventStream } from "@std/http";
import { DENO_ISOLATE_INSTANCE_ID } from "./env.ts";
import { ACTOR_ID_HEADER_NAME, ACTOR_ID_QS_NAME } from "./proxy.ts";
import { ActorState } from "./state.ts";
import { DenoKvActorStorage } from "./storage/denoKv.ts";
import { DenoKvActorStorage } from "./storage/denoKv/storage.ts";
import { EVENT_STREAM_RESPONSE_HEADER } from "./stream.ts";
import { isUpgrade, makeWebSocket } from "./util/channels/channel.ts";

Expand Down Expand Up @@ -119,7 +120,7 @@ export class ActorRuntime {
async fetch(req: Request): Promise<Response> {
const url = new URL(req.url);
const actorId = req.headers.get(ACTOR_ID_HEADER_NAME) ??
url.searchParams.get(ACTOR_ID_QS_NAME);
url.searchParams.get(ACTOR_ID_QS_NAME) ?? DENO_ISOLATE_INSTANCE_ID;
if (!actorId) {
return new Response(`missing ${ACTOR_ID_HEADER_NAME} header`, {
status: 400,
Expand Down
3 changes: 3 additions & 0 deletions src/actors/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ export interface ActorStorage {
delete(keys: string[][], options?: ActorStoragePutOptions): Promise<number>;
deleteAll(options?: ActorStoragePutOptions): Promise<void>;
atomic(storage: (st: ActorStorage) => Promise<void>): Promise<void>;
setAlarm(dt: number): Promise<void>;
getAlarm(): Promise<number | null>;
deleteAlarm(): Promise<void>;
}
15 changes: 15 additions & 0 deletions src/actors/storage/cached.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,23 @@ import type {

export class CachedStorage implements ActorStorage {
protected cache: Map<string, any> = new Map<string, any>();
protected alarm: Promise<number | null> | null = null;

constructor(protected innerStorage: ActorStorage) {}
setAlarm(dt: number): Promise<void> {
return this.innerStorage.setAlarm(dt).then(() => {
this.alarm = Promise.resolve(dt);
});
}
getAlarm(): Promise<number | null> {
this.alarm ??= this.innerStorage.getAlarm();
return this.alarm;
}
deleteAlarm(): Promise<void> {
return this.innerStorage.deleteAlarm().then(() => {
this.alarm = null;
});
}

private async getMany<T = unknown>(
keys: string[][],
Expand Down
154 changes: 154 additions & 0 deletions src/actors/storage/denoKv/alarms.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import { actors } from "../../proxy.ts";
import { kv } from "./storage.ts";

function minutesNow(timestamp = Date.now()): number {
const date = new Date(timestamp);

// Set seconds and milliseconds to zero to floor to the start of the next minute
date.setSeconds(0, 0);

// Add one minute (60,000 milliseconds)
const nextMinute = new Date(date.getTime() + 60 * 1000);

// Subtract 1 millisecond to get the last millisecond of the current minute
return nextMinute.getTime() - 1;
}

export interface Alarm extends CreateAlarmPayload {
id: string;
retries?: Retry[];
}
export interface CreateAlarmPayload {
actorName: string;
actorId: string;
triggerAt: number;
}
export interface Retry {
reason: string;
}
const TRIGGERS = ["triggers"];
const ALARMS = ["alarms"];
export const Alarms = {
id: (alarm: Omit<CreateAlarmPayload, "triggerAt">) =>
`${alarm.actorName}:${alarm.actorId}`,
schedule: async (payload: CreateAlarmPayload): Promise<Alarm> => {
const id = Alarms.id(payload);
const currentAlarm = await Alarms.get(payload);

const triggerKey = [...TRIGGERS, payload.triggerAt, id];
const alarm = { ...payload, id };
let transaction = kv.atomic().set(triggerKey, alarm).check(currentAlarm);
if (currentAlarm.versionstamp !== null && currentAlarm.value) {
transaction = transaction.delete([
...TRIGGERS,
currentAlarm.value.triggerAt,
id,
]);
}
await Alarms.set(payload, transaction);
await transaction.commit();
return alarm;
},
get: async (payload: Omit<CreateAlarmPayload, "triggerAt">) => {
const id = Alarms.id(payload);
const key = [...ALARMS, id];
const result = await kv.get<Alarm>(key);
return result;
},
set: async (
payload: Omit<CreateAlarmPayload, "triggerAt"> | Alarm,
denoKv: { set: Deno.Kv["set"] | Deno.AtomicOperation["set"] } = kv,
) => {
const id = Alarms.id(payload);
const alarm = { ...payload, id };
const key = [...ALARMS, id];
const result = denoKv.set(key, alarm);
if (result instanceof Deno.AtomicOperation) {
return;
}
const awaited = await result;
if (awaited.ok) {
throw new Error(`alarm could not be created`);
}
return {
key,
value: alarm,
versionstamp: awaited.versionstamp,
};
},
ack: async (
alarm: Alarm | Omit<CreateAlarmPayload, "triggerAt">,
): Promise<void> => {
const id = Alarms.id(alarm);
const triggerAt = "triggerAt" in alarm
? alarm.triggerAt
: (await Alarms.get(alarm)).value?.triggerAt;
if (!triggerAt) {
throw new Error(`alarm ${id} has no triggerAt`);
}

await kv.atomic().delete([...ALARMS, id]).delete([
...TRIGGERS,
triggerAt,
id,
]).commit();
},
retry: async (alarm: Alarm, reason: string): Promise<void> => {
const savedAlarm = await Alarms.get(alarm);

const transaction = kv.atomic().check(savedAlarm);
await Alarms.set({
...alarm,
retries: [...alarm.retries ?? [], { reason }],
}, transaction);
await transaction
.commit();
},
getRetries: async (alarm: Alarm): Promise<Retry[] | undefined> => {
const retry = await Alarms.get(alarm);
return retry?.value?.retries;
},
next: async function* (): AsyncIterableIterator<Alarm> {
const selector = {
prefix: TRIGGERS,
end: [...TRIGGERS, minutesNow()],
};
const iter = kv.list<Alarm>(selector);

for await (const alarm of iter) yield alarm.value;
},
};

const tryAck = async (alarm: Alarm): Promise<void> => {
try {
const proxy = actors.proxy<{ alarm: () => Promise<void> }>(
alarm.actorName,
).id(alarm.actorId);
await proxy.alarm();
await Alarms.ack(alarm);
} catch (error) {
const retries = await Alarms.getRetries(alarm);
if (retries === undefined) {
console.error(`retrying ${alarm}`, error);
await Alarms.retry(alarm, (error as Error).message);
} else if (retries?.length === 10) {
console.error(`retrying ${alarm}`, error, retries);
await Alarms.ack(alarm);
}
}
};

const inflight: Record<string, Promise<void>> = {};

export const triggerAlarms = async () => {
for await (const alarm of Alarms.next()) {
inflight[alarm.id] ??= tryAck(alarm).finally(() =>
delete inflight[alarm.id]
);
}
await Promise.all(Object.values(inflight));
};
// TODO (@author M. Candeia): this will make all isolates to spin up every 1 minute, would this be a problem?
// Shouldn't instead have a single job that spin ups isolates if they need to be called instead?
// On the other hand it is good because it works locally and remotelly in the same way.
Deno.cron("schedulerD", "* * * * *", triggerAlarms);
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import type {
ActorStorage,
ActorStorageListOptions,
ActorStoragePutOptions,
} from "../storage.ts";
} from "../../storage.ts";
import { Alarms } from "./alarms.ts";

export interface StorageOptions {
actorName: string;
Expand All @@ -18,7 +19,7 @@ const ACTORS_DENO_KV_TOKEN = Deno.env.get("ACTORS_DENO_KV_TOKEN");
ACTORS_DENO_KV_TOKEN &&
Deno.env.set("DENO_KV_ACCESS_TOKEN", ACTORS_DENO_KV_TOKEN);

const kv = await Deno.openKv(ACTORS_KV_DATABASE);
export const kv = await Deno.openKv(ACTORS_KV_DATABASE);

interface AtomicOp {
kv: Deno.AtomicOperation;
Expand All @@ -35,6 +36,27 @@ export class DenoKvActorStorage implements ActorStorage {
this.kvOrTransaction = options.atomicOp?.kv ?? kv;
this.atomicOp = options.atomicOp;
}
async setAlarm(dt: number): Promise<void> {
await Alarms.schedule({
actorId: this.options.actorId,
actorName: this.options.actorName,
triggerAt: dt,
});
}
async getAlarm(): Promise<number | null> {
return await Alarms.get(
{
actorId: this.options.actorId,
actorName: this.options.actorName,
},
).then((alarm) => alarm.value?.triggerAt ?? null);
}
async deleteAlarm(): Promise<void> {
await Alarms.ack({
actorId: this.options.actorId,
actorName: this.options.actorName,
});
}

async atomic(_storage: (st: ActorStorage) => Promise<void>): Promise<void> {
if (this.atomicOp) {
Expand Down Expand Up @@ -62,7 +84,8 @@ export class DenoKvActorStorage implements ActorStorage {

// Build the full key based on actor name, id, and provided key
buildKey(key: string[]): string[] {
return [this.options.actorName, this.options.actorId, ...key];
// id should come first as multiples actors name can share same id
return [this.options.actorId, this.options.actorName, ...key];
}

// Single get method that handles both single and multiple keys
Expand Down Expand Up @@ -130,6 +153,10 @@ export class DenoKvActorStorage implements ActorStorage {
keys: string[][],
options?: ActorStoragePutOptions,
): Promise<number>;
async delete(
keys: string,
options?: ActorStoragePutOptions,
): Promise<number>;
async delete(
keyOrKeys: string | string[] | string[][],
_options?: ActorStoragePutOptions,
Expand Down Expand Up @@ -180,7 +207,6 @@ export class DenoKvActorStorage implements ActorStorage {
);

for await (const entry of iter) {
console.log(entry);
result.push([(entry.key as string[]).slice(-2), entry.value]);
}

Expand Down