Skip to content

Commit

Permalink
#3737: Use composite buffers where possible
Browse files Browse the repository at this point in the history
  • Loading branch information
Outfluencer authored and md-5 committed Sep 9, 2024
1 parent 477ea59 commit b309e4a
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ public interface BungeeCipher
void cipher(ByteBuf in, ByteBuf out) throws GeneralSecurityException;

ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecurityException;

/*
* This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf.
* If you need access to a memory address, you should not allow composite buffers.
*/
boolean allowComposite();
}
13 changes: 10 additions & 3 deletions native/src/main/java/net/md_5/bungee/jni/cipher/JavaCipher.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
import javax.crypto.Cipher;
import javax.crypto.SecretKey;
Expand All @@ -12,10 +13,10 @@ public class JavaCipher implements BungeeCipher
{

private final Cipher cipher;
private static final ThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
private static final ThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();
private static final FastThreadLocal<byte[]> heapInLocal = new EmptyByteThreadLocal();
private static final FastThreadLocal<byte[]> heapOutLocal = new EmptyByteThreadLocal();

private static class EmptyByteThreadLocal extends ThreadLocal<byte[]>
private static class EmptyByteThreadLocal extends FastThreadLocal<byte[]>
{

@Override
Expand Down Expand Up @@ -88,4 +89,10 @@ private byte[] bufToByte(ByteBuf in)
in.readBytes( heapIn, 0, readableBytes );
return heapIn;
}

@Override
public boolean allowComposite()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,10 @@ public ByteBuf cipher(ChannelHandlerContext ctx, ByteBuf in) throws GeneralSecur

return heapOut;
}

@Override
public boolean allowComposite()
{
return false;
}
}
8 changes: 8 additions & 0 deletions native/src/main/java/net/md_5/bungee/jni/zlib/BungeeZlib.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
public interface BungeeZlib
{

public static final int OUTPUT_BUFFER_SIZE = 8192;

void init(boolean compress, int level);

void free();

void process(ByteBuf in, ByteBuf out) throws DataFormatException;

/*
* This indicates whether the input ByteBuf is allowed to be a CompositeByteBuf.
* If you need access to a memory address, you should not allow composite buffers.
*/
boolean allowComposite();
}
6 changes: 6 additions & 0 deletions native/src/main/java/net/md_5/bungee/jni/zlib/JavaZlib.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,10 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException
inflater.reset();
}
}

@Override
public boolean allowComposite()
{
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException

while ( !nativeCompress.finished && ( compress || in.isReadable() ) )
{
out.ensureWritable( 8192 );
out.ensureWritable( OUTPUT_BUFFER_SIZE );

int processed;
try
Expand All @@ -74,4 +74,10 @@ public void process(ByteBuf in, ByteBuf out) throws DataFormatException
nativeCompress.consumed = 0;
nativeCompress.finished = false;
}

@Override
public boolean allowComposite()
{
return false;
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,38 @@
package net.md_5.bungee.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import lombok.Setter;

/**
* Prepend length of the message as a Varint21 by writing length and data to a
* new buffer
*/
@ChannelHandler.Sharable
public class Varint21LengthFieldPrepender extends MessageToByteEncoder<ByteBuf>
public class Varint21LengthFieldPrepender extends MessageToMessageEncoder<ByteBuf>
{

@Setter
private boolean compose = true;

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> list) throws Exception
{
int bodyLen = msg.readableBytes();
int headerLen = varintSize( bodyLen );
out.ensureWritable( headerLen + bodyLen );

DefinedPacket.writeVarInt( bodyLen, out );
out.writeBytes( msg );
if ( compose )
{
ByteBuf buf = ctx.alloc().directBuffer( headerLen );
DefinedPacket.writeVarInt( bodyLen, buf );
list.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, buf, msg.retain() ) );
} else
{
ByteBuf buf = ctx.alloc().directBuffer( headerLen + bodyLen );
DefinedPacket.writeVarInt( bodyLen, buf );
buf.writeBytes( msg );
list.add( buf );
}
}

static int varintSize(int paramInt)
Expand Down
28 changes: 20 additions & 8 deletions proxy/src/main/java/net/md_5/bungee/compress/PacketCompressor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,23 @@

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import java.util.zip.Deflater;
import lombok.Getter;
import lombok.Setter;
import net.md_5.bungee.jni.zlib.BungeeZlib;
import net.md_5.bungee.protocol.DefinedPacket;

public class PacketCompressor extends MessageToByteEncoder<ByteBuf>
public class PacketCompressor extends MessageToMessageEncoder<ByteBuf>
{

@Getter
private final BungeeZlib zlib = CompressFactory.zlib.newInstance();
@Setter
private int threshold = 256;
@Setter
private boolean compose = true;

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
Expand All @@ -28,18 +33,25 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
}

