Skip to content

Commit

Permalink
Merge pull request #3 from permutive-engineering/trace4cats
Browse files Browse the repository at this point in the history
  • Loading branch information
janstenpickle authored Jan 19, 2023
2 parents ee30fd0 + c6ffc73 commit 76443e0
Show file tree
Hide file tree
Showing 4 changed files with 336 additions and 75 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ jobs:

- name: Make target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: mkdir -p testkit-munit-bigtable/target target functional-google-cloud-bigtable/target functional-gax/target .js/target .jvm/target .native/target functional-google-cloud-bigtable-pureconfig/target project/target
run: mkdir -p testkit-munit-bigtable/target target functional-google-cloud-bigtable/target functional-gax/target functional-google-cloud-bigtable-trace4cats/target .js/target .jvm/target .native/target functional-google-cloud-bigtable-pureconfig/target project/target

- name: Compress target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
run: tar cf targets.tar testkit-munit-bigtable/target target functional-google-cloud-bigtable/target functional-gax/target .js/target .jvm/target .native/target functional-google-cloud-bigtable-pureconfig/target project/target
run: tar cf targets.tar testkit-munit-bigtable/target target functional-google-cloud-bigtable/target functional-gax/target functional-google-cloud-bigtable-trace4cats/target .js/target .jvm/target .native/target functional-google-cloud-bigtable-pureconfig/target project/target

- name: Upload target directories
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main')
Expand Down
11 changes: 11 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ lazy val root =
functionalGax,
functionalGoogleCloudBigtable,
functionalGoogleCloudBigtablePureconfig,
functionalGoogleCloudBigtableTrace4Cats,
testkitMunitBigtable
)

Expand Down Expand Up @@ -101,6 +102,16 @@ lazy val functionalGoogleCloudBigtablePureconfig = project
)
.dependsOn(functionalGoogleCloudBigtable)

lazy val functionalGoogleCloudBigtableTrace4Cats = project
.in(file("functional-google-cloud-bigtable-trace4cats"))
.settings(
name := "functional-google-cloud-bigtable-trace4cats",
libraryDependencies ++= Seq(
"io.janstenpickle" %%% "trace4cats-core" % "0.14.1"
)
)
.dependsOn(functionalGoogleCloudBigtable)

