Skip to content

Commit

Permalink
Feat 34 - save LDES Client state (#42)
Browse files Browse the repository at this point in the history
* bump: bump dependency version

* feat: update .gitignore to ignore .ldes folder

* bump: bump dependency version

* feat: switch from default type Readable to type EventStream

* feat: export LDES Client EventStream state after end event

* feat: move saveState and loadState to separate utils file

* bump: bump dependency version

* feat: start using type State from @treecg/actor-init-ldes-client

* feat: use state in createReadStream when state present

* bump: bump dependecy version

* feat: react on now only syncing mode instead of end event and use LDES Client synchronization mode

* feat: comment out the delete folder step

* dist: new dist

* feat: use .ldes/${options.storage}/state.json to save state

* dist: new dist
  • Loading branch information
KasperZutterman authored Oct 28, 2021
1 parent 59251fb commit eb15441
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 191 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,7 @@ typings/
.tern-port

# Test output
output/
output/

# LDES state folder
.ldes/
419 changes: 258 additions & 161 deletions dist/index.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion dist/index.js.map

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"@actions/exec": "^1.1.0",
"@rdfjs/data-model": "^1.3.3",
"@rdfjs/types": "^1.0.1",
"@treecg/actor-init-ldes-client": "^2.4.1",
"@treecg/actor-init-ldes-client": "^2.5.5",
"@treecg/bucketizers": "^1.0.2",
"@treecg/types": "0.0.12",
"date-fns": "^2.23.0",
Expand Down
6 changes: 3 additions & 3 deletions src/data-source-strategy/DatasourceContext.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Readable } from 'stream';
import type { EventStream } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../utils/Config';
import type Datasource from '../utils/interfaces/Datasource';
Expand All @@ -18,8 +18,8 @@ class DatasourceContext {
return this.datasource.getData(config, bucketizer);
}

public getLinkedDataEventStream(url: string): Readable {
return this.datasource.getLinkedDataEventStream(url);
public getLinkedDataEventStream(url: string, storage: string): EventStream {
return this.datasource.getLinkedDataEventStream(url, storage);
}
}

Expand Down
25 changes: 19 additions & 6 deletions src/data-source-strategy/LDESClientDatasource.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { Readable } from 'stream';
import type { EventStream, State } from '@treecg/actor-init-ldes-client';
import { newEngine } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../utils/Config';
import type Datasource from '../utils/interfaces/Datasource';
import { loadState, saveState } from '../utils/State';

class LDESClientDatasource implements Datasource {
public async getData(
Expand All @@ -11,7 +12,7 @@ class LDESClientDatasource implements Datasource {
): Promise<Member[]> {
return new Promise<Member[]>((resolve, reject) => {
try {
const ldes = this.getLinkedDataEventStream(config.url);
const ldes = this.getLinkedDataEventStream(config.url, config.storage);

const data: Member[] = [];

Expand All @@ -20,7 +21,14 @@ class LDESClientDatasource implements Datasource {
data.push(member);
});

ldes.on('end', () => {
ldes.on('now only syncing', () => {
console.log('now only syncing');
ldes.pause();
});

ldes.on('pause', () => {
saveState(ldes.exportState(), config.storage);

console.log('No more data!');
resolve(data);
});
Expand All @@ -31,16 +39,21 @@ class LDESClientDatasource implements Datasource {
});
}

public getLinkedDataEventStream(url: string): Readable {
public getLinkedDataEventStream(url: string, storage: string): EventStream {
const options = {
emitMemberOnce: true,
disablePolling: true,
disableSynchronization: false,
mimeType: 'text/turtle',
representation: 'Quads',
};

const LDESClient = newEngine();
return LDESClient.createReadStream(url, options);
const state: State | null = loadState(storage);
if (state === null) {
return LDESClient.createReadStream(url, options);
}

return LDESClient.createReadStream(url, options, state);
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import BucketizerFragmentStrategy from './fragment-strategy/BucketizerFragmentSt
import FragmentContext from './fragment-strategy/FragmentContext';
import type { Config } from './utils/Config';
import { FileExtension } from './utils/FileExtension';
import { saveState } from './utils/State';

export class Data {
private readonly config: Config;
Expand Down Expand Up @@ -52,6 +53,7 @@ export class Data {
private async processDataStreamingly(): Promise<void> {
const ldes = this.datasourceContext.getLinkedDataEventStream(
this.config.url,
this.config.storage,
);
const bucketizer = await this.fragmentContext.getStrategy().initBucketizer(this.config);

Expand All @@ -66,7 +68,14 @@ export class Data {
tasks.push(this.fragmentContext.fragment(member, this.config));
});

ldes.on('end', async () => {
ldes.on('now only syncing', () => {
console.log('Now only syncing');
ldes.pause();
});

ldes.on('pause', async () => {
saveState(ldes.exportState(), this.config.storage);

await Promise.all(tasks);

const hypermediaControls = bucketizer.getBucketHypermediaControlsMap();
Expand Down
9 changes: 4 additions & 5 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// SOURCE: https://github.com/githubocto/flat/blob/main/src/main.ts
import { execSync } from 'child_process';
import { rmdirSync } from 'fs';
import * as core from '@actions/core';
import { exec } from '@actions/exec';
import { Data } from './data';
Expand All @@ -15,10 +14,10 @@ const run = async (): Promise<void> => {
await exec('git', ['config', 'user.email', `${config.git_email}`]);
core.endGroup();

// Delete output folder to have a clean start, no symlinks and no old data
core.startGroup('Delete output folder');
rmdirSync(config.storage, { recursive: true });
core.endGroup();
// // Delete output folder to have a clean start, no symlinks and no old data
// core.startGroup('Delete output folder');
// rmdirSync(config.storage, { recursive: true });
// core.endGroup();

// Fetches the LDES and applies a fragmentation strategy
core.startGroup('Fetch and write data');
Expand Down
18 changes: 18 additions & 0 deletions src/utils/State.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { existsSync, mkdirSync, writeFileSync, readFileSync } from 'fs';
import type { State } from '@treecg/actor-init-ldes-client';

export const saveState = (state: State, storage: string): void => {
const folder = `./.ldes/${storage}`;
if (!existsSync(folder)) {
mkdirSync(folder);
}
writeFileSync(`${folder}/state.json`, JSON.stringify(state));
};

export const loadState = (storage: string): State | null => {
const folder = `./.ldes/${storage}`;
if (existsSync(`${folder}/state.json`)) {
return JSON.parse(readFileSync(`${folder}/state.json`).toString());
}
return null;
};
4 changes: 2 additions & 2 deletions src/utils/interfaces/Datasource.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import type { Readable } from 'stream';
import type { EventStream } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../Config';

interface Datasource {
getData: (config: Config, bucketizer: Bucketizer) => Promise<Member[]>;
getLinkedDataEventStream: (url: string) => Readable;
getLinkedDataEventStream: (url: string, storage: string) => EventStream;
}

export default Datasource;

0 comments on commit eb15441

Please sign in to comment.