Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding maxBytesPerTrigger tag for Pulsar Admission Control #151

Merged
merged 38 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
66667d4
init
ericm-db Aug 8, 2023
9d5ceef
correcting bytesLeftInLedger calculation
ericm-db Aug 8, 2023
10b9444
adding check for startoffset
ericm-db Aug 8, 2023
63ac0a9
adminUrl correction
ericm-db Aug 8, 2023
eb73093
only MessageId is null tests failing
ericm-db Aug 10, 2023
6b858d7
adding pulsaroption
ericm-db Aug 14, 2023
4347744
test case
ericm-db Aug 15, 2023
b491844
moving functionality to PulsarHelper
ericm-db Aug 15, 2023
423efe4
feedback and refactoring
ericm-db Aug 16, 2023
769890e
feedback
ericm-db Aug 16, 2023
dbadb7e
dealing with startLedgerId == ledger.id
ericm-db Aug 16, 2023
54ea233
fix
ericm-db Aug 16, 2023
843f7c9
check readLimit greater than 0
ericm-db Aug 16, 2023
8f51130
early return if readLimitLeft == 0
ericm-db Aug 16, 2023
0cf4dd7
increasing processing time
ericm-db Aug 16, 2023
fb645ec
removing unnecessary code
ericm-db Aug 16, 2023
f4a3b39
checking if consumer is connected + pulsaradmissionhelper
ericm-db Aug 16, 2023
be7d93c
putting latestOffsetForTopic in AdmissionControlHelper
ericm-db Aug 18, 2023
9a4b1a4
added more tests for admission control
ericm-db Aug 18, 2023
b0a4450
changing where pulsarAdmin is set
ericm-db Aug 18, 2023
dbe6528
test where we add a new topic partition after stream has started
ericm-db Aug 18, 2023
b6a114c
fetchlatest -> gettopicpartitions
ericm-db Aug 18, 2023
2e69b5f
more tests
ericm-db Aug 21, 2023
4d00f88
changing AddPulsarDataWithPartition from Set(topic) -> topic
ericm-db Aug 21, 2023
9369424
adding test case concurrent topic writes
ericm-db Aug 21, 2023
3103b66
changing getAdminUrl and reducing offsets, startpartitionoffsets redu…
ericm-db Aug 21, 2023
4712cf4
setting partition index in messageId correctly
ericm-db Aug 21, 2023
a3acee5
removing info logs
ericm-db Aug 21, 2023
bf3dfe6
removing maxEntriesPerLedger option
ericm-db Aug 21, 2023
12deb6b
adding maxEntriesPerLedger in test
ericm-db Aug 21, 2023
b922024
maxEntriesPerLedger works
ericm-db Aug 21, 2023
eeb0b45
spacing
ericm-db Aug 21, 2023
c115f6f
checking numInputRows per microbatch in query
ericm-db Aug 21, 2023
8c4a7e2
removing exact checklastbatch
ericm-db Aug 21, 2023
6e7b93c
updating README
ericm-db Aug 24, 2023
c69d34b
updating readme
ericm-db Aug 24, 2023
305fb2c
changing admin.url in readme
ericm-db Aug 24, 2023
3fdc0b7
build errors
ericm-db Aug 24, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,21 @@ A possible solution to remove duplicates when reading the written data could be
</tr>

<tr>
<td>`admin.url` (Deprecated)</td>
<td>`admin.url`</td>
<td>A service HTTP URL of your Pulsar cluster</td>
<td>No</td>
<td>None</td>
<td>Streaming and Batch</td>
<td>The Pulsar `serviceHttpUrl` configuration. </td>
<td>The Pulsar `serviceHttpUrl` configuration. Only needed when `maxBytesPerTrigger` is specified</td>
</tr>

<tr>
<td>`maxBytesPerTrigger`</td>
<td>A long value in unit of number of bytes</td>
<td>No</td>
<td>None</td>
<td>Streaming and Batch</td>
<td>A soft limit of the maximum number of bytes we want to process per microbatch. If this is specified, `admin.url` also needs to be specified.</td>
</tr>

