From 317d528cb19933980c1d05fd1300eab2450d83b9 Mon Sep 17 00:00:00 2001 From: Dan Torrey Date: Fri, 6 Sep 2024 17:22:34 -0500 Subject: [PATCH] Add Raw HTTP input and support for custom authorization header (#20370) * Add Raw HTTP input * Fix import * Use `/raw` path for new Raw HTTP input * Add change log * Add support for user-defined authorization header and value * Validate config dependency for new auth header fields. * Record config as failure as well for visibility. * Update change log * Remove unneeded failure recorder usage * Finalize path field Co-authored-by: Ryan <103449971+ryan-carroll-graylog@users.noreply.github.com> --------- Co-authored-by: Ryan <103449971+ryan-carroll-graylog@users.noreply.github.com> --- changelog/unreleased/pr-20370.toml | 4 + .../inputs/raw/http/RawHttpInput.java | 68 +++++++ .../transports/AbstractHttpTransport.java | 186 ++++++++++++++++++ .../inputs/transports/HttpTransport.java | 110 +---------- .../inputs/transports/RawHttpTransport.java | 62 ++++++ .../inputs/transports/TransportsModule.java | 1 + .../inputs/transports/netty/HttpHandler.java | 21 +- .../shared/bindings/MessageInputBindings.java | 2 + .../transports/netty/HttpHandlerTest.java | 163 +++++++++------ 9 files changed, 454 insertions(+), 163 deletions(-) create mode 100644 changelog/unreleased/pr-20370.toml create mode 100644 graylog2-server/src/main/java/org/graylog2/inputs/raw/http/RawHttpInput.java create mode 100644 graylog2-server/src/main/java/org/graylog2/inputs/transports/AbstractHttpTransport.java create mode 100644 graylog2-server/src/main/java/org/graylog2/inputs/transports/RawHttpTransport.java diff --git a/changelog/unreleased/pr-20370.toml b/changelog/unreleased/pr-20370.toml new file mode 100644 index 000000000000..0069f1f29a32 --- /dev/null +++ b/changelog/unreleased/pr-20370.toml @@ -0,0 +1,4 @@ +type = "a" +message = "Added new Raw HTTP input. Also added support for custom authorization config option for HTTP Gelf and new HTTP Raw inputs." + +pulls = ["20370"] diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/raw/http/RawHttpInput.java b/graylog2-server/src/main/java/org/graylog2/inputs/raw/http/RawHttpInput.java new file mode 100644 index 000000000000..8a45e46f2d44 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/raw/http/RawHttpInput.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.inputs.raw.http; + +import com.codahale.metrics.MetricRegistry; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import jakarta.inject.Inject; +import org.graylog2.inputs.codecs.RawCodec; +import org.graylog2.inputs.transports.RawHttpTransport; +import org.graylog2.plugin.LocalMetricRegistry; +import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.MessageInput; + +public class RawHttpInput extends MessageInput { + + private static final String NAME = "Raw HTTP"; + + @AssistedInject + public RawHttpInput(MetricRegistry metricRegistry, + @Assisted Configuration configuration, + RawHttpTransport.Factory httpTransportFactory, + RawCodec.Factory rawCodecFactory, LocalMetricRegistry localRegistry, Config config, Descriptor descriptor, ServerStatus serverStatus) { + super(metricRegistry, configuration, httpTransportFactory.create(configuration), + localRegistry, + rawCodecFactory.create(configuration), config, descriptor, serverStatus); + } + + public interface Factory extends MessageInput.Factory { + @Override + RawHttpInput create(Configuration configuration); + + @Override + Config getConfig(); + + @Override + Descriptor getDescriptor(); + } + + public static class Descriptor extends MessageInput.Descriptor { + @Inject + public Descriptor() { + super(NAME, false, ""); + } + } + + public static class Config extends MessageInput.Config { + @Inject + public Config(RawHttpTransport.Factory transport, RawCodec.Factory codec) { + super(transport.getConfig(), codec.getConfig()); + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/AbstractHttpTransport.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/AbstractHttpTransport.java new file mode 100644 index 000000000000..257f6b3f33f1 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/AbstractHttpTransport.java @@ -0,0 +1,186 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.inputs.transports; + +import com.github.joschi.jadconfig.util.Size; +import io.netty.channel.ChannelHandler; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.timeout.ReadTimeoutHandler; +import jakarta.annotation.Nullable; +import org.graylog2.configuration.TLSProtocolsConfiguration; +import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; +import org.graylog2.inputs.transports.netty.HttpHandler; +import org.graylog2.inputs.transports.netty.LenientDelimiterBasedFrameDecoder; +import org.graylog2.plugin.InputFailureRecorder; +import org.graylog2.plugin.LocalMetricRegistry; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.configuration.ConfigurationRequest; +import org.graylog2.plugin.configuration.fields.BooleanField; +import org.graylog2.plugin.configuration.fields.ConfigurationField; +import org.graylog2.plugin.configuration.fields.NumberField; +import org.graylog2.plugin.configuration.fields.TextField; +import org.graylog2.plugin.inputs.MessageInput; +import org.graylog2.plugin.inputs.MisfireException; +import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.transports.AbstractTcpTransport; +import org.graylog2.plugin.inputs.util.ThroughputCounter; + +import java.util.LinkedHashMap; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.graylog2.shared.utilities.StringUtils.f; + +abstract public class AbstractHttpTransport extends AbstractTcpTransport { + private static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096; + private static final int DEFAULT_MAX_HEADER_SIZE = 8192; + protected static final int DEFAULT_MAX_CHUNK_SIZE = (int) Size.kilobytes(64L).toBytes(); + private static final int DEFAULT_IDLE_WRITER_TIMEOUT = 60; + + static final String CK_ENABLE_BULK_RECEIVING = "enable_bulk_receiving"; + static final String CK_ENABLE_CORS = "enable_cors"; + static final String CK_MAX_CHUNK_SIZE = "max_chunk_size"; + static final String CK_IDLE_WRITER_TIMEOUT = "idle_writer_timeout"; + static final String CK_AUTHORIZATION_HEADER_NAME = "authorization_header_name"; + static final String CK_AUTHORIZATION_HEADER_VALUE = "authorization_header_value"; + private static final String AUTHORIZATION_HEADER_NAME_LABEL = "Authorization Header Name"; + private static final String AUTHORIZATION_HEADER_VALUE_LABEL = "Authorization Header Value"; + + protected final boolean enableBulkReceiving; + protected final boolean enableCors; + protected final int maxChunkSize; + private final int idleWriterTimeout; + private final String authorizationHeader; + private final String authorizationHeaderValue; + private final String path; + + public AbstractHttpTransport(Configuration configuration, + EventLoopGroup eventLoopGroup, + EventLoopGroupFactory eventLoopGroupFactory, + NettyTransportConfiguration nettyTransportConfiguration, + ThroughputCounter throughputCounter, + LocalMetricRegistry localRegistry, + TLSProtocolsConfiguration tlsConfiguration, String path) { + super(configuration, + throughputCounter, + localRegistry, + eventLoopGroup, + eventLoopGroupFactory, + nettyTransportConfiguration, + tlsConfiguration); + this.enableBulkReceiving = configuration.getBoolean(CK_ENABLE_BULK_RECEIVING); + this.enableCors = configuration.getBoolean(CK_ENABLE_CORS); + this.maxChunkSize = parseMaxChunkSize(configuration); + this.idleWriterTimeout = configuration.intIsSet(CK_IDLE_WRITER_TIMEOUT) ? configuration.getInt(CK_IDLE_WRITER_TIMEOUT, DEFAULT_IDLE_WRITER_TIMEOUT) : DEFAULT_IDLE_WRITER_TIMEOUT; + this.authorizationHeader = configuration.getString(CK_AUTHORIZATION_HEADER_NAME); + this.authorizationHeaderValue = configuration.getString(CK_AUTHORIZATION_HEADER_VALUE); + this.path = path; + } + + /** + * @return If the configured Max Chunk Size is less than zero, return {@link AbstractHttpTransport#DEFAULT_MAX_CHUNK_SIZE}. + */ + protected static int parseMaxChunkSize(Configuration configuration) { + int maxChunkSize = configuration.getInt(CK_MAX_CHUNK_SIZE, DEFAULT_MAX_CHUNK_SIZE); + return maxChunkSize <= 0 ? DEFAULT_MAX_CHUNK_SIZE : maxChunkSize; + } + + @Override + protected LinkedHashMap> getCustomChildChannelHandlers(MessageInput input) { + final LinkedHashMap> handlers = new LinkedHashMap<>(); + if (idleWriterTimeout > 0) { + // Install read timeout handler to close idle connections after a timeout. + // This avoids dangling HTTP connections when the HTTP client does not close the connection properly. + // For details see: https://github.com/Graylog2/graylog2-server/issues/3223#issuecomment-270350500 + handlers.put("read-timeout-handler", () -> new ReadTimeoutHandler(idleWriterTimeout, TimeUnit.SECONDS)); + } + + handlers.put("decoder", () -> new HttpRequestDecoder(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, maxChunkSize)); + handlers.put("decompressor", HttpContentDecompressor::new); + handlers.put("encoder", HttpResponseEncoder::new); + handlers.put("aggregator", () -> new HttpObjectAggregator(maxChunkSize)); + handlers.put("http-handler", () -> new HttpHandler(enableCors, authorizationHeader, authorizationHeaderValue, path)); + if (enableBulkReceiving) { + handlers.put("http-bulk-newline-decoder", + () -> new LenientDelimiterBasedFrameDecoder(maxChunkSize, Delimiters.lineDelimiter())); + } + handlers.putAll(super.getCustomChildChannelHandlers(input)); + return handlers; + } + + @Override + public void launch(MessageInput input, @Nullable InputFailureRecorder inputFailureRecorder) throws MisfireException { + if (isNotBlank(authorizationHeader) && isBlank(authorizationHeaderValue)) { + checkForConfigFieldDependencies(AUTHORIZATION_HEADER_NAME_LABEL, AUTHORIZATION_HEADER_VALUE_LABEL); + } else if (isNotBlank(authorizationHeaderValue) && isBlank(authorizationHeader)) { + checkForConfigFieldDependencies(AUTHORIZATION_HEADER_VALUE_LABEL, AUTHORIZATION_HEADER_NAME_LABEL); + } + super.launch(input, inputFailureRecorder); + } + + private void checkForConfigFieldDependencies(String configParam1, String configParam2) throws MisfireException { + throw new MisfireException(f("The [%s] configuration parameter cannot be used without also specifying a value for [%s].", configParam1, configParam2)); + } + + @ConfigClass + public static class Config extends AbstractTcpTransport.Config { + @Override + public ConfigurationRequest getRequestedConfiguration() { + final ConfigurationRequest r = super.getRequestedConfiguration(); + r.addField(new BooleanField(CK_ENABLE_BULK_RECEIVING, + "Enable Bulk Receiving", + false, + "Enables bulk receiving of messages separated by newlines (\\n or \\r\\n)")); + r.addField(new BooleanField(CK_ENABLE_CORS, + "Enable CORS", + true, + "Input sends CORS headers to satisfy browser security policies")); + r.addField(new NumberField(CK_MAX_CHUNK_SIZE, + "Max. HTTP chunk size", + DEFAULT_MAX_CHUNK_SIZE, + "The maximum HTTP chunk size in bytes (e. g. length of HTTP request body)", + ConfigurationField.Optional.OPTIONAL)); + r.addField(new NumberField(CK_IDLE_WRITER_TIMEOUT, + "Idle writer timeout", + DEFAULT_IDLE_WRITER_TIMEOUT, + "The server closes the connection after the given time in seconds after the last client write request. (use 0 to disable)", + ConfigurationField.Optional.OPTIONAL, + NumberField.Attribute.ONLY_POSITIVE)); + r.addField(new TextField( + CK_AUTHORIZATION_HEADER_NAME, + AUTHORIZATION_HEADER_NAME_LABEL, + "", + "The name for the authorization header to use. If specified, all requests must contain this header with the correct value to authenticate successfully.", + ConfigurationField.Optional.OPTIONAL)); + r.addField(new TextField( + CK_AUTHORIZATION_HEADER_VALUE, + AUTHORIZATION_HEADER_VALUE_LABEL, + "", + "The secret authorization header value which all request must have in order to authenticate successfully. e.g. Bearer: N", + ConfigurationField.Optional.OPTIONAL, + TextField.Attribute.IS_PASSWORD)); + return r; + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java index 920304f48001..c3d9ff4b1a58 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/HttpTransport.java @@ -16,53 +16,20 @@ */ package org.graylog2.inputs.transports; -import com.github.joschi.jadconfig.util.Size; import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.AssistedInject; -import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; -import io.netty.handler.codec.Delimiters; -import io.netty.handler.codec.http.HttpContentDecompressor; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.handler.timeout.ReadTimeoutHandler; import org.graylog2.configuration.TLSProtocolsConfiguration; import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; -import org.graylog2.inputs.transports.netty.HttpHandler; -import org.graylog2.inputs.transports.netty.LenientDelimiterBasedFrameDecoder; import org.graylog2.plugin.LocalMetricRegistry; import org.graylog2.plugin.configuration.Configuration; -import org.graylog2.plugin.configuration.ConfigurationRequest; -import org.graylog2.plugin.configuration.fields.BooleanField; -import org.graylog2.plugin.configuration.fields.ConfigurationField; -import org.graylog2.plugin.configuration.fields.NumberField; -import org.graylog2.plugin.inputs.MessageInput; import org.graylog2.plugin.inputs.annotations.ConfigClass; import org.graylog2.plugin.inputs.annotations.FactoryClass; -import org.graylog2.plugin.inputs.transports.AbstractTcpTransport; import org.graylog2.plugin.inputs.transports.Transport; import org.graylog2.plugin.inputs.util.ThroughputCounter; -import java.util.LinkedHashMap; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -public class HttpTransport extends AbstractTcpTransport { - private static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096; - private static final int DEFAULT_MAX_HEADER_SIZE = 8192; - protected static final int DEFAULT_MAX_CHUNK_SIZE = (int) Size.kilobytes(64L).toBytes(); - private static final int DEFAULT_IDLE_WRITER_TIMEOUT = 60; - - static final String CK_ENABLE_BULK_RECEIVING = "enable_bulk_receiving"; - static final String CK_ENABLE_CORS = "enable_cors"; - static final String CK_MAX_CHUNK_SIZE = "max_chunk_size"; - static final String CK_IDLE_WRITER_TIMEOUT = "idle_writer_timeout"; - - protected final boolean enableBulkReceiving; - protected final boolean enableCors; - protected final int maxChunkSize; - private final int idleWriterTimeout; +public class HttpTransport extends AbstractHttpTransport { + private static final String PATH = "/gelf"; @AssistedInject public HttpTransport(@Assisted Configuration configuration, @@ -72,49 +39,8 @@ public HttpTransport(@Assisted Configuration configuration, ThroughputCounter throughputCounter, LocalMetricRegistry localRegistry, TLSProtocolsConfiguration tlsConfiguration) { - super(configuration, - throughputCounter, - localRegistry, - eventLoopGroup, - eventLoopGroupFactory, - nettyTransportConfiguration, - tlsConfiguration); - - this.enableBulkReceiving = configuration.getBoolean(CK_ENABLE_BULK_RECEIVING); - this.enableCors = configuration.getBoolean(CK_ENABLE_CORS); - this.maxChunkSize = parseMaxChunkSize(configuration); - this.idleWriterTimeout = configuration.intIsSet(CK_IDLE_WRITER_TIMEOUT) ? configuration.getInt(CK_IDLE_WRITER_TIMEOUT, DEFAULT_IDLE_WRITER_TIMEOUT) : DEFAULT_IDLE_WRITER_TIMEOUT; - } - - /** - * @return If the configured Max Chunk Size is less than zero, return {@link HttpTransport#DEFAULT_MAX_CHUNK_SIZE}. - */ - protected static int parseMaxChunkSize(Configuration configuration) { - int maxChunkSize = configuration.getInt(CK_MAX_CHUNK_SIZE, DEFAULT_MAX_CHUNK_SIZE); - return maxChunkSize <= 0 ? DEFAULT_MAX_CHUNK_SIZE : maxChunkSize; - } - - @Override - protected LinkedHashMap> getCustomChildChannelHandlers(MessageInput input) { - final LinkedHashMap> handlers = new LinkedHashMap<>(); - if (idleWriterTimeout > 0) { - // Install read timeout handler to close idle connections after a timeout. - // This avoids dangling HTTP connections when the HTTP client does not close the connection properly. - // For details see: https://github.com/Graylog2/graylog2-server/issues/3223#issuecomment-270350500 - handlers.put("read-timeout-handler", () -> new ReadTimeoutHandler(idleWriterTimeout, TimeUnit.SECONDS)); - } - - handlers.put("decoder", () -> new HttpRequestDecoder(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, maxChunkSize)); - handlers.put("decompressor", HttpContentDecompressor::new); - handlers.put("encoder", HttpResponseEncoder::new); - handlers.put("aggregator", () -> new HttpObjectAggregator(maxChunkSize)); - handlers.put("http-handler", () -> new HttpHandler(enableCors)); - if (enableBulkReceiving) { - handlers.put("http-bulk-newline-decoder", - () -> new LenientDelimiterBasedFrameDecoder(maxChunkSize, Delimiters.lineDelimiter())); - } - handlers.putAll(super.getCustomChildChannelHandlers(input)); - return handlers; + super(configuration, eventLoopGroup, eventLoopGroupFactory, nettyTransportConfiguration, throughputCounter, + localRegistry, tlsConfiguration, PATH); } @FactoryClass @@ -123,34 +49,10 @@ public interface Factory extends Transport.Factory { HttpTransport create(Configuration configuration); @Override - Config getConfig(); + HttpTransport.Config getConfig(); } @ConfigClass - public static class Config extends AbstractTcpTransport.Config { - @Override - public ConfigurationRequest getRequestedConfiguration() { - final ConfigurationRequest r = super.getRequestedConfiguration(); - r.addField(new BooleanField(CK_ENABLE_BULK_RECEIVING, - "Enable Bulk Receiving", - false, - "Enables bulk receiving of messages separated by newlines (\\n or \\r\\n)")); - r.addField(new BooleanField(CK_ENABLE_CORS, - "Enable CORS", - true, - "Input sends CORS headers to satisfy browser security policies")); - r.addField(new NumberField(CK_MAX_CHUNK_SIZE, - "Max. HTTP chunk size", - DEFAULT_MAX_CHUNK_SIZE, - "The maximum HTTP chunk size in bytes (e. g. length of HTTP request body)", - ConfigurationField.Optional.OPTIONAL)); - r.addField(new NumberField(CK_IDLE_WRITER_TIMEOUT, - "Idle writer timeout", - DEFAULT_IDLE_WRITER_TIMEOUT, - "The server closes the connection after the given time in seconds after the last client write request. (use 0 to disable)", - ConfigurationField.Optional.OPTIONAL, - NumberField.Attribute.ONLY_POSITIVE)); - return r; - } + public static class Config extends AbstractHttpTransport.Config { } } diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/RawHttpTransport.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/RawHttpTransport.java new file mode 100644 index 000000000000..06b08abd7298 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/RawHttpTransport.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.inputs.transports; + +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import io.netty.channel.EventLoopGroup; +import org.graylog2.configuration.TLSProtocolsConfiguration; +import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; +import org.graylog2.plugin.LocalMetricRegistry; +import org.graylog2.plugin.configuration.Configuration; +import org.graylog2.plugin.inputs.annotations.ConfigClass; +import org.graylog2.plugin.inputs.annotations.FactoryClass; +import org.graylog2.plugin.inputs.transports.Transport; +import org.graylog2.plugin.inputs.util.ThroughputCounter; + +/** + * Raw version of the HttpTransport which uses the `/raw` path instead of the `/gelf` path. + */ +public class RawHttpTransport extends AbstractHttpTransport { + private static final String PATH = "/raw"; + + @AssistedInject + public RawHttpTransport(@Assisted Configuration configuration, + EventLoopGroup eventLoopGroup, + EventLoopGroupFactory eventLoopGroupFactory, + NettyTransportConfiguration nettyTransportConfiguration, + ThroughputCounter throughputCounter, + LocalMetricRegistry localRegistry, + TLSProtocolsConfiguration tlsConfiguration) { + super(configuration, eventLoopGroup, eventLoopGroupFactory, nettyTransportConfiguration, + throughputCounter, localRegistry, tlsConfiguration, PATH); + + } + + @FactoryClass + public interface Factory extends Transport.Factory { + @Override + RawHttpTransport create(Configuration configuration); + + @Override + Config getConfig(); + } + + @ConfigClass + public static class Config extends AbstractHttpTransport.Config { + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java index 59e2d285bc24..b6f385177f92 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/TransportsModule.java @@ -38,6 +38,7 @@ protected void configure() { installTransport(mapBinder, "udp", UdpTransport.class); installTransport(mapBinder, "tcp", TcpTransport.class); installTransport(mapBinder, "http", HttpTransport.class); + installTransport(mapBinder, "http-raw", RawHttpTransport.class); installTransport(mapBinder, "randomhttp", RandomMessageTransport.class); installTransport(mapBinder, "kafka", KafkaTransport.class); installTransport(mapBinder, "amqp", AmqpTransport.class); diff --git a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java index 6fed1026d597..80542fa21d2e 100644 --- a/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java +++ b/graylog2-server/src/main/java/org/graylog2/inputs/transports/netty/HttpHandler.java @@ -33,11 +33,20 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; + public class HttpHandler extends SimpleChannelInboundHandler { private final boolean enableCors; + private final String authorizationHeader; + private final String authorizationHeaderValue; + private final String path; - public HttpHandler(boolean enableCors) { + public HttpHandler(boolean enableCors, String authorizationHeader, String authorizationHeaderValue, String path) { this.enableCors = enableCors; + this.authorizationHeader = authorizationHeader; + this.authorizationHeaderValue = authorizationHeaderValue; + this.path = path; } @Override @@ -47,6 +56,14 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) thro final HttpVersion httpRequestVersion = request.protocolVersion(); final String origin = request.headers().get(HttpHeaderNames.ORIGIN); + if (isNotBlank(authorizationHeader)) { + // Authentication is required. + final String suppliedAuthHeaderValue = request.headers().get(authorizationHeader); + if (isBlank(suppliedAuthHeaderValue) || !suppliedAuthHeaderValue.equals(authorizationHeaderValue)) { + writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.UNAUTHORIZED, origin); + } + } + // to allow for future changes, let's be at least a little strict in what we accept here. if (HttpMethod.OPTIONS.equals(request.method())) { writeResponse(channel, keepAlive, httpRequestVersion, HttpResponseStatus.OK, origin); @@ -56,7 +73,7 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpRequest request) thro return; } - final boolean correctPath = "/gelf".equals(request.uri()); + final boolean correctPath = path.equals(request.uri()); if (correctPath && request instanceof FullHttpRequest) { final FullHttpRequest fullHttpRequest = (FullHttpRequest) request; final ByteBuf buffer = fullHttpRequest.content(); diff --git a/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java b/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java index b91c638d6698..2aac72603fa8 100644 --- a/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java +++ b/graylog2-server/src/main/java/org/graylog2/shared/bindings/MessageInputBindings.java @@ -28,6 +28,7 @@ import org.graylog2.inputs.misc.jsonpath.JsonPathInput; import org.graylog2.inputs.random.FakeHttpMessageInput; import org.graylog2.inputs.raw.amqp.RawAMQPInput; +import org.graylog2.inputs.raw.http.RawHttpInput; import org.graylog2.inputs.raw.kafka.RawKafkaInput; import org.graylog2.inputs.raw.tcp.RawTCPInput; import org.graylog2.inputs.raw.udp.RawUDPInput; @@ -50,6 +51,7 @@ protected void configure() { installInput(inputMapBinder, RawTCPInput.class, RawTCPInput.Factory.class); installInput(inputMapBinder, RawUDPInput.class, RawUDPInput.Factory.class); installInput(inputMapBinder, RawAMQPInput.class, RawAMQPInput.Factory.class); + installInput(inputMapBinder, RawHttpInput.class, RawHttpInput.Factory.class); installInput(inputMapBinder, RawKafkaInput.class, RawKafkaInput.Factory.class); installInput(inputMapBinder, SyslogTCPInput.class, SyslogTCPInput.Factory.class); installInput(inputMapBinder, SyslogUDPInput.class, SyslogUDPInput.Factory.class); diff --git a/graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/HttpHandlerTest.java b/graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/HttpHandlerTest.java index 5d99fba32e58..cf0454f6f312 100644 --- a/graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/HttpHandlerTest.java +++ b/graylog2-server/src/test/java/org/graylog2/inputs/transports/netty/HttpHandlerTest.java @@ -20,7 +20,6 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; @@ -28,28 +27,41 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.util.AsciiString; import org.junit.Before; import org.junit.Test; import java.nio.charset.StandardCharsets; +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS; +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS; +import static io.netty.handler.codec.http.HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN; +import static io.netty.handler.codec.http.HttpHeaderNames.AUTHORIZATION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; +import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaderNames.HOST; +import static io.netty.handler.codec.http.HttpHeaderNames.ORIGIN; +import static io.netty.handler.codec.http.HttpResponseStatus.ACCEPTED; +import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED; import static org.assertj.core.api.Assertions.assertThat; public class HttpHandlerTest { private static final byte[] GELF_MESSAGE = "{\"version\":\"1.1\",\"short_message\":\"Foo\",\"host\":\"localhost\"}".getBytes(StandardCharsets.UTF_8); + private static final String BEARER_EXPECTED_TOKEN = "Bearer: expected-token"; private EmbeddedChannel channel; @Before public void setUp() { - channel = new EmbeddedChannel(new HttpHandler(true)); + channel = new EmbeddedChannel(new HttpHandler(true, null, null, "/gelf")); } @Test public void messageReceivedSuccessfullyProcessesPOSTRequest() { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.ORIGIN, "http://example.com"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(ORIGIN, "http://example.com"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); httpRequest.content().writeBytes(GELF_MESSAGE); @@ -57,21 +69,20 @@ public void messageReceivedSuccessfullyProcessesPOSTRequest() { channel.finish(); final HttpResponse httpResponse = channel.readOutbound(); - assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.ACCEPTED); + assertThat(httpResponse.status()).isEqualTo(ACCEPTED); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); - assertThat(headers.get(HttpHeaderNames.CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + assertThat(headers.get(CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); } - @Test public void withKeepalive() { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.KEEP_ALIVE); httpRequest.content().writeBytes(GELF_MESSAGE); @@ -79,18 +90,18 @@ public void withKeepalive() { channel.finish(); final HttpResponse httpResponse = channel.readOutbound(); - assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.ACCEPTED); + assertThat(httpResponse.status()).isEqualTo(ACCEPTED); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.CONNECTION)).isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString()); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(CONNECTION)).isEqualTo(HttpHeaderValues.KEEP_ALIVE.toString()); } @Test public void withJSONContentType() { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); httpRequest.content().writeBytes(GELF_MESSAGE); @@ -98,18 +109,18 @@ public void withJSONContentType() { channel.finish(); final HttpResponse httpResponse = channel.readOutbound(); - assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.ACCEPTED); + assertThat(httpResponse.status()).isEqualTo(ACCEPTED); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); } @Test public void withCustomContentType() { final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.CONTENT_TYPE, "foo/bar"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(CONTENT_TYPE, "foo/bar"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); httpRequest.content().writeBytes(GELF_MESSAGE); @@ -117,18 +128,18 @@ public void withCustomContentType() { channel.finish(); final HttpResponse httpResponse = channel.readOutbound(); - assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.ACCEPTED); + assertThat(httpResponse.status()).isEqualTo(ACCEPTED); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); } @Test public void successfullyProcessOPTIONSRequest() { final HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.ORIGIN, "http://example.com"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(ORIGIN, "http://example.com"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); channel.writeInbound(httpRequest); channel.finish(); @@ -137,17 +148,17 @@ public void successfullyProcessOPTIONSRequest() { assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.OK); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); } @Test public void successfullyProcessOPTIONSRequestWithoutOrigin() { final HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.OPTIONS, "/gelf"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); channel.writeInbound(httpRequest); channel.finish(); @@ -156,18 +167,18 @@ public void successfullyProcessOPTIONSRequestWithoutOrigin() { assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.OK); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.contains(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN)).isFalse(); - assertThat(headers.contains(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS)).isFalse(); - assertThat(headers.contains(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS)).isFalse(); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.contains(ACCESS_CONTROL_ALLOW_ORIGIN)).isFalse(); + assertThat(headers.contains(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isFalse(); + assertThat(headers.contains(ACCESS_CONTROL_ALLOW_HEADERS)).isFalse(); } @Test public void return404ForWrongPath() { final HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.ORIGIN, "http://example.com"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(ORIGIN, "http://example.com"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); channel.writeInbound(httpRequest); channel.finish(); @@ -175,18 +186,18 @@ public void return404ForWrongPath() { final HttpResponse httpResponse = channel.readOutbound(); assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.NOT_FOUND); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); } @Test public void messageReceivedReturns405ForInvalidMethod() { final HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); - httpRequest.headers().add(HttpHeaderNames.HOST, "localhost"); - httpRequest.headers().add(HttpHeaderNames.ORIGIN, "http://example.com"); - httpRequest.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(ORIGIN, "http://example.com"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); channel.writeInbound(httpRequest); channel.finish(); @@ -194,10 +205,48 @@ public void messageReceivedReturns405ForInvalidMethod() { final HttpResponse httpResponse = channel.readOutbound(); assertThat(httpResponse.status()).isEqualTo(HttpResponseStatus.METHOD_NOT_ALLOWED); final HttpHeaders headers = httpResponse.headers(); - assertThat(headers.get(HttpHeaderNames.CONTENT_LENGTH)).isEqualTo("0"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); - assertThat(headers.get(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + } + + @Test + public void testAuthentication() { + // No auth required - success. + testAuthentication(null, null, null, null, ACCEPTED); + // Auth required - success. + testAuthentication(AUTHORIZATION.toString(), BEARER_EXPECTED_TOKEN, AUTHORIZATION, BEARER_EXPECTED_TOKEN, ACCEPTED); + // Auth required - failures. + testAuthentication(AUTHORIZATION.toString(), BEARER_EXPECTED_TOKEN, AUTHORIZATION, "bad-token", UNAUTHORIZED); + testAuthentication(AUTHORIZATION.toString(), BEARER_EXPECTED_TOKEN, AUTHORIZATION, "", UNAUTHORIZED); + testAuthentication(AUTHORIZATION.toString(), BEARER_EXPECTED_TOKEN, null, "", UNAUTHORIZED); } -} \ No newline at end of file + private void testAuthentication(String expectedAuthHeader, String expectedAuthHeaderValue, AsciiString suppliedAuthHeader, String suppliedAuthHeaderValue, + HttpResponseStatus expectedStatus) { + final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/gelf"); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add(ORIGIN, "http://example.com"); + httpRequest.headers().add(CONNECTION, HttpHeaderValues.CLOSE); + if (suppliedAuthHeader != null) { + httpRequest.headers().add(suppliedAuthHeader, suppliedAuthHeaderValue); + } + + httpRequest.content().writeBytes(GELF_MESSAGE); + + channel = new EmbeddedChannel(new HttpHandler(true, expectedAuthHeader, expectedAuthHeaderValue, "/gelf")); + channel.writeInbound(httpRequest); + channel.finish(); + + final HttpResponse httpResponse = channel.readOutbound(); + // Request should be successful. + assertThat(httpResponse.status()).isEqualTo(expectedStatus); + final HttpHeaders headers = httpResponse.headers(); + assertThat(headers.get(CONTENT_LENGTH)).isEqualTo("0"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_ORIGIN)).isEqualTo("http://example.com"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_CREDENTIALS)).isEqualTo("true"); + assertThat(headers.get(ACCESS_CONTROL_ALLOW_HEADERS)).isEqualTo("Authorization, Content-Type"); + assertThat(headers.get(CONNECTION)).isEqualTo(HttpHeaderValues.CLOSE.toString()); + } +}