From 765abb7be933046333cf922a8b8d8d6bc50ac2de Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 3 Mar 2021 10:23:11 +0000 Subject: [PATCH] DefaultPayload#create properly copies ByteBuf content Closes gh-970 Signed-off-by: Rossen Stoyanchev --- .../java/io/rsocket/util/DefaultPayload.java | 13 +++++++++-- .../io/rsocket/core/RSocketConnectorTest.java | 22 +++++++++++++++++-- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index d59b9fe97..08b8b2fb7 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -100,7 +100,7 @@ public static Payload create(ByteBuf data) { public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { try { - return create(data.nioBuffer(), metadata == null ? null : metadata.nioBuffer()); + return create(toBytes(data), metadata != null ? toBytes(metadata) : null); } finally { data.release(); if (metadata != null) { @@ -110,7 +110,16 @@ public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { } public static Payload create(Payload payload) { - return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); + return create( + toBytes(payload.data()), payload.hasMetadata() ? toBytes(payload.metadata()) : null); + } + + private static byte[] toBytes(ByteBuf byteBuf) { + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.markReaderIndex(); + byteBuf.readBytes(bytes); + byteBuf.resetReaderIndex(); + return bytes; } @Override diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java index 468a13505..ec4adb8ad 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java @@ -73,9 +73,14 @@ public void ensuresThatSetupPayloadCanBeRetained() { @Test public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() { Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata"); - Assertions.assertThat(setupPayload.refCnt()).isOne(); + // Keep the data and metadata around so we can try changing them independently + ByteBuf dataBuf = setupPayload.data(); + ByteBuf metadataBuf = setupPayload.metadata(); + dataBuf.retain(); + metadataBuf.retain(); + TestClientTransport testClientTransport = new TestClientTransport(); Mono connectionMono = RSocketConnector.create().setupPayload(setupPayload).connect(testClientTransport); @@ -92,6 +97,15 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions .expectComplete() .verify(Duration.ofMillis(100)); + // Changing the original data and metadata should not impact the SetupPayload + dataBuf.writerIndex(dataBuf.readerIndex()); + dataBuf.writeChar('d'); + dataBuf.release(); + + metadataBuf.writerIndex(metadataBuf.readerIndex()); + metadataBuf.writeChar('m'); + metadataBuf.release(); + Assertions.assertThat(testClientTransport.testConnection().getSent()) .hasSize(2) .allMatch( @@ -100,7 +114,11 @@ public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions return payload.getDataUtf8().equals("TestData") && payload.getMetadataUtf8().equals("TestMetadata"); }) - .allMatch(ReferenceCounted::release); + .allMatch( + byteBuf -> { + System.out.println("calling release " + byteBuf.refCnt()); + return byteBuf.release(); + }); Assertions.assertThat(setupPayload.refCnt()).isZero(); }