From 08fd74752a976a26c5a64927a2021211de9370a5 Mon Sep 17 00:00:00 2001 From: psnep Date: Tue, 22 Oct 2024 13:24:23 +0300 Subject: [PATCH] Fix akka-http instrumentation for HTTP2 --- .../akka/http/Http2BlueprintAsyncAdvice.java | 51 +++++++++++++++++++ .../http/Http2ExtBindAndHandleAdvice.java | 11 ++-- .../http/AkkaHttpServerInstrumentation.scala | 13 +++-- .../http/AkkaHttpServerInstrumentation.scala | 30 ++--------- .../http/AkkaHttpServerInstrumentation.scala | 40 ++------------- 5 files changed, 68 insertions(+), 77 deletions(-) create mode 100644 instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2BlueprintAsyncAdvice.java diff --git a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2BlueprintAsyncAdvice.java b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2BlueprintAsyncAdvice.java new file mode 100644 index 000000000..3ece58262 --- /dev/null +++ b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2BlueprintAsyncAdvice.java @@ -0,0 +1,51 @@ +/* + * Copyright 2013-2024 The Kamon Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kamon.instrumentation.akka.http; + +import akka.NotUsed; +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.stream.scaladsl.Flow; +import kanela.agent.libs.net.bytebuddy.asm.Advice; + +public class Http2BlueprintAsyncAdvice { + + public static class EndpointInfo { + public final String listenInterface; + public final int listenPort; + + public EndpointInfo(String listenInterface, int listenPort) { + this.listenInterface = listenInterface; + this.listenPort = listenPort; + } + } + + public static ThreadLocal currentEndpoint = new ThreadLocal<>(); + + @Advice.OnMethodExit + public static void onExit(@Advice.Return(readOnly = false) Flow returnedFlow) { + EndpointInfo bindAndHandlerEndpoint = currentEndpoint.get(); + + if(bindAndHandlerEndpoint != null) { + returnedFlow = ServerFlowWrapper.apply( + returnedFlow, + bindAndHandlerEndpoint.listenInterface, + bindAndHandlerEndpoint.listenPort + ); + } + } +} diff --git a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java index d63769368..7115a896c 100644 --- a/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java +++ b/instrumentation/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java @@ -16,25 +16,20 @@ package kamon.instrumentation.akka.http; -import akka.http.scaladsl.model.HttpRequest; -import akka.http.scaladsl.model.HttpResponse; import kanela.agent.libs.net.bytebuddy.asm.Advice; -import scala.Function1; -import scala.concurrent.Future; public class Http2ExtBindAndHandleAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1> handler, - @Advice.Argument(1) String iface, - @Advice.Argument(2) Integer port) { + public static void onEnter(@Advice.Argument(1) String iface, @Advice.Argument(2) Integer port) { FlowOpsMapAsyncAdvice.currentEndpoint.set(new FlowOpsMapAsyncAdvice.EndpointInfo(iface, port)); - handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler); + Http2BlueprintAsyncAdvice.currentEndpoint.set(new Http2BlueprintAsyncAdvice.EndpointInfo(iface, port)); } @Advice.OnMethodExit public static void onExit() { FlowOpsMapAsyncAdvice.currentEndpoint.remove(); + Http2BlueprintAsyncAdvice.currentEndpoint.remove(); } } diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index eec9f91b4..924a75880 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -45,14 +45,14 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { /** * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow - * creation happen at different times we are wrapping the handler with the interface/port data and reading that - * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + * creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port + * data and reading that information on method exit to wrap it the same way we would for HTTP/1. */ onType("akka.http.scaladsl.Http2Ext") .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) onType("akka.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + .advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice]) /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free @@ -306,6 +306,7 @@ object PathDirectivesRawPathPrefixInterceptor { } } + object Http2BlueprintInterceptor { case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) @@ -315,10 +316,8 @@ object Http2BlueprintInterceptor { } @RuntimeType - def handleWithStreamIdHeader( - @Argument(1) handler: HttpRequest => Future[HttpResponse], - @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]] - ): Flow[HttpRequest, HttpResponse, NotUsed] = { + def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { handler match { case HandlerWithEndpoint(interface, port, _) => diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 2a48e71fb..0c5e95200 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -62,15 +62,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { /** * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow - * creation happen at different times we are wrapping the handler with the interface/port data and reading that - * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + * creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port + * data and reading that information on method exit to wrap it the same way we would for HTTP/1. */ onType("akka.http.impl.engine.http2.Http2Ext") .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) onType("akka.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + .advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice]) /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free @@ -329,27 +329,3 @@ object PathDirectivesRawPathPrefixInterceptor { } } } - -object Http2BlueprintInterceptor { - - case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) - extends (HttpRequest => Future[HttpResponse]) { - - override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) - } - - @RuntimeType - def handleWithStreamIdHeader( - @Argument(1) handler: HttpRequest => Future[HttpResponse], - @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]] - ): Flow[HttpRequest, HttpResponse, NotUsed] = { - - handler match { - case HandlerWithEndpoint(interface, port, _) => - ServerFlowWrapper(zuper.call(), interface, port) - - case _ => - zuper.call() - } - } -} diff --git a/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 320d8dda1..98e1fd6c2 100644 --- a/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/instrumentation/kamon-akka-http/src/main/scala-2.13+/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -3,27 +3,22 @@ package kamon.instrumentation.akka.http import java.util.concurrent.Callable import akka.http.scaladsl.marshalling.{ToEntityMarshaller, ToResponseMarshallable, ToResponseMarshaller} import akka.http.scaladsl.model.StatusCodes.Redirection -import akka.http.scaladsl.model.{HttpHeader, HttpRequest, HttpResponse, StatusCode, Uri} +import akka.http.scaladsl.model.{HttpHeader, StatusCode, Uri} import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} import akka.http.scaladsl.server.directives.RouteDirectives.reject import akka.http.scaladsl.server._ import akka.http.scaladsl.server.util.{Tuple, Tupler} -import akka.http.scaladsl.util.FastFuture import kamon.Kamon import kamon.instrumentation.akka.http.HasMatchingContext.PathMatchingContext -import kamon.instrumentation.context.{HasContext, InvokeWithCapturedContext} import kanela.agent.api.instrumentation.InstrumentationBuilder import kanela.agent.api.instrumentation.mixin.Initializer import kanela.agent.libs.net.bytebuddy.implementation.bind.annotation._ -import scala.concurrent.{Batchable, ExecutionContext, Future, Promise} -import scala.util.control.NonFatal +import scala.concurrent.Future import scala.util.{Failure, Success, Try} import java.util.regex.Pattern -import akka.NotUsed import akka.http.scaladsl.server.RouteResult.Rejected -import akka.stream.scaladsl.Flow import kamon.context.Context import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic @@ -46,15 +41,15 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { /** * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow - * creation happen at different times we are wrapping the handler with the interface/port data and reading that - * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + * creation happen at different times we are advising the handleWithStreamIdHeader method with the interface/port + * data and reading that information on method exit to wrap it the same way we would for HTTP/1. * */ onType("akka.http.impl.engine.http2.Http2Ext") .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) onType("akka.http.impl.engine.http2.Http2Blueprint$") - .intercept(method("handleWithStreamIdHeader"), classOf[Http2BlueprintInterceptor]) + .advise(method("handleWithStreamIdHeader"), classOf[Http2BlueprintAsyncAdvice]) /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free @@ -314,28 +309,3 @@ object PathDirectivesRawPathPrefixInterceptor { } } } - -class Http2BlueprintInterceptor -object Http2BlueprintInterceptor { - - case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) - extends (HttpRequest => Future[HttpResponse]) { - - override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) - } - - @RuntimeType - @static def handleWithStreamIdHeader( - @Argument(1) handler: HttpRequest => Future[HttpResponse], - @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]] - ): Flow[HttpRequest, HttpResponse, NotUsed] = { - - handler match { - case HandlerWithEndpoint(interface, port, _) => - ServerFlowWrapper(zuper.call(), interface, port) - - case _ => - zuper.call() - } - } -}