diff --git a/README.md b/README.md index c2ba789a..89c5f829 100644 --- a/README.md +++ b/README.md @@ -327,53 +327,6 @@ all of them. complete backlog is read at once. - - -`forwardStrategy` - - -`simple`, `large-first` or `proportional` - -`simple` -Streaming query -If `maxEntriesPerTrigger` is set, this parameter controls -which forwarding strategy is in use during the read of multiple -topics. -
  • -`simple` just divides the allowed number of entries equally -between all topics, regardless of their backlog size -
  • -
  • -`large-first` will load the largest topic backlogs first, -as the maximum number of allowed entries allows -
  • -
  • -`proportional` will forward all topics proportional to the -topic backlog/overall backlog ratio -
  • - - - - - -`ensureEntriesPerTopic` - -Number to forward each topic with during a micro-batch. -0 -Streaming query -If multiple topics are read, and the maximum number of -entries is also specified, always forward all topics with the -amount of entries specified here. Using this, users can ensure that topics -with considerably smaller backlogs than others are also forwarded -and read. Note that: -
  • If this number is higher than the maximum allowed entries divided -by the number of topics, then this value is taken into account, overriding -the maximum number of entries per micro-batch. -
  • -
  • This parameter has an effect only for forwarding strategies -`large-first` and `proportional`.
  • - - `allowDifferentTopicSchemas` @@ -400,7 +353,6 @@ taken into account during operation. - #### Authentication diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala index 27b496ae..f560ad04 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarMetadataReader.scala @@ -261,30 +261,14 @@ private[pulsar] case class PulsarMetadataReader( }.toMap) } - - def forwardOffset(actualOffset: Map[String, MessageId], - strategy: String, - numberOfEntriesToForward: Long, - ensureEntriesPerTopic: Long): SpecificPulsarOffset = { + def fetchNextOffsetWithMaxEntries(actualOffset: Map[String, MessageId], + numberOfEntries: Long): SpecificPulsarOffset = { getTopicPartitions() // Collect internal stats for all topics val topicStats = topicPartitions.map( topic => { - val internalStats = admin.topics().getInternalStats(topic) - val topicActualMessageId = actualOffset.getOrElse(topic, MessageId.earliest) - topic -> TopicState(internalStats, - PulsarSourceUtils.getLedgerId(topicActualMessageId), - PulsarSourceUtils.getEntryId(topicActualMessageId)) - } ).toMap - - val forwarder = strategy match { - case PulsarOptions.ProportionalForwardStrategy => - new ProportionalForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic) - case PulsarOptions.LargeFirstForwardStrategy => - new LargeFirstForwardStrategy(numberOfEntriesToForward, ensureEntriesPerTopic) - case _ => - new LinearForwardStrategy(numberOfEntriesToForward) - } + topic -> admin.topics().getInternalStats(topic) + } ).toMap.asJava SpecificPulsarOffset(topicPartitions.map { topic => topic -> PulsarSourceUtils.seekableLatestMid { @@ -298,39 +282,41 @@ private[pulsar] case class PulsarMetadataReader( // Get the partition index val partitionIndex = PulsarSourceUtils.getPartitionIndex(topicActualMessageId) // Cache topic internal stats - val internalStats = topicStats.get(topic).get.internalStat + val internalStats = topicStats.get(topic) // Calculate the amount of messages we will pull in - val numberOfEntriesPerTopic = forwarder.forward(topicStats)(topic) - // Get a future message ID which corresponds - // to the maximum number of messages + val numberOfEntriesPerTopic = numberOfEntries / topics.size + // Get a next message ID which respects + // the maximum number of messages val (nextLedgerId, nextEntryId) = TopicInternalStatsUtils.forwardMessageId( internalStats, actualLedgerId, actualEntryId, numberOfEntriesPerTopic) - // Build a message id - val forwardedMessageId = - DefaultImplementation.newMessageId(nextLedgerId, nextEntryId, partitionIndex) + // Build the next message ID + val nextMessageId = + DefaultImplementation + .getDefaultImplementation + .newMessageId(nextLedgerId, nextEntryId, partitionIndex) // Log state - val forwardedEntry = TopicInternalStatsUtils.numOfEntriesUntil( + val entryCountUntilNextMessageId = TopicInternalStatsUtils.numOfEntriesUntil( internalStats, nextLedgerId, nextEntryId) val entryCount = internalStats.numberOfEntries - val progress = f"${forwardedEntry.toFloat / entryCount.toFloat}%1.3f" - val logMessage = s"Pulsar Connector forward on topic. " + - s"[$numberOfEntriesPerTopic/$numberOfEntriesToForward]" + + val progress = f"${entryCountUntilNextMessageId.toFloat / entryCount.toFloat}%1.3f" + val logMessage = s"Pulsar Connector offset step forward. " + + s"[$numberOfEntriesPerTopic/$numberOfEntries]" + s"${topic.reverse.take(30).reverse} $topicActualMessageId -> " + - s"$forwardedMessageId ($forwardedEntry/$entryCount) [$progress]" + s"$nextMessageId ($entryCountUntilNextMessageId/$entryCount) [$progress]" log.debug(logMessage) // Return the message ID - forwardedMessageId + nextMessageId } catch { case e: PulsarAdminException if e.getStatusCode == 404 => MessageId.earliest case e: Throwable => throw new RuntimeException( s"Failed to get forwarded messageId for ${TopicName.get(topic).toString} " + - s"(tried to forward ${forwarder.forward(topicStats)(topic)} messages " + - s"starting from `$topicActualMessageId` using strategy $strategy)", e) + s"(tried to forward ${numberOfEntries} messages " + + s"starting from `$topicActualMessageId`)", e) } } diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala index 98ba60e9..2c2425cb 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarOptions.scala @@ -38,10 +38,6 @@ private[pulsar] object PulsarOptions { val TopicOptionKeys: Set[String] = Set(TopicSingle, TopicMulti, TopicPattern) val MaxEntriesPerTrigger = "maxentriespertrigger" - val EnsureEntriesPerTopic = "ensureentriespertopic" - val ForwardStrategy = "forwardstrategy" - val ProportionalForwardStrategy = "proportional" - val LargeFirstForwardStrategy = "large-first" val ServiceUrlOptionKey: String = "service.url" val AdminUrlOptionKey: String = "admin.url" diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala index 0b9b3c65..b29abc37 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarProvider.scala @@ -114,9 +114,7 @@ private[pulsar] class PulsarProvider failOnDataLoss(caseInsensitiveParams), subscriptionNamePrefix, jsonOptions, - maxEntriesPerTrigger(caseInsensitiveParams), - minEntriesPerTopic(caseInsensitiveParams), - forwardStrategy(caseInsensitiveParams) + maxEntriesPerTrigger(caseInsensitiveParams) ) } @@ -402,12 +400,6 @@ private[pulsar] object PulsarProvider extends Logging { private def maxEntriesPerTrigger(caseInsensitiveParams: Map[String, String]): Long = caseInsensitiveParams.getOrElse(MaxEntriesPerTrigger, "-1").toLong - private def minEntriesPerTopic(caseInsensitiveParams: Map[String, String]): Long = - caseInsensitiveParams.getOrElse(EnsureEntriesPerTopic, "0").toLong - - private def forwardStrategy(caseInsensitiveParams: Map[String, String]): String = - caseInsensitiveParams.getOrElse(ForwardStrategy, "simple") - private def validateGeneralOptions( caseInsensitiveParams: Map[String, String]): Map[String, String] = { if (!caseInsensitiveParams.contains(ServiceUrlOptionKey)) { diff --git a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala index 93737d28..ee0a25df 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/PulsarSource.scala @@ -37,9 +37,7 @@ private[pulsar] class PulsarSource( failOnDataLoss: Boolean, subscriptionNamePrefix: String, jsonOptions: JSONOptionsInRead, - maxEntriesPerTrigger: Long, - ensureEntriesPerTopic: Long, - forwardStrategy: String) + maxEntriesPerTrigger: Long) extends Source with Logging { @@ -68,11 +66,11 @@ private[pulsar] class PulsarSource( } else { currentTopicOffsets match { case Some(value) => - metadataReader.forwardOffset(value, - forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic) + metadataReader.fetchNextOffsetWithMaxEntries(value, + maxEntriesPerTrigger) case _ => - metadataReader.forwardOffset(initialTopicOffsets.topicOffsets, - forwardStrategy, maxEntriesPerTrigger, ensureEntriesPerTopic) + metadataReader.fetchNextOffsetWithMaxEntries(initialTopicOffsets.topicOffsets, + maxEntriesPerTrigger) } } logDebug(s"GetOffset: ${nextOffsets.topicOffsets.toSeq.map(_.toString).sorted}") diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala deleted file mode 100644 index ed39885b..00000000 --- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategy.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -/** - * Forward strategy which sorts the topics by their backlog size starting - * with the largest, and forwards topics starting from the beginning of - * this list as the maximum entries parameter allows (taking into account - * the number entries that need to be added anyway if - * - * @param additionalEntriesPerTopic is set). - * - * If the maximum entries to forward is `100`, topics will be forwarded - * like this (provided there is no minimum entry number specified: - * | topic name | backlog size | forward amount | - * |------------|--------------|----------------| - * | topic-1 | 60 | 60 | - * | topic-2 | 50 | 40 | - * | topic-3 | 40 | 0 | - * - * If @param ensureEntriesPerTopic is specified, then every topic will be - * forwarded by that value in addition to this (taking the backlog size of - * the topic into account so that bandwidth is not wasted). Given maximum - * entries is `100`, minimum entries is `10`, topics will be forwarded like - * this: - * - * | topic name | backlog size | forward amount | - * |------------|--------------|----------------| - * | topic-1 | 60 | 10 + 50 = 60 | - * | topic-2 | 50 | 10 + 30 = 30 | - * | topic-3 | 40 | 10 + 0 = 10 | - * @param maxEntriesAltogetherToForward Maximum entries in all topics to forward. - * Individual topics forward values will sum - * up to this value. - * @param ensureEntriesPerTopic All topics will be forwarded by this value. The goal - * of this parameter is to ensure that topics with a very - * small backlog are also forwarded with a given minimal - * value. Has a higher precedence than - * @param maxEntriesAltogetherToForward. - */ -class LargeFirstForwardStrategy(maxEntriesAltogetherToForward: Long, - ensureEntriesPerTopic: Long) extends ForwardStrategy { - override def forward(topics: Map[String, TopicState]): Map[String, Long] = { - - // calculate all remaining entries per topic, ordering them by remaining entry count - // in a reverse order - val topicBacklogs = topics - .map{ - case(topicName, topicStat) => - val internalStat = topicStat.internalStat - val ledgerId = topicStat.actualLedgerId - val entryId = topicStat.actualEntryId - (topicName, TopicInternalStatsUtils.numOfEntriesAfter(internalStat, ledgerId, entryId)) - } - .toList - .sortBy{ case(_, numOfEntriesAfterPosition) => numOfEntriesAfterPosition } - .reverse - - // calculate quota based on the ensured entry count - // this will be distributed between individual topics - var quota = Math.max(maxEntriesAltogetherToForward - ensureEntriesPerTopic * topics.size, 0) - - val result = for ((topic, topicBacklogSize) <- topicBacklogs) yield { - // try to increase topic by this number - // - if we have already ran out of quota, do not move topic - // - if we do not have enough quota, proceed with the quota (exhaust it completely) - // - if we have enough quota, exhaust all topic content (and decrease it later) - // - take the number of ensured entries into account when calculating quota - val forwardTopicBy = if (quota > 0) { - Math.min(quota, topicBacklogSize - ensureEntriesPerTopic) - } else { - 0 - } - // calculate forward position for a topic, make sure that it is - // always increased by the configured ensure entry count - val resultEntry = topic -> (ensureEntriesPerTopic + forwardTopicBy) - // decrease the overall quota separately - quota -= (topicBacklogSize - ensureEntriesPerTopic) - // return already calculated forward position - resultEntry - } - - result.toMap - } -} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala deleted file mode 100644 index 5dc994a0..00000000 --- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategy.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -/** - * Simple forward strategy, which forwards every topic evenly, not - * taking actual backlog sizes into account. Might waste bandwidth - * when the backlog of the topic is smaller than the calculated value - * for that topic. - * - * If the maximum entries to forward is `150`, topics will be forwarded - * like this (provided there is no minimum entry number specified: - * | topic name | backlog size | forward amount | - * |------------|--------------|----------------| - * | topic-1 | 60 | 50 | - * | topic-2 | 50 | 50 | - * | topic-3 | 40 | 50 | - * - * @param maxEntriesAltogetherToForward Maximum entries in all topics to - * forward. Will forward every topic - * by dividing this with the number of - * topics. - */ -class LinearForwardStrategy(maxEntriesAltogetherToForward: Long) extends ForwardStrategy { - override def forward(topics: Map[String, TopicState]): Map[String, Long] = - topics - .map{ case (topicName, _) => - topicName -> (maxEntriesAltogetherToForward / topics.size) } -} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala deleted file mode 100644 index b52f7049..00000000 --- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategy.scala +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -/** - * This forward strategy will forward individual topic backlogs based on - * their size proportional to the size of the overall backlog (considering - * all topics). - * - * If the maximum entries to forward is `100`, topics will be forwarded - * like this (provided there is no minimum entry number specified: - * | topic name | backlog size | forward amount | - * |------------|--------------|--------------------------| - * |topic-1 | 60 | 100*(60/(60+50+40)) = 40 | - * |topic-2 | 50 | 100*(50/(60+50+40)) = 33 | - * |topic-3 | 40 | 100*(40/(60+50+40)) = 27 | - * - * If @param ensureEntriesPerTopic is specified, then every topic will be - * forwarded by that value in addition to this (taking the backlog size of - * the topic into account so that bandwidth is not wasted). - * Given maximum entries is `100`, minimum entries is `10`, topics will be - * forwarded like this: - * - * | topic name | backlog size | forward amount | - * |------------|--------------|----------------------------| - * |topic-1 | 60 | 10+70*(60/(60+50+40)) = 38 | - * |topic-2 | 50 | 10+70*(50/(60+50+40)) = 33 | - * |topic-3 | 40 | 10+70*(40/(60+50+40)) = 29 | - * - * @param maxEntriesAltogetherToForward Maximum entries in all topics to forward. - * Individual topics forward values will sum - * up to this value. - * @param ensureEntriesPerTopic All topics will be forwarded by this value. The goal - * of this parameter is to ensure that topics with a very - * small backlog are also forwarded with a given minimal - * value. Has a higher precedence than - * @param maxEntriesAltogetherToForward. - */ -class ProportionalForwardStrategy(maxEntriesAltogetherToForward: Long, - ensureEntriesPerTopic: Long) extends ForwardStrategy { - override def forward(topics: Map[String, TopicState]): Map[String, Long] = { - // calculate all remaining entries per topic - val topicBacklogs = topics - .map{ - case (topicName, topicStat) => - val internalStat = topicStat.internalStat - val ledgerId = topicStat.actualLedgerId - val entryId = topicStat.actualEntryId - (topicName, TopicInternalStatsUtils.numOfEntriesAfter(internalStat, ledgerId, entryId)) - } - .toList - - // this is the size of the complete backlog (the sum of all individual topic - // backlogs) - val completeBacklogSize = topicBacklogs - .map{ case (_, topicBacklogSize) => topicBacklogSize } - .sum - - // calculate quota based on the ensured entry count - // this will be distributed between individual topics - val quota = Math.max(maxEntriesAltogetherToForward - ensureEntriesPerTopic * topics.size, 0) - - topicBacklogs.map { - case (topicName: String, backLog: Long) => - // when calculating the coefficient, do not take the number of additional entries into - // account (that we will add anyway) - val topicBacklogCoefficient = if (completeBacklogSize == 0) { - 0.0 // do not forward if there is no backlog - } else { - // take the ensured entries into account when calculating - // backlog coefficient - val backlogWithoutAdditionalEntries = - Math.max(backLog - ensureEntriesPerTopic, 0).toFloat - val completeBacklogWithoutAdditionalEntries = - (completeBacklogSize - ensureEntriesPerTopic * topics.size).toFloat - backlogWithoutAdditionalEntries / completeBacklogWithoutAdditionalEntries - } - topicName -> (ensureEntriesPerTopic + (quota * topicBacklogCoefficient).toLong) - }.toMap - } -} diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtils.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtils.scala index 19761e01..f3f8c3a3 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtils.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtils.scala @@ -1,4 +1,4 @@ -/** +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at diff --git a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala similarity index 80% rename from src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala rename to src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala index ec75ba7b..fa970ab0 100644 --- a/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ForwardStrategy.scala +++ b/src/main/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicState.scala @@ -1,4 +1,4 @@ -/** +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -15,10 +15,6 @@ package org.apache.spark.sql.pulsar.topicinternalstats.forward import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats -trait ForwardStrategy { - def forward(topics: Map[String, TopicState]): Map[String, Long] -} - case class TopicState(internalStat: PersistentTopicInternalStats, - actualLedgerId: Long, - actualEntryId: Long) + ledgerId: Long, + entryId: Long) diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala deleted file mode 100644 index 993b383a..00000000 --- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LargeFirstForwardStrategySuite.scala +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -import TopicStateFixture.{createLedgerInfo, _} -import org.apache.spark.SparkFunSuite - -class LargeFirstForwardStrategySuite extends SparkFunSuite { - - test("forward empty topics") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat(), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(10, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 0) - } - - test("forward a single topic with a single ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(10, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 10) - } - - test("forward a single topic with multiple ledgers") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200) - ), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(350, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 350) - } - - test("forward a single topic with the biggest backlog") { - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 400), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 600), - ), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(15, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - assert(result("topic3") == 15) - assert(result("topic2") == 0) - assert(result("topic1") == 0) - } - - test("forward multiple topics if the backlog is small enough") { - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 20), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 40), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 60), - ), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(100, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - assert(result("topic3") == 60) - assert(result("topic2") == 40) - assert(result("topic1") == 0) - } - - test("forward by additional entries regardless of backlog size") { - val maxEntries = 130 - val additionalEntries = 10 - val topic1Backlog = 80 - val topic2Backlog = 60 - val topic3Backlog = 40 - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic1Backlog), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic2Backlog), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic3Backlog), - ), - 0, 0 - )) - val testForwarder = new LargeFirstForwardStrategy(maxEntries, additionalEntries) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - - assert(result("topic1") >= additionalEntries) - assert(result("topic2") >= additionalEntries) - assert(result("topic3") == additionalEntries) - - } - - test("additional entries to forward has a higher precedence than max allowed entries") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat(), - 0, 0 - )) - - val testForwarder = new LargeFirstForwardStrategy(10, 20) - val result = testForwarder.forward(fakeState) - - assert(result("topic1") == 20) - } - - test("forward from the middle of the first topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 1000, 20 - )) - val testForwarder = new LargeFirstForwardStrategy(80, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - - test("forward from the middle of the last topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200), - createLedgerInfo(3000, 200) - ), - 3000, 20 - )) - val testForwarder = new LargeFirstForwardStrategy(80, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - -} - diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala deleted file mode 100644 index 087356f8..00000000 --- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/LinearForwardStrategySuite.scala +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -import TopicStateFixture._ - -import org.apache.spark.SparkFunSuite - -class LinearForwardStrategySuite extends SparkFunSuite { - - test("forward empty topics") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat(), - 0, 0 - )) - val testForwarder = new LinearForwardStrategy(10) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 10) - } - - test("forward a single topic with a single ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 0, 0 - )) - val testForwarder = new LinearForwardStrategy(10) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 10) - } - - test("forward a single topic with multiple ledgers") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200) - ), - 0, 0 - )) - val testForwarder = new LinearForwardStrategy(350) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 350) - } - - test("forward multiple topics with single ledger") { - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - ), - 0, 0 - )) - val testForwarder = new LinearForwardStrategy(15) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - assert(result("topic1") == 5) - assert(result("topic2") == 5) - assert(result("topic3") == 5) - } - - test("forward from the middle of the first topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 1000, 20 - )) - val testForwarder = new LinearForwardStrategy(80) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - - test("forward from the middle of the last topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200), - createLedgerInfo(3000, 200) - ), - 3000, 20 - )) - val testForwarder = new LinearForwardStrategy(80) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - -} - diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala deleted file mode 100644 index d811871d..00000000 --- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/ProportionalForwardStrategySuite.scala +++ /dev/null @@ -1,238 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.pulsar.topicinternalstats.forward - -import TopicStateFixture._ - -import org.apache.spark.SparkFunSuite - -class ProportionalForwardStrategySuite extends SparkFunSuite { - - test("forward empty topics") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat(), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(10, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 0) - } - - test("forward a single topic with a single ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(10, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 10) - } - - test("forward a single topic with multiple ledgers") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200) - ), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(350, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 350) - } - - test("forward a single topic with the biggest backlog") { - val maxEntries = 12 - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 400), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 600), - ), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(maxEntries, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - assert(result("topic1") == (maxEntries.toFloat / 6.0).toInt) - assert(result("topic2") == (maxEntries.toFloat / 3.0).toInt) - assert(result("topic3") == (maxEntries.toFloat / 2.0).toInt) - } - - test("forward multiple topics at the same time") { - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 20), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 40), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 60), - ), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(100, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - assert(result("topic3") > 0) - assert(result("topic2") > 0) - assert(result("topic1") > 0) - } - - test("forward by additional entries regardless of backlog size") { - val maxEntries = 50 - val additionalEntries = 10 - val topic1Backlog = 10000 - val topic2Backlog = 20000 - val topic3Backlog = 10 - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic1Backlog), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic2Backlog), - ), - 0, 0 - ), - "topic3" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, topic3Backlog), - ), - 0, 0 - )) - val testForwarder = new ProportionalForwardStrategy(maxEntries, additionalEntries) - val result = testForwarder.forward(fakeState) - - assert(result.size == 3) - - assert(result("topic1") >= additionalEntries) - assert(result("topic2") >= additionalEntries) - assert(result("topic3") == additionalEntries) - - } - - test("additional entries to forward has a higher precedence than topic backlog size") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 10) - ), - 0, 0 - )) - - val testForwarder = new ProportionalForwardStrategy(10, 20) - val result = testForwarder.forward(fakeState) - - assert(result("topic1") == 20) - } - - test("take the additional entries into account when calculating individual topic forward ratio") { - val fakeState = Map( - "topic1" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 1000), - ), - 0, 0 - ), - "topic2" -> createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 2000), - ), - 0, 0 - )) - val numberOfFakeTopics = fakeState.size - val ensureAdditionalEntriesPerTopic = 500 - val entriesOnTopOfAdditionalEntries = 100 - val maxEntries = entriesOnTopOfAdditionalEntries + ensureAdditionalEntriesPerTopic * numberOfFakeTopics - - val testForwarder = new ProportionalForwardStrategy(maxEntries, ensureAdditionalEntriesPerTopic) - val result = testForwarder.forward(fakeState) - - assert(result("topic1") == - (entriesOnTopOfAdditionalEntries.toFloat / 4.0).toInt - + ensureAdditionalEntriesPerTopic) - assert(result("topic2") == - (entriesOnTopOfAdditionalEntries.toFloat * 3.0 / 4.0).toInt - + ensureAdditionalEntriesPerTopic) - } - - test("forward from the middle of the first topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200) - ), - 1000, 20 - )) - val testForwarder = new ProportionalForwardStrategy(80, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - - test("forward from the middle of the last topic ledger") { - val fakeState = Map( "topic1" -> - createTopicState( - createPersistentTopicInternalStat( - createLedgerInfo(1000, 200), - createLedgerInfo(2000, 200), - createLedgerInfo(3000, 200) - ), - 3000, 20 - )) - val testForwarder = new ProportionalForwardStrategy(80, 0) - val result = testForwarder.forward(fakeState) - - assert(result.size == 1) - assert(result("topic1") == 80) - } - -} diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtilsSuite.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtilsSuite.scala index 8573c125..9aa03b04 100644 --- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtilsSuite.scala +++ b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicInternalStatsUtilsSuite.scala @@ -1,4 +1,4 @@ -/** +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at diff --git a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicStateTestFixture.scala b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicStateTestFixture.scala index ff4d4ff6..05c003ac 100644 --- a/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicStateTestFixture.scala +++ b/src/test/scala/org/apache/spark/sql/pulsar/topicinternalstats/forward/TopicStateTestFixture.scala @@ -1,4 +1,4 @@ -/** +/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -13,10 +13,10 @@ */ package org.apache.spark.sql.pulsar.topicinternalstats.forward -import java.util - +import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.LedgerInfo import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo + +import java.util object TopicStateFixture {