From b309e4ac50ca44f2cfc06e8d27de33a1f0c77549 Mon Sep 17 00:00:00 2001 From: Outfluencer Date: Mon, 9 Sep 2024 21:01:19 +1000 Subject: [PATCH] #3737: Use composite buffers where possible --- .../md_5/bungee/jni/cipher/BungeeCipher.java | 6 ++++ .../md_5/bungee/jni/cipher/JavaCipher.java | 13 +++++-- .../md_5/bungee/jni/cipher/NativeCipher.java | 6 ++++ .../net/md_5/bungee/jni/zlib/BungeeZlib.java | 8 +++++ .../net/md_5/bungee/jni/zlib/JavaZlib.java | 6 ++++ .../net/md_5/bungee/jni/zlib/NativeZlib.java | 8 ++++- .../Varint21LengthFieldPrepender.java | 29 ++++++++++----- .../bungee/compress/PacketCompressor.java | 28 ++++++++++----- .../bungee/connection/InitialHandler.java | 2 ++ .../net/md_5/bungee/netty/ChannelWrapper.java | 35 +++++++++++++++++++ .../net/md_5/bungee/netty/PipelineUtils.java | 3 +- .../bungee/netty/cipher/CipherEncoder.java | 2 ++ 12 files changed, 123 insertions(+), 23 deletions(-) diff --git a/native/src/main/java/net/md_5/bungee/jni/cipher/BungeeCipher.java b/native/src/main/java/net/md_5/bungee/jni/cipher/BungeeCipher.java index f71cf9c9d4..50178dd6f0 100644 --- a/native/src/main/java/net/md_5/bungee/jni/cipher/BungeeCipher.java +++ b/native/src/main/java/net/md_5/bungee/jni/cipher/BungeeCipher.java @@ -18,4 +18,10 @@ public interface BungeeCipher void cipher(ByteBuf in, ByteBuf out) throws GeneralSecurityException; ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecurityException; + + /* + * This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf. + * If you need access to a memory address, you should not allow composite buffers. + */ + boolean allowComposite(); } diff --git a/native/src/main/java/net/md_5/bungee/jni/cipher/JavaCipher.java b/native/src/main/java/net/md_5/bungee/jni/cipher/JavaCipher.java index 0e27c2d7e5..94d0269136 100644 --- a/native/src/main/java/net/md_5/bungee/jni/cipher/JavaCipher.java +++ b/native/src/main/java/net/md_5/bungee/jni/cipher/JavaCipher.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; +import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; import javax.crypto.Cipher; import javax.crypto.SecretKey; @@ -12,10 +13,10 @@ public class JavaCipher implements BungeeCipher { private final Cipher cipher; - private static final ThreadLocal heapInLocal = new EmptyByteThreadLocal(); - private static final ThreadLocal heapOutLocal = new EmptyByteThreadLocal(); + private static final FastThreadLocal heapInLocal = new EmptyByteThreadLocal(); + private static final FastThreadLocal heapOutLocal = new EmptyByteThreadLocal(); - private static class EmptyByteThreadLocal extends ThreadLocal + private static class EmptyByteThreadLocal extends FastThreadLocal { @Override @@ -88,4 +89,10 @@ private byte[] bufToByte(ByteBuf in) in.readBytes( heapIn, 0, readableBytes ); return heapIn; } + + @Override + public boolean allowComposite() + { + return true; + } } diff --git a/native/src/main/java/net/md_5/bungee/jni/cipher/NativeCipher.java b/native/src/main/java/net/md_5/bungee/jni/cipher/NativeCipher.java index 7797a46b27..614b8b2992 100644 --- a/native/src/main/java/net/md_5/bungee/jni/cipher/NativeCipher.java +++ b/native/src/main/java/net/md_5/bungee/jni/cipher/NativeCipher.java @@ -71,4 +71,10 @@ public ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecur return heapOut; } + + @Override + public boolean allowComposite() + { + return false; + } } diff --git a/native/src/main/java/net/md_5/bungee/jni/zlib/BungeeZlib.java b/native/src/main/java/net/md_5/bungee/jni/zlib/BungeeZlib.java index 0c98ad4073..50701454d3 100644 --- a/native/src/main/java/net/md_5/bungee/jni/zlib/BungeeZlib.java +++ b/native/src/main/java/net/md_5/bungee/jni/zlib/BungeeZlib.java @@ -6,9 +6,17 @@ public interface BungeeZlib { + public static final int OUTPUT_BUFFER_SIZE = 8192; + void init(boolean compress, int level); void free(); void process(ByteBuf in, ByteBuf out) throws DataFormatException; + + /* + * This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf. + * If you need access to a memory address, you should not allow composite buffers. + */ + boolean allowComposite(); } diff --git a/native/src/main/java/net/md_5/bungee/jni/zlib/JavaZlib.java b/native/src/main/java/net/md_5/bungee/jni/zlib/JavaZlib.java index 10da5d95c3..0d65522cec 100644 --- a/native/src/main/java/net/md_5/bungee/jni/zlib/JavaZlib.java +++ b/native/src/main/java/net/md_5/bungee/jni/zlib/JavaZlib.java @@ -73,4 +73,10 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException inflater.reset(); } } + + @Override + public boolean allowComposite() + { + return true; + } } diff --git a/native/src/main/java/net/md_5/bungee/jni/zlib/NativeZlib.java b/native/src/main/java/net/md_5/bungee/jni/zlib/NativeZlib.java index 96e3077713..bad84bc444 100644 --- a/native/src/main/java/net/md_5/bungee/jni/zlib/NativeZlib.java +++ b/native/src/main/java/net/md_5/bungee/jni/zlib/NativeZlib.java @@ -55,7 +55,7 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException while ( !nativeCompress.finished && ( compress || in.isReadable() ) ) { - out.ensureWritable( 8192 ); + out.ensureWritable( OUTPUT_BUFFER_SIZE ); int processed; try @@ -74,4 +74,10 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException nativeCompress.consumed = 0; nativeCompress.finished = false; } + + @Override + public boolean allowComposite() + { + return false; + } } diff --git a/protocol/src/main/java/net/md_5/bungee/protocol/Varint21LengthFieldPrepender.java b/protocol/src/main/java/net/md_5/bungee/protocol/Varint21LengthFieldPrepender.java index 1adfbfcd74..3f0a866fc3 100644 --- a/protocol/src/main/java/net/md_5/bungee/protocol/Varint21LengthFieldPrepender.java +++ b/protocol/src/main/java/net/md_5/bungee/protocol/Varint21LengthFieldPrepender.java @@ -1,27 +1,38 @@ package net.md_5.bungee.protocol; import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.MessageToMessageEncoder; +import java.util.List; +import lombok.Setter; /** * Prepend length of the message as a Varint21 by writing length and data to a * new buffer */ -@ChannelHandler.Sharable -public class Varint21LengthFieldPrepender extends MessageToByteEncoder +public class Varint21LengthFieldPrepender extends MessageToMessageEncoder { + @Setter + private boolean compose = true; + @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List list) throws Exception { int bodyLen = msg.readableBytes(); int headerLen = varintSize( bodyLen ); - out.ensureWritable( headerLen + bodyLen ); - - DefinedPacket.writeVarInt( bodyLen, out ); - out.writeBytes( msg ); + if ( compose ) + { + ByteBuf buf = ctx.alloc().directBuffer( headerLen ); + DefinedPacket.writeVarInt( bodyLen, buf ); + list.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, buf, msg.retain() ) ); + } else + { + ByteBuf buf = ctx.alloc().directBuffer( headerLen + bodyLen ); + DefinedPacket.writeVarInt( bodyLen, buf ); + buf.writeBytes( msg ); + list.add( buf ); + } } static int varintSize(int paramInt) diff --git a/proxy/src/main/java/net/md_5/bungee/compress/PacketCompressor.java b/proxy/src/main/java/net/md_5/bungee/compress/PacketCompressor.java index d07cf46274..314cd00a8d 100644 --- a/proxy/src/main/java/net/md_5/bungee/compress/PacketCompressor.java +++ b/proxy/src/main/java/net/md_5/bungee/compress/PacketCompressor.java @@ -2,18 +2,23 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.MessageToMessageEncoder; +import java.util.List; import java.util.zip.Deflater; +import lombok.Getter; import lombok.Setter; import net.md_5.bungee.jni.zlib.BungeeZlib; import net.md_5.bungee.protocol.DefinedPacket; -public class PacketCompressor extends MessageToByteEncoder +public class PacketCompressor extends MessageToMessageEncoder { + @Getter private final BungeeZlib zlib = CompressFactory.zlib.newInstance(); @Setter private int threshold = 256; + @Setter + private boolean compose = true; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception @@ -28,18 +33,25 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception } @Override - protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { int origSize = msg.readableBytes(); if ( origSize < threshold ) { - DefinedPacket.writeVarInt( 0, out ); - out.writeBytes( msg ); + if ( compose ) + { + // create a virtual buffer to avoid copying of data + out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, ctx.alloc().directBuffer( 1 ).writeByte( 0 ), msg.retain() ) ); + } else + { + out.add( ctx.alloc().directBuffer( origSize + 1 ).writeByte( 0 ).writeBytes( msg ) ); + } } else { - DefinedPacket.writeVarInt( origSize, out ); - - zlib.process( msg, out ); + ByteBuf buf = ctx.alloc().directBuffer( BungeeZlib.OUTPUT_BUFFER_SIZE ); + DefinedPacket.writeVarInt( origSize, buf ); + zlib.process( msg, buf ); + out.add( buf ); } } } diff --git a/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java b/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java index 0d745b91b5..12396c9a7b 100644 --- a/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java +++ b/proxy/src/main/java/net/md_5/bungee/connection/InitialHandler.java @@ -508,6 +508,8 @@ public void handle(final EncryptionResponse encryptResponse) throws Exception ch.addBefore( PipelineUtils.FRAME_DECODER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) ); BungeeCipher encrypt = EncryptionUtil.getCipher( true, sharedKey ); ch.addBefore( PipelineUtils.FRAME_PREPENDER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) ); + // disable use of composite buffers if we use natives + ch.updateComposite(); String encName = URLEncoder.encode( InitialHandler.this.getName(), "UTF-8" ); diff --git a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java index e8d5ed1969..682bb80734 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java @@ -7,15 +7,19 @@ import io.netty.channel.ChannelHandlerContext; import java.net.SocketAddress; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; import lombok.Getter; import lombok.Setter; +import net.md_5.bungee.api.ProxyServer; import net.md_5.bungee.compress.PacketCompressor; import net.md_5.bungee.compress.PacketDecompressor; +import net.md_5.bungee.netty.cipher.CipherEncoder; import net.md_5.bungee.protocol.DefinedPacket; import net.md_5.bungee.protocol.MinecraftDecoder; import net.md_5.bungee.protocol.MinecraftEncoder; import net.md_5.bungee.protocol.PacketWrapper; import net.md_5.bungee.protocol.Protocol; +import net.md_5.bungee.protocol.Varint21LengthFieldPrepender; import net.md_5.bungee.protocol.packet.Kick; public class ChannelWrapper @@ -187,5 +191,36 @@ public void setCompressionThreshold(int compressionThreshold) { ch.pipeline().remove( "decompress" ); } + // disable use of composite buffers if we use natives + updateComposite(); + } + + /* + * Should be called on encryption add and on compressor add or remove + */ + public void updateComposite() + { + CipherEncoder cipherEncoder = ch.pipeline().get( CipherEncoder.class ); + PacketCompressor packetCompressor = ch.pipeline().get( PacketCompressor.class ); + Varint21LengthFieldPrepender prepender = ch.pipeline().get( Varint21LengthFieldPrepender.class ); + boolean compressorCompose = cipherEncoder == null || cipherEncoder.getCipher().allowComposite(); + boolean prependerCompose = compressorCompose && ( packetCompressor == null || packetCompressor.getZlib().allowComposite() ); + + if ( prepender != null ) + { + ProxyServer.getInstance().getLogger().log( Level.FINE, "set prepender compose to {0} for {1}", new Object[] + { + prependerCompose, ch + } ); + prepender.setCompose( prependerCompose ); + } + if ( packetCompressor != null ) + { + ProxyServer.getInstance().getLogger().log( Level.FINE, "set packetCompressor compose to {0} for {1}", new Object[] + { + compressorCompose, ch + } ); + packetCompressor.setCompose( compressorCompose ); + } } } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java index 25f045bebe..2446f48919 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java @@ -93,7 +93,6 @@ protected void initChannel(Channel ch) throws Exception public static final Base BASE = new Base( false ); public static final Base BASE_SERVERSIDE = new Base( true ); private static final KickStringWriter legacyKicker = new KickStringWriter(); - private static final Varint21LengthFieldPrepender framePrepender = new Varint21LengthFieldPrepender(); private static final Varint21LengthFieldExtraBufPrepender serverFramePrepender = new Varint21LengthFieldExtraBufPrepender(); public static final String TIMEOUT_HANDLER = "timeout"; public static final String PACKET_DECODER = "packet-decoder"; @@ -202,7 +201,7 @@ public void initChannel(Channel ch) throws Exception ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) ); // No encryption bungee -> server, therefore use extra buffer to avoid copying everything for length prepending // Not used bungee -> client as header would need to be encrypted separately through expensive JNI call - ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : framePrepender ); + ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : new Varint21LengthFieldPrepender() ); ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() ); } diff --git a/proxy/src/main/java/net/md_5/bungee/netty/cipher/CipherEncoder.java b/proxy/src/main/java/net/md_5/bungee/netty/cipher/CipherEncoder.java index fc19ded01e..e89c52d6c2 100644 --- a/proxy/src/main/java/net/md_5/bungee/netty/cipher/CipherEncoder.java +++ b/proxy/src/main/java/net/md_5/bungee/netty/cipher/CipherEncoder.java @@ -3,6 +3,7 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; +import lombok.Getter; import lombok.RequiredArgsConstructor; import net.md_5.bungee.jni.cipher.BungeeCipher; @@ -10,6 +11,7 @@ public class CipherEncoder extends MessageToByteEncoder { + @Getter private final BungeeCipher cipher; @Override