From 6db8a54dc7c84099060652929d2006b92b8a72d8 Mon Sep 17 00:00:00 2001 From: kishan kumar <155713364+Kishan789dev@users.noreply.github.com> Date: Fri, 7 Jun 2024 11:52:19 +0530 Subject: [PATCH] feat: Notifier behind NATS (#57) * test changes * final testing done * conditional flag for the nats model is added * logger added for testing purpose * error handling * Update .env * filter sub changed * Update .env * added the error handlers * changes made in filter subjects * wip * changed the name of topic * changes made in adding consumer * changes made in some error handling in stream and consumer * final dev testing is completed * fix filter subject issue and refactor connecting nats part * fix previous reviewed code suggestions * remove unnecessary dent in notification service * chore: comment unnecessary * chore: remove commented code and restore template service file * chore: remove commented code and restore template service file * chore: remove commented code and restore template service file * fix: error logging * chore: changes made after review * fix: remove numReplicas in func getStreamConfig * chore: testing flow * chore: remove test loggings * fix: msg acknowledgement * chore: update error logging to give msg payload too --------- Co-authored-by: Eshank Vaish Co-authored-by: Shashwat Dadhich <92629050+ShashwatDadhich@users.noreply.github.com> Co-authored-by: komalreddy3 --- .env | 2 +- package-lock.json | 37 ++++++++ package.json | 1 + src/pubSub/pubSub.ts | 206 +++++++++++++++++++++++++++++++++++++++++ src/pubSub/utils.ts | 63 +++++++++++++ src/server.ts | 97 ++++++++++++------- src/templateService.ts | 4 +- 7 files changed, 372 insertions(+), 38 deletions(-) create mode 100644 src/pubSub/pubSub.ts create mode 100644 src/pubSub/utils.ts diff --git a/.env b/.env index f36b23b..fa94d10 100644 --- a/.env +++ b/.env @@ -1 +1 @@ -BASE_URL="http://demo.devtron.info:32080" \ No newline at end of file +BASE_URL="http://demo.devtron.info:32080" diff --git a/package-lock.json b/package-lock.json index 164b47c..a538441 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,6 +20,7 @@ "json-rules-engine": "^2.3.6", "moment-timezone": "^0.5.31", "mustache": "^3.0.1", + "nats": "2.10.0", "notifme-sdk": "^1.14.1", "pg": "^8.2.1", "reflect-metadata": "^0.1.13", @@ -1906,6 +1907,18 @@ "thenify-all": "^1.0.0" } }, + "node_modules/nats": { + "version": "2.10.0", + "resolved": "https://registry.npmjs.org/nats/-/nats-2.10.0.tgz", + "integrity": "sha512-ykhXN2IohSj7W8A2j7ptQO/tS8y6SGpi+H7xZoHJEBsBLK6bQcY5pVmOdu5ZTa9QfNlD7AWlFRL63kNAWny0lg==", + "dependencies": { + "nkeys.js": "1.0.3", + "web-streams-polyfill": "^3.2.1" + }, + "engines": { + "node": ">= 14.0.0" + } + }, "node_modules/negotiator": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", @@ -1914,6 +1927,17 @@ "node": ">= 0.6" } }, + "node_modules/nkeys.js": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/nkeys.js/-/nkeys.js-1.0.3.tgz", + "integrity": "sha512-p5Bpb/acPaQmCrbe4gNmMBY/naZJV8Q7m2B9UkXT8BQRC6wjX8zqD2ya8eZu9mpSXQffodV46HCP9OckmxcwYA==", + "dependencies": { + "tweetnacl": "1.0.3" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/node-adm": { "version": "0.9.1", "resolved": "https://registry.npmjs.org/node-adm/-/node-adm-0.9.1.tgz", @@ -2933,6 +2957,11 @@ "typescript": ">=2.1.0 || >=2.1.0-dev || >=2.2.0-dev || >=2.3.0-dev || >=2.4.0-dev || >=2.5.0-dev || >=2.6.0-dev || >=2.7.0-dev || >=2.8.0-dev || >=2.9.0-dev || >= 3.0.0-dev || >= 3.1.0-dev" } }, + "node_modules/tweetnacl": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-1.0.3.tgz", + "integrity": "sha512-6rt+RN7aOi1nGMyC4Xa5DdYiukl2UWCbcJft7YhxReBGQD7OAM8Pbxw6YMo4r2diNEA8FEmu32YOn9rhaiE5yw==" + }, "node_modules/type-is": { "version": "1.6.18", "resolved": "https://registry.npmjs.org/type-is/-/type-is-1.6.18.tgz", @@ -3312,6 +3341,14 @@ "safe-buffer": "^5.0.1" } }, + "node_modules/web-streams-polyfill": { + "version": "3.3.3", + "resolved": "https://registry.npmjs.org/web-streams-polyfill/-/web-streams-polyfill-3.3.3.tgz", + "integrity": "sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==", + "engines": { + "node": ">= 8" + } + }, "node_modules/webidl-conversions": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/webidl-conversions/-/webidl-conversions-3.0.1.tgz", diff --git a/package.json b/package.json index 67eb97f..00d8433 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "json-rules-engine": "^2.3.6", "moment-timezone": "^0.5.31", "mustache": "^3.0.1", + "nats": "2.10.0", "notifme-sdk": "^1.14.1", "pg": "^8.2.1", "reflect-metadata": "^0.1.13", diff --git a/src/pubSub/pubSub.ts b/src/pubSub/pubSub.ts new file mode 100644 index 0000000..f6da424 --- /dev/null +++ b/src/pubSub/pubSub.ts @@ -0,0 +1,206 @@ +import {createInbox, JetStreamClient, NatsConnection, NatsError, StreamInfo, StringCodec} from "nats"; +import { + GetStreamSubjects, + NatsConsumerConfig, + NatsConsumerWiseConfigMapping, + NatsStreamConfig, + NatsStreamWiseConfigMapping, + NatsTopic, + NatsTopicMapping, +} from "./utils"; + +import {ConsumerOptsBuilderImpl} from "nats/lib/nats-base-client/jsconsumeropts"; + +import {ConsumerInfo, ConsumerUpdateConfig, JetStreamManager, StreamConfig} from "nats/lib/nats-base-client/types"; + + +export interface PubSubService { + Subscribe(topic: string, callback: (msg: string) => void): void +} + + +export class PubSubServiceImpl implements PubSubService { + private nc: NatsConnection + private js: JetStreamClient + private jsm: JetStreamManager + private logger: any + + + constructor(conn: NatsConnection, jsm: JetStreamManager, logger: any) { + this.nc = conn + this.js = this.nc.jetstream() + this.jsm = jsm + this.logger = logger + } + + // ********** Subscribe function provided by consumer + + async Subscribe(topic: string, callback: (msg: string) => void) { + const natsTopicConfig: NatsTopic = NatsTopicMapping.get(topic) + const streamName = natsTopicConfig.streamName + const consumerName = natsTopicConfig.consumerName + const queueName = natsTopicConfig.queueName + const inbox = createInbox() + const consumerOptsDetails = new ConsumerOptsBuilderImpl({ + name: consumerName, + deliver_subject: inbox, + durable_name: consumerName, + ack_wait: 120 * 1e9, + num_replicas: 0, + filter_subject: topic, + + + }).bindStream(streamName).callback((err, msg) => { + try { + const msgString = getJsonString(msg.data) + callback(msgString) + } catch (err) { + this.logger.error("msg: "+msg.data+" err: "+err); + } + msg.ack(); + }) + // *******Creating/Updating stream + + const streamConfiguration = NatsStreamWiseConfigMapping.get(streamName) + const streamConfigParsed = getStreamConfig(streamConfiguration, streamName) + await this.addOrUpdateStream(streamName, streamConfigParsed) + + //******* Getting consumer configuration + + const consumerConfiguration = NatsConsumerWiseConfigMapping.get(consumerName) + + // *** newConsumerFound check the consumer is new or not + + const newConsumerFound = await this.updateConsumer(streamName, consumerName, consumerConfiguration) + + // ********** Creating a consumer + + if (newConsumerFound) { + try { + await this.jsm.consumers.add(streamName, { + name: consumerName, + deliver_subject: inbox, + durable_name: consumerName, + ack_wait: 120 * 1e9, + num_replicas: 0, + filter_subject: topic, + + }) + this.logger.info("consumer added successfully") + } catch (err) { + this.logger.error("error occurred while adding consumer", err) + } + + + } + + // ********* Nats Subscribe() function + try { + await this.js.subscribe(topic, consumerOptsDetails) + this.logger.info("subscribed to nats successfully") + + } catch (err) { + this.logger.error("error occurred while subscribing", err) + } + + + } + + + async updateConsumer(streamName: string, consumerName: string, consumerConfiguration: NatsConsumerConfig): Promise { + let updatesDetected: boolean = false + try { + const info: ConsumerInfo | null = await this.jsm.consumers.info(streamName, consumerName) + if (info) { + if (consumerConfiguration.ack_wait > 0 && info.config.ack_wait != consumerConfiguration.ack_wait) { + info.config.ack_wait = consumerConfiguration.ack_wait + updatesDetected = true + } + if (updatesDetected === true) { + + await this.jsm.consumers.update(streamName, consumerName, info.config) + this.logger.info("consumer updated successfully, consumerName: ", consumerName) + + } + } + } catch (err) { + if (err instanceof NatsError) { + this.logger.error("error occurred due to reason:", err) + + if (err.api_error.err_code === 10014) { // 10014 error code depicts that consumer is not found + return true + } + } + } + return false + + } + + async addOrUpdateStream(streamName: string, streamConfig: StreamConfig) { + try { + const Info: StreamInfo | null = await this.jsm.streams.info(streamName) + if (Info) { + if (await this.checkConfigChangeReqd(Info.config, streamConfig)) { + await this.jsm.streams.update(streamName, Info.config).catch( + (err) => { + this.logger.error("error occurred during updating streams", err) + } + ) + this.logger.info("streams updated successfully") + } + } + } catch (err) { + if (err instanceof NatsError) { + if (err.api_error.err_code === 10059) { + + // const cfgToSet = getNewConfig(streamName, streamConfig) + streamConfig.name = streamName + try { + await this.jsm.streams.add(streamConfig) + this.logger.info(" stream added successfully") + } catch (err) { + this.logger.error("error occurred during adding streams", err) + } + + + } else { + this.logger.error("error occurred due to :", err) + } + + } + + } + + } + + async checkConfigChangeReqd(existingStreamInfo: StreamConfig, toUpdateConfig: StreamConfig):Promise { + let configChanged: boolean = false + if (toUpdateConfig.max_age != 0 && (toUpdateConfig.max_age != existingStreamInfo.max_age)) { + existingStreamInfo.max_age = toUpdateConfig.max_age + configChanged = true + } + if (!existingStreamInfo.subjects.includes(toUpdateConfig.subjects[0])) { // filter subject if not present already + // If the value is not in the array, append it + existingStreamInfo.subjects.push(toUpdateConfig.subjects[0]); + configChanged = true + } + + return configChanged + } + +} + +function getJsonString(bytes: Uint8Array) { + const sc = StringCodec(); + return JSON.stringify(sc.decode(bytes)) + +} + +function getStreamConfig(streamConfig: NatsStreamConfig, streamName: string) { + + return { + max_age: streamConfig.max_age, + subjects: GetStreamSubjects(streamName), + } as StreamConfig +} + diff --git a/src/pubSub/utils.ts b/src/pubSub/utils.ts new file mode 100644 index 0000000..1b7ff7e --- /dev/null +++ b/src/pubSub/utils.ts @@ -0,0 +1,63 @@ +import * as process from "process"; + +export const NOTIFICATION_EVENT_TOPIC: string = "NOTIFICATION_EVENT_TOPIC" +export const NOTIFICATION_EVENT_GROUP: string = "NOTIFICATION_EVENT_GROUP" +export const NOTIFICATION_EVENT_DURABLE: string = "NOTIFICATION_EVENT_DURABLE" +export const ORCHESTRATOR_STREAM: string = "ORCHESTRATOR" +const ackWait: number = parseInt(process.env.ACK_WAIT) +const consumerReplica: number = parseInt(process.env.CONSUMER_REPLICAS) +const maxAge: number = parseInt(process.env.MAX_AGE) +const streamReplica: number = parseInt(process.env.STREAM_REPLICA) + +export interface NatsTopic { + topicName: string + streamName: string + queueName: string + consumerName: string +} + +export interface NatsConsumerConfig { + ack_wait: number +} + +export interface NatsStreamConfig { + max_age: number +} + +export let NatsTopicMapping = new Map([ + [NOTIFICATION_EVENT_TOPIC, { + topicName: NOTIFICATION_EVENT_TOPIC, + streamName: ORCHESTRATOR_STREAM, + queueName: NOTIFICATION_EVENT_GROUP, + consumerName: NOTIFICATION_EVENT_DURABLE + }] +]) + + +export const NatsConsumerWiseConfigMapping = new Map( + [[NOTIFICATION_EVENT_DURABLE, { + + ack_wait: !isNaN(ackWait) ? ackWait * 1e9 : 120 * 1e9, + + }] + ]); + +export const NatsStreamWiseConfigMapping = new Map( + [[ORCHESTRATOR_STREAM, { + + max_age: !isNaN(maxAge) ? maxAge * 1e9 : 86400 * 1e9, + + }] + ]); + +export function GetStreamSubjects(streamName: string): string[] { + let subjArr: string[] = []; + for (const [_, natsTopic] of NatsTopicMapping) { + if (natsTopic.streamName === streamName) { + subjArr.push(natsTopic.topicName); + } + } + + return subjArr; + +} diff --git a/src/server.ts b/src/server.ts index e3184ba..c5bff3f 100644 --- a/src/server.ts +++ b/src/server.ts @@ -15,38 +15,45 @@ */ import express from 'express'; -import { NotificationService, Event, Handler } from './notification/service/notificationService' +import {Event, Handler, NotificationService} from './notification/service/notificationService' import "reflect-metadata" -import {ConnectionOptions, createConnection, getConnectionOptions, getManager} from "typeorm" -import { NotificationSettingsRepository } from "./repository/notificationSettingsRepository" -import { SlackService } from './destination/destinationHandlers/slackHandler' -import { SESService } from './destination/destinationHandlers/sesHandler' -import { SMTPService } from './destination/destinationHandlers/smtpHandler' -import { EventLogRepository } from './repository/notifierEventLogRepository' -import { EventLogBuilder } from './common/eventLogBuilder' -import { EventRepository } from './repository/eventsRepository' -import { NotificationTemplatesRepository } from "./repository/templatesRepository"; -import { SlackConfigRepository } from "./repository/slackConfigRepository"; -import { NotificationSettings } from "./entities/notificationSettings"; -import { NotifierEventLog } from "./entities/notifierEventLogs"; -import { NotificationTemplates } from "./entities/notificationTemplates"; -import { SlackConfig } from "./entities/slackConfig"; +import {ConnectionOptions, createConnection} from "typeorm" +import {NotificationSettingsRepository} from "./repository/notificationSettingsRepository" +import {SlackService} from './destination/destinationHandlers/slackHandler' +import {SESService} from './destination/destinationHandlers/sesHandler' +import {SMTPService} from './destination/destinationHandlers/smtpHandler' +import {EventLogRepository} from './repository/notifierEventLogRepository' +import {EventLogBuilder} from './common/eventLogBuilder' +import {EventRepository} from './repository/eventsRepository' +import {NotificationTemplatesRepository} from "./repository/templatesRepository"; +import {SlackConfigRepository} from "./repository/slackConfigRepository"; +import {NotificationSettings} from "./entities/notificationSettings"; +import {NotifierEventLog} from "./entities/notifierEventLogs"; +import {NotificationTemplates} from "./entities/notificationTemplates"; +import {SlackConfig} from "./entities/slackConfig"; import * as winston from 'winston'; -import { SesConfig } from "./entities/sesConfig"; -import { SESConfigRepository } from "./repository/sesConfigRepository"; -import { SMTPConfig } from "./entities/smtpConfig"; -import { SMTPConfigRepository } from "./repository/smtpConfigRepository"; -import { UsersRepository } from './repository/usersRepository'; -import { Users } from "./entities/users"; -import { send } from './tests/sendSlackNotification'; -import { MustacheHelper } from './common/mustacheHelper'; -import { WebhookConfigRepository } from './repository/webhookConfigRepository'; -import { WebhookService } from './destination/destinationHandlers/webhookHandler'; -import { WebhookConfig } from './entities/webhookconfig'; +import {SesConfig} from "./entities/sesConfig"; +import {SESConfigRepository} from "./repository/sesConfigRepository"; +import {SMTPConfig} from "./entities/smtpConfig"; +import {SMTPConfigRepository} from "./repository/smtpConfigRepository"; +import {UsersRepository} from './repository/usersRepository'; +import {Users} from "./entities/users"; +import {MustacheHelper} from './common/mustacheHelper'; +import {WebhookConfigRepository} from './repository/webhookConfigRepository'; +import {WebhookService} from './destination/destinationHandlers/webhookHandler'; +import {WebhookConfig} from './entities/webhookconfig'; import * as process from "process"; -import bodyParser from 'body-parser'; +import {connect, NatsConnection} from "nats"; +import {NOTIFICATION_EVENT_TOPIC} from "./pubSub/utils"; +import {PubSubServiceImpl} from "./pubSub/pubSub"; +import {send} from "./tests/sendSlackNotification"; +import bodyParser from "body-parser"; + const app = express(); +const natsUrl = process.env.NATS_URL app.use(bodyParser.json({ limit: '10mb' })); +app.use(express.json()); + let logger = winston.createLogger({ level: 'info', @@ -62,7 +69,7 @@ let logger = winston.createLogger({ let eventLogRepository: EventLogRepository = new EventLogRepository() let eventLogBuilder: EventLogBuilder = new EventLogBuilder() let slackConfigRepository: SlackConfigRepository = new SlackConfigRepository() -let webhookConfigRepository:WebhookConfigRepository = new WebhookConfigRepository() +let webhookConfigRepository: WebhookConfigRepository = new WebhookConfigRepository() let sesConfigRepository: SESConfigRepository = new SESConfigRepository() let smtpConfigRepository: SMTPConfigRepository = new SMTPConfigRepository() let usersRepository: UsersRepository = new UsersRepository() @@ -81,15 +88,15 @@ handlers.push(smtpService) let notificationService = new NotificationService(new EventRepository(), new NotificationSettingsRepository(), new NotificationTemplatesRepository(), handlers, logger) - - - let dbHost: string = process.env.DB_HOST; const dbPort: number = +process.env.DB_PORT; const user: string = process.env.DB_USER; const pwd: string = process.env.DB_PWD; const db: string = process.env.DB; + + + let dbOptions: ConnectionOptions = { type: "postgres", host: dbHost, @@ -99,18 +106,39 @@ let dbOptions: ConnectionOptions = { database: db, entities: [NotificationSettings, NotifierEventLog, Event, NotificationTemplates, SlackConfig, SesConfig, SMTPConfig, WebhookConfig, Users] } - createConnection(dbOptions).then(async connection => { logger.info("Connected to DB") + if(natsUrl){ + let conn: NatsConnection + (async () => { + logger.info("Connecting to NATS server..."); + conn = await connect({servers: natsUrl}) + const jsm = await conn.jetstreamManager() + const obj = new PubSubServiceImpl(conn, jsm,logger) + await obj.Subscribe(NOTIFICATION_EVENT_TOPIC, natsEventHandler) + })().catch( + (err) => { + logger.error("error occurred due to", err) + } + ) + } + }).catch(error => { logger.error("TypeORM connection error: ", error); logger.error("shutting down notifier due to un-successful database connection...") process.exit(1) }); +const natsEventHandler = (msg: string) => { + const eventAsString = JSON.parse(msg) + const event = JSON.parse(eventAsString) as Event + notificationService.sendNotification(event) +} + + app.get('/', (req, res) => res.send('Welcome to notifier Notifier!')) -app.get('/health', (req, res) =>{ +app.get('/health', (req, res) => { res.status(200).send("healthy") }) @@ -118,11 +146,10 @@ app.get('/test', (req, res) => { send(); res.send('Test!'); }) - app.post('/notify', (req, res) => { logger.info("notifications Received") notificationService.sendNotification(req.body) res.send('notifications sent') }); -app.listen(3000, () => logger.info('Notifier app listening on port 3000!')) \ No newline at end of file +app.listen(3000, () => logger.info('Notifier app listening on port 3000!')) diff --git a/src/templateService.ts b/src/templateService.ts index f0b16da..c054130 100644 --- a/src/templateService.ts +++ b/src/templateService.ts @@ -28,7 +28,7 @@ // getNotificationPayload(event: Event) { // if (this.slackTemplateMap.has(event.type)) { // let template = this.slackTemplateMap.get(event.type) - + // } else { // //err not supported // } @@ -37,4 +37,4 @@ // //const result = Mustache.render(template, hash); -// } +// } \ No newline at end of file