lazy val testkitMunitBigtable = project
.in(file("testkit-munit-bigtable"))
.settings(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Copyright 2022 Permutive
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.permutive.google.bigtable.data.trace4cats

import cats.FlatMap
import cats.data.Kleisli
import cats.effect.Async
import cats.effect.kernel.Resource
import cats.implicits._
import com.google.api.gax.grpc.GrpcCallContext
import com.google.cloud.bigtable.data.v2.models._
import com.permutive.google.bigtable.data.FunctionalBigtableDataClient.RowKeyByteString
import com.permutive.google.bigtable.data.{BigtableDataClientSettings, FunctionalBigtableDataClient}
import com.permutive.google.gax.FunctionalGax.FunctionalBatcher
import fs2.Chunk
import trace4cats.model.AttributeValue
import trace4cats.model.AttributeValue.StringValue
import trace4cats.{SpanKind, Trace}

class TracedFunctionalBigtableDataClient[F[_]: Trace: FlatMap] private (
settings: BigtableDataClientSettings[F],
underlying: FunctionalBigtableDataClient[F]
) extends FunctionalBigtableDataClient[F] {
private val trace = Trace[F]
private val spanPrefix: String = "Bigtable"
private val labelRowKey = "bigtable.rowKey"
private val labelInstance = "bigtable.instance"
private val labelTable = "bigtable.table"
private val labelOperation = "bigtable.operation"

implicit private def rowKeyByteStringToTraceValue(value: => RowKeyByteString): AttributeValue = StringValue(
value.toStringUtf8
)

override def exists(tableId: String, rowKey: String): F[Boolean] =
trace.span(s"$spanPrefix exists($tableId, ...)", SpanKind.Client)(
trace.putAll(
labelRowKey -> rowKey,
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "exists"
) >> underlying.exists(tableId, rowKey)
)

override def exists(tableId: String, rowKey: RowKeyByteString): F[Boolean] =
trace.span(s"$spanPrefix exists($tableId, ...)", SpanKind.Client)(
trace.putAll(
labelRowKey -> rowKey,
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "exists"
) >> underlying.exists(tableId: String, rowKey)
)

override def readRow(tableId: String, rowKey: String, filter: Option[Filters.Filter]): F[Option[Row]] =
trace.span(s"$spanPrefix readRow($tableId, ...)", SpanKind.Client)(
trace.putAll(
labelRowKey -> rowKey,
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "readRow"
) >> underlying.readRow(tableId, rowKey, filter)
)

override def readRow(tableId: String, rowKey: RowKeyByteString, filter: Option[Filters.Filter]): F[Option[Row]] =
trace.span(s"$spanPrefix readRow($tableId, ...)", SpanKind.Client)(
trace.putAll(
labelRowKey -> rowKey,
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "readRow"
) >> underlying.readRow(tableId, rowKey, filter)
)

/** Note that, because of the difficultly in tracing streams, a span is only open whilst the stream is being
* _created_. This is likely to be instantaneous, and will not track the time spent consuming each item of the
* stream.
*/
override def readRows(query: Query, streamChunkSize: Int): fs2.Stream[F, Row] =
fs2.Stream
.eval(
trace.span(s"$spanPrefix readRows(...)", SpanKind.Client)(
trace
.putAll(
labelInstance -> settings.instanceId,
"bigtable.query" -> query.toString,
labelOperation -> "readRows"
)
.map(_ => underlying.readRows(query, streamChunkSize))
)
)
.flatten

override def sampleRowKeys(tableId: String): F[Chunk[KeyOffset]] =
trace.span(s"$spanPrefix sampleRowKeys($tableId)", SpanKind.Client)(
trace.putAll(
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "sampleRowKeys"
) >> underlying.sampleRowKeys(tableId)
)

override def mutateRow(rowMutation: RowMutation): F[Unit] =
trace.span(s"$spanPrefix mutateRow(...)", SpanKind.Client)(
trace.putAll(
labelInstance -> settings.instanceId,
labelOperation -> "mutateRow"
) >> underlying.mutateRow(rowMutation)
)

override def bulkMutateRows(bulkMutation: BulkMutation): F[Unit] =
trace.span(s"$spanPrefix bulkMutateRows(...)", SpanKind.Client)(
underlying.bulkMutateRows(bulkMutation)
)

override def bulkMutateRowsBatcher(
tableId: String,
grpcCallContext: Option[GrpcCallContext]
): Resource[F, FunctionalBatcher[F, RowMutationEntry, Unit]] =
underlying
.bulkMutateRowsBatcher(tableId, grpcCallContext)
.map(underlyingBatcher =>
Kleisli(mutationEntry =>
underlyingBatcher
.apply(mutationEntry)
.map(completionF =>
trace.span(s"$spanPrefix bulkMutateRows($tableId)", SpanKind.Client)(
trace.putAll(
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "bulkMutateRows"
) >> completionF
)
)
)
)

override def bulkReadRowsBatcher(
tableId: String,
filter: Option[Filters.Filter],
grpcCallContext: Option[GrpcCallContext]
): Resource[F, FunctionalBatcher[F, RowKeyByteString, Option[Row]]] =
underlying
.bulkReadRowsBatcher(tableId, filter, grpcCallContext)
.map(underlyingBatcher =>
Kleisli(rowKey =>
underlyingBatcher
.apply(rowKey)
.map(resultF =>
trace.span(s"$spanPrefix bulkReadRows($tableId)", SpanKind.Client)(
trace.putAll(
labelRowKey -> rowKey,
labelInstance -> settings.instanceId,
labelTable -> tableId,
labelOperation -> "bulkReadRows"
) >> resultF
)
)
)
)

override def checkAndMutateRow(mutation: ConditionalRowMutation): F[Boolean] =
trace.span(s"$spanPrefix checkAndMutateRow(...)", SpanKind.Client)(
trace.putAll(
labelInstance -> settings.instanceId,
labelOperation -> "checkAndMutateRow"
) >> underlying.checkAndMutateRow(mutation)
)

override def readModifyWriteRow(mutation: ReadModifyWriteRow): F[Row] =
trace.span(s"$spanPrefix readModifyWriteRow(...)", SpanKind.Client)(
trace.putAll(
labelInstance -> settings.instanceId,
labelOperation -> "readModifyWriteRow"
) >> underlying.readModifyWriteRow(mutation)
)
}

/** Wraps a [[FunctionalBigtableDataClient]], adding trace spans and labels for each call.
*/
object TracedFunctionalBigtableDataClient {
def resource[F[_]: Trace: Async](
settings: BigtableDataClientSettings[F]
): Resource[F, TracedFunctionalBigtableDataClient[F]] =
FunctionalBigtableDataClient
.resource(settings)
.map(TracedFunctionalBigtableDataClient(settings, _))

def apply[F[_]: Trace: FlatMap](
settings: BigtableDataClientSettings[F],
underlying: FunctionalBigtableDataClient[F]
): TracedFunctionalBigtableDataClient[F] = new TracedFunctionalBigtableDataClient[F](settings, underlying)

}
Loading

0 comments on commit 76443e0

Please sign in to comment.