Skip to content

Commit

Permalink
feat: Implement dead letter endpoints
Browse files Browse the repository at this point in the history
* Implement backend api endpoints for dead letter CRUD
* Include dead letter queue count in client status data
* Implement dead letter client view
* Partially working retry/remove actions for client (request is OK, RTK not working for update yet)
  • Loading branch information
FoxxMD committed Oct 11, 2023
1 parent c8c5f5d commit fbb67a8
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 30 deletions.
6 changes: 1 addition & 5 deletions src/backend/common/infrastructure/Atomic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {Logger} from '@foxxmd/winston';
import TupleMap from "../TupleMap";
import {Request, Response} from "express";
import {NextFunction, ParamsDictionary, Query} from "express-serve-static-core";
import { LogLevel, logLevels, PlayMeta, PlayObject } from "../../../core/Atomic";
import {LogLevel, logLevels, PlayMeta, PlayObject} from "../../../core/Atomic";

export type SourceType = 'spotify' | 'plex' | 'tautulli' | 'subsonic' | 'jellyfin' | 'lastfm' | 'deezer' | 'ytmusic' | 'mpris' | 'mopidy' | 'listenbrainz' | 'jriver' | 'kodi' | 'webscrobbler';
export const sourceTypes: SourceType[] = ['spotify', 'plex', 'tautulli', 'subsonic', 'jellyfin', 'lastfm', 'deezer', 'ytmusic', 'mpris', 'mopidy', 'listenbrainz', 'jriver', 'kodi', 'webscrobbler'];
Expand Down Expand Up @@ -213,7 +213,3 @@ export const TIME_WEIGHT = 0.5;
export const REFERENCE_WEIGHT = 0.5;
export const DUP_SCORE_THRESHOLD = 1;

export interface SourceScrobble {
source: string
play: PlayObject
}
44 changes: 25 additions & 19 deletions src/backend/scrobblers/AbstractScrobbleClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import {
INITIALIZING,
InitState,
NOT_INITIALIZED, REFERENCE_WEIGHT,
ScrobbledPlayObject, SourceScrobble, TIME_WEIGHT, TITLE_WEIGHT,
ScrobbledPlayObject, TIME_WEIGHT, TITLE_WEIGHT,
} from "../common/infrastructure/Atomic";
import winston, {Logger} from '@foxxmd/winston';
import { CommonClientConfig } from "../common/infrastructure/config/client/index";
import { ClientConfig } from "../common/infrastructure/config/client/clients";
import { Notifiers } from "../notifier/Notifiers";
import {FixedSizeList} from 'fixed-size-list';
import { PlayObject, TrackStringOptions } from "../../core/Atomic";
import {DeadLetterScrobble, PlayObject, QueuedScrobble, SourceScrobble, TrackStringOptions} from "../../core/Atomic";
import {buildTrackString, capitalize, truncateStringToLength} from "../../core/StringUtils";
import EventEmitter from "events";
import {compareScrobbleArtists, compareScrobbleTracks, normalizeStr} from "../utils/StringUtils";
Expand Down Expand Up @@ -62,8 +62,8 @@ export default abstract class AbstractScrobbleClient {
scrobbleRetries: number = 0;
scrobbling: boolean = false;
userScrobblingStopSignal: undefined | any;
queuedScrobbles: (SourceScrobble & {id: string})[] = [];
deadLetterScrobbles: (SourceScrobble & {id: string, retries: number})[] = [];
queuedScrobbles: QueuedScrobble<PlayObject>[] = [];
deadLetterScrobbles: DeadLetterScrobble<PlayObject>[] = [];

config: CommonClientConfig;
logger: Logger;
Expand Down Expand Up @@ -401,7 +401,8 @@ ${closestMatch.breakdowns.join('\n')}`);
}
}
try {
return this.doScrobble(playObj);
throw new UpstreamError('a test', {showStopper: false});
//return this.doScrobble(playObj);
} finally {
this.lastScrobbleAttempt = dayjs();
}
Expand Down Expand Up @@ -588,29 +589,29 @@ ${closestMatch.breakdowns.join('\n')}`);
return;
}

const idsToRemove = [];
const removedIds = [];
for (const deadScrobble of this.deadLetterScrobbles) {
if (deadScrobble.retries < retries) {
const scrobbled = await this.processDeadLetterScrobble(deadScrobble.id);
const [scrobbled, dead] = await this.processDeadLetterScrobble(deadScrobble.id);
if (scrobbled) {
idsToRemove.push(deadScrobble.id);
removedIds.push(deadScrobble.id);
}
}
}
if (idsToRemove.length > 0) {
this.deadLetterScrobbles = this.deadLetterScrobbles.filter(x => !idsToRemove.includes(x.id));
this.logger.info(`Removed ${idsToRemove.length} scrobbles from dead letter queue`, {leaf: 'Dead Letter'});
if (removedIds.length > 0) {
this.logger.info(`Removed ${removedIds.length} scrobbles from dead letter queue`, {leaf: 'Dead Letter'});
}
}

