diff --git a/build.sbt b/build.sbt index c13543d..e72b237 100644 --- a/build.sbt +++ b/build.sbt @@ -27,8 +27,10 @@ val kanelaAgent = "io.kamon" % "kanela-agent" % "1.0.1 val akkaHttpJson = "de.heikoseeberger" %% "akka-http-json4s" % "1.27.0" val json4sNative = "org.json4s" %% "json4s-native" % "3.6.7" val http25 = "com.typesafe.akka" %% "akka-http" % "10.1.9" +val http2Support = "com.typesafe.akka" %% "akka-http2-support" % "10.1.9" val httpTestKit25 = "com.typesafe.akka" %% "akka-http-testkit" % "10.1.9" val stream25 = "com.typesafe.akka" %% "akka-stream" % "2.5.24" +val okHttp = "com.squareup.okhttp3" % "okhttp" % "3.14.2" lazy val root = (project in file(".")) @@ -46,7 +48,8 @@ lazy val kamonAkkaHttp25 = Project("kamon-akka-http", file("kamon-akka-http")) moduleName := "kamon-akka-http", bintrayPackage := "kamon-akka-http", crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0")), + javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "test", libraryDependencies ++= compileScope(kamonCore, kamonAkka25, kamonCommon) ++ - providedScope(kanelaAgent, http25, stream25) ++ - testScope(httpTestKit25, scalatest, slf4jApi, slf4jnop, kamonTestKit, akkaHttpJson, json4sNative)) + providedScope(kanelaAgent, http25, http2Support, stream25) ++ + testScope(httpTestKit25, scalatest, slf4jApi, slf4jnop, kamonTestKit, akkaHttpJson, json4sNative, okHttp)) diff --git a/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java b/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java new file mode 100644 index 0000000..95d0a31 --- /dev/null +++ b/kamon-akka-http/src/main/java/kamon/instrumentation/akka/http/Http2ExtBindAndHandleAdvice.java @@ -0,0 +1,18 @@ +package kamon.instrumentation.akka.http; + +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import kanela.agent.libs.net.bytebuddy.asm.Advice; +import scala.Function1; +import scala.concurrent.Future; + +public class Http2ExtBindAndHandleAdvice { + + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Function1> handler, + @Advice.Argument(1) String iface, + @Advice.Argument(2) Integer port) { + + handler = new Http2BlueprintInterceptor.HandlerWithEndpoint(iface, port, handler); + } +} diff --git a/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 020f7e4..4cd4db0 100644 --- a/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/kamon-akka-http/src/main/scala-2.11/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -4,7 +4,7 @@ import java.util.concurrent.Callable import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller} import akka.http.scaladsl.model.StatusCodes.Redirection -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} import akka.http.scaladsl.server.directives.RouteDirectives.reject @@ -23,7 +23,10 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} import java.util.regex.Pattern +import akka.NotUsed +import akka.stream.scaladsl.Flow import kamon.context.Context +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic class AkkaHttpServerInstrumentation extends InstrumentationBuilder { @@ -40,6 +43,16 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { onType("akka.http.scaladsl.HttpExt") .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) + /** + * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow + * creation happen at different times we are wrapping the handler with the interface/port data and reading that + * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + */ + onType("akka.http.scaladsl.Http2Ext") + .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + + onType("akka.http.impl.engine.http2.Http2Blueprint$") + .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free @@ -79,7 +92,6 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { .intercept(method("transformWith$extension1"), FastFutureTransformWithAdvice) } - trait HasMatchingContext { def defaultOperationName: String def matchingContext: Seq[PathMatchingContext] @@ -208,7 +220,6 @@ object ResolveOperationNameOnRouteInterceptor { } } - class LastAutomaticOperationNameEdit(@volatile var operationName: String) object LastAutomaticOperationNameEdit { @@ -252,7 +263,6 @@ object PathDirectivesRawPathPrefixInterceptor { } } - object FastFutureTransformWithAdvice { @RuntimeType @@ -291,4 +301,26 @@ object FastFutureTransformWithAdvice { } } } +} + +object Http2BlueprintInterceptor { + + case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) + extends (HttpRequest => Future[HttpResponse]) { + + override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) + } + + @RuntimeType + def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { + + handler match { + case HandlerWithEndpoint(interface, port, _) => + ServerFlowWrapper(zuper.call(), interface, port) + + case _ => + zuper.call() + } + } } \ No newline at end of file diff --git a/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index fbc8046..0fada81 100644 --- a/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/kamon-akka-http/src/main/scala-2.12/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -4,7 +4,7 @@ import java.util.concurrent.Callable import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller} import akka.http.scaladsl.model.StatusCodes.Redirection -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} import akka.http.scaladsl.server.directives.RouteDirectives.reject @@ -23,7 +23,10 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} import java.util.regex.Pattern +import akka.NotUsed +import akka.stream.scaladsl.Flow import kamon.context.Context +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic class AkkaHttpServerInstrumentation extends InstrumentationBuilder { @@ -40,6 +43,17 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { onType("akka.http.scaladsl.HttpExt") .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) + /** + * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow + * creation happen at different times we are wrapping the handler with the interface/port data and reading that + * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + */ + onType("akka.http.scaladsl.Http2Ext") + .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + + onType("akka.http.impl.engine.http2.Http2Blueprint$") + .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free * of variables) and take a Sampling Decision in case none has been taken so far. @@ -286,4 +300,27 @@ object FastFutureTransformWithAdvice { } } } +} + + +object Http2BlueprintInterceptor { + + case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) + extends (HttpRequest => Future[HttpResponse]) { + + override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) + } + + @RuntimeType + def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { + + handler match { + case HandlerWithEndpoint(interface, port, _) => + ServerFlowWrapper(zuper.call(), interface, port) + + case _ => + zuper.call() + } + } } \ No newline at end of file diff --git a/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala b/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala index 36b8758..b46266c 100644 --- a/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala +++ b/kamon-akka-http/src/main/scala-2.13/kamon/instrumentation/akka/http/AkkaHttpServerInstrumentation.scala @@ -4,7 +4,7 @@ import java.util.concurrent.Callable import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller} import akka.http.scaladsl.model.StatusCodes.Redirection -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} import akka.http.scaladsl.server.PathMatcher.{Matched, Unmatched} import akka.http.scaladsl.server.directives.{BasicDirectives, CompleteOrRecoverWithMagnet, OnSuccessMagnet} import akka.http.scaladsl.server.directives.RouteDirectives.reject @@ -23,7 +23,10 @@ import scala.util.control.NonFatal import scala.util.{Failure, Success, Try} import java.util.regex.Pattern +import akka.NotUsed +import akka.stream.scaladsl.Flow import kamon.context.Context +import kanela.agent.libs.net.bytebuddy.matcher.ElementMatchers.isPublic class AkkaHttpServerInstrumentation extends InstrumentationBuilder { @@ -40,6 +43,17 @@ class AkkaHttpServerInstrumentation extends InstrumentationBuilder { onType("akka.http.scaladsl.HttpExt") .advise(method("bindAndHandle"), classOf[HttpExtBindAndHandleAdvice]) + /** + * For the HTTP/2 instrumentation, since the parts where we can capture the interface/port and the actual flow + * creation happen at different times we are wrapping the handler with the interface/port data and reading that + * information when turning the handler function into a flow and wrapping it the same way we would for HTTP/1. + */ + onType("akka.http.scaladsl.Http2Ext") + .advise(method("bindAndHandleAsync") and isPublic(), classOf[Http2ExtBindAndHandleAdvice]) + + onType("akka.http.impl.engine.http2.Http2Blueprint$") + .intercept(method("handleWithStreamIdHeader"), Http2BlueprintInterceptor) + /** * The rest of these sections are just about making sure that we can generate an appropriate operation name (i.e. free * of variables) and take a Sampling Decision in case none has been taken so far. @@ -292,4 +306,27 @@ object FastFutureTransformWithAdvice { } } } +} + + +object Http2BlueprintInterceptor { + + case class HandlerWithEndpoint(interface: String, port: Int, handler: HttpRequest => Future[HttpResponse]) + extends (HttpRequest => Future[HttpResponse]) { + + override def apply(request: HttpRequest): Future[HttpResponse] = handler(request) + } + + @RuntimeType + def handleWithStreamIdHeader(@Argument(1) handler: HttpRequest => Future[HttpResponse], + @SuperCall zuper: Callable[Flow[HttpRequest, HttpResponse, NotUsed]]): Flow[HttpRequest, HttpResponse, NotUsed] = { + + handler match { + case HandlerWithEndpoint(interface, port, _) => + ServerFlowWrapper(zuper.call(), interface, port) + + case _ => + zuper.call() + } + } } \ No newline at end of file diff --git a/kamon-akka-http/src/test/resources/application.conf b/kamon-akka-http/src/test/resources/application.conf index acb87b0..51f62a4 100644 --- a/kamon-akka-http/src/test/resources/application.conf +++ b/kamon-akka-http/src/test/resources/application.conf @@ -1,3 +1,5 @@ kamon { trace.sampler = "always" -} \ No newline at end of file +} + +akka.http.server.preview.enable-http2 = on \ No newline at end of file diff --git a/kamon-akka-http/src/test/resources/https/chain.pem b/kamon-akka-http/src/test/resources/https/chain.pem new file mode 100644 index 0000000..fdee54e --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/chain.pem @@ -0,0 +1,40 @@ +-----BEGIN CERTIFICATE----- +MIIDJDCCAgwCFCBVhkPbLsrPvH5jjO4+u9O6rcRzMA0GCSqGSIb3DQEBCwUAMEUx +CzAJBgNVBAYTAkhSMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl +cm5ldCBXaWRnaXRzIFB0eSBMdGQwHhcNMTkwODEwMDgyODU3WhcNMjkwNTA5MDgy +ODU3WjBYMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UE +CgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMREwDwYDVQQDDAhrYW1vbi5pbzCC +ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKmiEbbgaz+JzLCG0gDcPWFF +oecyEK18AwrrLLwSz+C3PhvYCGv9s/oqbEAfYX4LeFraVB9FopOhBYtpAvC1Kofz +5vVigFNx+Wq09fotHdKTwwHEa1gJczIic3eonKWNLTM6Ge0v7LY/b9Z9lB331TRU +2rePGYZIeO9DRF6snjLYt1YXpqLf7KS0Ue8rZKa8Y1fMUvOSATCX188XDIHalbNi +X4Ph+bJPHsJDGNvIY5/7PVX66bGjEOSoHovZu04mtn7icMUh24QF1YfTEYF7yVcy +7EP1+jLYHvBY3cgHt1j1n1StVMQVttD19T37dBYTLHlITO5ANtgTAbMemkS3vMsC +AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAFN7wTeOq7xTWYc4ffWvjf/uAYrhUXoRz +7fQRYaPdqNrmik3Dv+eo9akRr8PQeT8S8wH8nZyQi/V4HOCXOTjqIE9A+XVDysFF +8Ehs+nKSw6uZAQ5LMVab9GFVAK8CLwRuP03yclEB4thDzykAusZTz7CrfHI96lco +CTJda1YZAEX1Tq4oUsnBzjQ1Y67x9LR/svnLghdDFaN9EvXQOVp1uMDfWfIlD8Pd +ZqH/FC+PIZk8oT/DcgZDfH9JOnnqRpqmuZ3h6sdn66opDRCqD6wXpCwbgFwJAF/h +SOU4bRRVhMsWIoHc76Gc99P+5WQRSIfxU/pi0iqP7V8vRnfynuD0pg== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUDA+mHhmaAXWoplJ+4vuWX27TNewwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCSFIxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0xOTA4MTAwODI3NTdaFw0yOTA1 +MDkwODI3NTdaMEUxCzAJBgNVBAYTAkhSMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDDcWKGaVMQgf2NaQCFsKRKSYI2B18gjVAeRDkvVWuf +XiR2ZH4DNMJ7H+FDN8G22ngkO/+3kIQ5cFh2lKgstjQUMu00Zc/xv+SYzL7WpgR6 +r1rxMs4LSRvEg6UivLk79trdSME7fz6u/H3L5rR5J6B9ha+BKxn8+bOFZvC/GAM6 +75u+C+7ymcVFEqZIKzPvY4kD+naIIr40hnHf1nwh0ku7wxr+ih460jwSQIM9iVPQ +LTmH1YbxtizBs4Fk3/qhEkiujtZPluQAD20wPkKU7kwMt0tVp2eBEahFkY+erwLS +my8viAT3Py4ACvMWGBTTg0GYr8s8/cpz4+UW3H99kx0rAgMBAAGjUzBRMB0GA1Ud +DgQWBBQbUz5bJSWJ6P8MEGTrMESAgcWSejAfBgNVHSMEGDAWgBQbUz5bJSWJ6P8M +EGTrMESAgcWSejAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAa +ffFvW1xdjQExZSANCOIxPpQL/tqVjzhVUEU4MK9vyZynemqdEXGeWg0P638wXsIc +U3baTmi0IFv+7XdLDSLPB83R6h4m03TBmYY8IfmedqppZVcKd4e1H+bzakMJS3FJ +Xq5/mt80Lb5FYaDw7OgYntdFPKW+wrIE+Hsz/lB0PxGUW2Qqvlt48fFuBbj0xR8n +InCUbr7bUz5K01med27ZrMrWrMfwMuUmxki3ZV6Gw0RYAottTfi5vjjR4j5Xm0nk +2AkQKVrKuVJvTGZdtXP80s0r7Ckk/inlKq7OyJth9Ff17uyIPcL1pBRhH7gVfc3I +p7S/IIgiwccbfMqVAKxe +-----END CERTIFICATE----- diff --git a/kamon-akka-http/src/test/resources/https/rootCA.crt b/kamon-akka-http/src/test/resources/https/rootCA.crt new file mode 100644 index 0000000..537134e --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/rootCA.crt @@ -0,0 +1,21 @@ +-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUDA+mHhmaAXWoplJ+4vuWX27TNewwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCSFIxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0xOTA4MTAwODI3NTdaFw0yOTA1 +MDkwODI3NTdaMEUxCzAJBgNVBAYTAkhSMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDDcWKGaVMQgf2NaQCFsKRKSYI2B18gjVAeRDkvVWuf +XiR2ZH4DNMJ7H+FDN8G22ngkO/+3kIQ5cFh2lKgstjQUMu00Zc/xv+SYzL7WpgR6 +r1rxMs4LSRvEg6UivLk79trdSME7fz6u/H3L5rR5J6B9ha+BKxn8+bOFZvC/GAM6 +75u+C+7ymcVFEqZIKzPvY4kD+naIIr40hnHf1nwh0ku7wxr+ih460jwSQIM9iVPQ +LTmH1YbxtizBs4Fk3/qhEkiujtZPluQAD20wPkKU7kwMt0tVp2eBEahFkY+erwLS +my8viAT3Py4ACvMWGBTTg0GYr8s8/cpz4+UW3H99kx0rAgMBAAGjUzBRMB0GA1Ud +DgQWBBQbUz5bJSWJ6P8MEGTrMESAgcWSejAfBgNVHSMEGDAWgBQbUz5bJSWJ6P8M +EGTrMESAgcWSejAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAa +ffFvW1xdjQExZSANCOIxPpQL/tqVjzhVUEU4MK9vyZynemqdEXGeWg0P638wXsIc +U3baTmi0IFv+7XdLDSLPB83R6h4m03TBmYY8IfmedqppZVcKd4e1H+bzakMJS3FJ +Xq5/mt80Lb5FYaDw7OgYntdFPKW+wrIE+Hsz/lB0PxGUW2Qqvlt48fFuBbj0xR8n +InCUbr7bUz5K01med27ZrMrWrMfwMuUmxki3ZV6Gw0RYAottTfi5vjjR4j5Xm0nk +2AkQKVrKuVJvTGZdtXP80s0r7Ckk/inlKq7OyJth9Ff17uyIPcL1pBRhH7gVfc3I +p7S/IIgiwccbfMqVAKxe +-----END CERTIFICATE----- diff --git a/kamon-akka-http/src/test/resources/https/rootCA.key b/kamon-akka-http/src/test/resources/https/rootCA.key new file mode 100644 index 0000000..8011f75 --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/rootCA.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAw3FihmlTEIH9jWkAhbCkSkmCNgdfII1QHkQ5L1Vrn14kdmR+ +AzTCex/hQzfBttp4JDv/t5CEOXBYdpSoLLY0FDLtNGXP8b/kmMy+1qYEeq9a8TLO +C0kbxIOlIry5O/ba3UjBO38+rvx9y+a0eSegfYWvgSsZ/PmzhWbwvxgDOu+bvgvu +8pnFRRKmSCsz72OJA/p2iCK+NIZx39Z8IdJLu8Ma/ooeOtI8EkCDPYlT0C05h9WG +8bYswbOBZN/6oRJIro7WT5bkAA9tMD5ClO5MDLdLVadngRGoRZGPnq8C0psvL4gE +9z8uAArzFhgU04NBmK/LPP3Kc+PlFtx/fZMdKwIDAQABAoIBAG/vna07344h1TVL +gTgQjlfZuBD3sdzz8oITMulQNB6HjbydG6r8abKY9KxJ39G5WHvwPSpGQ+Sd2py3 +0YYiKLu02zRaZ3mfHO8CvP41AXW+vwhLv8So75VijI7TpgeY/4sjY0CPRTh1dhr1 +HEITlxCtI3KIXA8OeGocJiBcQWVb28GWx6A3R1W+CWcgydw4mR2S8YeCnS7y175s +Ni6emuQIb7CZp6sqGK5xXs/iy0RMLRNi6f5IEPH/zn+l1G+XmfR7HAL+CRTkuNDU +dlp30gUuOyntVZgzU5rSrt00vDg7TR92GLsFQscmdT+uIdhpy79yksA6K4e+VEOQ +D+hTSdECgYEA6tjZche6qcCM7vjKTqIEvcpKG4UdWAPH6qNwkn/V/hwqVwXzovej +TJwXvo9oq9MNOC3vBqpYpxa+pgcbfSEab8NtP4eMFZkB6mSDPMaQ+Mh3XfKio7U9 +ghjnvLK0ravY0/sPltTZOjv6srY9+wGCD/Ung4SH9AhEmrXaD9P3T2MCgYEA1Qvy +TqW/s/xAC7JP/r3sLucitDfe4VV8KziwKv4rkjz1lbqp+NpJ6SYrn87R6HCjLzUA +rMCruN/JyIaHvcIEBJFeaLkld2BBGEeH4EEGuX1AiC3kUuxjrUbbsjLgq/edHXEi +sN7GaAIeqrcya5CSuXFmpB3Uuuas0K2gZeEsGZkCgYBhC98/gILIZyNWFUU0nUss +So25NZbcqiNQ2N1KDL2XVnhAodr+OysmG1LMkmKErqBF2OVvcbFUytdZsJIxcR6F +lNJucEr5GdNq0sJQuRVrWRvKnNuMnvad7kDE/2weYGcnohXdFHP31pVQiHKwaP0g +LwR3Gqs7srb237MO217VVQKBgByzgFBCGiJoQESTIB3EflYPQ2ieAkO/HXxBJdKU +7U/FMJycShvBZKWpQ8VCupqi2gkZDd84EapVU7zVCuJwidQHtX1MPBTp/bsEn/SB +LiO9EP2HmTPmrsMAQcau/f+M2zjFLhQ/3uDSMEl1ZrCBCJM9CMPhVPBc9TkjuvEe +ta85AoGBALxexsnE4SX3KREUmw7etEGOmnk2bpgVfCFe1ewDbCnsajAFKU9cizcx +EYVIOEkCuwX+VfHt2Z1yEJrWYtT8SoLwIJiZRNIns2iAtQiifaDOkBRewwNcM6p1 +nKzia++0GpKhsj0CZ6zMpGx1/LzT26IceaRLSIHsE58OHG9kKlVz +-----END RSA PRIVATE KEY----- diff --git a/kamon-akka-http/src/test/resources/https/rootCA.srl b/kamon-akka-http/src/test/resources/https/rootCA.srl new file mode 100644 index 0000000..288a47b --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/rootCA.srl @@ -0,0 +1 @@ +20558643DB2ECACFBC7E638CEE3EBBD3BAADC473 diff --git a/kamon-akka-http/src/test/resources/https/server.crt b/kamon-akka-http/src/test/resources/https/server.crt new file mode 100644 index 0000000..1a92258 --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/server.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDJDCCAgwCFCBVhkPbLsrPvH5jjO4+u9O6rcRzMA0GCSqGSIb3DQEBCwUAMEUx +CzAJBgNVBAYTAkhSMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRl +cm5ldCBXaWRnaXRzIFB0eSBMdGQwHhcNMTkwODEwMDgyODU3WhcNMjkwNTA5MDgy +ODU3WjBYMQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UE +CgwYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMREwDwYDVQQDDAhrYW1vbi5pbzCC +ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKmiEbbgaz+JzLCG0gDcPWFF +oecyEK18AwrrLLwSz+C3PhvYCGv9s/oqbEAfYX4LeFraVB9FopOhBYtpAvC1Kofz +5vVigFNx+Wq09fotHdKTwwHEa1gJczIic3eonKWNLTM6Ge0v7LY/b9Z9lB331TRU +2rePGYZIeO9DRF6snjLYt1YXpqLf7KS0Ue8rZKa8Y1fMUvOSATCX188XDIHalbNi +X4Ph+bJPHsJDGNvIY5/7PVX66bGjEOSoHovZu04mtn7icMUh24QF1YfTEYF7yVcy +7EP1+jLYHvBY3cgHt1j1n1StVMQVttD19T37dBYTLHlITO5ANtgTAbMemkS3vMsC +AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAFN7wTeOq7xTWYc4ffWvjf/uAYrhUXoRz +7fQRYaPdqNrmik3Dv+eo9akRr8PQeT8S8wH8nZyQi/V4HOCXOTjqIE9A+XVDysFF +8Ehs+nKSw6uZAQ5LMVab9GFVAK8CLwRuP03yclEB4thDzykAusZTz7CrfHI96lco +CTJda1YZAEX1Tq4oUsnBzjQ1Y67x9LR/svnLghdDFaN9EvXQOVp1uMDfWfIlD8Pd +ZqH/FC+PIZk8oT/DcgZDfH9JOnnqRpqmuZ3h6sdn66opDRCqD6wXpCwbgFwJAF/h +SOU4bRRVhMsWIoHc76Gc99P+5WQRSIfxU/pi0iqP7V8vRnfynuD0pg== +-----END CERTIFICATE----- diff --git a/kamon-akka-http/src/test/resources/https/server.csr b/kamon-akka-http/src/test/resources/https/server.csr new file mode 100644 index 0000000..d19ffb2 --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/server.csr @@ -0,0 +1,17 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIICszCCAZsCAQAwWDELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUx +ITAfBgNVBAoMGEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDERMA8GA1UEAwwIa2Ft +b24uaW8wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCpohG24Gs/icyw +htIA3D1hRaHnMhCtfAMK6yy8Es/gtz4b2Ahr/bP6KmxAH2F+C3ha2lQfRaKToQWL +aQLwtSqH8+b1YoBTcflqtPX6LR3Sk8MBxGtYCXMyInN3qJyljS0zOhntL+y2P2/W +fZQd99U0VNq3jxmGSHjvQ0RerJ4y2LdWF6ai3+yktFHvK2SmvGNXzFLzkgEwl9fP +FwyB2pWzYl+D4fmyTx7CQxjbyGOf+z1V+umxoxDkqB6L2btOJrZ+4nDFIduEBdWH +0xGBe8lXMuxD9foy2B7wWN3IB7dY9Z9UrVTEFbbQ9fU9+3QWEyx5SEzuQDbYEwGz +HppEt7zLAgMBAAGgFjAUBgkqhkiG9w0BCQcxBwwFa2Ftb24wDQYJKoZIhvcNAQEL +BQADggEBAJd3GMvju7earx/3sd0BxUzFIqyeQToGy34ie6WP6ZuF+RhVR0sfXETF +QMCkvthEgGNi/4GOCcK1LLCVET9sz+iMMxx0gQddcNB1jbxaU4/MNvBR8oxf9YNu +zcmgO1wHKPK0TKbRJtB7MGyKRsKLa4wZ5lEVxKTuKUdURbBOJdETvS1Wy3uJaK0m +LlzEu7IVJNIM9M21BsZvu8dK8jxMhJBkfWjE6du1oTJHYPQgwLipcxKBxtE2Jren +32lm5yyWmsNlQ8Y7pV7PME2HeJ33Fmhg7YVI6Vo06HglFz4ENWmC2t/JjBGgui6y +MVl69Qx69j5L7RkL+jWZrvMWCMNg1Qg= +-----END CERTIFICATE REQUEST----- diff --git a/kamon-akka-http/src/test/resources/https/server.key b/kamon-akka-http/src/test/resources/https/server.key new file mode 100644 index 0000000..3c38f61 --- /dev/null +++ b/kamon-akka-http/src/test/resources/https/server.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAqaIRtuBrP4nMsIbSANw9YUWh5zIQrXwDCussvBLP4Lc+G9gI +a/2z+ipsQB9hfgt4WtpUH0Wik6EFi2kC8LUqh/Pm9WKAU3H5arT1+i0d0pPDAcRr +WAlzMiJzd6icpY0tMzoZ7S/stj9v1n2UHffVNFTat48Zhkh470NEXqyeMti3Vhem +ot/spLRR7ytkprxjV8xS85IBMJfXzxcMgdqVs2Jfg+H5sk8ewkMY28hjn/s9Vfrp +saMQ5Kgei9m7Tia2fuJwxSHbhAXVh9MRgXvJVzLsQ/X6Mtge8FjdyAe3WPWfVK1U +xBW20PX1Pft0FhMseUhM7kA22BMBsx6aRLe8ywIDAQABAoIBAGJJosP4soulN3HN +HF8dPX9gDlhcXOd4ZHbuHwR2Tfahlh4iBXc1EBRSgliBFkcnNDxIJtfbzECH2yOU +2/xGrHcLrnXd9gbjkiXu5ltnytDZhvM+MQhYqWOSLJ9XljQiYd89ugoBa8GJbi60 +op7em61vwS78fkidM11G95V3pU5F697JjsKtxh5M0gQwLBOy10/QNcChLKBAHDan +/KpgdpUxE5Z6cLC98/S0Ygn7RWSR0LzpzdyJV+VUAWKt+zZNVBv9QmxWA+eKYAPh +WMyFgAdKSu6pWZRarUuyxrcUiEHefbErCp/OOkAbFGlr9aEEPwCYVOwP5guTA+Y5 +1nYmd7kCgYEA20mh5FuBFF1PwpzIbrsZDgh/OxD4J4WGcXKFbeM01Se+rgKjdvdw +pPGk3xh8K8POSqO/ystj20zu+VZngfM79sEPwWdJPWJ5cACQsqqvKv9FoXvLeBSk +PA+NXEDHrTqzmajqtHSaGEKNjeGrycC3oQAZopk50oC0KsGVF5/+FR8CgYEAxghP +nifQ6Gr4BHbgtCnX9xBnqj2GnxShVloPQSa5+HZ//b4DWE90/5CAu5PJkBbAkUZT +1I6f4aXohU5eFmeUs+hk10C4J7XlaoWt5Rl8ScxOOMg0YFueAjXvPZZNc/4ppbnw +kKoH3zTPLuVi5GR8ld66r4HN2WLl88Tjm4wQltUCgYAaL5bHgC3P0ry9jp9Yqbr6 +NAWNdh9MCOPfFD/euW0Lry1T9jiy8iVfbQO1KGVbjIxL2XYDr3oDLBK1b534pKUa +eD97ZuwWCnZZ65db3ooAZm9YM0I+2qgqC+ljhNDTXNkplkRAvFPSZdAlizdKZlsH +PM3S3t1Kx9e761X0dkSPHQKBgQCB7G/37l10LsH7g9bWvOEw+fVZTrZk5k8XbUy2 +zOaUKYK9gg2FwdOb3D1pU4OZYiQC6+YR/WTN0WClHQ5Dmr+H7T9DrfVkMEWMxpmZ +RkgxzrW/MTKTyWf4QVRtzo+QOz8tuLko4DT77xTCysI/3+GRHijS/tGD/wupDBLc +OV+k5QKBgQC+scfAqGuMN1mwPg3/c1u7W/vEA07yIhi2zGA+kpmdjQ/yjGWtaWKi +Jw5E7RP2z5klL821HlAxZQUuXfVGKQ7rEEqBF7BVfYA/qDG3Lxq276N0mjbWr0+G +jeYE4jrsR9UBGZyPF86DbGMRdEWZt1/4AEQTRKyZyhAI79fLkzPG4A== +-----END RSA PRIVATE KEY----- diff --git a/kamon-akka-http/src/test/resources/https/server.p12 b/kamon-akka-http/src/test/resources/https/server.p12 new file mode 100644 index 0000000..b4c8bf7 Binary files /dev/null and b/kamon-akka-http/src/test/resources/https/server.p12 differ diff --git a/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala b/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala index 537405e..f7a8bf6 100644 --- a/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala +++ b/kamon-akka-http/src/test/scala/kamon/akka/http/AkkaHttpServerTracingSpec.scala @@ -1,11 +1,11 @@ /* * ========================================================================================= - * Copyright © 2013-2016 the kamon project + * Copyright © 2013-2016 the kamon project * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file * except in compliance with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, @@ -19,18 +19,18 @@ package kamon.akka.http import java.util.UUID import akka.actor.ActorSystem -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.UseHttp2 import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.ActorMaterializer +import javax.net.ssl.{HostnameVerifier, SSLSession} import kamon.testkit._ -import kamon.tag.Lookups.{plain, plainLong, plainBoolean} +import kamon.tag.Lookups.{plain, plainBoolean, plainLong} import kamon.trace.Span.Mark import org.scalatest._ import org.scalatest.concurrent.{Eventually, ScalaFutures} import scala.concurrent.duration._ -import akka.http.scaladsl.model.StatusCodes +import okhttp3.{OkHttpClient, Request} class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with ScalaFutures with Inside with BeforeAndAfterAll with MetricInspection.Syntax with Reconfigure with TestWebServer with Eventually with OptionValues with TestSpanReporter { @@ -41,195 +41,208 @@ class AkkaHttpServerTracingSpec extends WordSpecLike with Matchers with ScalaFut implicit private val executor = system.dispatcher implicit private val materializer = ActorMaterializer() + val (sslSocketFactory, trustManager) = clientSSL() + val okHttp = new OkHttpClient.Builder() + .sslSocketFactory(sslSocketFactory, trustManager) + .hostnameVerifier(new HostnameVerifier { override def verify(s: String, sslSession: SSLSession): Boolean = true }) + .build() + val timeoutTest: FiniteDuration = 5 second val interface = "127.0.0.1" - val port = 8081 - val webServer = startServer(interface, port) + val http1WebServer = startServer(interface, 8081, https = false) + val http2WebServer = startServer(interface, 8082, https = true) - "the Akka HTTP server instrumentation" should { - "create a server Span when receiving requests" in { - val target = s"http://$interface:$port/$dummyPathOk" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + testSuite("HTTP/1", http1WebServer) + testSuite("HTTP/2", http2WebServer) - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.tags.get(plain("http.url")) shouldBe target - span.metricTags.get(plain("component")) shouldBe "akka.http.server" - span.metricTags.get(plain("http.method")) shouldBe "GET" - span.metricTags.get(plainLong("http.status_code")) shouldBe 200L + def testSuite(httpVersion: String, server: WebServer) = { + val interface = server.interface + val port = server.port + val protocol = server.protocol + + s"the Akka HTTP server instrumentation with ${httpVersion}" should { + "create a server Span when receiving requests" in { + val target = s"$protocol://$interface:$port/$dummyPathOk" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "akka.http.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200L + } } - } - "not include variables in operation name" when { - "including nested directives" in { - val path = s"extraction/nested/42/fixed/anchor/32/${UUID.randomUUID().toString}/fixed/44/CafE" - val expected = "/extraction/nested/{}/fixed/anchor/{}/{}/fixed/{}/{}" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "not include variables in operation name" when { + "including nested directives" in { + val path = s"extraction/nested/42/fixed/anchor/32/${UUID.randomUUID().toString}/fixed/44/CafE" + val expected = "/extraction/nested/{}/fixed/anchor/{}/{}/fixed/{}/{}" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } + } - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected - } - } + "not fail when request url contains special regexp chars" in { + val path = "extraction/segment/special**" + val expected = "/extraction/segment/{}" + val target = s"$protocol://$interface:$port/$path" + val response = okHttp.newCall(new Request.Builder().url(target).build()).execute() - "not fail when request url contains special regexp chars" in { - val path = "extraction/segment/special**" - val expected = "/extraction/segment/{}" - val target = s"http://$interface:$port/$path" - val response = Http().singleRequest(HttpRequest(uri = target)).futureValue + response.code() shouldBe 200 + response.body().string() shouldBe "special**" - response.status shouldBe StatusCodes.OK - Unmarshal(response).to[String].futureValue shouldBe "special**" + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } + } - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + "take a sampling decision when the routing tree hits an onComplete directive" in { + val path = "extraction/on-complete/42/more-path" + val expected = "/extraction/on-complete/{}/more-path" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } } - } - "take a sampling decision when the routing tree hits an onComplete directive" in { - val path = "extraction/on-complete/42/more-path" - val expected = "/extraction/on-complete/{}/more-path" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "take a sampling decision when the routing tree hits an onSuccess directive" in { + val path = "extraction/on-success/42/after" + val expected = "/extraction/on-success/{}/after" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } } - } - "take a sampling decision when the routing tree hits an onSuccess directive" in { - val path = "extraction/on-success/42/after" - val expected = "/extraction/on-success/{}/after" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "take a sampling decision when the routing tree hits a completeOrRecoverWith directive with a failed future" in { + val path = "extraction/complete-or-recover-with/42/after" + val expected = "/extraction/complete-or-recover-with/{}/after" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } } - } - "take a sampling decision when the routing tree hits a completeOrRecoverWith directive with a failed future" in { - val path = "extraction/complete-or-recover-with/42/after" - val expected = "/extraction/complete-or-recover-with/{}/after" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "take a sampling decision when the routing tree hits a completeOrRecoverWith directive with a successful future" in { + val path = "extraction/complete-or-recover-with-success/42/after" + val expected = "/extraction/complete-or-recover-with-success/{}" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } + } + + "including ambiguous nested directives" in { + val path = s"v3/user/3/post/3" + val expected = "/v3/user/{}/post/{}" + val target = s"$protocol://$interface:$port/$path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } } } - "take a sampling decision when the routing tree hits a completeOrRecoverWith directive with a successful future" in { - val path = "extraction/complete-or-recover-with-success/42/after" - val expected = "/extraction/complete-or-recover-with-success/{}" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "change the Span operation name when using the operationName directive" in { + val target = s"$protocol://$interface:$port/$traceOk" + okHttp.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + span.operationName shouldBe "user-supplied-operation" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "akka.http.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200L } } - "including ambiguous nested directives" in { - val path = s"v3/user/3/post/3" - val expected = "/v3/user/{}/post/{}" - val target = s"http://$interface:$port/$path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + "mark spans as failed when request fails" in { + val target = s"$protocol://$interface:$port/$dummyPathError" + okHttp.newCall(new Request.Builder().url(target).build()).execute() eventually(timeout(10 seconds)) { val span = testSpanReporter().nextSpan().value - span.operationName shouldBe expected + span.operationName shouldBe s"/$dummyPathError" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "akka.http.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500L } } - } - "change the Span operation name when using the operationName directive" in { - val target = s"http://$interface:$port/$traceOk" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe "user-supplied-operation" - span.tags.get(plain("http.url")) shouldBe target - span.metricTags.get(plain("component")) shouldBe "akka.http.server" - span.metricTags.get(plain("http.method")) shouldBe "GET" - span.metricTags.get(plainLong("http.status_code")) shouldBe 200L + "change the operation name to 'unhandled' when the response status code is 404" in { + val target = s"$protocol://$interface:$port/unknown-path" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "unhandled" + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "akka.http.server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe false + span.metricTags.get(plainLong("http.status_code")) shouldBe 404L + } } - } - "mark spans as failed when request fails" in { - val target = s"http://$interface:$port/$dummyPathError" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe s"/$dummyPathError" - span.tags.get(plain("http.url")) shouldBe target - span.metricTags.get(plain("component")) shouldBe "akka.http.server" - span.metricTags.get(plain("http.method")) shouldBe "GET" - span.metricTags.get(plainBoolean("error")) shouldBe true - span.metricTags.get(plainLong("http.status_code")) shouldBe 500L - } - } + "correctly time entity transfer timings" in { + val target = s"$protocol://$interface:$port/$stream" + okHttp.newCall(new Request.Builder().url(target).build()).execute() + val span = eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe "/stream" + span + } - "change the operation name to 'unhandled' when the response status code is 404" in { - val target = s"http://$interface:$port/unknown-path" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) + inside(span.marks){ + case List(_ @ Mark(_, "http.response.ready")) => + } - eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe "unhandled" span.tags.get(plain("http.url")) shouldBe target span.metricTags.get(plain("component")) shouldBe "akka.http.server" span.metricTags.get(plain("http.method")) shouldBe "GET" - span.metricTags.get(plainBoolean("error")) shouldBe false - span.metricTags.get(plainLong("http.status_code")) shouldBe 404L } - } + "include the trace-id and keep all user-provided headers in the responses" in { + val target = s"$protocol://$interface:$port/extra-header" + val response = okHttp.newCall(new Request.Builder().url(target).build()).execute() - "correctly time entity transfer timings" in { - val target = s"http://$interface:$port/$stream" - Http().singleRequest(HttpRequest(uri = target)).map(_.discardEntityBytes()) - - val span = eventually(timeout(10 seconds)) { - val span = testSpanReporter().nextSpan().value - span.operationName shouldBe "/stream" - span - } - - inside(span.marks){ - case List(m2 @ Mark(_, "http.response.ready")) => - } - - span.tags.get(plain("http.url")) shouldBe target - span.metricTags.get(plain("component")) shouldBe "akka.http.server" - span.metricTags.get(plain("http.method")) shouldBe "GET" - } - - "include the trace-id and keep all user-provided headers in the responses" in { - val target = s"http://$interface:$port/extra-header" - val response = Http().singleRequest(HttpRequest(uri = target)) - - whenReady(response, timeout(5 seconds)) { httpResponse => - httpResponse.headers.map(_.name()) should contain allOf ( + response.headers().names() should contain allOf ( "trace-id", "extra" ) } } - } override protected def afterAll(): Unit = { - webServer.shutdown() + http1WebServer.shutdown() + http2WebServer.shutdown() } diff --git a/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala b/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala index 02d5f5b..0097e81 100644 --- a/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala +++ b/kamon-akka-http/src/test/scala/kamon/testkit/TestWebServer.scala @@ -16,35 +16,34 @@ package kamon.testkit +import java.security.cert.{Certificate, CertificateFactory} +import java.security.{KeyStore, SecureRandom} + import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import akka.http.scaladsl.{Http, HttpsConnectionContext, UseHttp2} import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpResponse} import de.heikoseeberger.akkahttpjson4s.Json4sSupport import akka.http.scaladsl.model.StatusCodes.{BadRequest, InternalServerError, OK} import akka.http.scaladsl.model.headers.{Connection, RawHeader} import akka.http.scaladsl.server.Directives._ -import akka.http.scaladsl.server.RequestContext +import akka.http.scaladsl.server.{RequestContext, Route} import akka.stream.ActorMaterializer import akka.stream.scaladsl.Source import akka.util.ByteString +import javax.net.ssl.{KeyManagerFactory, SSLContext, SSLSocketFactory, TrustManagerFactory, X509TrustManager} import kamon.Kamon import kamon.instrumentation.akka.http.TracingDirectives import org.json4s.{DefaultFormats, native} - -import scala.concurrent.duration._ import kamon.tag.Lookups.plain import kamon.trace.Trace - -import scala.util.{Failure, Success} import scala.concurrent.{ExecutionContext, Future} -import scala.util.Random trait TestWebServer extends TracingDirectives { implicit val serialization = native.Serialization implicit val formats = DefaultFormats import Json4sSupport._ - def startServer(interface: String, port: Int)(implicit system: ActorSystem): WebServer = { + def startServer(interface: String, port: Int, https: Boolean = false)(implicit system: ActorSystem): WebServer = { import Endpoints._ implicit val ec: ExecutionContext = system.dispatcher @@ -171,9 +170,44 @@ trait TestWebServer extends TracingDirectives { } } - new WebServer(Http().bindAndHandle(routes, interface, port)) + if(https) + new WebServer(interface, port, "https", Http().bindAndHandleAsync(Route.asyncHandler(routes), interface, port, httpContext())) + else + new WebServer(interface, port, "http", Http().bindAndHandle(routes, interface, port)) + } + + def httpContext() = { + val password = "kamon".toCharArray + val ks = KeyStore.getInstance("PKCS12") + ks.load(getClass.getClassLoader.getResourceAsStream("https/server.p12"), password) + + val keyManagerFactory = KeyManagerFactory.getInstance("SunX509") + keyManagerFactory.init(ks, password) + + val context = SSLContext.getInstance("TLS") + context.init(keyManagerFactory.getKeyManagers, null, new SecureRandom) + + new HttpsConnectionContext(context) } + def clientSSL(): (SSLSocketFactory, X509TrustManager) = { + val certStore = KeyStore.getInstance(KeyStore.getDefaultType) + certStore.load(null, null) + // only do this if you want to accept a custom root CA. Understand what you are doing! + certStore.setCertificateEntry("ca", loadX509Certificate("https/rootCA.crt")) + + val certManagerFactory = TrustManagerFactory.getInstance("SunX509") + certManagerFactory.init(certStore) + + val context = SSLContext.getInstance("TLS") + context.init(null, certManagerFactory.getTrustManagers, new SecureRandom) + + (context.getSocketFactory, certManagerFactory.getTrustManagers.apply(0).asInstanceOf[X509TrustManager]) + } + + def loadX509Certificate(resourceName: String): Certificate = + CertificateFactory.getInstance("X.509").generateCertificate(getClass.getClassLoader.getResourceAsStream(resourceName)) + def samplingDecision(ctx: RequestContext): Trace.SamplingDecision = Kamon.currentSpan().trace.samplingDecision @@ -195,7 +229,7 @@ trait TestWebServer extends TracingDirectives { } } - class WebServer(bindingFuture: Future[Http.ServerBinding])(implicit ec: ExecutionContext) { + class WebServer(val interface: String, val port: Int, val protocol: String, bindingFuture: Future[Http.ServerBinding])(implicit ec: ExecutionContext) { def shutdown(): Future[_] = { bindingFuture.flatMap(binding => binding.unbind()) }