-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/instrument process node #1
base: master
Are you sure you want to change the base?
Feature/instrument process node #1
Conversation
…ection timeout, make test assumptions about follow-strategy explicit
kamon-kafka-clients/src/main/java/kamon/kafka/client/instrumentation/advisor/Advisors.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boxsterman LGTM I've left a simple comment. @ivantopo WDYT
… details of accessing nested Scala objects from Java code.
@dpsoft Finally got around to address your feedback ... too much other work :( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @boxsterman, I finally started looking into this PR. Solid work so far 🎉
I just looked at the producer/consumer instrumentation right now since I'm still wrapping my head around the streams part, hopefully we can start polishing that side and I'll get back as soon as possible regarding the streams side of this 😄
Besides the minor comments on some methods, there are two other bigger things I would like to address:
Regular vs Delayed Spans
Besides the follow strategy we should also have a setting to determine whether the generated Span should be a "regular" span or a "delayed" Span. The idea behind the delayed Spans is that they can capture additional information about the processing in asynchronous scenarios, particularly, how long was an operation waiting before actually being processed. I think it would go like this:
- When using delayed Spans is enabled we could count the "wait time" as the difference between the time a message was produced and the time when the "poll" method started and the actual processing time for that Span will be the time it takes for "poll" to complete.
- When not using delayed Spans the only time reflected will be processing time, which should be equal to the time take for the "poll" function to run (btw, this is not the case as the moment, please see the relevant comments)
Continuing Tracing on the Consumer Side
Once a record has been consumed, the user will want to continue doing some processing of that record and that processing should continue the same trace started/joined by the consumer. Currently, I don't see any clear way to do that and I think there should be one. One idea that comes to mind is to inject the generated Spans on the ConsumerRecord
objects via instrumentation and provide an utility class for users to extract that Span and use it as parent of their own Spans.
What do you think about the above?
build.sbt
Outdated
val scalaExtension = "io.kamon" %% "kanela-scala-extension" % "0.0.10" | ||
val kamonCore = "io.kamon" %% "kamon-core" % "2.0.0" | ||
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.0.0" | ||
val scalaExtension = "io.kamon" %% "kamon-instrumentation-common" % "2.0.0" | ||
|
||
val kafkaClient = "org.apache.kafka" % "kafka-clients" % "0.11.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any particular reason for not using the latest kafka clients/streams versions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migrated to Kafka 2.3.0.
The signature of the instrumented method in the Kafka client has changed, how shall we address compatibility with older Kafka version (pre 2.3.0)? Shall I create a dedicated module depending on Kafka 2.0.0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@boxsterman you can use the kanela ClassRefiner
in order to activate an instrumentation depending of some properties of the target class. Take a look:
https://github.com/kamon-io/kamon-play/blob/master/kamon-play/src/main/scala/kamon/instrumentation/play/PlayServerInstrumentation.scala#L34-L58
and https://github.com/kamon-io/kanela/blob/master/agent/src/test/scala/kanela/agent/classloader/ClassloaderNameMatcherSpec.scala#L25
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we would keep it all in the kamon-kafka project and use such property-based instrumentation, wouldn't that cause issues with eviction of (older) Kafka libraries on the side of the users of such a kamon-kafka lib?
kamon-kafka-clients/src/main/java/kamon/kafka/client/instrumentation/advisor/Advisors.java
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/scala/kamon/kafka/instrumentation/ConsumerInstrumentation.scala
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/scala/kamon/kafka/instrumentation/ConsumerInstrumentation.scala
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/scala/kamon/kafka/instrumentation/ConsumerInstrumentation.scala
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/java/kamon/kafka/client/instrumentation/advisor/Advisors.java
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/scala/kamon/kafka/instrumentation/ConsumerInstrumentation.scala
Outdated
Show resolved
Hide resolved
ContextSerializationHelper.fromByteArray(h.value()) | ||
}.getOrElse(Context.Empty) | ||
|
||
val span = consumerSpansForTopic.getOrElseUpdate(topic, { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey, i was wondering about poll-spans cardinality here.
This way it will produce one span per consumed topic within a poll, is this intentional and whats the reasoning behind it?
Seams it could go two ways:
- One span per poll, in which case record specific tags dont makes sense (partition, key, offset). Any further streaming spans should link to poll span due to potentially large number of child spans (default
max.poll.records
is 500?) - One span per polled record, can be used as a root for all streaming stages spans. This is aligned with what is done on producing side instrumentation which tracks individual record
send()
s and ingores batching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mladens
thanks a lot for your feedback and for pointing out this span inconsistency!
I've refactored that part and now the following spans are created:
- one span for the actual
poll
operation: using its "own" (existing) traceId context this allows tracing of individual poll operations and how many records they actually polled. - one span for each record polled: using the record's send record as parent span or linked span if
follow-strategy = false
this enable tracing of the business-related flow of the application.
This way it becomes also quite visible how Kafka consumer settings like autoCommit
can influence the consumer.
Here are two examples on how the instrumentation now looks like:
Simple publish/subscribe with two messages (autoCommit=true)
Simple publish/subscribe with two messages (autoCommit=false)
- solid edges: parent/child spans
- dashed edges: linked spans
In the second example was the second record not commit fast enough so that it popped up in the next poll operation. It will not be forwarded to the application since it is still "in flight" on the client side.
… span creation. Improve tests and DOT file renderer.
kamon-kafka-clients/src/main/scala/kamon/kafka/client/instrumentation/RecordProcessor.scala
Outdated
Show resolved
Hide resolved
kamon-kafka-clients/src/main/scala/kamon/kafka/client/instrumentation/RecordProcessor.scala
Outdated
Show resolved
Hide resolved
…pan, fix "missing" zeros in milli-to-nano conversion
Hi @ivantopo I've added support for accessing the (completed) span of a consumer record in order to create further child spans with it. Therefore I've created a new mixin to only carry the span and not the whole context ... to avoid dealing with changes/updates to the context itself. This is how the simple example looks now: What do you think? I'll continue to add now a "real" context mixin to the stream to properly keep that context. |
* to make the span available as parent for down stream operations | ||
*/ | ||
onSubTypesOf("org.apache.kafka.clients.consumer.ConsumerRecord") | ||
.mixin(classOf[HasSpan.Mixin]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be HasContext
instead of HasSpan
, please see the comments on the HasSpan
class.
|
||
import scala.util.Try | ||
|
||
trait HasSpan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the Context might have additional information we cannot hide it from the users and instead of mixin the Span we should mixin the Context that has the Span (should be the deserialized Context with the new Span inside).
|
||
object SpanExtractionSupport { | ||
|
||
implicit class SpanOps[K, V](cr: ConsumerRecord[K, V]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few comments on this one:
- Is there a situation in which users themselves will want to write a Context into the ConsumerRecord? Maybe I'm missing something here.
- We need to add a method to get the Context, but we can also have one to get the Span, just because it is convenient. In that method please return just the
Span
, notOption[Span]
.. if there is no span in the context Kamon will return aSpan.Empty
instead, same goes for the Context. We use these empty objects instead of returning Options of things because it provides a much more consistent usage across Java/Scala/Kotlin. - I would recommend moving this to the
Client
companion object and breaking it down into two parts: methods on the companion object itself likeextractContext(consumerRecord)/extractSpan(consumerRecord)
and aSyntax
implicit class that does what this class does. This is to allow Java users to access these functions but still keeping the cool stuff for the Scala folks 😄
@@ -4,7 +4,10 @@ | |||
|
|||
kamon { | |||
kafka { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the rest of the modules this should be under kamon.instrumentation.kafka
. BTW, same goes with the code itself! It should be under the kamon.instrumentation.kafka
package.
@boxsterman the record should get the Context mixed in, not just the Span because there might be more interesting data in the Context that also needs to be used by the client, like Context Tags or some other Context Entries besides the Span. I just made some comments on the relevant files. |
@ivantopo Yes, I've noticed that the |
…ationId, add test using multiple threads per stream
add reset for configuration
… cycle (init/process/close)
…-changelog updates (send operation)
Hi @boxsterman, what are your plans regarding this PR, do you have any more ideas/features you wanted to see in there or? If there's anything i can do to help and get this closer to release, let me know. |
No description provided.