Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

auto import extra considerations #850

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/api/src/app/import-jobs/dtos/create-userjob.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ export class CreateUserJobDto {
@IsString()
@IsOptional()
extra?: string;

@IsString()
@IsOptional()
authHeaderValue?: string;
}
17 changes: 9 additions & 8 deletions apps/api/src/app/import-jobs/import-jobs.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@ import { ApiTags, ApiSecurity, ApiOperation } from '@nestjs/swagger';
import { Body, Controller, Delete, Get, Param, ParseArrayPipe, Post, Put, UseGuards } from '@nestjs/common';
import {
CreateUserJob,
GetColumnSchemaMapping,
CreateJobMapping,
UpdateUserJob,
CreateJobMapping,
GetColumnSchemaMapping,
GetUserJob,
UserJobPause,
UserJobDelete,
UserJobResume,
UserJobTerminate,
UserJobDelete,
} from './usecase';
import { ACCESS_KEY_NAME } from '@impler/shared';
import { JwtAuthGuard } from '@shared/framework/auth.gaurd';
import { UpdateJobDto, CreateUserJobDto, UpdateJobMappingDto } from './dtos';

@ApiTags('Import-Jobs')
@ApiTags('Import Jobs')
@Controller('/import-jobs')
@UseGuards(JwtAuthGuard)
@ApiSecurity(ACCESS_KEY_NAME)
Expand All @@ -35,12 +35,13 @@ export class ImportJobsController {
@Post(':templateId')
@ApiOperation({ summary: 'Create User-Job' })
@ApiSecurity(ACCESS_KEY_NAME)
async createUserJobRoute(@Param('templateId') templateId: string, @Body() createUserJobData: CreateUserJobDto) {
async createUserJobRoute(@Param('templateId') templateId: string, @Body() jobData: CreateUserJobDto) {
return this.createUserJob.execute({
_templateId: templateId,
url: createUserJobData.url,
extra: createUserJobData.extra,
externalUserId: createUserJobData.externalUserId,
url: jobData.url,
extra: jobData.extra,
externalUserId: jobData.externalUserId,
authHeaderValue: jobData.authHeaderValue,
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export class CreateUserJobCommand {
url: string;
extra?: string;
_templateId: string;
externalUserId?: string;
extra?: string;
authHeaderValue?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ export class CreateUserJob {
private readonly userJobRepository: UserJobRepository
) {}

async execute({ _templateId, url, externalUserId, extra }: CreateUserJobCommand): Promise<UserJobEntity> {
async execute({
url,
extra,
_templateId,
externalUserId,
authHeaderValue,
}: CreateUserJobCommand): Promise<UserJobEntity> {
const mimeType = await this.rssService.getMimeType(url);
if (mimeType === FileMimeTypesEnum.XML || mimeType === FileMimeTypesEnum.TEXTXML) {
const { rssKeyHeading } = await this.rssService.parseRssFeed(url);
Expand All @@ -23,9 +29,10 @@ export class CreateUserJob {

return await this.userJobRepository.create({
url,
extra,
authHeaderValue,
headings: rssKeyHeading,
_templateId: _templateId,
extra,
externalUserId: externalUserId || (formattedExtra as unknown as Record<string, any>)?.externalUserId,
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ export class GetImportJobDataConsumer extends BaseConsumer {
const data = JSON.parse(message.content) as { _jobId: string };
const importJobHistoryId = this.commonRepository.generateMongoId().toString();
const importedData = await this.getJobImportedData(data._jobId);
const allDataFilePath = this.fileNameService.getAllJsonDataFilePath(importJobHistoryId);
await this.convertRecordsToJsonFile(importJobHistoryId, importedData);
await this.importJobHistoryRepository.create({
_id: importJobHistoryId,
_jobId: data._jobId,
allDataFilePath: this.fileNameService.getAllJsonDataFilePath(importJobHistoryId),
allDataFilePath,
status: ImportJobHistoryStatusEnum.PROCESSING,
});
const userJobInfo = await this.userJobRepository.getUserJobWithTemplate(data._jobId);
Expand All @@ -42,7 +43,7 @@ export class GetImportJobDataConsumer extends BaseConsumer {
});

if (webhookDestination?.callbackUrl) {
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, { importJobHistoryId });
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, { _jobId: data._jobId, allDataFilePath });
}

return;
Expand Down
2 changes: 1 addition & 1 deletion apps/queue-manager/src/consumers/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export * from './send-webhook-data.consumer';
export * from './end-import.consumer';
export * from './send-bubble-data.consumer';
export * from './import-job-data.consumer';
export * from './get-import-job-data.consumer';
export * from './send-import-job-data.consumer';
79 changes: 33 additions & 46 deletions apps/queue-manager/src/consumers/send-import-job-data.consumer.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,20 @@
import {
TemplateRepository,
WebhookDestinationRepository,
ColumnRepository,
TemplateRepository,
ImportJobHistoryRepository,
UserJobEntity,
WebhookDestinationRepository,
WebhookLogEntity,
UploadRepository,
WebhookLogRepository,
UserJobRepository,
} from '@impler/dal';
import { StorageService } from '@impler/services';
import {
ColumnTypesEnum,
SendImportJobData,
SendImportJobCachedData,
replaceVariablesInObject,
FileEncodingsEnum,
QueuesEnum,
ColumnTypesEnum,
UploadStatusEnum,
FileEncodingsEnum,
ColumnDelimiterEnum,
} from '@impler/shared';

Expand All @@ -30,17 +27,15 @@ const DEFAULT_PAGE = 1;

export class SendImportJobDataConsumer extends BaseConsumer {
private columnRepository: ColumnRepository = new ColumnRepository();
private uploadRepository: UploadRepository = new UploadRepository();
private userJobRepository: UserJobRepository = new UserJobRepository();
private templateRepository: TemplateRepository = new TemplateRepository();
private webhookLogRepository: WebhookLogRepository = new WebhookLogRepository();
private importJobHistoryRepository: ImportJobHistoryRepository = new ImportJobHistoryRepository();
private webhookDestinationRepository: WebhookDestinationRepository = new WebhookDestinationRepository();
private storageService: StorageService = getStorageServiceClass();

async message(message: { content: string }) {
const data = JSON.parse(message.content) as SendImportJobData;
const cachedData = data.cache || (await this.getInitialCachedData(data.importJobHistoryId));
const cachedData = data.cache || (await this.getInitialCachedData(data._jobId, data.allDataFilePath));
let allDataJson: null | any[] = null;

if (cachedData && cachedData.callbackUrl) {
Expand All @@ -53,7 +48,7 @@ export class SendImportJobDataConsumer extends BaseConsumer {
}
const { sendData, page } = this.buildSendData({
data: allDataJson,
uploadId: data.importJobHistoryId,
uploadId: data._jobId,
chunkSize: cachedData.chunkSize,
recordFormat: cachedData.recordFormat,
chunkFormat: cachedData.chunkFormat,
Expand All @@ -67,14 +62,14 @@ export class SendImportJobDataConsumer extends BaseConsumer {

const response = await this.makeApiCall({
data: sendData,
uploadId: data.importJobHistoryId,
uploadId: data._jobId,
page,
method: 'POST',
url: cachedData.callbackUrl,
headers,
});

this.makeResponseEntry(response);
await this.makeResponseEntry(response);

const nextPageNumber = this.getNextPageNumber({
totalRecords: allDataJson.length,
Expand All @@ -85,15 +80,16 @@ export class SendImportJobDataConsumer extends BaseConsumer {
if (nextPageNumber) {
// Make next call
publishToQueue(QueuesEnum.SEND_IMPORT_JOB_DATA, {
importJobHistoryId: data.importJobHistoryId,
_jobId: data._jobId,
allDataFilePath: data.allDataFilePath,
cache: {
...cachedData,
page: nextPageNumber,
},
});
} as SendImportJobData);
} else {
// Processing is done
this.finalizeUpload(data.importJobHistoryId);
this.finalizeUpload(data._jobId);
}
}
}
Expand All @@ -115,10 +111,10 @@ export class SendImportJobDataConsumer extends BaseConsumer {
Math.min(page * chunkSize, data.length)
);

if (multiSelectHeadings && Object.keys(multiSelectHeadings).length > 0) {
if (Array.isArray(multiSelectHeadings) && multiSelectHeadings.length > 0) {
slicedData = slicedData.map((obj) => {
Object.keys(multiSelectHeadings).forEach((heading) => {
obj.record[heading] = obj.record[heading] ? obj.record[heading].split(multiSelectHeadings[heading]) : [];
multiSelectHeadings.forEach((heading) => {
obj[heading] = obj[heading] ? (Array.isArray(obj[heading]) ? obj[heading] : obj[heading].split(',')) : [];
});

return obj;
Expand All @@ -144,21 +140,16 @@ export class SendImportJobDataConsumer extends BaseConsumer {
};
}

private async getInitialCachedData(_importJobHistoryId: string): Promise<SendImportJobCachedData> {
const importJobHistory = await this.importJobHistoryRepository.getHistoryWithJob(_importJobHistoryId, [
'_templateId',
]);
const userJobEmail = await this.userJobRepository.getUserEmailFromJobId(importJobHistory._jobId);
private async getInitialCachedData(_jobId: string, allDataFilePath: string): Promise<SendImportJobCachedData> {
const userJob = await this.userJobRepository.findById(_jobId);
const userJobEmail = await this.userJobRepository.getUserEmailFromJobId(_jobId);

const columns = await this.columnRepository.find({
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
_templateId: userJob._templateId,
});
const templateData = await this.templateRepository.findById(
(importJobHistory._jobId as unknown as UserJobEntity)._templateId,
'name _projectId code'
);
const templateData = await this.templateRepository.findById(userJob._templateId, 'name _projectId code');
const webhookDestination = await this.webhookDestinationRepository.findOne({
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
_templateId: userJob._templateId,
});

if (!webhookDestination || !webhookDestination.callbackUrl) {
Expand All @@ -175,33 +166,29 @@ export class SendImportJobDataConsumer extends BaseConsumer {
});
}

this.importJobHistoryRepository.create({
_jobId: importJobHistory._id,
});

return {
page: 1,
allDataFilePath,
email: userJobEmail,
_templateId: (importJobHistory._jobId as unknown as UserJobEntity)._templateId,
multiSelectHeadings,
extra: userJob.extra,
name: templateData.name,
_templateId: userJob._templateId,
authHeaderValue: userJob.authHeaderValue,
callbackUrl: webhookDestination?.callbackUrl,
chunkSize: webhookDestination?.chunkSize,
name: templateData.name,
page: 1,
extra: (importJobHistory._jobId as unknown as UserJobEntity).extra,
authHeaderName: webhookDestination.authHeaderName,
authHeaderValue: '',
allDataFilePath: importJobHistory.allDataFilePath,
defaultValues: JSON.stringify(defaultValueObj),
recordFormat: (importJobHistory._jobId as unknown as UserJobEntity).customRecordFormat,
chunkFormat: (importJobHistory._jobId as unknown as UserJobEntity).customChunkFormat,
multiSelectHeadings,
recordFormat: userJob.customRecordFormat,
chunkFormat: userJob.customChunkFormat,
};
}

private async makeResponseEntry(data: Partial<WebhookLogEntity>) {
return await this.webhookLogRepository.create(data);
return this.importJobHistoryRepository.create(data);
}

private async finalizeUpload(importJobHistoryId: string) {
return await this.uploadRepository.update({ _id: importJobHistoryId }, { status: UploadStatusEnum.COMPLETED });
private async finalizeUpload(_jobId: string) {
return await this.userJobRepository.update({ _id: _jobId }, { status: UploadStatusEnum.COMPLETED });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { useMutation } from '@tanstack/react-query';
import { useForm, SubmitHandler } from 'react-hook-form';

import { notifier } from '@util';
import { IAutoImportValues } from '@types';
import { useAppState } from '@store/app.context';
import { useAPIState } from '@store/api.context';
import { useJobsInfo } from '@store/jobinfo.context';
import { useImplerState } from '@store/impler.context';
import { IUserJob, IErrorObject } from '@impler/shared';
import { IAutoImportValues } from '@types';
import { useAppState } from '@store/app.context';

interface IUseAutoImportPhase1Props {
goNext: () => void;
Expand Down
2 changes: 2 additions & 0 deletions libs/dal/src/repositories/user-job/user-job.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export class UserJobEntity {

status: string;

authHeaderValue: string;

customRecordFormat: string;

customChunkFormat: string;
Expand Down
3 changes: 3 additions & 0 deletions libs/dal/src/repositories/user-job/user-job.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ const userJobSchema = new Schema(
externalUserId: {
type: Schema.Types.String,
},
authHeaderValue: {
type: Schema.Types.String,
},
status: {
type: Schema.Types.String,
},
Expand Down
4 changes: 2 additions & 2 deletions libs/shared/src/entities/UserJob/Userjob.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export interface IUserJob {
_id: string;
url: string;
_templateId: string;
headings: string[];
cron: string;
headings: string[];
_templateId: string;
}
3 changes: 2 additions & 1 deletion libs/shared/src/types/upload/upload.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ export type SendWebhookData = {
};

export type SendImportJobData = {
importJobHistoryId: string;
_jobId: string;
allDataFilePath: string;
cache?: SendImportJobCachedData;
};

Expand Down
Loading