@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception
{
int origSize = msg.readableBytes();
if ( origSize < threshold )
{
DefinedPacket.writeVarInt( 0, out );
out.writeBytes( msg );
if ( compose )
{
// create a virtual buffer to avoid copying of data
out.add( ctx.alloc().compositeDirectBuffer( 2 ).addComponents( true, ctx.alloc().directBuffer( 1 ).writeByte( 0 ), msg.retain() ) );
} else
{
out.add( ctx.alloc().directBuffer( origSize + 1 ).writeByte( 0 ).writeBytes( msg ) );
}
} else
{
DefinedPacket.writeVarInt( origSize, out );

zlib.process( msg, out );
ByteBuf buf = ctx.alloc().directBuffer( BungeeZlib.OUTPUT_BUFFER_SIZE );
DefinedPacket.writeVarInt( origSize, buf );
zlib.process( msg, buf );
out.add( buf );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ public void handle(final EncryptionResponse encryptResponse) throws Exception
ch.addBefore( PipelineUtils.FRAME_DECODER, PipelineUtils.DECRYPT_HANDLER, new CipherDecoder( decrypt ) );
BungeeCipher encrypt = EncryptionUtil.getCipher( true, sharedKey );
ch.addBefore( PipelineUtils.FRAME_PREPENDER, PipelineUtils.ENCRYPT_HANDLER, new CipherEncoder( encrypt ) );
// disable use of composite buffers if we use natives
ch.updateComposite();

String encName = URLEncoder.encode( InitialHandler.this.getName(), "UTF-8" );

Expand Down
35 changes: 35 additions & 0 deletions proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
import io.netty.channel.ChannelHandlerContext;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import lombok.Getter;
import lombok.Setter;
import net.md_5.bungee.api.ProxyServer;
import net.md_5.bungee.compress.PacketCompressor;
import net.md_5.bungee.compress.PacketDecompressor;
import net.md_5.bungee.netty.cipher.CipherEncoder;
import net.md_5.bungee.protocol.DefinedPacket;
import net.md_5.bungee.protocol.MinecraftDecoder;
import net.md_5.bungee.protocol.MinecraftEncoder;
import net.md_5.bungee.protocol.PacketWrapper;
import net.md_5.bungee.protocol.Protocol;
import net.md_5.bungee.protocol.Varint21LengthFieldPrepender;
import net.md_5.bungee.protocol.packet.Kick;

public class ChannelWrapper
Expand Down Expand Up @@ -187,5 +191,36 @@ public void setCompressionThreshold(int compressionThreshold)
{
ch.pipeline().remove( "decompress" );
}
// disable use of composite buffers if we use natives
updateComposite();
}

/*
* Should be called on encryption add and on compressor add or remove
*/
public void updateComposite()
{
CipherEncoder cipherEncoder = ch.pipeline().get( CipherEncoder.class );
PacketCompressor packetCompressor = ch.pipeline().get( PacketCompressor.class );
Varint21LengthFieldPrepender prepender = ch.pipeline().get( Varint21LengthFieldPrepender.class );
boolean compressorCompose = cipherEncoder == null || cipherEncoder.getCipher().allowComposite();
boolean prependerCompose = compressorCompose && ( packetCompressor == null || packetCompressor.getZlib().allowComposite() );

if ( prepender != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set prepender compose to {0} for {1}", new Object[]
{
prependerCompose, ch
} );
prepender.setCompose( prependerCompose );
}
if ( packetCompressor != null )
{
ProxyServer.getInstance().getLogger().log( Level.FINE, "set packetCompressor compose to {0} for {1}", new Object[]
{
compressorCompose, ch
} );
packetCompressor.setCompose( compressorCompose );
}
}
}
3 changes: 1 addition & 2 deletions proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ protected void initChannel(Channel ch) throws Exception
public static final Base BASE = new Base( false );
public static final Base BASE_SERVERSIDE = new Base( true );
private static final KickStringWriter legacyKicker = new KickStringWriter();
private static final Varint21LengthFieldPrepender framePrepender = new Varint21LengthFieldPrepender();
private static final Varint21LengthFieldExtraBufPrepender serverFramePrepender = new Varint21LengthFieldExtraBufPrepender();
public static final String TIMEOUT_HANDLER = "timeout";
public static final String PACKET_DECODER = "packet-decoder";
Expand Down Expand Up @@ -202,7 +201,7 @@ public void initChannel(Channel ch) throws Exception
ch.pipeline().addLast( TIMEOUT_HANDLER, new ReadTimeoutHandler( BungeeCord.getInstance().config.getTimeout(), TimeUnit.MILLISECONDS ) );
// No encryption bungee -> server, therefore use extra buffer to avoid copying everything for length prepending
// Not used bungee -> client as header would need to be encrypted separately through expensive JNI call
ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : framePrepender );
ch.pipeline().addLast( FRAME_PREPENDER, ( toServer ) ? serverFramePrepender : new Varint21LengthFieldPrepender() );

ch.pipeline().addLast( BOSS_HANDLER, new HandlerBoss() );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import net.md_5.bungee.jni.cipher.BungeeCipher;

@RequiredArgsConstructor
public class CipherEncoder extends MessageToByteEncoder<ByteBuf>
{

@Getter
private final BungeeCipher cipher;

@Override
Expand Down

0 comments on commit b309e4a

Please sign in to comment.