Skip to content

Commit

Permalink
1.1.5
Browse files Browse the repository at this point in the history
Balanced:
- Adds new "nats.js.define-consumer" config parameter (defaults to true) which means the connector will _not_ try to create the stream consumer
- "nats.js.define-stream" config parameter now defaults to true

Partitioned:
- no change (as it already assumes the streams/consumers are defined administratively)

Signed-off-by: Jean-Noël Moyne <[email protected]>
  • Loading branch information
jnmoyne committed Nov 20, 2023
1 parent 1b46efc commit c6d5c11
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 18 deletions.
2 changes: 1 addition & 1 deletion load_balanced/build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import sbt.Def.settings

name := "nats-spark-connector-balanced"
version := "1.1.4"
version := "1.1.5"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
40 changes: 25 additions & 15 deletions load_balanced/src/main/scala/natsconnector/NatsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class NatsConfig(isSource: Boolean) {
// If zero messages then subsciber will wait messageReceiveWaitTime before giving up.

// ============== JetStream stream Config Values
var defineStream = false // configurable
var defineStream = true // configurable
var defineConsumer = true // configurable
var replicationCount = 1 // configurable
var streamName: Option[String] = None // configurable
var storageType: StorageType = StorageType.File // configurable
Expand Down Expand Up @@ -224,6 +225,13 @@ class NatsConfig(isSource: Boolean) {
case e: NoSuchElementException =>
}

try {
val param = parameters("nats.js.define-consumer").toBoolean
this.defineConsumer = param
} catch {
case e: NoSuchElementException =>
}

this.server = Some(s"nats://${this.host}:${this.port}")
this.options = Some(createConnectionOptions(this.server.get, this.allowReconnect))

Expand Down Expand Up @@ -332,21 +340,23 @@ class NatsConfig(isSource: Boolean) {
}
}

val subjectArray = this.streamSubjects.get.replace(" ", "").split(",")
subjectArray.zipWithIndex.foreach {
case (subject, idx) => {
val configBuilder = ConsumerConfiguration
.builder()
.ackWait(this.msgAckWaitTime)
.ackPolicy(this.ackPolicy)
.filterSubject(subject)
.deliverPolicy(this.deliverPolicy)
if(this.durable.isDefined)
configBuilder.durable(s"${this.durable.get}-${idx}")
else {
// TODO: Add configBuilder.InactiveThreshold()
if (this.defineConsumer) {
val subjectArray = this.streamSubjects.get.replace(" ", "").split(",")
subjectArray.zipWithIndex.foreach {
case (subject, idx) => {
val configBuilder = ConsumerConfiguration
.builder()
.ackWait(this.msgAckWaitTime)
.ackPolicy(this.ackPolicy)
.filterSubject(subject)
.deliverPolicy(this.deliverPolicy)
if (this.durable.isDefined)
configBuilder.durable(s"${this.durable.get}-${idx}")
else {
// TODO: Add configBuilder.InactiveThreshold()
}
jsm.addOrUpdateConsumer(this.streamName.get, configBuilder.build())
}
jsm.addOrUpdateConsumer(this.streamName.get, configBuilder.build())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ object NatsBatchTestDriver extends App {
"nats.host" -> "localhost",
"nats.port" -> "4222",
"nats.msg.ack.wait.secs" -> "10",
"nats.durable.name" -> "Durable"
"nats.durable.name" -> "Durable",
"nats.js.define-stream" -> "true",
"nats.js.define-consumer" -> "true",
)
NatsConfigSource.config.setConnection(parameters)
NatsConfigSink.config.streamName = Some("TestBatchStream")
Expand Down
1 change: 1 addition & 0 deletions load_balanced/src/test/scala/natstest/NatsTestDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object NatsTestDriver extends App {
"nats.durable.name" -> "Durable",
"nats.storage.replicas" -> "1",
"nats.js.define-stream" -> "true",
"nats.js.define-consumer" -> "true",
)
NatsConfigSource.config.setConnection(parameters)
NatsConfigSink.config.streamName = Some("TestStream")
Expand Down
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion partitioned/build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

name := "nats-spark-connector-partitioned"
version := "1.1.4"
version := "1.1.5"
scalaVersion := "2.12.14"

val sparkVersion = "3.3.0"
Expand Down
Binary file not shown.
Binary file not shown.

0 comments on commit c6d5c11

Please sign in to comment.