From f13be7e655e601e1368cf58de83c1e0590816c93 Mon Sep 17 00:00:00 2001 From: Chris Jansen Date: Thu, 19 Jan 2023 09:50:26 +0000 Subject: [PATCH] Add traced implementation - restore client interface - add tracing implementation --- build.sbt | 11 + .../TracedFunctionalBigtableDataClient.scala | 194 ++++++++++++++++++ .../data/FunctionalBigtableDataClient.scala | 193 ++++++++++------- 3 files changed, 320 insertions(+), 78 deletions(-) create mode 100644 functional-google-cloud-bigtable-trace4cats/src/main/scala/com/permutive/google/bigtable/data/trace4cats/TracedFunctionalBigtableDataClient.scala diff --git a/build.sbt b/build.sbt index ebf94b5..3ea72c5 100644 --- a/build.sbt +++ b/build.sbt @@ -37,6 +37,7 @@ lazy val root = functionalGax, functionalGoogleCloudBigtable, functionalGoogleCloudBigtablePureconfig, + functionalGoogleCloudBigtableTrace4Cats, testkitMunitBigtable ) @@ -101,6 +102,16 @@ lazy val functionalGoogleCloudBigtablePureconfig = project ) .dependsOn(functionalGoogleCloudBigtable) +lazy val functionalGoogleCloudBigtableTrace4Cats = project + .in(file("functional-google-cloud-bigtable-trace4cats")) + .settings( + name := "functional-google-cloud-bigtable-trace4cats", + libraryDependencies ++= Seq( + "io.janstenpickle" %%% "trace4cats-core" % "0.14.1" + ) + ) + .dependsOn(functionalGoogleCloudBigtable) + lazy val testkitMunitBigtable = project .in(file("testkit-munit-bigtable")) .settings( diff --git a/functional-google-cloud-bigtable-trace4cats/src/main/scala/com/permutive/google/bigtable/data/trace4cats/TracedFunctionalBigtableDataClient.scala b/functional-google-cloud-bigtable-trace4cats/src/main/scala/com/permutive/google/bigtable/data/trace4cats/TracedFunctionalBigtableDataClient.scala new file mode 100644 index 0000000..7b1165a --- /dev/null +++ b/functional-google-cloud-bigtable-trace4cats/src/main/scala/com/permutive/google/bigtable/data/trace4cats/TracedFunctionalBigtableDataClient.scala @@ -0,0 +1,194 @@ +package com.permutive.google.bigtable.data.trace4cats + +import cats.FlatMap +import cats.data.Kleisli +import cats.effect.Async +import cats.effect.kernel.Resource +import cats.implicits._ +import com.google.api.gax.grpc.GrpcCallContext +import com.google.cloud.bigtable.data.v2.models._ +import com.permutive.google.bigtable.data.FunctionalBigtableDataClient.RowKeyByteString +import com.permutive.google.bigtable.data.{BigtableDataClientSettings, FunctionalBigtableDataClient} +import com.permutive.google.gax.FunctionalGax.FunctionalBatcher +import fs2.Chunk +import trace4cats.model.AttributeValue +import trace4cats.model.AttributeValue.StringValue +import trace4cats.{SpanKind, Trace} + +class TracedFunctionalBigtableDataClient[F[_]: Trace: FlatMap] private ( + settings: BigtableDataClientSettings[F], + underlying: FunctionalBigtableDataClient[F] +) extends FunctionalBigtableDataClient[F] { + private val trace = Trace[F] + private val spanPrefix: String = "Bigtable" + private val labelRowKey = "bigtable.rowKey" + private val labelInstance = "bigtable.instance" + private val labelTable = "bigtable.table" + private val labelOperation = "bigtable.operation" + + implicit private def rowKeyByteStringToTraceValue(value: => RowKeyByteString): AttributeValue = StringValue( + value.toStringUtf8 + ) + + override def exists(tableId: String, rowKey: String): F[Boolean] = + trace.span(s"$spanPrefix exists($tableId, ...)", SpanKind.Client)( + trace.putAll( + labelRowKey -> rowKey, + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "exists" + ) >> underlying.exists(tableId, rowKey) + ) + + override def exists(tableId: String, rowKey: RowKeyByteString): F[Boolean] = + trace.span(s"$spanPrefix exists($tableId, ...)", SpanKind.Client)( + trace.putAll( + labelRowKey -> rowKey, + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "exists" + ) >> underlying.exists(tableId: String, rowKey) + ) + + override def readRow(tableId: String, rowKey: String, filter: Option[Filters.Filter]): F[Option[Row]] = + trace.span(s"$spanPrefix readRow($tableId, ...)", SpanKind.Client)( + trace.putAll( + labelRowKey -> rowKey, + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "readRow" + ) >> underlying.readRow(tableId, rowKey, filter) + ) + + override def readRow(tableId: String, rowKey: RowKeyByteString, filter: Option[Filters.Filter]): F[Option[Row]] = + trace.span(s"$spanPrefix readRow($tableId, ...)", SpanKind.Client)( + trace.putAll( + labelRowKey -> rowKey, + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "readRow" + ) >> underlying.readRow(tableId, rowKey, filter) + ) + + /** @inheritdoc + * + * Note that, because of the difficultly in tracing streams, a span is only open whilst the stream is being + * _created_. This is likely to be instantaneous, and will not track the time spent consuming each item of the + * stream. + */ + override def readRows(query: Query, streamChunkSize: Int): fs2.Stream[F, Row] = + fs2.Stream + .eval( + trace.span(s"$spanPrefix readRows(...)", SpanKind.Client)( + trace + .putAll( + labelInstance -> settings.instanceId, + "bigtable.query" -> query.toString, + labelOperation -> "readRows" + ) + .map(_ => underlying.readRows(query, streamChunkSize)) + ) + ) + .flatten + + override def sampleRowKeys(tableId: String): F[Chunk[KeyOffset]] = + trace.span(s"$spanPrefix sampleRowKeys($tableId)", SpanKind.Client)( + trace.putAll( + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "sampleRowKeys" + ) >> underlying.sampleRowKeys(tableId) + ) + + override def mutateRow(rowMutation: RowMutation): F[Unit] = + trace.span(s"$spanPrefix mutateRow(...)", SpanKind.Client)( + trace.putAll( + labelInstance -> settings.instanceId, + labelOperation -> "mutateRow" + ) >> underlying.mutateRow(rowMutation) + ) + + override def bulkMutateRows(bulkMutation: BulkMutation): F[Unit] = + trace.span(s"$spanPrefix bulkMutateRows(...)", SpanKind.Client)( + underlying.bulkMutateRows(bulkMutation) + ) + + override def bulkMutateRowsBatcher( + tableId: String, + grpcCallContext: Option[GrpcCallContext] + ): Resource[F, FunctionalBatcher[F, RowMutationEntry, Unit]] = + underlying + .bulkMutateRowsBatcher(tableId, grpcCallContext) + .map(underlyingBatcher => + Kleisli(mutationEntry => + underlyingBatcher + .apply(mutationEntry) + .map(completionF => + trace.span(s"$spanPrefix bulkMutateRows($tableId)", SpanKind.Client)( + trace.putAll( + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "bulkMutateRows" + ) >> completionF + ) + ) + ) + ) + + override def bulkReadRowsBatcher( + tableId: String, + filter: Option[Filters.Filter], + grpcCallContext: Option[GrpcCallContext] + ): Resource[F, FunctionalBatcher[F, RowKeyByteString, Option[Row]]] = + underlying + .bulkReadRowsBatcher(tableId, filter, grpcCallContext) + .map(underlyingBatcher => + Kleisli(rowKey => + underlyingBatcher + .apply(rowKey) + .map(resultF => + trace.span(s"$spanPrefix bulkReadRows($tableId)", SpanKind.Client)( + trace.putAll( + labelRowKey -> rowKey, + labelInstance -> settings.instanceId, + labelTable -> tableId, + labelOperation -> "bulkReadRows" + ) >> resultF + ) + ) + ) + ) + + override def checkAndMutateRow(mutation: ConditionalRowMutation): F[Boolean] = + trace.span(s"$spanPrefix checkAndMutateRow(...)", SpanKind.Client)( + trace.putAll( + labelInstance -> settings.instanceId, + labelOperation -> "checkAndMutateRow" + ) >> underlying.checkAndMutateRow(mutation) + ) + + override def readModifyWriteRow(mutation: ReadModifyWriteRow): F[Row] = + trace.span(s"$spanPrefix readModifyWriteRow(...)", SpanKind.Client)( + trace.putAll( + labelInstance -> settings.instanceId, + labelOperation -> "readModifyWriteRow" + ) >> underlying.readModifyWriteRow(mutation) + ) +} + +/** Wraps a [[FunctionalBigtableDataClient]], adding trace spans and labels for each call. + */ +object TracedFunctionalBigtableDataClient { + def resource[F[_]: Trace: Async]( + settings: BigtableDataClientSettings[F] + ): Resource[F, TracedFunctionalBigtableDataClient[F]] = + FunctionalBigtableDataClient + .resource(settings) + .map(TracedFunctionalBigtableDataClient(settings, _)) + + def apply[F[_]: Trace: FlatMap]( + settings: BigtableDataClientSettings[F], + underlying: FunctionalBigtableDataClient[F] + ): TracedFunctionalBigtableDataClient[F] = new TracedFunctionalBigtableDataClient[F](settings, underlying) + +} diff --git a/functional-google-cloud-bigtable/src/main/scala/com/permutive/google/bigtable/data/FunctionalBigtableDataClient.scala b/functional-google-cloud-bigtable/src/main/scala/com/permutive/google/bigtable/data/FunctionalBigtableDataClient.scala index 7c73e0c..7d39e6c 100644 --- a/functional-google-cloud-bigtable/src/main/scala/com/permutive/google/bigtable/data/FunctionalBigtableDataClient.scala +++ b/functional-google-cloud-bigtable/src/main/scala/com/permutive/google/bigtable/data/FunctionalBigtableDataClient.scala @@ -26,6 +26,7 @@ import com.google.cloud.bigtable.data.v2.models.Filters.Filter import com.google.cloud.bigtable.data.v2.models._ import com.google.cloud.bigtable.data.v2.{BigtableDataClient => JBigtableDataClient} import com.google.protobuf.ByteString +import com.permutive.google.bigtable.data.FunctionalBigtableDataClient.RowKeyByteString import com.permutive.google.gax.FunctionalGax import com.permutive.google.gax.FunctionalGax.FunctionalBatcher import fs2.{Chunk, Stream} @@ -47,32 +48,22 @@ import scala.jdk.CollectionConverters._ * Unfortunately ScalaDoc cannot link to the exact methods in `BigtableDataClient` (as that is a Java class). Instead * links just go to documentation for the class, with the correct method name as the text. */ -sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( - dataClient: JBigtableDataClient -) { - /** Type alias to make intentions clearer in `FunctionalBatcher` */ - type RowKeyByteString = ByteString +trait FunctionalBigtableDataClient[F[_]] { /** Checks if a given row exists or not. * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#exists]] */ - def exists(tableId: String, rowKey: String): F[Boolean] = - delayConvert(dataClient.existsAsync(tableId, rowKey)) - // Convert Java => Scala, doesn't "just work" unfortunately - .map(b => b) + def exists(tableId: String, rowKey: String): F[Boolean] /** Checks if a given row exists or not. * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#exists]] */ - def exists(tableId: String, rowKey: ByteString): F[Boolean] = - delayConvert(dataClient.existsAsync(tableId, rowKey)) - // Convert Java => Scala, doesn't "just work" unfortunately - .map(b => b) + def exists(tableId: String, rowKey: ByteString): F[Boolean] /** Read a single row with an optional filter. * @@ -85,10 +76,7 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( tableId: String, rowKey: String, filter: Option[Filter] - ): F[Option[Row]] = - delayConvert(dataClient.readRowAsync(tableId, rowKey, filter.orNull)) - // Result can be null - .map(Option(_)) + ): F[Option[Row]] /** Read a single row with an optional filter. * @@ -101,10 +89,7 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( tableId: String, rowKey: ByteString, filter: Option[Filter] - ): F[Option[Row]] = - delayConvert(dataClient.readRowAsync(tableId, rowKey, filter.orNull)) - // Result can be null - .map(Option(_)) + ): F[Option[Row]] /** Stream a series of result rows from the provided query. * @@ -113,36 +98,26 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#readRows]] */ - def readRows(query: Query, streamChunkSize: Int): Stream[F, Row] = - delayStream(dataClient.readRows(query), streamChunkSize) + def readRows(query: Query, streamChunkSize: Int): Stream[F, Row] /** Sample the row keys present in the table. Returned keys delimit contiguous sections, approximately equal in size. * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#sampleRowKeys]] - * * @note * this returns a [[fs2.Chunk Chunk]] because it's the easiest immutable collection to return which wraps the * underlying Java list; other collections iterate to construct. */ - def sampleRowKeys(tableId: String): F[Chunk[KeyOffset]] = - delayConvert(dataClient.sampleRowKeysAsync(tableId)).map(jList => Chunk.iterable(jList.asScala)) + def sampleRowKeys(tableId: String): F[Chunk[KeyOffset]] /** Mutate a single row atomically. * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#mutateRow]] */ - def mutateRow(rowMutation: RowMutation): F[Unit] = - delayConvert(dataClient.mutateRowAsync(rowMutation)).void + def mutateRow(rowMutation: RowMutation): F[Unit] - /** Mutate multiple rows in a batch. Each individual row is mutated atomically. - * - * @see - * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#bulkMutateRows]] - */ - def bulkMutateRows(bulkMutation: BulkMutation): F[Unit] = - delayConvert(dataClient.bulkMutateRowsAsync(bulkMutation)).void + def bulkMutateRows(bulkMutation: BulkMutation): F[Unit] /** Create a `FunctionalBatcher` which mutates multiple rows in a batch. Each individual row is mutated atomically. * @@ -168,10 +143,8 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#newBulkMutationBatcher]] - * * @note * Closing the resource may semantically block for some time as it flushes the current batch and awaits results - * * @note * `BigtableDataSettings.Builder.enableBatchMutationLatencyBasedThrottling` may be useful as well. It dynamically * alters the number of in-flight requests to achieve a target RPC latency. To use this setting you will manually @@ -183,12 +156,7 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( def bulkMutateRowsBatcher( tableId: String, grpcCallContext: Option[GrpcCallContext] = None - ): Resource[F, FunctionalBatcher[F, RowMutationEntry, Unit]] = - delayBatcher( - dataClient.newBulkMutationBatcher(tableId, grpcCallContext.orNull) - ) - // We need to define a new batcher to convert the Java `void` to a unit - .map(_.map(_.void)) + ): Resource[F, FunctionalBatcher[F, RowMutationEntry, Unit]] /** Create a `FunctionalBatcher` which reads multiple rows, with an optional filter, in a batch. * @@ -217,7 +185,6 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#newBulkReadRowsBatcher]] - * * @note * Closing the resource may semantically block for some time as it flushes the current batch and awaits results */ @@ -225,16 +192,7 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( tableId: String, filter: Option[Filter], grpcCallContext: Option[GrpcCallContext] = None - ): Resource[F, FunctionalBatcher[F, RowKeyByteString, Option[Row]]] = - delayBatcher( - dataClient.newBulkReadRowsBatcher( - tableId, - filter.orNull, - grpcCallContext.orNull - ) - ) - // We need to define a new batcher to handle the case where the row returned is null - .map(_.map(_.map(Option(_)))) + ): Resource[F, FunctionalBatcher[F, RowKeyByteString, Option[Row]]] /** Mutate a row atomically based on the output of a filter. * @@ -243,45 +201,124 @@ sealed abstract class FunctionalBigtableDataClient[F[_]: Async] private ( */ def checkAndMutateRow( mutation: ConditionalRowMutation - ): F[Boolean] = - delayConvert(dataClient.checkAndMutateRowAsync(mutation)) - // Convert Java Boolean => Scala, doesn't "just work" unfortunately - .map(b => b) + ): F[Boolean] /** Modify a row atomically on the server based on its current value. Returns the new contents of all modified cells. * * @see * [[com.google.cloud.bigtable.data.v2.BigtableDataClient BigtableDataClient#readModifyWriteRow]] */ - def readModifyWriteRow(mutation: ReadModifyWriteRow): F[Row] = - // [Ben: 2020-06-17] Underlying method does _not_ state the response can be null. It says it returns "the new - // contents of all modified cells". - // Chasing the code a bit it seems to fill a default stub row if no changes were made - delayConvert(dataClient.readModifyWriteRowAsync(mutation)) - - private def delayConvert[A](a: => ApiFuture[A]): F[A] = - FunctionalGax.convertApiFuture(Sync[F].delay(a)) - - private def delayStream[A]( - a: => ServerStream[A], - chunkSize: Int - ): Stream[F, A] = - FunctionalGax.convertServerStream(Sync[F].delay(a), chunkSize) - - private def delayBatcher[A, B]( - a: => Batcher[A, B] - ): Resource[F, FunctionalBatcher[F, A, B]] = - FunctionalGax.convertBatcher(Sync[F].delay(a)) + def readModifyWriteRow(mutation: ReadModifyWriteRow): F[Row] } object FunctionalBigtableDataClient { + /** Type alias to make intentions clearer in `FunctionalBatcher` */ + type RowKeyByteString = ByteString + /** Construct a [[FunctionalBigtableDataClient]] given the provided settings for the underlying client. */ def resource[F[_]: Async]( dataClientSettings: BigtableDataClientSettings[F] ): Resource[F, FunctionalBigtableDataClient[F]] = BigtableDataClientResource(dataClientSettings).map( - new FunctionalBigtableDataClient[F](_) {} + new FunctionalBigtableDataClientImpl[F](_) ) + + private class FunctionalBigtableDataClientImpl[F[_]: Async]( + dataClient: JBigtableDataClient + ) extends FunctionalBigtableDataClient[F] { + + override def exists(tableId: String, rowKey: String): F[Boolean] = + delayConvert(dataClient.existsAsync(tableId, rowKey)) + // Convert Java => Scala, doesn't "just work" unfortunately + .map(b => b) + + override def exists(tableId: String, rowKey: ByteString): F[Boolean] = + delayConvert(dataClient.existsAsync(tableId, rowKey)) + // Convert Java => Scala, doesn't "just work" unfortunately + .map(b => b) + + override def readRow( + tableId: String, + rowKey: String, + filter: Option[Filter] + ): F[Option[Row]] = + delayConvert(dataClient.readRowAsync(tableId, rowKey, filter.orNull)) + // Result can be null + .map(Option(_)) + + override def readRow( + tableId: String, + rowKey: ByteString, + filter: Option[Filter] + ): F[Option[Row]] = + delayConvert(dataClient.readRowAsync(tableId, rowKey, filter.orNull)) + // Result can be null + .map(Option(_)) + + override def readRows(query: Query, streamChunkSize: Int): Stream[F, Row] = + delayStream(dataClient.readRows(query), streamChunkSize) + + override def sampleRowKeys(tableId: String): F[Chunk[KeyOffset]] = + delayConvert(dataClient.sampleRowKeysAsync(tableId)).map(jList => Chunk.iterable(jList.asScala)) + + override def mutateRow(rowMutation: RowMutation): F[Unit] = + delayConvert(dataClient.mutateRowAsync(rowMutation)).void + + override def bulkMutateRows(bulkMutation: BulkMutation): F[Unit] = + delayConvert(dataClient.bulkMutateRowsAsync(bulkMutation)).void + + override def bulkMutateRowsBatcher( + tableId: String, + grpcCallContext: Option[GrpcCallContext] = None + ): Resource[F, FunctionalBatcher[F, RowMutationEntry, Unit]] = + delayBatcher( + dataClient.newBulkMutationBatcher(tableId, grpcCallContext.orNull) + ) + // We need to define a new batcher to convert the Java `void` to a unit + .map(_.map(_.void)) + + override def bulkReadRowsBatcher( + tableId: String, + filter: Option[Filter], + grpcCallContext: Option[GrpcCallContext] = None + ): Resource[F, FunctionalBatcher[F, RowKeyByteString, Option[Row]]] = + delayBatcher( + dataClient.newBulkReadRowsBatcher( + tableId, + filter.orNull, + grpcCallContext.orNull + ) + ) + // We need to define a new batcher to handle the case where the row returned is null + .map(_.map(_.map(Option(_)))) + + override def checkAndMutateRow( + mutation: ConditionalRowMutation + ): F[Boolean] = + delayConvert(dataClient.checkAndMutateRowAsync(mutation)) + // Convert Java Boolean => Scala, doesn't "just work" unfortunately + .map(b => b) + + override def readModifyWriteRow(mutation: ReadModifyWriteRow): F[Row] = + // [Ben: 2020-06-17] Underlying method does _not_ state the response can be null. It says it returns "the new + // contents of all modified cells". + // Chasing the code a bit it seems to fill a default stub row if no changes were made + delayConvert(dataClient.readModifyWriteRowAsync(mutation)) + + private def delayConvert[A](a: => ApiFuture[A]): F[A] = + FunctionalGax.convertApiFuture(Sync[F].delay(a)) + + private def delayStream[A]( + a: => ServerStream[A], + chunkSize: Int + ): Stream[F, A] = + FunctionalGax.convertServerStream(Sync[F].delay(a), chunkSize) + + private def delayBatcher[A, B]( + a: => Batcher[A, B] + ): Resource[F, FunctionalBatcher[F, A, B]] = + FunctionalGax.convertBatcher(Sync[F].delay(a)) + } }