Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix 67 - Removing processDataMemory() #68

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions action.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ inputs:
property_path:
description: 'Property path to be used by bucketizers'
required: false
stream_data:
description: 'Boolean whether to stream the LDES members or the load them in memory'
required: false
default: 'false'
timeout:
description: 'Amount of time in milliseconds to wait for the datasource to fetch data in a single run'
required: false
Expand Down Expand Up @@ -72,7 +68,6 @@ runs:
INPUT_FRAGMENTATION_PAGE_SIZE: ${{ inputs.fragmentation_page_size }}
INPUT_DATASOURCE_STRATEGY: ${{ inputs.datasource_strategy }}
INPUT_PROPERTY_PATH: ${{ inputs.property_path }}
INPUT_STREAM_DATA: ${{ inputs.stream_data }}
INPUT_TIMEOUT: ${{ inputs.timeout }}
INPUT_ANNOUNCE: ${{ inputs.announce }}
- name: Run post script
Expand Down
6 changes: 0 additions & 6 deletions src/data-source-strategy/DatasourceContext.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import type { EventStream } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../utils/Config';
import type Datasource from '../utils/interfaces/Datasource';

class DatasourceContext {
Expand All @@ -14,10 +12,6 @@ class DatasourceContext {
this.datasource = datasource;
}

public getData(config: Config, bucketizer: Bucketizer): Promise<Member[]> {
return this.datasource.getData(config, bucketizer);
}

public getLinkedDataEventStream(url: string, storage: string): EventStream {
return this.datasource.getLinkedDataEventStream(url, storage);
}
Expand Down
41 changes: 1 addition & 40 deletions src/data-source-strategy/LDESClientDatasource.ts
Original file line number Diff line number Diff line change
@@ -1,49 +1,10 @@
import type { EventStream } from '@treecg/actor-init-ldes-client';
import { newEngine } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../utils/Config';
import type Datasource from '../utils/interfaces/Datasource';
import type { LDESActionState } from '../utils/State';
import { loadState, saveState } from '../utils/State';
import { loadState } from '../utils/State';

class LDESClientDatasource implements Datasource {
public async getData(
config: Config,
bucketizer: Bucketizer,
): Promise<Member[]> {
return new Promise<Member[]>((resolve, reject) => {
try {
const ldes = this.getLinkedDataEventStream(config.url, config.storage);

const data: Member[] = [];

// If run takes longer than x minutes, pause the LDES Client
const timeout = setTimeout(() => ldes.pause(), config.timeout);

ldes.on('data', (member: Member) => {
bucketizer.bucketize(member.quads, member.id.value);
data.push(member);
});

ldes.on('now only syncing', () => {
timeout.unref();
console.log('now only syncing');
ldes.pause();
});

ldes.on('pause', () => {
saveState(ldes.exportState(), bucketizer.exportState(), config.storage);

console.log('No more data!');
resolve(data);
});
} catch (error: unknown) {
console.error(error);
return reject(error);
}
});
}

public getLinkedDataEventStream(url: string, storage: string): EventStream {
const options = {
emitMemberOnce: true,
Expand Down
67 changes: 3 additions & 64 deletions src/data.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { existsSync, mkdirSync } from 'fs';

import type * as RDF from '@rdfjs/types';
import type { Member, Bucketizer, RelationParameters } from '@treecg/types';
import type { Member } from '@treecg/types';
import DatasourceContext from './data-source-strategy/DatasourceContext';
import LDESClientDatasource from './data-source-strategy/LDESClientDatasource';
import BucketizerFragmentStrategy from './fragment-strategy/BucketizerFragmentStrategy';
Expand All @@ -15,7 +15,7 @@ export class Data {
private readonly datasourceContext: DatasourceContext;
private readonly fragmentContext: FragmentContext;

private RDFData: Member[];
private readonly RDFData: Member[];

public constructor(config: Config) {
this.config = config;
Expand All @@ -37,18 +37,7 @@ export class Data {
}

public processData(): Promise<void> {
return this.config.stream_data ?
this.processDataStreamingly() :
this.processDataMemory();
}

private async processDataMemory(): Promise<void> {
const bucketizer = await this.fragmentContext.getStrategy().initBucketizer(this.config);
await this.fetchData(bucketizer);

const hypermediaControls = bucketizer.getBucketHypermediaControlsMap();
const propertyPathQuads = bucketizer.getPropertyPathQuads();
await this.writeData(hypermediaControls, propertyPathQuads);
return this.processDataStreamingly();
}

private async processDataStreamingly(): Promise<void> {
Expand Down Expand Up @@ -105,56 +94,6 @@ export class Data {
this.datasourceContext.setDatasource(new LDESClientDatasource());
}

/**
* Fetch data using Datasource
*/
public async fetchData(bucketizer: Bucketizer): Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try {
this.RDFData = await this.datasourceContext.getData(
this.config,
bucketizer,
);
return resolve();
} catch (error: unknown) {
console.error(error);
return reject(error);
}
});
}

/**
* Write fetched data and hypermediacontrols to the output directory supplied in the config file
*/
public writeData(
hypermediaControls: Map<string, RelationParameters[]>,
propertyPathQuads: RDF.Quad[],
): Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try {
const tasks: any[] = [];
this.RDFData.forEach(member => {
const extension = this.getOutputExtension(member.quads);
this.fragmentContext.setFileExtension(extension);

tasks.push(this.fragmentContext.fragment(member, this.config));
});

await Promise.all(tasks);
await this.fragmentContext.addHypermediaControls(
hypermediaControls,
propertyPathQuads,
this.config,
);

return resolve();
} catch (error: unknown) {
console.error(error);
return reject(error);
}
});
}

private getOutputExtension(quads: RDF.Quad[]): FileExtension {
const graphlessQuads = quads.filter(quad => quad.graph.termType === 'DefaultGraph');
return graphlessQuads.length > 0 ? FileExtension.Turtle : FileExtension.TriG;
Expand Down
4 changes: 0 additions & 4 deletions src/utils/Config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ export interface Config {
// Property path that will be resolved by bucketizer
property_path: string;

// Whether or not to process LDES member once received or to load all of them in memory
stream_data: boolean;

// Amount of time to wait for the datasource to fetch data in a single run
timeout: number;

Expand All @@ -56,7 +53,6 @@ export function getConfig(): Config {
),
datasource_strategy: core.getInput('datasource_strategy'),
property_path: core.getInput('property_path'),
stream_data: core.getBooleanInput('stream_data') || false,
timeout: Number.parseInt(core.getInput('timeout'), 10) || 1_000 * 60 * 60 * 1,
announce: core.getBooleanInput('announce') || false,
};
Expand Down
3 changes: 0 additions & 3 deletions src/utils/interfaces/Datasource.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import type { EventStream } from '@treecg/actor-init-ldes-client';
import type { Member, Bucketizer } from '@treecg/types';
import type { Config } from '../Config';

interface Datasource {
getData: (config: Config, bucketizer: Bucketizer) => Promise<Member[]>;
getLinkedDataEventStream: (url: string, storage: string) => EventStream;
}

Expand Down