diff --git a/test-tools/src/main/java/org/terracotta/utilities/test/GcTracker.java b/test-tools/src/main/java/org/terracotta/utilities/test/GcTracker.java new file mode 100644 index 0000000..afb45a9 --- /dev/null +++ b/test-tools/src/main/java/org/terracotta/utilities/test/GcTracker.java @@ -0,0 +1,285 @@ +/* + * Copyright 2024 Terracotta, Inc., a Software AG company. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.terracotta.utilities.test; + +import java.lang.ref.PhantomReference; +import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.time.Duration; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static java.time.Duration.ofMillis; +import static java.time.Duration.ofMinutes; +import static java.time.Duration.ofSeconds; + +/** + * Tool supporting enforced garbage collection of {@code MappedByteBuffer} instances. + *

+ * This class is meant to aid testing involving off-heap buffer instances. Like heap memory, + * management of off-heap buffers relies on JVM garbage collection (GC). Unfortunately, GC + * cycles are initiated by heap pressure with no regard for the usage of off-heap buffers. + * {@code GcTracker} can be used to encourage collection of off-heap buffers by + * repeatedly invoking {@code System.gc} until the tracked buffer instances are collected. + *

+ * The following is a sample usage pattern: + *

{@code
+ *     GcTracker tracker = new GcTracker();
+ *     try {
+ *       ByteBuffer buffer = // allocate an off-heap buffer
+ *       tracker.add(buffer);
+ *
+ *       // Work with the off-heap buffer
+ *
+ *       // Nullify the buffer reference; without nullification, the JVM can retain the reference
+ *       buffer = null;
+ *     } finally {
+ *       // Await collection of the buffer
+ *       tracker.awaitGc();
+ *     }
+ * }
+ * When working with file-based {@code MappedByteBuffer} instances, the following pattern is effective: + *

+ *  GcTracker tracker = new GcTracker();
+ *   try {
+ *     try (FileInputStream fis = new FileInputStream(file);
+ *          FileChannel fc = fis.getChannel()) {
+ *
+ *       /*
+ *        * Create a buffer mapping the file content.  This opens an additional
+ *        * file descriptor for the file that is not closed/release until the
+ *        * buffer is garbage collected -- closure of the FileInputStream and
+ *        * FileChannel does not release this buffer and prevents deletion or
+ *        * re-use of the file on Windows.
+ *        */
+ *       int sz = (int)fc.size();
+ *       MappedByteBuffer bb = fc.map(FileChannel.MapMode.READ_ONLY, 0, sz);
+ *       tracker.add(bb);
+ *
+ *       // Work with the MappedByteBuffer
+ *
+ *       // Nullify the buffer reference
+ *       bb = null;    // Encourages GC
+ *     }
+ *   } finally {
+ *     tracker.awaitGc();
+ *   }
+ * 
+ * + * Note, in each example, the assignment of the buffer reference to {@code null}. Without nullification, + * the JVM can retain a reference to the buffer and prevent garbage collection even when the reference + * appears out of scope. + */ +public class GcTracker { + + private final Set> trackedRefs = new LinkedHashSet<>(); + private final ReferenceQueue refQueue = new ReferenceQueue<>(); + + private final long pollingMillis; + + /** + * The maximum amount of time the non-timed {@code awaitGc} methods will poll/wait + * for collection of the registered objects. + */ + private final long maxWaitTime; + + /** + * The minimum amount of time to wait for a {@code ReferenceQueue.remove(long)} operation. + * The value, {@code 15} milliseconds, is based on the "normal" Windows timer interrupt frequency. + */ + private static final long MINIMUM_REMOVE_WAIT_MILLIS = 15L; + + /** + * Creates a {@code GcTracker} instance with a 100ms polling interval and a 1-minute maximum wait time. + * Equivalent to calling {@code new GcTracker(Duration.ofMillis(100L), Duration.ofMinutes(1L))}. + */ + public GcTracker() { + this(ofMillis(100L), ofMinutes(1L)); + } + + /** + * Creates a {@code GcTracker} instance using the specified polling interval. + * The {@code pollingInterval} is converted to milliseconds for use. + * + * @param pollingInterval the duration expressing the polling interval used in the + * {@code awaitGc} polling loops; must between 100ms and + * 5s, inclusive + * @param maxWaitTime the duration expressing the maximum amount of time a + * {@code awaitGc} will pool/wait; must be between 5s and + * 5m, inclusive + */ + public GcTracker(Duration pollingInterval, Duration maxWaitTime) { + Objects.requireNonNull(pollingInterval, "pollingInterval"); + Objects.requireNonNull(maxWaitTime, "maxWaitTime"); + if (pollingInterval.compareTo(ofMillis(100L)) < 0 || 0 < pollingInterval.compareTo(ofSeconds(5L))) { + throw new IllegalArgumentException("pollingInterval must be at least 100ms and not more than 5s"); + } + if (maxWaitTime.compareTo(ofSeconds(5L)) < 0 || 0 < maxWaitTime.compareTo(ofMinutes(5L))) { + throw new IllegalArgumentException("maxWaitTime must be at least 5s and not more than 5m"); + } + this.pollingMillis = pollingInterval.toMillis(); + this.maxWaitTime = maxWaitTime.toMillis(); + } + + /** + * Adds a {@code MappedByteBuffer} to the collection of tracked objects. + * @param buffer the buffer to track; if the buffer instance is not a + * {@code MappedByteBuffer}, it is silently ignored + * @param the buffer type + */ + public void add(B buffer) { + if (buffer instanceof MappedByteBuffer) { + trackedRefs.add(new PhantomReference<>((MappedByteBuffer)buffer, refQueue)); + } + } + + /** + * Awaits garbage collection of all buffers added to this tracker. + * This method uses a polling loop that is not interruptible. + * @throws WaitTimeExhaustedException if the configured maximum wait time is exhausted + */ + public void awaitGc() { + try { + awaitGc(ofMillis(maxWaitTime)); + } catch (TimeoutException e) { + throw new WaitTimeExhaustedException("Maximum wait time of " + ofMillis(maxWaitTime) + " exhausted leaving " + + trackedRefs.size() + " buffers uncollected." + + "\n If this is not expected, explicitly set the buffer reference(s) to null before calling awaitGc();" + + "\n variable scoping may not be sufficient for allowing buffers to be reclaimed by garbage collection", e); + } + } + + /** + * Awaits garbage collection of all buffers added to this tracker. + * This method uses a polling loop that is not interruptible. + * @param maxWaitTime the maximum amount of time to wait for garbage collection; + * this value overrides the value specified with + * {@link #GcTracker(Duration,Duration) GcTracker(pollingInterval,maxWaitTime)} + * @throws TimeoutException if the wait duration/unit time is exhausted + */ + public void awaitGc(Duration maxWaitTime) throws TimeoutException { + Objects.requireNonNull(maxWaitTime, "maxWaitTime"); + if (maxWaitTime.isNegative() || maxWaitTime.isZero()) { + throw new IllegalArgumentException("maxWaitTime must be more than zero"); + } + + try { + awaitGcInternal(maxWaitTime, false); + } catch (InterruptedException e) { + throw new IllegalStateException("Unexpected InterruptionException", e); + } + } + + /** + * Awaits garbage collection of all buffers added to this tracker. + * This method uses an interruptible polling loop which may be + * resumed following an interruption by repeating the call to this method. + * @throws InterruptedException if an interrupt is raised on the current thread + * @throws WaitTimeExhaustedException if the configured maximum wait time is exhausted + */ + public void awaitGcInterruptibly() throws InterruptedException { + try { + awaitGcInterruptibly(ofMillis(maxWaitTime)); + } catch (TimeoutException e) { + throw new WaitTimeExhaustedException("Maximum wait time of " + ofMillis(maxWaitTime) + " exhausted leaving " + + trackedRefs.size() + " buffers uncollected." + + "\n If this is not expected, explicitly set the buffer reference(s) to null before calling awaitGc();" + + "\n variable scoping may not be sufficient for allowing buffers to be reclaimed by garbage collection", e); + } + } + + /** + * Awaits garbage collection of all buffers added to this tracker. + * This method uses an interruptible polling loop which may be + * resumed following an interruption by repeating the call to this method. + * @param maxWaitTime the maximum amount of time to wait for garbage collection; + * this value overrides the value specified with + * {@link #GcTracker(Duration,Duration) GcTracker(pollingInterval,maxWaitTime)} + * @throws InterruptedException if an interrupt is raised on the current thread + * @throws TimeoutException if the wait duration/unit time is exhausted + */ + public void awaitGcInterruptibly(Duration maxWaitTime) throws InterruptedException, TimeoutException { + Objects.requireNonNull(maxWaitTime, "maxWaitTime"); + if (maxWaitTime.isNegative() || maxWaitTime.isZero()) { + throw new IllegalArgumentException("maxWaitTime must be more than zero"); + } + + awaitGcInternal(maxWaitTime, true); + } + + @SuppressWarnings("removal") + private void awaitGcInternal(Duration maxWaitTime, boolean isInterruptible) + throws TimeoutException, InterruptedException { + long pollingNanos = TimeUnit.MILLISECONDS.toNanos(pollingMillis); + + boolean interrupted = (!isInterruptible && Thread.interrupted()); + long deadlineNanos = System.nanoTime() + maxWaitTime.toNanos(); + try { + while (!trackedRefs.isEmpty()) { + long nanosRemaining = deadlineNanos - System.nanoTime(); + if (nanosRemaining <= 0) { + throw new TimeoutException("Wait duration exhausted leaving " + trackedRefs.size() + " buffers uncollected"); + } + + long waitMillis = Math.max(TimeUnit.NANOSECONDS.toMillis( + Math.min(nanosRemaining, pollingNanos)), MINIMUM_REMOVE_WAIT_MILLIS); + Reference queuedRef = null; + try { + queuedRef = refQueue.remove(waitMillis); + if (queuedRef != null) { + if (!trackedRefs.remove(queuedRef)) { + throw new IllegalStateException("Unexpected reference obtained from queue"); + } + } + } catch (InterruptedException e) { + if (isInterruptible) { + InterruptedException ie = + new InterruptedException("Interrupted leaving " + trackedRefs.size() + " buffers uncollected"); + ie.initCause(e); + throw ie; + } else { + interrupted = true; + } + } + // queuedRef == null means the remove time limit expired or was interrupted; GC before trying again + if (queuedRef == null) { + System.gc(); + System.runFinalization(); // Marked for removal beyond Java 21 + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + /** + * Indicates that the maximum time permitted for {@link GcTracker#awaitGc()} is exhausted. + */ + public static final class WaitTimeExhaustedException extends RuntimeException { + private static final long serialVersionUID = -3483070765958796042L; + + public WaitTimeExhaustedException(String message, Throwable cause) { + super(message, cause); + } + } +} diff --git a/test-tools/src/test/java/org/terracotta/utilities/test/GcTrackerTest.java b/test-tools/src/test/java/org/terracotta/utilities/test/GcTrackerTest.java new file mode 100644 index 0000000..0430e66 --- /dev/null +++ b/test-tools/src/test/java/org/terracotta/utilities/test/GcTrackerTest.java @@ -0,0 +1,206 @@ +/* + * Copyright 2024 Terracotta, Inc., a Software AG company. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.terracotta.utilities.test; + +import org.junit.After; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.terracotta.utilities.test.matchers.Eventually.within; +import static org.terracotta.utilities.test.matchers.ThrowsMatcher.threw; + +/** + * Tests for {@link GcTracker}. + */ +@SuppressWarnings("UnusedAssignment") +public class GcTrackerTest { + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + @After + public void cleanup() { + scheduler.shutdownNow(); + } + + @Test + public void testCtor0() { + new GcTracker(); + } + + @Test + public void testCtor1() { + assertThat(() -> new GcTracker(null, Duration.ofMinutes(1L)), threw(instanceOf(NullPointerException.class))); + assertThat(() -> new GcTracker(Duration.ofMillis(100L), null), threw(instanceOf(NullPointerException.class))); + + assertThat(() -> new GcTracker(Duration.ofMillis(99L), Duration.ofMinutes(1L)), threw(instanceOf(IllegalArgumentException.class))); + assertThat(() -> new GcTracker(Duration.ofMillis(TimeUnit.SECONDS.toMillis(5L) + 1), Duration.ofMinutes(1L)), threw(instanceOf(IllegalArgumentException.class))); + + assertThat(() -> new GcTracker(Duration.ofMillis(100L), Duration.ofMillis(TimeUnit.SECONDS.toMillis(5L) - 1)), threw(instanceOf(IllegalArgumentException.class))); + assertThat(() -> new GcTracker(Duration.ofMillis(100L), Duration.ofMillis(TimeUnit.MINUTES.toMillis(5L) + 1)), threw(instanceOf(IllegalArgumentException.class))); + + new GcTracker(Duration.ofMillis(100L), Duration.ofSeconds(30L)); + new GcTracker(Duration.ofSeconds(5L), Duration.ofMinutes(5L)); + } + + @Test + public void testNoBuffers() { + GcTracker tracker = new GcTracker(); + ByteBuffer buffer; + try { + buffer = ByteBuffer.allocate(4096); + tracker.add(buffer); + } finally { + tracker.awaitGc(); + } + } + + @Test + public void testSingleBuffer() { + GcTracker tracker = new GcTracker(); + try { + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + + Thread currentThread = Thread.currentThread(); + scheduler.schedule(currentThread::interrupt, 500L, TimeUnit.MILLISECONDS); + assertThat(() -> tracker.awaitGc(Duration.ofMillis(1500L)), threw(instanceOf(TimeoutException.class))); + assertThat(Thread.currentThread().isInterrupted(), is(true)); + + buffer = null; + } finally { + tracker.awaitGc(); + assertThat(Thread.currentThread().isInterrupted(), is(true)); + } + } + + @Test + public void testMaxWaitDuration() { + GcTracker tracker = new GcTracker(Duration.ofMillis(100L), Duration.ofSeconds(5L)); + try { + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + } finally { + within(Duration.ofSeconds(6L)).runsCleanly( + () -> assertThat(tracker::awaitGc, threw(instanceOf(GcTracker.WaitTimeExhaustedException.class)))); + } + } + + @Test + public void testDoubleBuffer() { + GcTracker tracker = new GcTracker(); + try { + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + + buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + buffer = null; + } finally { + tracker.awaitGc(); + } + } + + @Test + public void testBadDuration() { + GcTracker tracker = new GcTracker(); + assertThat(() -> tracker.awaitGc(Duration.ofSeconds(-1L)), threw(instanceOf(IllegalArgumentException.class))); + assertThat(() -> tracker.awaitGc(Duration.ZERO), threw(instanceOf(IllegalArgumentException.class))); + } + + @Test + public void testInterruptibleSingleBuffer() throws InterruptedException { + GcTracker tracker = new GcTracker(); + try { + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + + Thread currentThread = Thread.currentThread(); + scheduler.schedule(currentThread::interrupt, 1L, TimeUnit.SECONDS); + assertThat(tracker::awaitGcInterruptibly, threw(instanceOf(InterruptedException.class))); + + buffer = null; + } finally { + tracker.awaitGcInterruptibly(); + } + } + + @Test + public void testInterruptibleMaxWaitDuration() { + GcTracker tracker = new GcTracker(Duration.ofMillis(100L), Duration.ofSeconds(5L)); + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + + Thread currentThread = Thread.currentThread(); + scheduler.schedule(currentThread::interrupt, 1L, TimeUnit.SECONDS); + assertThat(tracker::awaitGcInterruptibly, threw(instanceOf(InterruptedException.class))); + + within(Duration.ofSeconds(6L)).runsCleanly( + () -> assertThat(tracker::awaitGcInterruptibly, threw(instanceOf(GcTracker.WaitTimeExhaustedException.class)))); + } + + @Test + public void testInterruptibleTimeout() { + GcTracker tracker = new GcTracker(Duration.ofMillis(100L), Duration.ofSeconds(5L)); + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + within(Duration.ofSeconds(3L)).runsCleanly( + () -> assertThat(() -> tracker.awaitGcInterruptibly(Duration.ofSeconds(1L)), threw(instanceOf(TimeoutException.class)))); + } + + @Test + public void testInterruptibleRepeat() { + GcTracker tracker = new GcTracker(Duration.ofMillis(100L), Duration.ofSeconds(5L)); + ByteBuffer buffer = ByteBuffer.allocateDirect(4096); + tracker.add(buffer); + + Thread currentThread = Thread.currentThread(); + ScheduledFuture interruptFuture = + scheduler.scheduleAtFixedRate(currentThread::interrupt, 500L, 500L, TimeUnit.MILLISECONDS); + + int i = 0; + try { + while (true) { + try { + tracker.awaitGcInterruptibly(); + break; + } catch (InterruptedException e) { + if (++i >= 8) { + buffer = null; + } + } + } + } finally { + interruptFuture.cancel(true); + } + } + + @Test + public void testInterruptibleBadDuration() { + GcTracker tracker = new GcTracker(); + assertThat(() -> tracker.awaitGcInterruptibly(Duration.ofSeconds(-1L)), threw(instanceOf(IllegalArgumentException.class))); + assertThat(() -> tracker.awaitGcInterruptibly(Duration.ZERO), threw(instanceOf(IllegalArgumentException.class))); + } +} \ No newline at end of file