-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Raw HTTP input and support for custom authorization header (#20370)
* Add Raw HTTP input * Fix import * Use `/raw` path for new Raw HTTP input * Add change log * Add support for user-defined authorization header and value * Validate config dependency for new auth header fields. * Record config as failure as well for visibility. * Update change log * Remove unneeded failure recorder usage * Finalize path field Co-authored-by: Ryan <[email protected]> --------- Co-authored-by: Ryan <[email protected]>
- Loading branch information
1 parent
c0fed1b
commit 317d528
Showing
9 changed files
with
454 additions
and
163 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
type = "a" | ||
message = "Added new Raw HTTP input. Also added support for custom authorization config option for HTTP Gelf and new HTTP Raw inputs." | ||
|
||
pulls = ["20370"] |
68 changes: 68 additions & 0 deletions
68
graylog2-server/src/main/java/org/graylog2/inputs/raw/http/RawHttpInput.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Copyright (C) 2020 Graylog, Inc. | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the Server Side Public License, version 1, | ||
* as published by MongoDB, Inc. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* Server Side Public License for more details. | ||
* | ||
* You should have received a copy of the Server Side Public License | ||
* along with this program. If not, see | ||
* <http://www.mongodb.com/licensing/server-side-public-license>. | ||
*/ | ||
package org.graylog2.inputs.raw.http; | ||
|
||
import com.codahale.metrics.MetricRegistry; | ||
import com.google.inject.assistedinject.Assisted; | ||
import com.google.inject.assistedinject.AssistedInject; | ||
import jakarta.inject.Inject; | ||
import org.graylog2.inputs.codecs.RawCodec; | ||
import org.graylog2.inputs.transports.RawHttpTransport; | ||
import org.graylog2.plugin.LocalMetricRegistry; | ||
import org.graylog2.plugin.ServerStatus; | ||
import org.graylog2.plugin.configuration.Configuration; | ||
import org.graylog2.plugin.inputs.MessageInput; | ||
|
||
public class RawHttpInput extends MessageInput { | ||
|
||
private static final String NAME = "Raw HTTP"; | ||
|
||
@AssistedInject | ||
public RawHttpInput(MetricRegistry metricRegistry, | ||
@Assisted Configuration configuration, | ||
RawHttpTransport.Factory httpTransportFactory, | ||
RawCodec.Factory rawCodecFactory, LocalMetricRegistry localRegistry, Config config, Descriptor descriptor, ServerStatus serverStatus) { | ||
super(metricRegistry, configuration, httpTransportFactory.create(configuration), | ||
localRegistry, | ||
rawCodecFactory.create(configuration), config, descriptor, serverStatus); | ||
} | ||
|
||
public interface Factory extends MessageInput.Factory<RawHttpInput> { | ||
@Override | ||
RawHttpInput create(Configuration configuration); | ||
|
||
@Override | ||
Config getConfig(); | ||
|
||
@Override | ||
Descriptor getDescriptor(); | ||
} | ||
|
||
public static class Descriptor extends MessageInput.Descriptor { | ||
@Inject | ||
public Descriptor() { | ||
super(NAME, false, ""); | ||
} | ||
} | ||
|
||
public static class Config extends MessageInput.Config { | ||
@Inject | ||
public Config(RawHttpTransport.Factory transport, RawCodec.Factory codec) { | ||
super(transport.getConfig(), codec.getConfig()); | ||
} | ||
} | ||
} |
186 changes: 186 additions & 0 deletions
186
graylog2-server/src/main/java/org/graylog2/inputs/transports/AbstractHttpTransport.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
/* | ||
* Copyright (C) 2020 Graylog, Inc. | ||
* | ||
* This program is free software: you can redistribute it and/or modify | ||
* it under the terms of the Server Side Public License, version 1, | ||
* as published by MongoDB, Inc. | ||
* | ||
* This program is distributed in the hope that it will be useful, | ||
* but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
* Server Side Public License for more details. | ||
* | ||
* You should have received a copy of the Server Side Public License | ||
* along with this program. If not, see | ||
* <http://www.mongodb.com/licensing/server-side-public-license>. | ||
*/ | ||
package org.graylog2.inputs.transports; | ||
|
||
import com.github.joschi.jadconfig.util.Size; | ||
import io.netty.channel.ChannelHandler; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.handler.codec.Delimiters; | ||
import io.netty.handler.codec.http.HttpContentDecompressor; | ||
import io.netty.handler.codec.http.HttpObjectAggregator; | ||
import io.netty.handler.codec.http.HttpRequestDecoder; | ||
import io.netty.handler.codec.http.HttpResponseEncoder; | ||
import io.netty.handler.timeout.ReadTimeoutHandler; | ||
import jakarta.annotation.Nullable; | ||
import org.graylog2.configuration.TLSProtocolsConfiguration; | ||
import org.graylog2.inputs.transports.netty.EventLoopGroupFactory; | ||
import org.graylog2.inputs.transports.netty.HttpHandler; | ||
import org.graylog2.inputs.transports.netty.LenientDelimiterBasedFrameDecoder; | ||
import org.graylog2.plugin.InputFailureRecorder; | ||
import org.graylog2.plugin.LocalMetricRegistry; | ||
import org.graylog2.plugin.configuration.Configuration; | ||
import org.graylog2.plugin.configuration.ConfigurationRequest; | ||
import org.graylog2.plugin.configuration.fields.BooleanField; | ||
import org.graylog2.plugin.configuration.fields.ConfigurationField; | ||
import org.graylog2.plugin.configuration.fields.NumberField; | ||
import org.graylog2.plugin.configuration.fields.TextField; | ||
import org.graylog2.plugin.inputs.MessageInput; | ||
import org.graylog2.plugin.inputs.MisfireException; | ||
import org.graylog2.plugin.inputs.annotations.ConfigClass; | ||
import org.graylog2.plugin.inputs.transports.AbstractTcpTransport; | ||
import org.graylog2.plugin.inputs.util.ThroughputCounter; | ||
|
||
import java.util.LinkedHashMap; | ||
import java.util.concurrent.Callable; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.apache.commons.lang3.StringUtils.isBlank; | ||
import static org.apache.commons.lang3.StringUtils.isNotBlank; | ||
import static org.graylog2.shared.utilities.StringUtils.f; | ||
|
||
abstract public class AbstractHttpTransport extends AbstractTcpTransport { | ||
private static final int DEFAULT_MAX_INITIAL_LINE_LENGTH = 4096; | ||
private static final int DEFAULT_MAX_HEADER_SIZE = 8192; | ||
protected static final int DEFAULT_MAX_CHUNK_SIZE = (int) Size.kilobytes(64L).toBytes(); | ||
private static final int DEFAULT_IDLE_WRITER_TIMEOUT = 60; | ||
|
||
static final String CK_ENABLE_BULK_RECEIVING = "enable_bulk_receiving"; | ||
static final String CK_ENABLE_CORS = "enable_cors"; | ||
static final String CK_MAX_CHUNK_SIZE = "max_chunk_size"; | ||
static final String CK_IDLE_WRITER_TIMEOUT = "idle_writer_timeout"; | ||
static final String CK_AUTHORIZATION_HEADER_NAME = "authorization_header_name"; | ||
static final String CK_AUTHORIZATION_HEADER_VALUE = "authorization_header_value"; | ||
private static final String AUTHORIZATION_HEADER_NAME_LABEL = "Authorization Header Name"; | ||
private static final String AUTHORIZATION_HEADER_VALUE_LABEL = "Authorization Header Value"; | ||
|
||
protected final boolean enableBulkReceiving; | ||
protected final boolean enableCors; | ||
protected final int maxChunkSize; | ||
private final int idleWriterTimeout; | ||
private final String authorizationHeader; | ||
private final String authorizationHeaderValue; | ||
private final String path; | ||
|
||
public AbstractHttpTransport(Configuration configuration, | ||
EventLoopGroup eventLoopGroup, | ||
EventLoopGroupFactory eventLoopGroupFactory, | ||
NettyTransportConfiguration nettyTransportConfiguration, | ||
ThroughputCounter throughputCounter, | ||
LocalMetricRegistry localRegistry, | ||
TLSProtocolsConfiguration tlsConfiguration, String path) { | ||
super(configuration, | ||
throughputCounter, | ||
localRegistry, | ||
eventLoopGroup, | ||
eventLoopGroupFactory, | ||
nettyTransportConfiguration, | ||
tlsConfiguration); | ||
this.enableBulkReceiving = configuration.getBoolean(CK_ENABLE_BULK_RECEIVING); | ||
this.enableCors = configuration.getBoolean(CK_ENABLE_CORS); | ||
this.maxChunkSize = parseMaxChunkSize(configuration); | ||
this.idleWriterTimeout = configuration.intIsSet(CK_IDLE_WRITER_TIMEOUT) ? configuration.getInt(CK_IDLE_WRITER_TIMEOUT, DEFAULT_IDLE_WRITER_TIMEOUT) : DEFAULT_IDLE_WRITER_TIMEOUT; | ||
this.authorizationHeader = configuration.getString(CK_AUTHORIZATION_HEADER_NAME); | ||
this.authorizationHeaderValue = configuration.getString(CK_AUTHORIZATION_HEADER_VALUE); | ||
this.path = path; | ||
} | ||
|
||
/** | ||
* @return If the configured Max Chunk Size is less than zero, return {@link AbstractHttpTransport#DEFAULT_MAX_CHUNK_SIZE}. | ||
*/ | ||
protected static int parseMaxChunkSize(Configuration configuration) { | ||
int maxChunkSize = configuration.getInt(CK_MAX_CHUNK_SIZE, DEFAULT_MAX_CHUNK_SIZE); | ||
return maxChunkSize <= 0 ? DEFAULT_MAX_CHUNK_SIZE : maxChunkSize; | ||
} | ||
|
||
@Override | ||
protected LinkedHashMap<String, Callable<? extends ChannelHandler>> getCustomChildChannelHandlers(MessageInput input) { | ||
final LinkedHashMap<String, Callable<? extends ChannelHandler>> handlers = new LinkedHashMap<>(); | ||
if (idleWriterTimeout > 0) { | ||
// Install read timeout handler to close idle connections after a timeout. | ||
// This avoids dangling HTTP connections when the HTTP client does not close the connection properly. | ||
// For details see: https://github.com/Graylog2/graylog2-server/issues/3223#issuecomment-270350500 | ||
handlers.put("read-timeout-handler", () -> new ReadTimeoutHandler(idleWriterTimeout, TimeUnit.SECONDS)); | ||
} | ||
|
||
handlers.put("decoder", () -> new HttpRequestDecoder(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, maxChunkSize)); | ||
handlers.put("decompressor", HttpContentDecompressor::new); | ||
handlers.put("encoder", HttpResponseEncoder::new); | ||
handlers.put("aggregator", () -> new HttpObjectAggregator(maxChunkSize)); | ||
handlers.put("http-handler", () -> new HttpHandler(enableCors, authorizationHeader, authorizationHeaderValue, path)); | ||
if (enableBulkReceiving) { | ||
handlers.put("http-bulk-newline-decoder", | ||
() -> new LenientDelimiterBasedFrameDecoder(maxChunkSize, Delimiters.lineDelimiter())); | ||
} | ||
handlers.putAll(super.getCustomChildChannelHandlers(input)); | ||
return handlers; | ||
} | ||
|
||
@Override | ||
public void launch(MessageInput input, @Nullable InputFailureRecorder inputFailureRecorder) throws MisfireException { | ||
if (isNotBlank(authorizationHeader) && isBlank(authorizationHeaderValue)) { | ||
checkForConfigFieldDependencies(AUTHORIZATION_HEADER_NAME_LABEL, AUTHORIZATION_HEADER_VALUE_LABEL); | ||
} else if (isNotBlank(authorizationHeaderValue) && isBlank(authorizationHeader)) { | ||
checkForConfigFieldDependencies(AUTHORIZATION_HEADER_VALUE_LABEL, AUTHORIZATION_HEADER_NAME_LABEL); | ||
} | ||
super.launch(input, inputFailureRecorder); | ||
} | ||
|
||
private void checkForConfigFieldDependencies(String configParam1, String configParam2) throws MisfireException { | ||
throw new MisfireException(f("The [%s] configuration parameter cannot be used without also specifying a value for [%s].", configParam1, configParam2)); | ||
} | ||
|
||
@ConfigClass | ||
public static class Config extends AbstractTcpTransport.Config { | ||
@Override | ||
public ConfigurationRequest getRequestedConfiguration() { | ||
final ConfigurationRequest r = super.getRequestedConfiguration(); | ||
r.addField(new BooleanField(CK_ENABLE_BULK_RECEIVING, | ||
"Enable Bulk Receiving", | ||
false, | ||
"Enables bulk receiving of messages separated by newlines (\\n or \\r\\n)")); | ||
r.addField(new BooleanField(CK_ENABLE_CORS, | ||
"Enable CORS", | ||
true, | ||
"Input sends CORS headers to satisfy browser security policies")); | ||
r.addField(new NumberField(CK_MAX_CHUNK_SIZE, | ||
"Max. HTTP chunk size", | ||
DEFAULT_MAX_CHUNK_SIZE, | ||
"The maximum HTTP chunk size in bytes (e. g. length of HTTP request body)", | ||
ConfigurationField.Optional.OPTIONAL)); | ||
r.addField(new NumberField(CK_IDLE_WRITER_TIMEOUT, | ||
"Idle writer timeout", | ||
DEFAULT_IDLE_WRITER_TIMEOUT, | ||
"The server closes the connection after the given time in seconds after the last client write request. (use 0 to disable)", | ||
ConfigurationField.Optional.OPTIONAL, | ||
NumberField.Attribute.ONLY_POSITIVE)); | ||
r.addField(new TextField( | ||
CK_AUTHORIZATION_HEADER_NAME, | ||
AUTHORIZATION_HEADER_NAME_LABEL, | ||
"", | ||
"The name for the authorization header to use. If specified, all requests must contain this header with the correct value to authenticate successfully.", | ||
ConfigurationField.Optional.OPTIONAL)); | ||
r.addField(new TextField( | ||
CK_AUTHORIZATION_HEADER_VALUE, | ||
AUTHORIZATION_HEADER_VALUE_LABEL, | ||
"", | ||
"The secret authorization header value which all request must have in order to authenticate successfully. e.g. Bearer: <api-token>N", | ||
ConfigurationField.Optional.OPTIONAL, | ||
TextField.Attribute.IS_PASSWORD)); | ||
return r; | ||
} | ||
} | ||
} |
Oops, something went wrong.