Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix akka-http instrumentation for HTTP2 #1374

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2013-2024 The Kamon Project <https://kamon.io>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kamon.instrumentation.akka.http;

import akka.NotUsed;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.stream.scaladsl.Flow;
import kanela.agent.libs.net.bytebuddy.asm.Advice;

public class Http2BlueprintAsyncAdvice {

public static class EndpointInfo {
public final String listenInterface;
public final int listenPort;

public EndpointInfo(String listenInterface, int listenPort) {
this.listenInterface = listenInterface;
this.listenPort = listenPort;
}
}

public static ThreadLocal<EndpointInfo> currentEndpoint = new ThreadLocal<>();

@Advice.OnMethodExit
public static void onExit(@Advice.Return(readOnly = false) Flow<HttpRequest, HttpResponse, NotUsed> returnedFlow) {
EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get();

if(bindAndHandlerEndpoint != null) {
returnedFlow = ServerFlowWrapper.apply(
returnedFlow,
bindAndHandlerEndpoint.listenInterface,
bindAndHandlerEndpoint.listenPort
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,20 @@

package kamon.instrumentation.akka.http;

import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import kanela.agent.libs.net.bytebuddy.asm.Advice;
import scala.Function1;
import scala.concurrent.Future;

public class Http2ExtBindAndHandleAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1<HttpRequest, Future<HttpResponse>> handler,
@Advice.Argument(1) String iface,
@Advice.Argument(2) Integer port) {
public static void onEnter(@Advice.Argument(1) String iface, @Advice.Argument(2) Integer port) {

FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port));
handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler);
Http2BlueprintAsyncAdvice.currentEndpoint.set(new Http2BlueprintAsyncAdvice.EndpointInfo(iface, port));
}

@Advice.OnMethodExit
public static void onExit() {
FlowOpsMapAsyncAdvice.currentEndpoint.remove();
Http2BlueprintAsyncAdvice.currentEndpoint.remove();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*/
onType("akka.http.scaladsl.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -306,6 +306,7 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}


object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
Expand All @@ -315,10 +316,8 @@ object Http2BlueprintInterceptor {
}

@RuntimeType
def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {
def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*/

onType("akka.http.impl.engine.http2.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor)
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -329,27 +329,3 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}
}

object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
extends (HttpRequest => Future[HttpResponse]) {

override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
}

@RuntimeType
def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
ServerFlowWrapper(zuper.call(), interface, port)

case _ =>
zuper.call()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,22 @@ package kamon.instrumentation.akka.http
import java.util.concurrent.Callable
import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller}
import akka.http.scaladsl.model.StatusCodes.Redirection
import akka.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri}
import akka.http.scaladsl.model.{HttpHeader, StatusCode, Uri}
import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched}
import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet}
import akka.http.scaladsl.server.directives.RouteDirectives.reject
import akka.http.scaladsl.server._
import akka.http.scaladsl.server.util.{Tuple, Tupler}
import akka.http.scaladsl.util.FastFuture
import kamon.Kamon
import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext
import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext}
import kanela.agent.api.instrumentation.InstrumentationBuilder
import kanela.agent.api.instrumentation.mixin.Initializer
import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._

import scala.concurrent.{Batchable, ExecutionContext, Future, Promise}
import scala.util.control.NonFatal
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import java.util.regex.Pattern
import akka.NotUsed
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.stream.scaladsl.Flow
import kamon.context.Context
import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic

Expand All @@ -46,15 +41,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder {

/**
* For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow
* creation happen at different times we are wrapping the handler with the interface/port data and reading that
* information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1.
* creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port
* data and reading that information on method exit to wrap it the same way we would for HTTP/1.
*
*/
onType("akka.http.impl.engine.http2.Http2Ext")
.advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice])

onType("akka.http.impl.engine.http2.Http2Blueprint$")
.intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor])
.advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice])

/**
* The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free
Expand Down Expand Up @@ -314,28 +309,3 @@ object PathDirectivesRawPathPrefixInterceptor {
}
}
}

class Http2BlueprintInterceptor
object Http2BlueprintInterceptor {

case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse])
extends (HttpRequest => Future[HttpResponse]) {

override def apply(request: HttpRequest): Future[HttpResponse] = handler(request)
}

@RuntimeType
@static def handleWithStreamIdHeader(
@Argument(1) handler: HttpRequest => Future[HttpResponse],
@SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]
): Flow[HttpRequest, HttpResponse, NotUsed] = {

handler match {
case HandlerWithEndpoint(interface, port, _) =>
ServerFlowWrapper(zuper.call(), interface, port)

case _ =>
zuper.call()
}
}
}