Skip to content

Commit

Permalink
feat: Notifier behind NATS (#57)
Browse files Browse the repository at this point in the history
* test changes

* final testing done

* conditional flag for the nats model is added

* logger added for testing purpose

* error handling

* Update .env

* filter sub changed

* Update .env

* added the error handlers

* changes made in filter subjects

* wip

* changed the name of topic

* changes made in adding consumer

* changes made in some error handling in stream and consumer

* final dev testing is completed

* fix filter subject issue and refactor connecting nats part

* fix previous reviewed code suggestions

* remove unnecessary dent in notification service

* chore: comment unnecessary

* chore: remove commented code and restore template service file

* chore: remove commented code and restore template service file

* chore: remove commented code and restore template service file

* fix: error logging

* chore: changes made after review

* fix: remove numReplicas in func getStreamConfig

* chore: testing flow

* chore: remove test loggings

* fix: msg acknowledgement

* chore: update error logging to give msg payload too

---------

Co-authored-by: Eshank Vaish <[email protected]>
Co-authored-by: Shashwat Dadhich <[email protected]>
Co-authored-by: komalreddy3 <[email protected]>
  • Loading branch information
4 people authored Jun 7, 2024
1 parent 4cc998e commit 6db8a54
Show file tree
Hide file tree
Showing 7 changed files with 372 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1 +1 @@
BASE_URL="http://demo.devtron.info:32080"
BASE_URL="http://demo.devtron.info:32080"
37 changes: 37 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"json-rules-engine": "^2.3.6",
"moment-timezone": "^0.5.31",
"mustache": "^3.0.1",
"nats": "2.10.0",
"notifme-sdk": "^1.14.1",
"pg": "^8.2.1",
"reflect-metadata": "^0.1.13",
Expand Down
206 changes: 206 additions & 0 deletions src/pubSub/pubSub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import {createInbox, JetStreamClient, NatsConnection, NatsError, StreamInfo, StringCodec} from "nats";
import {
GetStreamSubjects,
NatsConsumerConfig,
NatsConsumerWiseConfigMapping,
NatsStreamConfig,
NatsStreamWiseConfigMapping,
NatsTopic,
NatsTopicMapping,
} from "./utils";

import {ConsumerOptsBuilderImpl} from "nats/lib/nats-base-client/jsconsumeropts";

import {ConsumerInfo, ConsumerUpdateConfig, JetStreamManager, StreamConfig} from "nats/lib/nats-base-client/types";


export interface PubSubService {
Subscribe(topic: string, callback: (msg: string) => void): void
}


export class PubSubServiceImpl implements PubSubService {
private nc: NatsConnection
private js: JetStreamClient
private jsm: JetStreamManager
private logger: any


constructor(conn: NatsConnection, jsm: JetStreamManager, logger: any) {
this.nc = conn
this.js = this.nc.jetstream()
this.jsm = jsm
this.logger = logger
}

// ********** Subscribe function provided by consumer

async Subscribe(topic: string, callback: (msg: string) => void) {
const natsTopicConfig: NatsTopic = NatsTopicMapping.get(topic)
const streamName = natsTopicConfig.streamName
const consumerName = natsTopicConfig.consumerName
const queueName = natsTopicConfig.queueName
const inbox = createInbox()
const consumerOptsDetails = new ConsumerOptsBuilderImpl({
name: consumerName,
deliver_subject: inbox,
durable_name: consumerName,
ack_wait: 120 * 1e9,
num_replicas: 0,
filter_subject: topic,


}).bindStream(streamName).callback((err, msg) => {
try {
const msgString = getJsonString(msg.data)
callback(msgString)
} catch (err) {
this.logger.error("msg: "+msg.data+" err: "+err);
}
msg.ack();
})
// *******Creating/Updating stream

const streamConfiguration = NatsStreamWiseConfigMapping.get(streamName)
const streamConfigParsed = getStreamConfig(streamConfiguration, streamName)
await this.addOrUpdateStream(streamName, streamConfigParsed)

//******* Getting consumer configuration

const consumerConfiguration = NatsConsumerWiseConfigMapping.get(consumerName)

// *** newConsumerFound check the consumer is new or not

const newConsumerFound = await this.updateConsumer(streamName, consumerName, consumerConfiguration)

// ********** Creating a consumer

if (newConsumerFound) {
try {
await this.jsm.consumers.add(streamName, {
name: consumerName,
deliver_subject: inbox,
durable_name: consumerName,
ack_wait: 120 * 1e9,
num_replicas: 0,
filter_subject: topic,

})
this.logger.info("consumer added successfully")
} catch (err) {
this.logger.error("error occurred while adding consumer", err)
}


}

// ********* Nats Subscribe() function
try {
await this.js.subscribe(topic, consumerOptsDetails)
this.logger.info("subscribed to nats successfully")

} catch (err) {
this.logger.error("error occurred while subscribing", err)
}


}


async updateConsumer(streamName: string, consumerName: string, consumerConfiguration: NatsConsumerConfig): Promise<boolean> {
let updatesDetected: boolean = false
try {
const info: ConsumerInfo | null = await this.jsm.consumers.info(streamName, consumerName)
if (info) {
if (consumerConfiguration.ack_wait > 0 && info.config.ack_wait != consumerConfiguration.ack_wait) {
info.config.ack_wait = consumerConfiguration.ack_wait
updatesDetected = true
}
if (updatesDetected === true) {

await this.jsm.consumers.update(streamName, consumerName, info.config)
this.logger.info("consumer updated successfully, consumerName: ", consumerName)

}
}
} catch (err) {
if (err instanceof NatsError) {
this.logger.error("error occurred due to reason:", err)

if (err.api_error.err_code === 10014) { // 10014 error code depicts that consumer is not found
return true
}
}
}
return false

}

async addOrUpdateStream(streamName: string, streamConfig: StreamConfig) {
try {
const Info: StreamInfo | null = await this.jsm.streams.info(streamName)
if (Info) {
if (await this.checkConfigChangeReqd(Info.config, streamConfig)) {
await this.jsm.streams.update(streamName, Info.config).catch(
(err) => {
this.logger.error("error occurred during updating streams", err)
}
)
this.logger.info("streams updated successfully")
}
}
} catch (err) {
if (err instanceof NatsError) {
if (err.api_error.err_code === 10059) {

// const cfgToSet = getNewConfig(streamName, streamConfig)
streamConfig.name = streamName
try {
await this.jsm.streams.add(streamConfig)
this.logger.info(" stream added successfully")
} catch (err) {
this.logger.error("error occurred during adding streams", err)
}


} else {
this.logger.error("error occurred due to :", err)
}

}

}

}

async checkConfigChangeReqd(existingStreamInfo: StreamConfig, toUpdateConfig: StreamConfig):Promise<boolean> {
let configChanged: boolean = false
if (toUpdateConfig.max_age != 0 && (toUpdateConfig.max_age != existingStreamInfo.max_age)) {
existingStreamInfo.max_age = toUpdateConfig.max_age
configChanged = true
}
if (!existingStreamInfo.subjects.includes(toUpdateConfig.subjects[0])) { // filter subject if not present already
// If the value is not in the array, append it
existingStreamInfo.subjects.push(toUpdateConfig.subjects[0]);
configChanged = true
}

return configChanged
}

}

function getJsonString(bytes: Uint8Array) {
const sc = StringCodec();
return JSON.stringify(sc.decode(bytes))

}

function getStreamConfig(streamConfig: NatsStreamConfig, streamName: string) {

return {
max_age: streamConfig.max_age,
subjects: GetStreamSubjects(streamName),
} as StreamConfig
}

63 changes: 63 additions & 0 deletions src/pubSub/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import * as process from "process";

export const NOTIFICATION_EVENT_TOPIC: string = "NOTIFICATION_EVENT_TOPIC"
export const NOTIFICATION_EVENT_GROUP: string = "NOTIFICATION_EVENT_GROUP"
export const NOTIFICATION_EVENT_DURABLE: string = "NOTIFICATION_EVENT_DURABLE"
export const ORCHESTRATOR_STREAM: string = "ORCHESTRATOR"
const ackWait: number = parseInt(process.env.ACK_WAIT)
const consumerReplica: number = parseInt(process.env.CONSUMER_REPLICAS)
const maxAge: number = parseInt(process.env.MAX_AGE)
const streamReplica: number = parseInt(process.env.STREAM_REPLICA)

export interface NatsTopic {
topicName: string
streamName: string
queueName: string
consumerName: string
}

export interface NatsConsumerConfig {
ack_wait: number
}

export interface NatsStreamConfig {
max_age: number
}

export let NatsTopicMapping = new Map<string, NatsTopic>([
[NOTIFICATION_EVENT_TOPIC, {
topicName: NOTIFICATION_EVENT_TOPIC,
streamName: ORCHESTRATOR_STREAM,
queueName: NOTIFICATION_EVENT_GROUP,
consumerName: NOTIFICATION_EVENT_DURABLE
}]
])


export const NatsConsumerWiseConfigMapping = new Map<string, NatsConsumerConfig>(
[[NOTIFICATION_EVENT_DURABLE, {

ack_wait: !isNaN(ackWait) ? ackWait * 1e9 : 120 * 1e9,

}]
]);

export const NatsStreamWiseConfigMapping = new Map<string, NatsStreamConfig>(
[[ORCHESTRATOR_STREAM, {

max_age: !isNaN(maxAge) ? maxAge * 1e9 : 86400 * 1e9,

}]
]);

export function GetStreamSubjects(streamName: string): string[] {
let subjArr: string[] = [];
for (const [_, natsTopic] of NatsTopicMapping) {
if (natsTopic.streamName === streamName) {
subjArr.push(natsTopic.topicName);
}
}

return subjArr;

}
Loading

0 comments on commit 6db8a54

Please sign in to comment.