processDeadLetterScrobble = async (id: string) => {
processDeadLetterScrobble = async (id: string): Promise<[boolean, DeadLetterScrobble<PlayObject>?]> => {
const deadScrobbleIndex = this.deadLetterScrobbles.findIndex(x => x.id === id);
const deadScrobble = this.deadLetterScrobbles[deadScrobbleIndex];

if (!(await this.isReady())) {
this.logger.warn('Cannot process dead letter scrobble because client is not ready.', {leaf: 'Dead Letter'});
return;
return [false, deadScrobble];
}
const deadScrobbleIndex = this.deadLetterScrobbles.findIndex(x => x.id === id);
const deadScrobble = this.deadLetterScrobbles[deadScrobbleIndex];
if (this.lastScrobbleCheck.unix() < this.getLatestQueuePlayDate().unix()) {
if (this.getLatestQueuePlayDate() !== undefined && this.lastScrobbleCheck.unix() < this.getLatestQueuePlayDate().unix()) {
await this.refreshScrobbles();
}
const [timeFrameValid, timeFrameValidLog] = this.timeFrameIsValid(deadScrobble.play);
Expand All @@ -621,17 +622,22 @@ ${closestMatch.breakdowns.join('\n')}`);
this.addScrobbledTrack(deadScrobble.play, scrobbledPlay);
} catch (e) {
deadScrobble.retries++;
deadScrobble.lastRetry = dayjs();
this.logger.error(`Could not scrobble ${buildTrackString(deadScrobble.play)} from Source '${deadScrobble.source}' due to error`, {leaf: 'Dead Letter'});
this.logger.error(e);
this.deadLetterScrobbles[deadScrobbleIndex] = deadScrobble;
return false;
return [false, deadScrobble];
} finally {
await sleep(1000);
}
} else if (!timeFrameValid) {
this.logger.debug(`Will not scrobble ${buildTrackString(deadScrobble.play)} from Source '${deadScrobble.source}' because it ${timeFrameValidLog}`, {leaf: 'Dead Letter'});
}
return true;
if(deadScrobble !== undefined) {
this.removeDeadLetterScrobble(deadScrobble.id)
}

return [true];
}

removeDeadLetterScrobble = (id: string) => {
Expand All @@ -658,7 +664,7 @@ ${closestMatch.breakdowns.join('\n')}`);
this.queuedScrobbles.sort((a, b) => sortByOldestPlayDate(a.play, b.play));
}

protected addDeadLetterScrobble = (data: SourceScrobble & { id?: string, retries?: number }) => {
protected addDeadLetterScrobble = (data: QueuedScrobble<PlayObject>) => {
this.deadLetterScrobbles.push({id: nanoid(), retries: 0, ...data});
this.deadLetterScrobbles.sort((a, b) => sortByOldestPlayDate(a.play, b.play));
}
Expand Down
66 changes: 64 additions & 2 deletions src/backend/server/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {getRoot} from "../ioc";
import {makeClientCheckMiddle, makeSourceCheckMiddle} from "./middleware";
import AbstractSource from "../sources/AbstractSource";
import {
ClientStatusData,
ClientStatusData, DeadLetterScrobble,
LogInfo,
LogInfoJson,
LogLevel,
Expand Down Expand Up @@ -197,7 +197,8 @@ export const setupApi = (app: ExpressWithAsync, logger: Logger, initialLogOutput
hasAuth: requiresAuth,
hasAuthInteraction: requiresAuthInteraction,
authed,
initialized
initialized,
deadLetterScrobbles: x.deadLetterScrobbles.length
};
if (!initialized) {
base.status = 'Not Initialized';
Expand Down Expand Up @@ -225,6 +226,67 @@ export const setupApi = (app: ExpressWithAsync, logger: Logger, initialLogOutput
return res.json(result);
});

app.getAsync('/api/dead', clientMiddleFunc(true), async (req, res, next) => {
const {
// @ts-expect-error TS(2339): Property 'scrobbleSource' does not exist on type '... Remove this comment to see the full error message
scrobbleClient: client,
} = req;

let result: DeadLetterScrobble<PlayObject>[] = [];
if (client !== undefined) {
result = (client as AbstractScrobbleClient).deadLetterScrobbles;
}

return res.json(result);
});

app.putAsync('/api/dead/:id', clientMiddleFunc(true), async (req, res, next) => {
const {
// @ts-expect-error TS(2339): Property 'scrobbleSource' does not exist on type '... Remove this comment to see the full error message
scrobbleClient: client,
params: {
id
} = {}
} = req;

const deadId = id as string;

const deadScrobble = (client as AbstractScrobbleClient).deadLetterScrobbles.find(x => x.id === deadId);

if(deadScrobble === undefined) {
return res.sendStatus(400);
}

const [scrobbled, dead] = await (client as AbstractScrobbleClient).processDeadLetterScrobble(deadId);

if(scrobbled) {
return res.sendStatus(200);
}

return res.json(dead);
});

app.deleteAsync('/api/dead/:id', clientMiddleFunc(true), async (req, res, next) => {
const {
// @ts-expect-error TS(2339): Property 'scrobbleSource' does not exist on type '... Remove this comment to see the full error message
scrobbleClient: client,
params: {
id
} = {}
} = req;

const deadId = id as string;

const deadScrobble = (client as AbstractScrobbleClient).deadLetterScrobbles.find(x => x.id === deadId);

if(deadScrobble === undefined) {
return res.sendStatus(400);
}

(client as AbstractScrobbleClient).removeDeadLetterScrobble(deadId);
return res.sendStatus(200);
});

app.getAsync('/api/scrobbled', clientMiddleFunc(false), async (req, res, next) => {
const {
// @ts-ignore
Expand Down
5 changes: 5 additions & 0 deletions src/client/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {store} from './store';
import Dashboard from "./dashboard/dashboard";
import RecentPage from "./recent/RecentPage";
import ScrobbledPage from "./scrobbled/ScrobbledPage";
import DeadPage from "./deadLetter/DeadPage";

function NoMatch() {
let location = useLocation();
Expand All @@ -35,6 +36,10 @@ const router = createBrowserRouter([
path: "/scrobbled",
element: <ScrobbledPage />,
},
{
path: "/dead",
element: <DeadPage />,
},
{
path: "*",
element: <NoMatch/>
Expand Down
7 changes: 5 additions & 2 deletions src/client/components/statusCard/ClientStatusCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ const ClientStatusCard = (props: ClientStatusCardData) => {
name,
type,
display,
status
status,
tracksDiscovered = 0,
deadLetterScrobbles = 0
} = {}
} = props;
let header: string | undefined = display;
Expand All @@ -50,7 +52,8 @@ const ClientStatusCard = (props: ClientStatusCardData) => {

// TODO links
body = (<Fragment>
<div>{scrobbled}: {data.tracksDiscovered}</div>
<div>{scrobbled}: {tracksDiscovered}</div>
<div><Link to={`/dead?type=${type}&name=${name}`}>Failed Scrobbles</Link>: {deadLetterScrobbles}</div>
{hasAuth ? <a target="_blank" href={`/api/client/auth?name=${name}&type=${type}`}>(Re)authenticate or initialize</a> : null}
</Fragment>);
}
Expand Down
61 changes: 61 additions & 0 deletions src/client/deadLetter/DeadPage.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import React, {useCallback} from 'react';
import PlayDisplay from "../components/PlayDisplay";
import {recentIncludes} from "../../core/Atomic";
import {useSearchParams} from "react-router-dom";
import {
useGetDeadQuery,
//useLazyProcessDeadSingleQuery,
//useLazyRemoveDeadSingleQuery,
useRemoveDeadSingleMutation,
useProcessDeadSingleMutation
} from "./deadLetterDucks";
import dayjs from "dayjs";
import {Simulate} from "react-dom/test-utils";
import click = Simulate.click;

const displayOpts = {
include: recentIncludes,
includeWeb: true
}

const dead = () => {
let [searchParams, setSearchParams] = useSearchParams();
const {
data = [],
error,
isLoading,
isSuccess
} = useGetDeadQuery({name: searchParams.get('name'), type: searchParams.get('type')});

// const [removeDeadFetch] = useLazyRemoveDeadSingleQuery();
// const [retryDeadFetch] = useLazyProcessDeadSingleQuery();
const [removeDeadFetch] = useRemoveDeadSingleMutation();
const [retryDeadFetch] = useProcessDeadSingleMutation();

const retryDead = useCallback((id: string) => retryDeadFetch({name: searchParams.get('name'), type: searchParams.get('type'), id}), [retryDeadFetch, searchParams]);
const removeDead = useCallback((id: string) => removeDeadFetch({name: searchParams.get('name'), type: searchParams.get('type'), id}), [removeDeadFetch, searchParams]);

return (
<div className="grid">
<div className="shadow-md rounded bg-gray-500 text-white">
<div className="p-3 font-semibold bg-gray-700 text-white">
<h2>Failed Scrobbles
</h2>
</div>
<div className="p-5">
{isSuccess && !isLoading && data.length === 0 ? 'No failed scrobbles!' : null}
<ul>{data.map(x => (<li className="my-2.5" key={x.id}>
<div className="text-lg"><PlayDisplay data={x.play} buildOptions={displayOpts}/></div>
<div><span className="font-semibold">Source</span>:{x.source.replace('Source -', '')}</div>
<div><span className="font-semibold">Last Retried</span>: {x.lastRetry === undefined ? 'Never' : dayjs.duration(dayjs().diff(dayjs(x.lastRetry))).humanize(true)}</div>
<div><span className="font-semibold">Retries</span>: {x.retries}</div>
<div onClick={() => retryDead(x.id)} className="capitalize underline cursor-pointer">Retry</div>
<div onClick={() => removeDead(x.id)} className="capitalize underline cursor-pointer">Remove</div>
</li>))}</ul>
</div>
</div>
</div>
);
}

export default dead;
47 changes: 47 additions & 0 deletions src/client/deadLetter/deadLetterDucks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import {createApi, fetchBaseQuery} from "@reduxjs/toolkit/dist/query/react/index";
import {DeadLetterScrobble, JsonPlayObject} from "../../core/Atomic";
import {id} from "common-tags";

type DeadResponse = DeadLetterScrobble<JsonPlayObject, string>[];
export const deadApi = createApi({
reducerPath: 'deadApi',
baseQuery: fetchBaseQuery({ baseUrl: '/api/' }),
tagTypes: ['DeadLetters'],
endpoints: (builder) => ({
getDead: builder.query<DeadResponse, {name: string, type: string}>({
query: (params) => `dead?name=${params.name}&type=${params.type}`,
providesTags: ['DeadLetters']
}),
processDeadSingle: builder.mutation<DeadLetterScrobble<JsonPlayObject, string> | undefined, {name: string, type: string, id: string}>({
query:(params) => ({
url: `/dead/${params.id}`,
method: 'PUT',
params: {
name: params.name,
type: params.type
}
})
}),
removeDeadSingle: builder.mutation<DeadLetterScrobble<JsonPlayObject, string> | undefined, {name: string, type: string, id: string}>({
query:(params) => ({
url: `/dead/${params.id}`,
method: 'DELETE',
params: {
name: params.name,
type: params.type
},
transformResponse: (response, meta, arg) => {
if(response === undefined) {
return undefined;
} else {
return response;
}
},
invalidatesTags: ['DeadLetters']
}),
}),
}),
});

//export const { useGetDeadQuery, useLazyProcessDeadSingleQuery, useLazyRemoveDeadSingleQuery } = deadApi;
export const { useGetDeadQuery, useProcessDeadSingleMutation, useRemoveDeadSingleMutation } = deadApi;
4 changes: 3 additions & 1 deletion src/client/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import {logsReducer} from "./logs/logDucks";
import {logsApi} from "./logs/logsApi";
import {recentApi} from "./recent/recentDucks";
import {scrobbledApi} from "./scrobbled/scrobbledDucks";
import {deadApi} from "./deadLetter/deadLetterDucks";

export const store = configureStore({
reducer: {
// Add the generated reducer as a specific top-level slice
[statusApi.reducerPath]: statusApi.reducer,
[logsApi.reducerPath]: logsApi.reducer,
[recentApi.reducerPath]: recentApi.reducer,
[deadApi.reducerPath]: deadApi.reducer,
[scrobbledApi.reducerPath]: scrobbledApi.reducer,
//parts: statusReducer
clients: clientSlice.reducer,
Expand All @@ -23,7 +25,7 @@ export const store = configureStore({
// Adding the api middleware enables caching, invalidation, polling,
// and other useful features of `rtk-query`.
middleware: (getDefaultMiddleware) =>
getDefaultMiddleware().concat([statusApi.middleware, logsApi.middleware, recentApi.middleware, scrobbledApi.middleware]),
getDefaultMiddleware().concat([statusApi.middleware, logsApi.middleware, recentApi.middleware, scrobbledApi.middleware, deadApi.middleware]),
})

// optional, but required for refetchOnFocus/refetchOnReconnect behaviors
Expand Down
Loading

0 comments on commit fbb67a8

Please sign in to comment.