Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding PulsarAdmin configs to PulsarHelper #158

Merged
merged 9 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.spark.sql.types.StructType
private[pulsar] case class PulsarHelper(
serviceUrl: String,
adminUrl: Option[String],
adminConf: ju.Map[String, Object],
clientConf: ju.Map[String, Object],
driverGroupIdPrefix: String,
caseInsensitiveParameters: Map[String, String],
Expand All @@ -66,7 +67,7 @@ private[pulsar] case class PulsarHelper(
// will only be called if latestOffset is called and there should
// be an exception thrown in PulsarProvider if maxBytes is set,
// and adminUrl is not set
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get)
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get, adminConf)

override def close(): Unit = {
// do nothing
Expand Down Expand Up @@ -136,6 +137,9 @@ private[pulsar] case class PulsarHelper(
try {
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
val consumer = CachedConsumer.getOrCreate(tp, subscription, client)
// We need to do this because the consumer does not attempt to
// reconnect after calling .seek().
// TODO: Remove this once we have upgraded to a version so that this is no longer needed
if (!consumer.isConnected) consumer.getLastMessageId
consumer.seek(mid)
} catch {
Expand Down Expand Up @@ -517,10 +521,10 @@ private[pulsar] case class PulsarHelper(
}
}

class PulsarAdmissionControlHelper(adminUrl: String)
class PulsarAdmissionControlHelper(adminUrl: String, conf: ju.Map[String, Object])
extends Logging {

private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()
private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).loadConf(conf).build()

import scala.collection.JavaConverters._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ private[pulsar] object PulsarOptions {

// option key prefix for different modules
val PulsarClientOptionKeyPrefix: String = "pulsar.client."
val PulsarAdminOptionKeyPrefix: String = "pulsar.admin."
val PulsarProducerOptionKeyPrefix: String = "pulsar.producer."
val PulsarReaderOptionKeyPrefix: String = "pulsar.reader."

Expand Down
27 changes: 23 additions & 4 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,14 @@ private[pulsar] class PulsarProvider
parameters: Map[String, String]): (String, StructType) = {

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, _, adminConfig, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)

val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
val inferredSchema = Utils.tryWithResource(
PulsarHelper(
serviceUrlConfig,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -85,14 +86,15 @@ private[pulsar] class PulsarProvider
logDebug(s"Creating Pulsar source: $parameters")

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, adminConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
logDebug(
s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl")

val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
val pulsarHelper = PulsarHelper(
serviceUrl,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -134,11 +136,12 @@ private[pulsar] class PulsarProvider

val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)

val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, adminConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (start, end, schema, pSchema) = Utils.tryWithResource(
PulsarHelper(
serviceUrl,
adminUrl,
adminConfig,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -258,6 +261,20 @@ private[pulsar] object PulsarProvider extends Logging {
}
}

private def getAdminParams(parameters: Map[String, String]): Map[String, String] = {
ericm-db marked this conversation as resolved.
Show resolved Hide resolved
val lowercaseKeyMap = parameters.keySet
.filter(_.startsWith(PulsarAdminOptionKeyPrefix))
.map { k =>
k.drop(PulsarAdminOptionKeyPrefix.length).toString -> parameters(k)
}
.toMap
lowercaseKeyMap.map { case (k, v) =>
clientConfKeys.getOrElse(
k,
throw new IllegalArgumentException(s"$k not supported by pulsar")) -> v
}
}

private def getProducerParams(parameters: Map[String, String]): Map[String, String] = {
getModuleParams(parameters, PulsarProducerOptionKeyPrefix, producerConfKeys)
}
Expand Down Expand Up @@ -507,17 +524,19 @@ private[pulsar] object PulsarProvider extends Logging {
}

private def prepareConfForReader(parameters: Map[String, String])
: (ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {
: (ju.Map[String, Object], ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {

val serviceUrl = getServiceUrl(parameters)
val adminUrl = getAdminUrl(parameters)
var clientParams = getClientParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
val readerParams = getReaderParams(parameters)
val adminParams = getAdminParams(parameters)

(
paramsToPulsarConf("pulsar.client", clientParams),
paramsToPulsarConf("pulsar.reader", readerParams),
paramsToPulsarConf("pulsar.admin", adminParams),
serviceUrl, adminUrl)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.apache.spark.sql.pulsar

import java.{util => ju}

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.internal.DefaultImplementation
Expand Down Expand Up @@ -33,7 +35,7 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
val firstLedger = getLedgerId(firstMid)
val firstEntry = getEntryId(firstMid)
require(getLatestOffsets(Set(topic)).size === 1)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, new ju.HashMap[String, Object]())
val offset = admissionControlHelper.latestOffsetForTopicPartition(topic, MessageId.earliest, approxSizeOfInt)
assert(getLedgerId(offset) == firstLedger && getEntryId(offset) == firstEntry)
}
Expand All @@ -44,7 +46,7 @@ class PulsarAdmissionControlSuite extends PulsarSourceTest {
val firstMid = messageIds.head._2
val secondMid = messageIds.apply(1)._2
require(getLatestOffsets(Set(topic)).size === 1)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl)
val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl, new ju.HashMap[String, Object]())
val offset = admissionControlHelper.latestOffsetForTopicPartition(topic, firstMid, approxSizeOfInt)
assert(getLedgerId(offset) == getLedgerId(secondMid) && getEntryId(offset) == getEntryId(secondMid))

Expand Down
Loading