Skip to content

Commit

Permalink
Adding maxBytesPerTrigger tag for Pulsar Admission Control (#151)
Browse files Browse the repository at this point in the history
* init

* correcting bytesLeftInLedger calculation

* adding check for startoffset

* adminUrl correction

* only MessageId is null tests failing

* adding pulsaroption

* test case

* moving functionality to PulsarHelper

* feedback and refactoring

* feedback

* dealing with startLedgerId == ledger.id

* fix

* check readLimit greater than 0

* early return if readLimitLeft == 0

* increasing processing time

* removing unnecessary code

* checking if consumer is connected + pulsaradmissionhelper

* putting latestOffsetForTopic in AdmissionControlHelper

* added more tests for admission control

* changing where pulsarAdmin is set

* test where we add a new topic partition after stream has started

* fetchlatest -> gettopicpartitions

* more tests

* changing AddPulsarDataWithPartition from Set(topic) -> topic

* adding test case concurrent topic writes

* changing getAdminUrl and reducing offsets, startpartitionoffsets redundancy

* setting partition index in messageId correctly

* removing info logs

* removing maxEntriesPerLedger option

* adding maxEntriesPerLedger in test

* maxEntriesPerLedger works

* spacing

* checking numInputRows per microbatch in query

* removing exact checklastbatch

* updating README

* updating readme

* changing admin.url in readme

* build errors
  • Loading branch information
ericm-db authored Aug 28, 2023
1 parent f1b8d99 commit cd93f8d
Show file tree
Hide file tree
Showing 9 changed files with 553 additions and 15 deletions.
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,21 @@ A possible solution to remove duplicates when reading the written data could be
</tr>

<tr>
<td>`admin.url` (Deprecated)</td>
<td>`admin.url`</td>
<td>A service HTTP URL of your Pulsar cluster</td>
<td>No</td>
<td>None</td>
<td>Streaming and Batch</td>
<td>The Pulsar `serviceHttpUrl` configuration. </td>
<td>The Pulsar `serviceHttpUrl` configuration. Only needed when `maxBytesPerTrigger` is specified</td>
</tr>

<tr>
<td>`maxBytesPerTrigger`</td>
<td>A long value in unit of number of bytes</td>
<td>No</td>
<td>None</td>
<td>Streaming and Batch</td>
<td>A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, `admin.url` also needs to be specified.</td>
</tr>

<tr>
Expand Down
111 changes: 110 additions & 1 deletion src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import scala.collection.mutable
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.{MessageId, PulsarClient}
import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.client.internal.DefaultImplementation
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit}
import org.apache.spark.sql.pulsar.PulsarOptions._
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -40,6 +46,7 @@ import org.apache.spark.sql.types.StructType
*/
private[pulsar] case class PulsarHelper(
serviceUrl: String,
adminUrl: Option[String],
clientConf: ju.Map[String, Object],
driverGroupIdPrefix: String,
caseInsensitiveParameters: Map[String, String],
Expand All @@ -55,6 +62,12 @@ private[pulsar] case class PulsarHelper(
private var topics: Seq[String] = _
private var topicPartitions: Seq[String] = _

// We can call adminUrl.get because admissionControlHelper
// will only be called if latestOffset is called and there should
// be an exception thrown in PulsarProvider if maxBytes is set,
// and adminUrl is not set
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get)

override def close(): Unit = {
// do nothing
}
Expand Down Expand Up @@ -122,7 +135,9 @@ private[pulsar] case class PulsarHelper(
offset.foreach { case (tp, mid) =>
try {
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
CachedConsumer.getOrCreate(tp, subscription, client).seek(mid)
val consumer = CachedConsumer.getOrCreate(tp, subscription, client)
if (!consumer.isConnected) consumer.getLastMessageId
consumer.seek(mid)
} catch {
case e: Throwable =>
throw new RuntimeException(
Expand Down Expand Up @@ -207,6 +222,35 @@ private[pulsar] case class PulsarHelper(
}.toMap)
}

def latestOffsets(startingOffset: streaming.Offset,
totalReadLimit: Long): SpecificPulsarOffset = {
// implement helper inside PulsarHelper in order to use getTopicPartitions
val topicPartitions = getTopicPartitions
// add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit
// start a reader, get to the earliest offset for new topic partitions
val existingStartOffsets = if (startingOffset != null) {
getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset])
} else {
Map[String, MessageId]()
}
val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet)
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
=> {
topicPartition -> MessageId.earliest
})
val offsets = mutable.Map[String, MessageId]()
val numPartitions = startPartitionOffsets.size
// move all topic partition logic to helper function
val readLimit = totalReadLimit / numPartitions
startPartitionOffsets.keys.foreach { topicPartition =>
val startMessageId = startPartitionOffsets.apply(topicPartition)
offsets += (topicPartition ->
admissionControlHelper.latestOffsetForTopicPartition(
topicPartition, startMessageId, readLimit))
}
SpecificPulsarOffset(offsets.toMap)
}