<tr>
Expand Down
111 changes: 110 additions & 1 deletion src/main/scala/org/apache/spark/sql/pulsar/PulsarHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,22 @@ import scala.collection.mutable
import scala.language.postfixOps
import scala.util.control.NonFatal

import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.{MessageId, PulsarClient}
import org.apache.pulsar.client.impl.{MessageIdImpl, PulsarClientImpl}
import org.apache.pulsar.client.impl.schema.BytesSchema
import org.apache.pulsar.client.internal.DefaultImplementation
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace
import org.apache.pulsar.common.naming.TopicName
import org.apache.pulsar.common.schema.SchemaInfo
import org.apache.pulsar.shade.com.google.common.util.concurrent.Uninterruptibles

import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit}
import org.apache.spark.sql.pulsar.PulsarOptions._
import org.apache.spark.sql.pulsar.PulsarSourceUtils.{getEntryId, getLedgerId}
import org.apache.spark.sql.pulsar.SpecificPulsarOffset.getTopicOffsets
import org.apache.spark.sql.types.StructType

/**
Expand All @@ -40,6 +46,7 @@ import org.apache.spark.sql.types.StructType
*/
private[pulsar] case class PulsarHelper(
serviceUrl: String,
adminUrl: Option[String],
clientConf: ju.Map[String, Object],
driverGroupIdPrefix: String,
caseInsensitiveParameters: Map[String, String],
Expand All @@ -55,6 +62,12 @@ private[pulsar] case class PulsarHelper(
private var topics: Seq[String] = _
private var topicPartitions: Seq[String] = _

// We can call adminUrl.get because admissionControlHelper
// will only be called if latestOffset is called and there should
// be an exception thrown in PulsarProvider if maxBytes is set,
// and adminUrl is not set
private lazy val admissionControlHelper = new PulsarAdmissionControlHelper(adminUrl.get)

override def close(): Unit = {
// do nothing
}
Expand Down Expand Up @@ -122,7 +135,9 @@ private[pulsar] case class PulsarHelper(
offset.foreach { case (tp, mid) =>
try {
val (subscription, _) = extractSubscription(predefinedSubscription, tp)
CachedConsumer.getOrCreate(tp, subscription, client).seek(mid)
val consumer = CachedConsumer.getOrCreate(tp, subscription, client)
if (!consumer.isConnected) consumer.getLastMessageId
ericm-db marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a bug that pulsar consumer do not attempt to reconnect when doing seek(), can you leave a comment here explaining why this change is needed and TODO that we will get rid of this once we upgraded to a version that has the fix?

consumer.seek(mid)
} catch {
case e: Throwable =>
throw new RuntimeException(
Expand Down Expand Up @@ -207,6 +222,35 @@ private[pulsar] case class PulsarHelper(
}.toMap)
}

def latestOffsets(startingOffset: streaming.Offset,
totalReadLimit: Long): SpecificPulsarOffset = {
// implement helper inside PulsarHelper in order to use getTopicPartitions
val topicPartitions = getTopicPartitions
// add new partitions from PulsarAdmin, set to earliest entry and ledger id based on limit
// start a reader, get to the earliest offset for new topic partitions
val existingStartOffsets = if (startingOffset != null) {
getTopicOffsets(startingOffset.asInstanceOf[org.apache.spark.sql.execution.streaming.Offset])
} else {
Map[String, MessageId]()
}
val newTopics = topicPartitions.toSet.diff(existingStartOffsets.keySet)
val startPartitionOffsets = existingStartOffsets ++ newTopics.map(topicPartition
=> {
topicPartition -> MessageId.earliest
})
val offsets = mutable.Map[String, MessageId]()
val numPartitions = startPartitionOffsets.size
// move all topic partition logic to helper function
val readLimit = totalReadLimit / numPartitions
startPartitionOffsets.keys.foreach { topicPartition =>
val startMessageId = startPartitionOffsets.apply(topicPartition)
offsets += (topicPartition ->
admissionControlHelper.latestOffsetForTopicPartition(
topicPartition, startMessageId, readLimit))
}
SpecificPulsarOffset(offsets.toMap)
}

def fetchLatestOffsetForTopic(topic: String): MessageId = {
val messageId =
try {
Expand Down Expand Up @@ -472,3 +516,68 @@ private[pulsar] case class PulsarHelper(
CachedConsumer.getOrCreate(topic, subscriptionName, client).getLastMessageId
}
}

class PulsarAdmissionControlHelper(adminUrl: String)
extends Logging {

private lazy val pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()

import scala.collection.JavaConverters._

def latestOffsetForTopicPartition(topicPartition: String,
startMessageId: MessageId,
readLimit: Long): MessageId = {
val startLedgerId = getLedgerId(startMessageId)
val startEntryId = getEntryId(startMessageId)
val stats = pulsarAdmin.topics.getInternalStats(topicPartition)
val ledgers = pulsarAdmin.topics.getInternalStats(topicPartition).ledgers.
asScala.filter(_.ledgerId >= startLedgerId).sortBy(_.ledgerId)
// The last ledger of the ledgers list doesn't have .size or .entries
// properly populated, and the corresponding info is in currentLedgerSize
// and currentLedgerEntries
if (ledgers.nonEmpty) {
ledgers.last.size = stats.currentLedgerSize
ledgers.last.entries = stats.currentLedgerEntries
}
val partitionIndex = if (topicPartition.contains(PartitionSuffix)) {
topicPartition.split(PartitionSuffix)(1).toInt
} else {
-1
}
var messageId = startMessageId
var readLimitLeft = readLimit
ledgers.filter(_.entries != 0).sortBy(_.ledgerId).foreach { ledger =>
assert(readLimitLeft >= 0)
if (readLimitLeft == 0) {
ericm-db marked this conversation as resolved.
Show resolved Hide resolved
return messageId
}
val avgBytesPerEntries = ledger.size / ledger.entries
// approximation of bytes left in ledger to deal with case
// where we are at the middle of the ledger
val bytesLeftInLedger = if (ledger.ledgerId == startLedgerId) {
avgBytesPerEntries * (ledger.entries - startEntryId - 1)
} else {
ledger.size
}
if (readLimitLeft > bytesLeftInLedger) {
readLimitLeft -= bytesLeftInLedger
messageId = DefaultImplementation
.getDefaultImplementation
.newMessageId(ledger.ledgerId, ledger.entries - 1, partitionIndex)
} else {
val numEntriesToRead = Math.max(1, readLimitLeft / avgBytesPerEntries)
val lastEntryId = if (ledger.ledgerId != startLedgerId) {
numEntriesToRead - 1
} else {
startEntryId + numEntriesToRead
}
val lastEntryRead = Math.min(ledger.entries - 1, lastEntryId)
messageId = DefaultImplementation
.getDefaultImplementation
.newMessageId(ledger.ledgerId, lastEntryRead, partitionIndex)
readLimitLeft = 0
}
}
messageId
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[pulsar] object PulsarOptions {
val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern)

val ServiceUrlOptionKey: String = "service.url"
val AdminUrlOptionKey: String = "admin.url"
val StartingOffsetsOptionKey: String = "startingOffsets".toLowerCase(Locale.ROOT)
val StartingTime: String = "startingTime".toLowerCase(Locale.ROOT)
val EndingTime: String = "endingTime".toLowerCase(Locale.ROOT)
Expand All @@ -45,6 +46,7 @@ private[pulsar] object PulsarOptions {
val SubscriptionPrefix: String = "subscriptionPrefix".toLowerCase(Locale.ROOT)
val PredefinedSubscription: String = "predefinedSubscription".toLowerCase(Locale.ROOT)

val MaxBytesPerTrigger: String = "maxBytesPerTrigger".toLowerCase(Locale.ROOT)
val PollTimeoutMS: String = "pollTimeoutMs".toLowerCase(Locale.ROOT)
val FailOnDataLossOptionKey: String = "failOnDataLoss".toLowerCase(Locale.ROOT)

Expand Down
32 changes: 27 additions & 5 deletions src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ private[pulsar] class PulsarProvider
parameters: Map[String, String]): (String, StructType) = {

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, _, serviceUrlConfig) = prepareConfForReader(parameters)
val (clientConfig, _, serviceUrlConfig, adminUrl) = prepareConfForReader(parameters)

val subscriptionNamePrefix = s"spark-pulsar-${UUID.randomUUID}"
val inferredSchema = Utils.tryWithResource(
PulsarHelper(
serviceUrlConfig,
adminUrl,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -84,13 +85,14 @@ private[pulsar] class PulsarProvider
logDebug(s"Creating Pulsar source: $parameters")

val caseInsensitiveParams = validateStreamOptions(parameters)
val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
logDebug(
s"Client config: $clientConfig; Reader config: $readerConfig; Service URL: $serviceUrl")

val subscriptionNamePrefix = getSubscriptionPrefix(parameters)
val pulsarHelper = PulsarHelper(
serviceUrl,
adminUrl,
ericm-db marked this conversation as resolved.
Show resolved Hide resolved
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand All @@ -105,6 +107,12 @@ private[pulsar] class PulsarProvider
pulsarHelper.offsetForEachTopic(caseInsensitiveParams, LatestOffset, StartOptionKey)
pulsarHelper.setupCursor(offset)

val maxBytes = maxBytesPerTrigger(caseInsensitiveParams)
if (adminUrl.isEmpty && maxBytes != 0L) {
throw new IllegalArgumentException("admin.url " +
"must be specified if maxBytesPerTrigger is specified")
}

new PulsarSource(
sqlContext,
pulsarHelper,
Expand All @@ -113,6 +121,7 @@ private[pulsar] class PulsarProvider
metadataPath,
offset,
pollTimeoutMs(caseInsensitiveParams),
maxBytesPerTrigger(caseInsensitiveParams),
failOnDataLoss(caseInsensitiveParams),
subscriptionNamePrefix,
jsonOptions)
Expand All @@ -125,10 +134,11 @@ private[pulsar] class PulsarProvider

val subscriptionNamePrefix = getSubscriptionPrefix(parameters, isBatch = true)

val (clientConfig, readerConfig, serviceUrl) = prepareConfForReader(parameters)
val (clientConfig, readerConfig, serviceUrl, adminUrl) = prepareConfForReader(parameters)
val (start, end, schema, pSchema) = Utils.tryWithResource(
PulsarHelper(
serviceUrl,
adminUrl,
clientConfig,
subscriptionNamePrefix,
caseInsensitiveParams,
Expand Down Expand Up @@ -366,6 +376,10 @@ private[pulsar] object PulsarProvider extends Logging {
parameters(ServiceUrlOptionKey)
}

private def getAdminUrl(parameters: Map[String, String]): Option[String] = {
parameters.get(AdminUrlOptionKey)
}

private def getAllowDifferentTopicSchemas(parameters: Map[String, String]): Boolean = {
parameters.getOrElse(AllowDifferentTopicSchemas, "false").toBoolean
}
Expand All @@ -380,6 +394,13 @@ private[pulsar] object PulsarProvider extends Logging {
(SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000).toString)
.toInt

private def maxBytesPerTrigger(caseInsensitiveParams: Map[String, String]): Long =
caseInsensitiveParams
.getOrElse(
PulsarOptions.MaxBytesPerTrigger,
0L.toString
).toLong

private def validateGeneralOptions(
caseInsensitiveParams: Map[String, String]): Map[String, String] = {
if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) {
Expand Down Expand Up @@ -486,17 +507,18 @@ private[pulsar] object PulsarProvider extends Logging {
}

private def prepareConfForReader(parameters: Map[String, String])
: (ju.Map[String, Object], ju.Map[String, Object], String) = {
: (ju.Map[String, Object], ju.Map[String, Object], String, Option[String]) = {

val serviceUrl = getServiceUrl(parameters)
val adminUrl = getAdminUrl(parameters)
var clientParams = getClientParams(parameters)
clientParams += (ServiceUrlOptionKey -> serviceUrl)
val readerParams = getReaderParams(parameters)

(
paramsToPulsarConf("pulsar.client", clientParams),
paramsToPulsarConf("pulsar.reader", readerParams),
serviceUrl)
serviceUrl, adminUrl)
}

private def prepareConfForProducer(parameters: Map[String, String])
Expand Down
Loading