Skip to content

Commit

Permalink
Merge pull request #344 from permutive/larger-chunks-ce2
Browse files Browse the repository at this point in the history
  • Loading branch information
CremboC authored Jan 7, 2022
2 parents 326fb8e + c3dd1d8 commit 71f078f
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 24 deletions.
1 change: 0 additions & 1 deletion .bsp/sbt.json

This file was deleted.

9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ target/
.idea
.idea_modules

.bsp
.bsp/

.bloop
.metals
.vscode
project/.bloop
project/metals.sbt
project/project
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ object PubsubGoogleConsumer {
): Stream[F, ConsumerRecord[F, A]] =
PubsubSubscriber
.subscribe(blocker, projectId, subscription, config)
.flatMap { case internal.Model.Record(msg, ack, nack) =>
.evalMapChunk { case internal.Model.Record(msg, ack, nack) =>
MessageDecoder[A].decode(msg.getData.toByteArray) match {
case Left(e) => Stream.eval_(errorHandler(msg, e, ack, nack))
case Left(e) =>
errorHandler(msg, e, ack, nack).as(none[ConsumerRecord[F, A]])
case Right(v) =>
Stream.emit(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit))
ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit).some.pure[F]
}
}
.unNone

/**
* Subscribe with automatic acknowledgement
Expand All @@ -67,12 +69,13 @@ object PubsubGoogleConsumer {
): Stream[F, A] =
PubsubSubscriber
.subscribe(blocker, projectId, subscription, config)
.flatMap { case internal.Model.Record(msg, ack, nack) =>
.evalMapChunk { case internal.Model.Record(msg, ack, nack) =>
MessageDecoder[A].decode(msg.getData.toByteArray) match {
case Left(e) => Stream.eval_(errorHandler(msg, e, ack, nack))
case Right(v) => Stream.eval(ack >> v.pure)
case Left(e) => errorHandler(msg, e, ack, nack).as(none[A])
case Right(v) => ack >> v.some.pure
}
}
.unNone

/**
* Subscribe to the raw stream, receiving the the message as retrieved from PubSub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumer.InternalPubSubError
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumerConfig
import com.permutive.pubsub.consumer.{Model => PublicModel}
import fs2.Stream
import fs2.{Chunk, Stream}
import org.threeten.bp.Duration

import scala.collection.JavaConverters._

import java.util
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}

private[consumer] object PubsubSubscriber {
Expand Down Expand Up @@ -70,11 +73,19 @@ private[consumer] object PubsubSubscriber {
queue.put(Left(InternalPubSubError(failure)))
}

def takeNextElement[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[A] =
def takeNextElements[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[Chunk[A]] =
for {
nextOpt <- Sync[F].delay(Option(messages.poll())) // `poll` is non-blocking, returning `null` if queue is empty
next <- nextOpt.fold(blocker.delay(messages.take()))(Applicative[F].pure) // `take` can wait for an element
} yield next
nextOpt <- Sync[F].delay(messages.poll()) // `poll` is non-blocking, returning `null` if queue is empty
// `take` can wait for an element
next <- if (nextOpt == null) blocker.delay(messages.take()) else Applicative[F].pure(nextOpt)
chunk <- Sync[F].delay {
val elements = new util.ArrayList[A]
elements.add(next)
messages.drainTo(elements)

Chunk.buffer(elements.asScala)
}
} yield chunk

def subscribe[F[_]: Sync: ContextShift](
blocker: Blocker,
Expand All @@ -83,12 +94,12 @@ private[consumer] object PubsubSubscriber {
config: PubsubGoogleConsumerConfig[F],
): Stream[F, Model.Record[F]] =
for {

queue <- Stream.eval(
Sync[F].delay(new LinkedBlockingQueue[Either[InternalPubSubError, Model.Record[F]]](config.maxQueueSize))
)
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker))
next <- Stream.repeatEval(takeNextElement(queue, blocker))
msg <- Stream.fromEither[F](next)
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker))
taken <- Stream.repeatEval(takeNextElements(queue, blocker))
// Only retains the first error (if there are multiple), but that is OK, the stream is failing anyway...
msg <- Stream.fromEither[F](taken.sequence).flatMap(Stream.chunk)
} yield msg
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ object PubsubHttpConsumer {
): Stream[F, ConsumerRecord[F, A]] =
PubsubSubscriber
.subscribe(projectId, subscription, serviceAccountPath, config, httpClient, httpClientRetryPolicy)
.flatMap { record =>
.evalMapChunk { record =>
MessageDecoder[A].decode(Base64.getDecoder.decode(record.value.data.getBytes)) match {
case Left(e) => Stream.eval_(errorHandler(record.value, e, record.ack, record.nack))
case Right(v) => Stream.emit(record.toConsumerRecord(v))
case Left(e) => errorHandler(record.value, e, record.ack, record.nack).as(none[ConsumerRecord[F, A]])
case Right(v) => record.toConsumerRecord(v).some.pure
}
}
.unNone

/**
* Subscribe with automatic acknowledgement
Expand All @@ -74,12 +75,13 @@ object PubsubHttpConsumer {
): Stream[F, A] =
PubsubSubscriber
.subscribe(projectId, subscription, serviceAccountPath, config, httpClient, httpClientRetryPolicy)
.flatMap { record =>
.evalMapChunk { record =>
MessageDecoder[A].decode(Base64.getDecoder.decode(record.value.data.getBytes)) match {
case Left(e) => Stream.eval_(errorHandler(record.value, e, record.ack, record.nack))
case Right(v) => Stream.eval(record.ack >> v.pure)
case Left(e) => errorHandler(record.value, e, record.ack, record.nack).as(none[A])
case Right(v) => record.ack >> v.some.pure
}
}
.unNone

/**
* Subscribe to the raw stream, receiving the the message as retrieved from PubSub
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import com.google.cloud.pubsub.v1.{SubscriptionAdminClient, TopicAdminClient}
import com.google.pubsub.v1.{ProjectSubscriptionName, TopicName}
import com.permutive.pubsub.consumer.ConsumerRecord
import com.permutive.pubsub.consumer.http.Example.ValueHolder
import com.permutive.pubsub.producer.PubsubProducer
import com.permutive.pubsub.producer.Model.SimpleRecord
import com.permutive.pubsub.producer.{Model, PubsubProducer}
import fs2.Stream
import io.chrisdavenport.log4cats.Logger
import io.chrisdavenport.log4cats.slf4j.Slf4jLogger
Expand Down Expand Up @@ -169,4 +170,39 @@ class PingPongSpec extends PubSubSpec with BeforeAndAfterEach {
.unsafeRunSync()
}

private def consumeExpectingChunksize(
client: Client[IO],
elementsReceived: Ref[IO, Int],
chunkSizeExpected: Int,
): Stream[IO, ConsumerRecord[IO, ValueHolder]] =
consumer(client).chunks
.evalTap(c =>
IO.raiseError(new RuntimeException(s"Chunks were of the wrong size, received ${c.size}"))
.unlessA(c.size == chunkSizeExpected)
)
.flatMap(Stream.chunk)
.evalTap(_ => elementsReceived.update(_ + 1))
.evalTap(_.ack)

it should "preserve chunksize in the underlying stream" in {
val messagesToSend = 5

(for {
// We will sleep for 10 seconds, which means if the message is not acked it will be redelivered before end of test
(client, producer) <- Stream.resource(setup(ackDeadlineSeconds = 5))
_ <- Stream.eval(
// This must be produced using `produceMany` otherwise the returned elements are in individual chunks
producer.produceMany(List.fill[Model.Record[ValueHolder]](messagesToSend)(SimpleRecord(ValueHolder("ping"))))
)
ref <- Stream.eval(Ref.of[IO, Int](0))
// Wait 10 seconds whilst we run the consumer to check we have received all elements in a single chunk
_ <- Stream.sleep[IO](10.seconds).concurrently(consumeExpectingChunksize(client, ref, messagesToSend))
elementsReceived <- Stream.eval(ref.get)
} yield elementsReceived should ===(messagesToSend))
.as(ExitCode.Success)
.compile
.drain
.unsafeRunSync()
}

}

0 comments on commit 71f078f

Please sign in to comment.