def fetchLatestOffsetForTopic(topic: String): MessageId = {
val messageId =
try {
Expand Down Expand Up @@ -472,3 +516,68 @@ private[pulsar] case class PulsarHelper(
CachedConsumer.getOrCreate(topic, subscriptionName, client).getLastMessageId
}
}

class PulsarAdmissionControlHelper(adminUrl: String)
extends Logging {

private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()

import scala.collection.JavaConverters._

def latestOffsetForTopicPartition(topicPartition: String,
startMessageId: MessageId,
readLimit: Long): MessageId = {
val startLedgerId = getLedgerId(startMessageId)
val startEntryId = getEntryId(startMessageId)
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
val ledgers = pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
asScala.filter(_.ledgerId >= startLedgerId).sortBy(_.ledgerId)
// The last ledger of the ledgers list doesn't have .size or .entries
// properly populated, and the corresponding info is in currentLedgerSize
// and currentLedgerEntries
if (ledgers.nonEmpty) {
ledgers.last.size = stats.currentLedgerSize
ledgers.last.entries = stats.currentLedgerEntries
}
val partitionIndex = if (topicPartition.contains(PartitionSuffix)) {
topicPartition.split(PartitionSuffix)(1).toInt
} else {
-1
}
var messageId = startMessageId
var readLimitLeft = readLimit
ledgers.filter(_.entries != 0).sortBy(_.ledgerId).foreach { ledger =>
assert(readLimitLeft >= 0)
if (readLimitLeft == 0) {
return messageId
}
val avgBytesPerEntries = ledger.size / ledger.entries
// approximation of bytes left in ledger to deal with case
// where we are at the middle of the ledger
val bytesLeftInLedger = if (ledger.ledgerId == startLedgerId) {
avgBytesPerEntries * (ledger.entries - startEntryId - 1)
} else {
ledger.size
}
if (readLimitLeft > bytesLeftInLedger) {
readLimitLeft -= bytesLeftInLedger
messageId = DefaultImplementation
.getDefaultImplementation
.newMessageId(ledger.ledgerId, ledger.entries - 1, partitionIndex)
} else {
val numEntriesToRead = Math.max(1, readLimitLeft / avgBytesPerEntries)
val lastEntryId = if (ledger.ledgerId != startLedgerId) {
numEntriesToRead - 1
} else {
startEntryId + numEntriesToRead
}
val lastEntryRead = Math.min(ledger.entries - 1, lastEntryId)
messageId = DefaultImplementation
.getDefaultImplementation
.newMessageId(ledger.ledgerId, lastEntryRead, partitionIndex)
readLimitLeft = 0
}
}
messageId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[pulsar] object PulsarOptions {
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)

val ServiceUrlOptionKey: String = "service.url"
val AdminUrlOptionKey: String = "admin.url"
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)
val StartingTime: String = "startingTime".toLowerCase(Locale.ROOT)
val EndingTime: String = "endingTime".toLowerCase(Locale.ROOT)
Expand All @@ -45,6 +46,7 @@ private[pulsar] object PulsarOptions {
val SubscriptionPrefix: String = "subscriptionPrefix".toLowerCase(Locale.ROOT)
val PredefinedSubscription: String = "predefinedSubscription".toLowerCase(Locale.ROOT)

val MaxBytesPerTrigger: String = "maxBytesPerTrigger".toLowerCase(Locale.ROOT)
val PollTimeoutMS: String = "pollTimeoutMs".toLowerCase(Locale.ROOT)
val FailOnDataLossOptionKey: String = "failOnDataLoss".toLowerCase(Locale.ROOT)

Expand Down
32 changes: 27 additions & 5 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ private[pulsar] class PulsarProvider
parameters: Map[String, String]): (String, StructType) = {

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, _, serviceUrlConfig) = prepareConfForReader(parameters)
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)

val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
val inferredSchema = Utils.tryWithResource(
PulsarHelper(
serviceUrlConfig,
adminUrl,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -84,13 +85,14 @@ private[pulsar] class PulsarProvider
logDebug(s"Creating Pulsar source: $parameters")

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
logDebug(
s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl")

val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
val pulsarHelper = PulsarHelper(
serviceUrl,
adminUrl,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -105,6 +107,12 @@ private[pulsar] class PulsarProvider
pulsarHelper.offsetForEachTopic(caseInsensitiveParams, LatestOffset, StartOptionKey)
pulsarHelper.setupCursor(offset)

val maxBytes = maxBytesPerTrigger(caseInsensitiveParams)
if (adminUrl.isEmpty && maxBytes != 0L) {
throw new IllegalArgumentException("admin.url " +
"must be specified if maxBytesPerTrigger is specified")
}

new PulsarSource(
sqlContext,
pulsarHelper,
Expand All @@ -113,6 +121,7 @@ private[pulsar] class PulsarProvider
metadataPath,
offset,
pollTimeoutMs(caseInsensitiveParams),
maxBytesPerTrigger(caseInsensitiveParams),
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions)
Expand All @@ -125,10 +134,11 @@ private[pulsar] class PulsarProvider

val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)

val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (start, end, schema, pSchema) = Utils.tryWithResource(
PulsarHelper(
serviceUrl,
adminUrl,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -366,6 +376,10 @@ private[pulsar] object PulsarProvider extends Logging {
parameters(ServiceUrlOptionKey)
}

private def getAdminUrl(parameters: Map[String, String]): Option[String] = {
parameters.get(AdminUrlOptionKey)
}

private def getAllowDifferentTopicSchemas(parameters: Map[String, String]): Boolean = {
parameters.getOrElse(AllowDifferentTopicSchemas, "false").toBoolean
}
Expand All @@ -380,6 +394,13 @@ private[pulsar] object PulsarProvider extends Logging {
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
.toInt

private def maxBytesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams
.getOrElse(
PulsarOptions.MaxBytesPerTrigger,
0L.toString
).toLong

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
Expand Down Expand Up @@ -486,17 +507,18 @@ private[pulsar] object PulsarProvider extends Logging {
}

private def prepareConfForReader(parameters: Map[String, String])
: (ju.Map[String, Object], ju.Map[String, Object], String) = {
: (ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {

val serviceUrl = getServiceUrl(parameters)
val adminUrl = getAdminUrl(parameters)
var clientParams = getClientParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
val readerParams = getReaderParams(parameters)

(
paramsToPulsarConf("pulsar.client", clientParams),
paramsToPulsarConf("pulsar.reader", readerParams),
serviceUrl)
serviceUrl, adminUrl)
}

private def prepareConfForProducer(parameters: Map[String, String])
Expand Down
Loading

0 comments on commit cd93f8d

Please sign in to comment.