Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/instrument process node #1

Open
wants to merge 56 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
77e3d4f
integrate with Kamon 2.0.0-RC
Jul 21, 2019
5207d59
Set version to 2.0.0-SNAPSHOT
Jul 21, 2019
2941659
Add current kafka record's key as tag to the span
Jul 26, 2019
499ea65
Add kafka's Scala sugar
Jul 26, 2019
50d7403
Add optional record.key() as tag to the span
Jul 26, 2019
779a031
Fix module name in stream's reference.conf
Jul 26, 2019
a77ff44
Add streaming test for context propagation
Jul 26, 2019
c1ebb00
WIP: add node tracing
Jul 26, 2019
cc1505a
Add consistent stream and node instrumentation
Jul 28, 2019
99b0bfb
Use default propagation, inprove test stability by increasing zk conn…
Aug 8, 2019
5b76e66
Cleanup code, remove println output
Aug 8, 2019
4bd2bfe
Adapt to 2.0 final
Aug 8, 2019
76cd874
update travis to use openjdk8 instead of oraclejdk8
Aug 9, 2019
f38e917
Update readme
Aug 11, 2019
75243a5
Avoid javac errors on unknown characters
Aug 11, 2019
8c1562e
Update kamon-sbt-umbrella plugin reference
Aug 11, 2019
493f4d0
Try different workaround for package.type undefined problem ...
Aug 11, 2019
71a4a29
Try again another workaround for package.type undefined problem ...
Aug 11, 2019
c0eda0f
Try again another workaround ... properly integrated
Aug 11, 2019
0c3685a
Try again another workaround ...
Aug 11, 2019
eef7611
Try again another workaround ...
Aug 13, 2019
8078100
Move to sbt 1.3.0
Sep 5, 2019
b9d59d5
Increase mem for sbt tests
Sep 5, 2019
19c18b8
Fix cross scala version build, bump scala version to 2.12.9
Sep 5, 2019
9e166d6
Add description to the module configuration
Sep 28, 2019
b8c3d95
Extract context serialization into a Scala class in order to hide the…
Sep 28, 2019
b6d7351
Simplify stream instrumentation by removing the Java-based Advisor an…
Sep 30, 2019
f2333a0
Introduce config for enable/disable stream topology node tracing
Sep 30, 2019
6710301
Add "Apache" to the module name in reference.conf
Oct 2, 2019
9814616
Add DOT file generator and add some more tests for slightly more comp…
Oct 2, 2019
f302d30
Upgrade Kafka libs to 2.3.0, adapt instrumentation, refactor client t…
Oct 6, 2019
e1d3318
Address PR feedback: Split client's Java advisors, fix (start) timing…
Oct 9, 2019
e6cea35
Address PR feedback: remove superfluous code
Oct 9, 2019
5e25ad6
Address PR feedback: use linked spans in not following
Oct 9, 2019
91ac200
Address PR feedback: add clientId (consumer/publisher) and groupId (c…
Oct 9, 2019
e637a87
Refactor configuration and introduce support for delayed spans
Oct 10, 2019
ef802a9
Refactor RecordProcessor (client poll instrumentation) for consistent…
Oct 12, 2019
ab27353
Fix single abstract message issue
Oct 14, 2019
e0b7281
Fix single abstract message issue - again, forgot about cross compila…
Oct 14, 2019
63cd34b
Fix single abstract message issue - again, forgot about cross compila…
Oct 14, 2019
c6ea37b
Add kafka.timestamp and kafka.timestampType as tags to kafka client s…
Oct 14, 2019
b10a825
Add test for new tags
Oct 14, 2019
1035c9c
Trigger build
Oct 14, 2019
1e88b87
Add proper test for new span tags timestamp and timestampType
Oct 15, 2019
94aa3e5
Add support for storing and accessing spans associated with consumer …
Oct 15, 2019
3a50957
Add proper context propagation for streams
Oct 18, 2019
751808e
Refactor package names to kamon.instrumentation.kafka.*
Oct 18, 2019
fc97b9a
Clean up convenience functions for extracting the context from Consum…
Oct 18, 2019
50c94c6
Trigger build
Oct 19, 2019
07ae1dd
Add configuration for include/exclude streams tracing based on applic…
Oct 20, 2019
c0d63a7
Improve tests with better expression of which spans are expected,
Oct 21, 2019
cd656f2
Provide proper current context when executing ProcessNode.process()
Nov 5, 2019
bc02e60
Refactor node instrumentation of streams to properly honor their life…
Nov 10, 2019
93590b7
Add proper exception logging for stream and node spans
Nov 10, 2019
3c87f2a
Add specific istrumentation for source and sink node to capture in/ou…
Nov 11, 2019
27ae452
Start tracing for JOIN operations - fix context propagation for store…
Nov 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-J-XX:MaxMetaspaceSize=2048m
-J-Xmx4096m
-J-XX:+CMSClassUnloadingEnabled
-J-XX:+UseConcMarkSweepGC
8 changes: 4 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
language: scala
sbt_args: -Djava.io.tmpdir=$TRAVIS_BUILD_DIR/tmp
script:
- sbt test
- sbt +test
scala:
- 2.12.8
- 2.12.9
jdk:
- oraclejdk8
- openjdk8
before_script:
- mkdir $TRAVIS_BUILD_DIR/tmp
- export SBT_OPTS="-Djava.io.tmpdir=$TRAVIS_BUILD_DIR/tmp"
sudo: false

6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ the requests you are doing are behaving.

### Getting Started

Kamon Cassandra module is currently available for Scala 2.11 and 2.12.
Kamon Kafka module is currently available for Scala 2.11 and 2.12.

Supported releases and dependencies are shown below.

| kamon-kafka | status | jdk | scala
|:------:|:------:|:----:|------------------
| 1.0.0 | stable | 1.8+ | 2.11, 2.12
| 2.0.0 | snapshot | 1.8+ | 2.11, 2.12

To get started with SBT, simply add the following to your `build.sbt`
file:

```scala
libraryDependencies += "io.kamon" %% "kamon-kafka" % "1.0.0"
libraryDependencies += "io.kamon" %% "kamon-kafka" % "2.0.0-SNAPSHOT"
```


Expand Down
21 changes: 11 additions & 10 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
* =========================================================================================
*/

val kamonCore = "io.kamon" %% "kamon-core" % "1.1.3"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "1.1.3"
val scalaExtension = "io.kamon" %% "kanela-scala-extension" % "0.0.10"
val kamonCore = "io.kamon" %% "kamon-core" % "2.0.0"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.0.0"
val scalaExtension = "io.kamon" %% "kamon-instrumentation-common" % "2.0.0"

val kafkaClient = "org.apache.kafka" % "kafka-clients" % "0.11.0.0"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reason for not using the latest kafka clients/streams versions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Migrated to Kafka 2.3.0.

The signature of the instrumented method in the Kafka client has changed, how shall we address compatibility with older Kafka version (pre 2.3.0)? Shall I create a dedicated module depending on Kafka 2.0.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we would keep it all in the kamon-kafka project and use such property-based instrumentation, wouldn't that cause issues with eviction of (older) Kafka libraries on the side of the users of such a kamon-kafka lib?

val kafkaStreams = "org.apache.kafka" % "kafka-streams" % "2.0.0"
val kafkaStreamsScala = "org.apache.kafka" %% "kafka-streams-scala" % "2.0.0"

val kafkaTest = "net.manub" %% "scalatest-embedded-kafka" % "2.0.0"
val kafkaStreamTest = "net.manub" %% "scalatest-embedded-kafka-streams" % "2.0.0"
Expand All @@ -34,9 +35,9 @@ lazy val kafkaClients = (project in file("kamon-kafka-clients"))
.enablePlugins(JavaAgent)
.settings(bintrayPackage := "kamon-kafka")
.settings(name := "kamon-kafka-clients")
.settings(scalaVersion := "2.12.8")
.settings(crossScalaVersions := Seq("2.11.12", "2.12.8"))
.settings(javaAgents += "io.kamon" % "kanela-agent" % "0.0.15" % "compile;test")
.settings(scalaVersion := "2.12.9")
.settings(crossScalaVersions := Seq("2.11.12", "2.12.9"))
ivantopo marked this conversation as resolved.
Show resolved Hide resolved
.settings(javaAgents += "io.kamon" % "kanela-agent" % "1.0.1" % "compile;test")
.settings(resolvers += Resolver.bintrayRepo("kamon-io", "snapshots"))
.settings(resolvers += Resolver.mavenLocal)
.settings(
Expand All @@ -50,13 +51,13 @@ lazy val kafkaStream = (project in file("kamon-kafka-streams"))
.dependsOn(kafkaClients % "compile->compile;test->test")
.settings(bintrayPackage := "kamon-kafka")
.settings(name := "kamon-kafka-streams")
.settings(scalaVersion := "2.12.8")
.settings(crossScalaVersions := Seq("2.11.12", "2.12.8"))
.settings(javaAgents += "io.kamon" % "kanela-agent" % "0.0.15" % "compile;test")
.settings(scalaVersion := "2.12.9")
.settings(crossScalaVersions := Seq("2.11.12", "2.12.9"))
.settings(javaAgents += "io.kamon" % "kanela-agent" % "1.0.1" % "compile;test")
.settings(resolvers += Resolver.bintrayRepo("kamon-io", "snapshots"))
.settings(resolvers += Resolver.mavenLocal)
.settings(
libraryDependencies ++=
compileScope(kamonCore, kafkaStreams, scalaExtension) ++
compileScope(kamonCore, kafkaStreams, kafkaStreamsScala, scalaExtension) ++
providedScope(lombok) ++
testScope(kamonTestkit, scalatest, slf4jApi, logbackClassic, kafkaStreamTest))
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* =========================================================================================
* Copyright © 2013-2019 the kamon project <http://kamon.io/>
/* =========================================================================================
dpsoft marked this conversation as resolved.
Show resolved Hide resolved
* Copyright (C) 2013-2019 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -28,6 +27,8 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.ByteArrayOutputStream;

@Value
public class Advisors {
/**
Expand All @@ -48,23 +49,26 @@ public static class SendMethodAdvisor {
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
@Advice.Argument(value = 1, readOnly = false) Callback callback,
@Advice.Local("scope") Storage.Scope scope) {

val currentContext = Kamon.currentContext();
val topic = record.topic() == null ? "kafka" : record.topic();
val partition = record.partition() == null ? "unknown-partition" : record.partition().toString();
val value = record.key() == null ? "unknown-key" : record.key().toString();

val span = Kamon.buildSpan("kafka.produce")
.asChildOf(currentContext.get(Span.ContextKey()))
.withMetricTag("span.kind", "producer")
.withTag("kafka.key", value)
.withTag("kafka.partition", partition)
.withTag("kafka.topic", topic)
val span = Kamon.producerSpanBuilder("kafka.produce", "kafkaPublisher")
dpsoft marked this conversation as resolved.
Show resolved Hide resolved
.asChildOf(currentContext.get(Span.Key()))
.tag("span.kind", "producer")
.tag("kafka.key", value)
.tag("kafka.partition", partition)
.tag("kafka.topic", topic)
.start();

val ctx = currentContext.withKey(Span.ContextKey(), span);
val ctx = currentContext.withEntry(Span.Key(), span);

val out = new ByteArrayOutputStream();
// ugly, ugly, ugly ... :(
Kamon.defaultBinaryPropagation().write(ctx, kamon.context.BinaryPropagation$ByteStreamWriter$.MODULE$.of(out));
dpsoft marked this conversation as resolved.
Show resolved Hide resolved

record.headers().add("kamon-context", Kamon.contextCodec().Binary().encode(ctx).array());
record.headers().add("kamon-context", out.toByteArray());

scope = Kamon.storeContext(ctx);
callback = new ProducerCallback(callback, scope);
Expand All @@ -75,8 +79,8 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Produce
public static void onExit(@Advice.Local("scope") Storage.Scope scope,
@Advice.Thrown final Throwable throwable) {

val span = scope.context().get(Span.ContextKey());
if (throwable != null) span.addError(throwable.getMessage(), throwable);
val span = scope.context().get(Span.Key());
if (throwable != null) span.fail(throwable.getMessage(), throwable);
span.finish();
scope.close();
}
Expand Down
7 changes: 3 additions & 4 deletions kamon-kafka-clients/src/main/scala/kamon/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@
package kamon.kafka

import com.typesafe.config.Config
import kamon.{Kamon, OnReconfigureHook}
import kamon.Kamon

object Kafka {
@volatile var followStrategy: Boolean = followStrategyFromConfig(Kamon.config())

private def followStrategyFromConfig(config: Config): Boolean =
Kamon.config.getBoolean("kamon.kafka.follow-strategy")

Kamon.onReconfigure(new OnReconfigureHook {
override def onReconfigure(newConfig: Config): Unit = {
Kamon.onReconfigure( (newConfig: Config) => {
followStrategy = followStrategyFromConfig(newConfig)
}
})
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,32 @@

package kamon.kafka.instrumentation

import java.nio.ByteBuffer
import java.io.ByteArrayOutputStream

import kamon.Kamon
import kamon.context.BinaryPropagation.{ByteStreamReader, ByteStreamWriter}
import kamon.context.Context
import kamon.kafka.Kafka
import kamon.kafka.client.instrumentation.advisor.Advisors.PollMethodAdvisor
import kamon.trace.Span
import kanela.agent.scala.KanelaInstrumentation
import kanela.agent.api.instrumentation.InstrumentationBuilder
import org.apache.kafka.clients.consumer.ConsumerRecords

import scala.collection.mutable

class ConsumerInstrumentation extends KanelaInstrumentation {
class ConsumerInstrumentation extends InstrumentationBuilder {

/**
* Instruments org.apache.kafka.clients.consumer.KafkaConsumer::poll(long)
*/
forTargetType("org.apache.kafka.clients.consumer.KafkaConsumer") { builder =>
builder
.withAdvisorFor(method("poll").and(withArgument(0, classOf[Long])), classOf[PollMethodAdvisor])
.build()
}
onType("org.apache.kafka.clients.consumer.KafkaConsumer")
.advise(method("poll").and(withArgument(0, classOf[Long])), classOf[PollMethodAdvisor])
}

object RecordProcessor {

import scala.collection.JavaConverters._

/**
* Inject Context into Records
*/
Expand All @@ -51,33 +52,40 @@ object RecordProcessor {

val consumerSpansForTopic = new mutable.LinkedHashMap[String, Span]()

records.partitions().forEach(partition => {
records.partitions().asScala.foreach(partition => {
val topic = partition.topic

records.records(partition).forEach(record => {
records.records(partition).asScala.foreach(record => {
val header = Option(record.headers.lastHeader("kamon-context"))

val currentContext = header.map(h => Kamon.contextCodec.Binary.decode(ByteBuffer.wrap(h.value))).getOrElse(Context.Empty)
val currentContext = header.map{ h =>
Kamon.defaultBinaryPropagation().read(ByteStreamReader.of(h.value()))
}.getOrElse(Context.Empty)

val span = consumerSpansForTopic.getOrElseUpdate(topic, {
val spanBuilder = Kamon.buildSpan("poll")
.withMetricTag("span.kind", "consumer")
.withTag("kafka.partition", partition.partition)
.withTag("kafka.topic", topic)
.withTag("kafka.offset", record.offset)
.withFrom(instant)

if(Kafka.followStrategy) spanBuilder.asChildOf(currentContext.get(Span.ContextKey))
val spanBuilder = Kamon.spanBuilder("poll")
dpsoft marked this conversation as resolved.
Show resolved Hide resolved
.tag("span.kind", "consumer")
.tag("kafka.partition", partition.partition)
.tag("kafka.topic", topic)
.tag("kafka.offset", record.offset)

// Key could be optional ... see tests
Option(record.key()).foreach(k => spanBuilder.tag("kafka.key", record.key().toString))

if(Kafka.followStrategy) spanBuilder.asChildOf(currentContext.get(Span.Key))
else {
val context = currentContext.get(Span.ContextKey).context()
spanBuilder.withTag("trace.related.trace_id", context.traceID.string)
spanBuilder.withTag("trace.related.span_id", context.spanID.string)
val currentSpan = currentContext.get(Span.Key)
spanBuilder
dpsoft marked this conversation as resolved.
Show resolved Hide resolved
.tag("trace.related.trace_id", currentSpan.id.string)
.tag("trace.related.span_id", currentSpan.trace.id.string)
}
spanBuilder.start()
dpsoft marked this conversation as resolved.
Show resolved Hide resolved
})

val ctx = currentContext.withKey(Span.ContextKey, span)
record.headers.add("kamon-context", Kamon.contextCodec.Binary.encode(ctx).array())
val out = new ByteArrayOutputStream();
Kamon.defaultBinaryPropagation().write(currentContext.withEntry(Span.Key, span), ByteStreamWriter.of(out));

record.headers.add("kamon-context", out.toByteArray)
})
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,26 @@ package kamon.kafka.instrumentation
import kamon.context.Storage.Scope
import kamon.kafka.client.instrumentation.advisor.Advisors.SendMethodAdvisor
import kamon.trace.Span
import kanela.agent.scala.KanelaInstrumentation
import kanela.agent.api.instrumentation.InstrumentationBuilder
import org.apache.kafka.clients.producer.{Callback, RecordMetadata}

class ProducerInstrumentation extends KanelaInstrumentation {
class ProducerInstrumentation extends InstrumentationBuilder {

/**
* Instruments "org.apache.kafka.clients.producer.KafkaProducer::Send()
*/
forTargetType("org.apache.kafka.clients.producer.KafkaProducer") { builder =>
builder
.withAdvisorFor(method("send").and(takesArguments(2)), classOf[SendMethodAdvisor])
.build()
}
onType("org.apache.kafka.clients.producer.KafkaProducer")
.advise(method("send").and(takesArguments(2)), classOf[SendMethodAdvisor])

}

/**
* Producer Callback Wrapper
*/
final class ProducerCallback(callback: Callback, scope: Scope) extends Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
val span = scope.context.get(Span.ContextKey)
if(exception != null) span.addError(exception.getMessage, exception)
val span = scope.context.get(Span.Key)
if(exception != null) span.fail(exception.getMessage, exception)
try if(callback != null) callback.onCompletion(metadata, exception)
finally {
span.finish()
Expand Down
Loading