diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 16d661570b..482f20a13d 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -17,12 +17,21 @@ jobs: name: Test JDK ${{ matrix.java }}, ${{ matrix.os }} steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v2 + - name: Set up JDK - uses: actions/setup-java@v1 + uses: actions/setup-java@v2 with: + distribution: 'adopt' java-version: ${{ matrix.java }} + + - name: Disable NTP on macOS (https://github.com/actions/virtual-environments/issues/820) + run: | + sudo systemsetup -setusingnetworktime off + sudo rm -rf /etc/ntp.conf + if: runner.os == 'macOS' + - name: Test with Maven env: MAVEN_OPTS: "-Xms3g -Xmx3g" - run: mvn test -B -V -D"java.util.logging.config.file"="logging.properties" -D"http.keepAlive"="false" -D"maven.wagon.http.pool"="false" -D"maven.wagon.httpconnectionManager.ttlSeconds"="120" + run: mvn test -B -V -D"http.keepAlive"="false" -D"maven.wagon.http.pool"="false" -D"maven.wagon.httpconnectionManager.ttlSeconds"="120" diff --git a/README.md b/README.md index 95c3b7bb42..b03533034c 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ QuickFIX/J [![Total alerts](https://img.shields.io/lgtm/alerts/g/quickfix-j/quickfixj.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/quickfix-j/quickfixj/alerts/) [![Language grade: Java](https://img.shields.io/lgtm/grade/java/g/quickfix-j/quickfixj.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/quickfix-j/quickfixj/context:java) + This is the official QuickFIX/J project repository. ## intro diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 3369a989aa..b8bdc6f6db 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -760,7 +760,7 @@ private void setEnabled(boolean enabled) { } private void initializeHeader(Message.Header header) { - state.setLastSentTime(SystemTime.currentTimeMillis()); +// state.setLastSentTime(SystemTime.currentTimeMillis()); move to sendRaw() header.setString(BeginString.FIELD, sessionID.getBeginString()); header.setString(SenderCompID.FIELD, sessionID.getSenderCompID()); optionallySetID(header, SenderSubID.FIELD, sessionID.getSenderSubID()); @@ -1753,6 +1753,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) UnsupportedMessageType, IOException { state.setLastReceivedTime(SystemTime.currentTimeMillis()); + state.setLastReceivedTimeNanos(SystemTime.currentTimeMillisFromNanos()); state.clearTestRequestCounter(); String msgType; @@ -1975,11 +1976,11 @@ public void next() throws IOException { LOG.warn("Heartbeat failure detected but deactivated"); } } else { - if (state.isTestRequestNeeded()) { + if (state.isTestRequestNeeded(sessionID)) { generateTestRequest("TEST"); getLog().onEvent("Sent test request TEST"); stateListener.onMissedHeartBeat(); - } else if (state.isHeartBeatNeeded()) { + } else if (state.isHeartBeatNeeded(sessionID)) { generateHeartbeat(); } } @@ -2036,6 +2037,7 @@ private boolean generateLogon() throws IOException { logon.setBoolean(ResetSeqNumFlag.FIELD, true); } state.setLastReceivedTime(SystemTime.currentTimeMillis()); + state.setLastReceivedTimeNanos(SystemTime.currentTimeMillisFromNanos()); state.clearTestRequestCounter(); state.setLogonSent(true); logonAttempts++; @@ -2336,6 +2338,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN generateSequenceReset(receivedMessage, begin, msgSeqNum); } getLog().onEvent("Resending message: " + msgSeqNum); + state.setLastSentTime(SystemTime.currentTimeMillis()); send(msg.toString()); begin = 0; appMessageJustSent = true; @@ -2591,8 +2594,8 @@ private boolean sendRaw(Message message, int num) { final Message.Header header = message.getHeader(); final String msgType = header.getString(MsgType.FIELD); - initializeHeader(header); - + initializeHeader(header); // TODO still duplicate to generateXXX methods + state.setLastSentTime(SystemTime.currentTimeMillis()); if (num > 0) { header.setInt(MsgSeqNum.FIELD, num); } @@ -2647,7 +2650,6 @@ private boolean sendRaw(Message message, int num) { result = send(messageString); } } - return result; } catch (final IOException e) { logThrowable(getLog(), "Error reading/writing in MessageStore", e); diff --git a/quickfixj-core/src/main/java/quickfix/SessionState.java b/quickfixj-core/src/main/java/quickfix/SessionState.java index 39f71ee1e6..3965209da1 100644 --- a/quickfixj-core/src/main/java/quickfix/SessionState.java +++ b/quickfixj-core/src/main/java/quickfix/SessionState.java @@ -54,7 +54,9 @@ public final class SessionState { private boolean logoutReceived = false; private int testRequestCounter; private long lastSentTime; + private long lastSentTimeNanos; private long lastReceivedTime; + private long lastReceivedTimeNanos; private final double testRequestDelayMultiplier; private final double heartBeatTimeoutMultiplier; private long heartBeatMillis = Long.MAX_VALUE; @@ -112,10 +114,23 @@ long getHeartBeatMillis() { } } - public boolean isHeartBeatNeeded() { - long millisSinceLastSentTime = SystemTime.currentTimeMillis() - getLastSentTime(); + SessionID sessionID1 = new SessionID(FixVersions.BEGINSTRING_FIX44, "ISLD", "TW"); +// SessionID sessionID2 = new SessionID(FixVersions.BEGINSTRING_FIX44, "TW", "ISLD"); + public boolean isHeartBeatNeeded(SessionID sessionID) { + long currentTimeMillisFromNanos = SystemTime.currentTimeMillisFromNanos(); + long lastSentTimeNanos = getLastSentTimeNanos(); + long millisSinceLastSentTime = TimeUnit.NANOSECONDS.toMillis(currentTimeMillisFromNanos - lastSentTimeNanos); +// long millisSinceLastSentTime = SystemTime.currentTimeMillis() - getLastSentTime(); + boolean name = millisSinceLastSentTime + 10 > getHeartBeatMillis() && getTestRequestCounter() == 0; // QFJ-448: allow 10 ms leeway since exact comparison causes skipped heartbeats occasionally - return millisSinceLastSentTime + 10 > getHeartBeatMillis() && getTestRequestCounter() == 0; + if (sessionID1.getSenderCompID().equals(sessionID.getSenderCompID()) + || sessionID1.getSenderCompID().equals(sessionID.getTargetCompID()) + || "ACCEPTOR-1".equals(sessionID.getTargetCompID())) { + getLog().onEvent("isHeartBeatNeeded() = " + name + " current=" + currentTimeMillisFromNanos + + " - lastSentTime=" + lastSentTimeNanos + + "= millisSinceLastSent=" + millisSinceLastSentTime); + } + return name; } public boolean isInitiator() { @@ -128,21 +143,40 @@ public long getLastReceivedTime() { } } + public long getLastReceivedTimeNanos() { + synchronized (lock) { + return lastReceivedTimeNanos; + } + } + public void setLastReceivedTime(long lastReceivedTime) { synchronized (lock) { this.lastReceivedTime = lastReceivedTime; } } + public void setLastReceivedTimeNanos(long lastReceivedTimeNanos) { + synchronized (lock) { + this.lastReceivedTimeNanos = lastReceivedTimeNanos; + } + } + public long getLastSentTime() { synchronized (lock) { return lastSentTime; } } + public long getLastSentTimeNanos() { + synchronized (lock) { + return lastSentTimeNanos; + } + } + public void setLastSentTime(long lastSentTime) { synchronized (lock) { this.lastSentTime = lastSentTime; + this.lastSentTimeNanos = SystemTime.currentTimeMillisFromNanos(); } } @@ -184,7 +218,8 @@ public void setLogonSent(boolean logonSent) { public boolean isLogonTimedOut() { synchronized (lock) { - return isLogonSent() && SystemTime.currentTimeMillis() - getLastReceivedTime() >= getLogonTimeoutMs(); +// return isLogonSent() && SystemTime.currentTimeMillis() - getLastReceivedTime() >= getLogonTimeoutMs(); + return isLogonSent() && TimeUnit.NANOSECONDS.toMillis(SystemTime.currentTimeMillisFromNanos() - getLastReceivedTimeNanos()) >= getLogonTimeoutMs(); } } @@ -282,14 +317,22 @@ public void incrementTestRequestCounter() { } } - public boolean isTestRequestNeeded() { + public boolean isTestRequestNeeded(SessionID sessionID) { long millisSinceLastReceivedTime = timeSinceLastReceivedMessage(); + if (sessionID.getSenderCompID().equals(sessionID1.getSenderCompID()) + || sessionID.getTargetCompID().equals(sessionID1.getSenderCompID()) + || "ACCEPTOR-1".equals(sessionID.getTargetCompID())) { + getLog().onEvent("isTestRequestNeeded() - millisSinceLastReceived = " + millisSinceLastReceivedTime); + } return millisSinceLastReceivedTime >= ((1 + testRequestDelayMultiplier) * (getTestRequestCounter() + 1)) * getHeartBeatMillis(); } private long timeSinceLastReceivedMessage() { - return SystemTime.currentTimeMillis() - getLastReceivedTime(); + long lastReceivedTimeNanos1 = getLastReceivedTimeNanos(); + long currentTimeMillisFromNanos = SystemTime.currentTimeMillisFromNanos(); + long timeSinceLastReceivedMessage = TimeUnit.NANOSECONDS.toMillis(currentTimeMillisFromNanos - lastReceivedTimeNanos1); + return timeSinceLastReceivedMessage; } public boolean isTimedOut() { diff --git a/quickfixj-core/src/main/java/quickfix/SystemTime.java b/quickfixj-core/src/main/java/quickfix/SystemTime.java index 85d73c47bc..fb577f8332 100644 --- a/quickfixj-core/src/main/java/quickfix/SystemTime.java +++ b/quickfixj-core/src/main/java/quickfix/SystemTime.java @@ -37,6 +37,11 @@ public long getTime() { return System.currentTimeMillis(); } + @Override + public long getTimeFromNanos() { + return System.nanoTime(); + } + @Override public LocalDateTime getNow() { return LocalDateTime.now(ZoneOffset.UTC); @@ -48,6 +53,10 @@ public LocalDateTime getNow() { public static long currentTimeMillis() { return systemTimeSource.getTime(); } + + public static long currentTimeMillisFromNanos() { + return systemTimeSource.getTimeFromNanos(); + } public static LocalDateTime now() { return systemTimeSource.getNow(); diff --git a/quickfixj-core/src/main/java/quickfix/SystemTimeSource.java b/quickfixj-core/src/main/java/quickfix/SystemTimeSource.java index c14af90f8f..179a6b2ccb 100644 --- a/quickfixj-core/src/main/java/quickfix/SystemTimeSource.java +++ b/quickfixj-core/src/main/java/quickfix/SystemTimeSource.java @@ -33,6 +33,8 @@ public interface SystemTimeSource { * @return current (possible simulated) time */ long getTime(); + + long getTimeFromNanos(); /** * Obtain current LocalDateTime. diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index 29c09d7cba..49fc8225e1 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -53,6 +53,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.mina.core.future.CloseFuture; import org.apache.mina.core.service.IoService; +import quickfix.SystemTime; /** * An abstract base class for acceptors and initiators. Provides support for common functionality and also serves as an @@ -265,7 +266,7 @@ protected void logoutAllSessions(boolean forceDisconnect) { } protected void waitForLogout() { - long start = System.currentTimeMillis(); + long start = SystemTime.currentTimeMillisFromNanos(); Set loggedOnSessions; while (!(loggedOnSessions = getLoggedOnSessions()).isEmpty()) { try { @@ -273,7 +274,7 @@ protected void waitForLogout() { } catch (InterruptedException e) { log.error(e.getMessage(), e); } - final long elapsed = System.currentTimeMillis() - start; + final long elapsed = TimeUnit.NANOSECONDS.toMillis(SystemTime.currentTimeMillisFromNanos() - start); Iterator sessionItr = loggedOnSessions.iterator(); while (sessionItr.hasNext()) { Session session = sessionItr.next(); diff --git a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java index 8d2e88d3ea..e5174de215 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java @@ -76,7 +76,7 @@ public void onMessage(Session quickfixSession, Message message) { queueTracker.put(new SessionMessageEvent(quickfixSession, message)); } catch (InterruptedException e) { isStopped = true; - throw new RuntimeException(e); + Thread.currentThread().interrupt(); } } @@ -194,7 +194,9 @@ public void stopHandlingMessages(boolean join) { stopHandlingMessages(); if (join) { try { - messageProcessingThread.join(); + if (messageProcessingThread != null) { + messageProcessingThread.join(); + } } catch (InterruptedException e) { sessionConnector.log.error("{} interrupted.", MESSAGE_PROCESSOR_THREAD_NAME); } diff --git a/quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java b/quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java index 3c9dea46c4..768bb6e3ea 100644 --- a/quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java +++ b/quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java @@ -23,9 +23,12 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.Calendar; +import java.util.concurrent.TimeUnit; public class MockSystemTimeSource implements SystemTimeSource { - private long[] systemTimes = { System.currentTimeMillis() }; + + private long[] systemTimes = {System.currentTimeMillis()}; + private long[] systemTimesNanos = {System.nanoTime()}; private int offset; public MockSystemTimeSource() { @@ -34,6 +37,7 @@ public MockSystemTimeSource() { public MockSystemTimeSource(long time) { setSystemTimes(time); + setSystemTimesNanos(TimeUnit.MILLISECONDS.toNanos(time)); } public void setSystemTimes(long[] times) { @@ -41,7 +45,11 @@ public void setSystemTimes(long[] times) { } void setSystemTimes(long time) { - systemTimes = new long[] { time }; + systemTimes = new long[]{time}; + } + + void setSystemTimesNanos(long time) { + systemTimesNanos = new long[]{time}; } public void setTime(Calendar c) { @@ -56,10 +64,21 @@ public long getTime() { return systemTimes[offset]; } + @Override + public long getTimeFromNanos() { + if (systemTimesNanos.length - offset > 1) { + offset++; + } + return systemTimesNanos[offset]; + } + public void increment(long delta) { if (systemTimes.length - offset == 1) { systemTimes[offset] += delta; } + if (systemTimesNanos.length - offset == 1) { + systemTimesNanos[offset] += TimeUnit.MILLISECONDS.toNanos(delta); + } } @Override diff --git a/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java b/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java index 815d5676d9..d097f9269c 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionDisconnectConcurrentlyTest.java @@ -19,7 +19,6 @@ package quickfix; -import org.junit.After; import org.junit.Assert; import org.junit.Test; import quickfix.field.BeginString; @@ -31,6 +30,7 @@ import quickfix.field.SendingTime; import quickfix.field.TargetCompID; import quickfix.field.TestReqID; +import quickfix.fix42.Heartbeat; import quickfix.fix42.TestRequest; import quickfix.mina.ProtocolFactory; @@ -50,20 +50,32 @@ import static org.junit.Assert.fail; public class SessionDisconnectConcurrentlyTest { - private TestAcceptorApplication testAcceptorApplication; - @After - public void tearDown() throws Exception { - if (testAcceptorApplication != null) { - testAcceptorApplication.tearDown(); + + @Test + public void main() { + for (int i = 0; i < 100_000_000; i++) { + timeInvocation(); } } + private static void timeInvocation() { + final long start = System.nanoTime(); + final long end = System.nanoTime(); + checkResult(end - start); + } + + private static void checkResult(final long l) { + if (l < 0) { + System.out.println("should not get here " + l); // removing reference to l parameter here "fixes" the bug + fail(); + } + } // QFJ-738 @Test(timeout = 15000) public void testConcurrentDisconnection() throws Exception { - testAcceptorApplication = new TestAcceptorApplication(1); - final Acceptor acceptor = createAcceptor(); + TestAcceptorApplication testAcceptorApplication = new TestAcceptorApplication(1); + final Acceptor acceptor = createAcceptor(testAcceptorApplication); final Initiator initiator = createInitiator(); try { acceptor.start(); @@ -71,7 +83,7 @@ public void testConcurrentDisconnection() throws Exception { testAcceptorApplication.waitForLogon(); - doSessionDispatchingTest(1); + doSessionDispatchingTest(1, testAcceptorApplication); } finally { MyThread thread = new MyThread(); thread.setDaemon(true); @@ -84,13 +96,11 @@ public void testConcurrentDisconnection() throws Exception { } } - private void doSessionDispatchingTest(int i) throws SessionNotFound, InterruptedException, + private void doSessionDispatchingTest(int i, TestAcceptorApplication testAcceptorApplication) throws SessionNotFound, InterruptedException, FieldNotFound { TestRequest message = new TestRequest(); message.set(new TestReqID("TEST" + i)); SessionID sessionID = getSessionIDForClient(i); - - testAcceptorApplication.setMessageLatch(new CountDownLatch(1)); Session.sendToTarget(message, sessionID); testAcceptorApplication.waitForMessages(); @@ -104,10 +114,11 @@ private SessionID getSessionIDForClient(int i) { private static class TestAcceptorApplication extends ApplicationAdapter { private final HashMap sessionMessages = new HashMap<>(); private final CountDownLatch logonLatch; - private CountDownLatch messageLatch; + private final CountDownLatch messageLatch; public TestAcceptorApplication(int countDown) { logonLatch = new CountDownLatch(countDown); + messageLatch = new CountDownLatch(1); } public void onLogon(SessionID sessionId) { @@ -117,8 +128,8 @@ public void onLogon(SessionID sessionId) { public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, RejectLogon { - sessionMessages.put(sessionId, message); - if (messageLatch != null) { + if (message instanceof Heartbeat) { + sessionMessages.put(sessionId, message); messageLatch.countDown(); } } @@ -127,6 +138,7 @@ public void assertTestRequestOnSession(String text, SessionID sessionID) throws FieldNotFound { Message testRequest = sessionMessages.get(sessionID); assertNotNull("no message", testRequest); + System.out.println("XXXXXX " + testRequest); assertEquals("wrong message", text, testRequest.getString(TestReqID.FIELD)); } @@ -138,11 +150,7 @@ public void waitForLogon() { } } - public synchronized void setMessageLatch(CountDownLatch messageLatch) { - this.messageLatch = messageLatch; - } - - public synchronized void waitForMessages() { + public void waitForMessages() { try { if (!messageLatch.await(10, TimeUnit.SECONDS)) { fail("Timed out waiting for message"); @@ -151,10 +159,6 @@ public synchronized void waitForMessages() { fail(e.getMessage()); } } - - public void tearDown() { - sessionMessages.clear(); - } } private Initiator createInitiator() throws ConfigError { @@ -187,7 +191,7 @@ private void configureInitiatorForSession(SessionSettings settings, int i, int p settings.setString(sessionID, "SocketConnectPort", Integer.toString(port)); } - private Acceptor createAcceptor() throws ConfigError { + private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication) throws ConfigError { SessionSettings settings = new SessionSettings(); HashMap defaults = new HashMap<>(); defaults.put("ConnectionType", "acceptor"); diff --git a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java index a19e48f949..52406d191f 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionStateTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionStateTest.java @@ -19,6 +19,7 @@ package quickfix; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -46,6 +47,7 @@ public void testTimeoutDefaultsAreNonzero() { SessionState state = new SessionState(new Object(), null, 0, false, null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); state.setLastReceivedTime(900); + state.setLastReceivedTimeNanos(TimeUnit.MILLISECONDS.toNanos(900)); assertFalse("logon timeout not init'ed", state.isLogonTimedOut()); state.setLogoutSent(true); @@ -55,23 +57,25 @@ public void testTimeoutDefaultsAreNonzero() { @Test public void testTestRequestTiming() { + SessionID sessionID1 = new SessionID(FixVersions.BEGINSTRING_FIX44, "ISLD", "TW"); SessionState state = new SessionState(new Object(), null, 0, false, null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); - state.setLastReceivedTime(950); + state.setLastReceivedTimeNanos(TimeUnit.MILLISECONDS.toNanos(950)); state.setHeartBeatInterval(50); - assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded()); + assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded(sessionID1)); for (int i = 0; i < 5; i++) { state.incrementTestRequestCounter(); } - assertFalse("testRequest should be needed", state.isTestRequestNeeded()); + assertFalse("testRequest should be needed", state.isTestRequestNeeded(sessionID1)); // set the heartbeat interval to something small and we shouldn't need it again state.setHeartBeatInterval(3); - assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded()); + assertFalse("testRequest shouldn't be needed yet", state.isTestRequestNeeded(sessionID1)); } @Test public void testHeartbeatTiming() { + SessionID sessionID1 = new SessionID(FixVersions.BEGINSTRING_FIX44, "ISLD", "TW"); // we set a HB interval of 2 seconds = 2000ms SessionState state = new SessionState(new Object(), null, 2 /* HB interval */, false, null, Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); @@ -79,12 +83,12 @@ public void testHeartbeatTiming() { long now = System.currentTimeMillis(); timeSource.setSystemTimes(now); state.setLastSentTime(now); - assertFalse("heartbeat shouldn't be needed yet", state.isHeartBeatNeeded()); + assertFalse("heartbeat shouldn't be needed yet", state.isHeartBeatNeeded(sessionID1)); timeSource.increment(1000); - assertFalse("heartbeat shouldn't be needed yet", state.isHeartBeatNeeded()); + assertFalse("heartbeat shouldn't be needed yet", state.isHeartBeatNeeded(sessionID1)); timeSource.increment(1000); // current time is now 2000ms further since the start, i.e. the HB interval has elapsed - assertTrue("heartbeat should be needed", state.isHeartBeatNeeded()); + assertTrue("heartbeat should be needed", state.isHeartBeatNeeded(sessionID1)); } @Test @@ -93,15 +97,15 @@ public void testSessionTimeout() { Session.DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, Session.DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER); // session should timeout after 2.4 * 30 = 72 seconds - state.setLastReceivedTime(950_000); + state.setLastReceivedTimeNanos(TimeUnit.MILLISECONDS.toNanos(950_000)); - timeSource.setSystemTimes(1_000_000L); + timeSource.setSystemTimesNanos(TimeUnit.MILLISECONDS.toNanos(1_000_000L)); assertFalse("session is still valid", state.isTimedOut()); - timeSource.setSystemTimes(1_021_999L); + timeSource.setSystemTimesNanos(TimeUnit.MILLISECONDS.toNanos(1_021_999L)); assertFalse("session is still valid", state.isTimedOut()); - timeSource.setSystemTimes(1_022_000L); + timeSource.setSystemTimesNanos(TimeUnit.MILLISECONDS.toNanos(1_022_000L)); assertTrue("session timed out", state.isTimedOut()); } } diff --git a/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java b/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java index 0637d3c774..6d4a1eadd1 100644 --- a/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java +++ b/quickfixj-core/src/test/java/quickfix/SocketAcceptorTest.java @@ -40,6 +40,12 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TestRule; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -47,6 +53,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + /** * QFJ-643: Unable to restart a stopped acceptor (SocketAcceptor) * @@ -64,6 +71,20 @@ public class SocketAcceptorTest { private final SessionID initiatorSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "INITIATOR", "ACCEPTOR"); + + @Rule + public TestRule watcher = new TestWatcher() { + @Override + protected void starting(Description description) { + System.out.println("Starting test: " + description.getMethodName()); + } + }; + + @Before + public void check() { + checkThreads(ManagementFactory.getThreadMXBean(), 0); + } + @After public void cleanup() { try { @@ -386,7 +407,7 @@ private Acceptor createAcceptor(TestAcceptorApplication testAcceptorApplication) SessionSettings settings = createAcceptorSettings(); MessageStoreFactory factory = new MemoryStoreFactory(); - quickfix.LogFactory logFactory = new SLF4JLogFactory(new SessionSettings()); + quickfix.LogFactory logFactory = new ScreenLogFactory(true, true, true, true); return new SocketAcceptor(testAcceptorApplication, factory, settings, logFactory, new DefaultMessageFactory()); } @@ -397,7 +418,7 @@ private Acceptor createAcceptorThreaded(TestAcceptorApplication testAcceptorAppl SessionSettings settings = createAcceptorSettings(); MessageStoreFactory factory = new MemoryStoreFactory(); - quickfix.LogFactory logFactory = new SLF4JLogFactory(new SessionSettings()); + quickfix.LogFactory logFactory = new ScreenLogFactory(true, true, true, true); return new ThreadedSocketAcceptor(testAcceptorApplication, factory, settings, logFactory, new DefaultMessageFactory()); } diff --git a/quickfixj-core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java b/quickfixj-core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java index f2012aea84..f54f70e3d4 100644 --- a/quickfixj-core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java +++ b/quickfixj-core/src/test/java/quickfix/mina/ssl/SecureSocketTest.java @@ -19,9 +19,9 @@ package quickfix.mina.ssl; -import junit.framework.TestCase; import org.apache.mina.core.filterchain.IoFilterAdapter; import org.apache.mina.core.session.IoSession; +import org.apache.mina.util.AvailablePortFinder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import quickfix.ApplicationAdapter; @@ -40,26 +40,35 @@ import quickfix.test.util.ExpectedTestFailure; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -public class SecureSocketTest extends TestCase { +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; + +public class SecureSocketTest { private final Logger log = LoggerFactory.getLogger(getClass()); private final int transportProtocol = ProtocolFactory.SOCKET; - protected void setUp() throws Exception { + @Before + public void setUp() throws Exception { SystemTime.setTimeSource(null); } + @Test public void testLogonWithBadCertificate() throws Exception { - ServerThread serverThread = new ServerThread("nonexistent", "pwd"); + int freePort = AvailablePortFinder.getNextAvailable(); + ServerThread serverThread = new ServerThread("nonexistent", "pwd", freePort); try { serverThread.setDaemon(true); serverThread.start(); serverThread.waitForInitialization(); SessionID clientSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TW", "ISLD"); - SessionSettings settings = getClientSessionSettings(clientSessionID); + SessionSettings settings = getClientSessionSettings(clientSessionID, freePort); ClientApplication clientApplication = new ClientApplication(); ThreadedSocketInitiator initiator = new ThreadedSocketInitiator(clientApplication, new MemoryStoreFactory(), settings, new DefaultMessageFactory()); @@ -85,10 +94,12 @@ public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable } } + @Test public void testLogonWithDefaultCertificate() throws Exception { doLogonTest(null, null); } + @Test public void testLogonWithCustomCertificate() throws Exception { doLogonTest("test.keystore", "quickfixjtestpw"); } @@ -103,9 +114,11 @@ public void testLogonWithCustomCertificate() throws Exception { * so that it's not cached by another test so that there are no false failures. * The test-client.keystore key store is just a copy of test.keystore under a different name. */ + @Test public void testLogonWithBadCertificateOnInitiatorSide() throws Exception { + int freePort = AvailablePortFinder.getNextAvailable(); SessionID clientSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TW", "ISLD"); - SessionSettings settings = getClientSessionSettings(clientSessionID); + SessionSettings settings = getClientSessionSettings(clientSessionID, freePort); // reset client side to invalid certs settings.setString(SSLSupport.SETTING_KEY_STORE_NAME, "test-client.keystore"); settings.setString(SSLSupport.SETTING_KEY_STORE_PWD, "wrong-pwd"); @@ -122,14 +135,15 @@ protected void execute() throws Throwable { } private void doLogonTest(String keyStoreName, String keyStorePassword) throws InterruptedException, ConfigError { - ServerThread serverThread = new ServerThread(keyStoreName, keyStorePassword); + int freePort = AvailablePortFinder.getNextAvailable(); + ServerThread serverThread = new ServerThread(keyStoreName, keyStorePassword, freePort); try { serverThread.setDaemon(true); serverThread.start(); serverThread.waitForInitialization(); SessionID clientSessionID = new SessionID(FixVersions.BEGINSTRING_FIX42, "TW", "ISLD"); - SessionSettings settings = getClientSessionSettings(clientSessionID); + SessionSettings settings = getClientSessionSettings(clientSessionID, freePort); ClientApplication clientApplication = new ClientApplication(); ThreadedSocketInitiator initiator = new ThreadedSocketInitiator(clientApplication, new MemoryStoreFactory(), settings, new DefaultMessageFactory()); @@ -149,14 +163,14 @@ private void doLogonTest(String keyStoreName, String keyStorePassword) throws In } } - private SessionSettings getClientSessionSettings(SessionID clientSessionID) { + private SessionSettings getClientSessionSettings(SessionID clientSessionID, int freePort) { SessionSettings settings = new SessionSettings(); - HashMap defaults = new HashMap<>(); + Map defaults = new HashMap<>(); defaults.put("ConnectionType", "initiator"); defaults.put("SocketConnectProtocol", ProtocolFactory.getTypeString(transportProtocol)); defaults.put("SocketUseSSL", "Y"); defaults.put("SocketConnectHost", "localhost"); - defaults.put("SocketConnectPort", "9877"); + defaults.put("SocketConnectPort", String.valueOf(freePort)); defaults.put("StartTime", "00:00:00"); defaults.put("EndTime", "00:00:00"); defaults.put("HeartBtInt", "30"); @@ -205,9 +219,9 @@ public void onLogon(SessionID sessionId) { private class ServerThread extends Thread { private final ATServer server; - public ServerThread(String keyStoreName, String keyStorePassword) { + public ServerThread(String keyStoreName, String keyStorePassword, int freePort) { super("test server"); - server = new ATServer(); + server = new ATServer(freePort, transportProtocol); server.setUseSSL(true); server.setKeyStoreName(keyStoreName); server.setKeyStorePassword(keyStorePassword); diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/ATApplication.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/ATApplication.java index 0d994ed124..f51a94919f 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/ATApplication.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/ATApplication.java @@ -19,6 +19,7 @@ package quickfix.test.acceptance; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; import quickfix.Application; @@ -36,17 +37,17 @@ public class ATApplication implements Application { private final ATMessageCracker inboundCracker = new ATMessageCracker(); private final MessageCracker outboundCracker = new MessageCracker(new Object()); - private boolean isLoggedOn; + private AtomicBoolean isLoggedOn = new AtomicBoolean(false); public void onCreate(SessionID sessionID) { assertNoSessionLock(sessionID); Session.lookupSession(sessionID).reset(); } - public synchronized void onLogon(SessionID sessionID) { + public void onLogon(SessionID sessionID) { assertNoSessionLock(sessionID); - Assert.assertFalse("Already logged on", isLoggedOn); - isLoggedOn = true; + Assert.assertFalse("Already logged on", isLoggedOn.get()); + isLoggedOn.set(true); } private void assertNoSessionLock(SessionID sessionID) { @@ -56,11 +57,11 @@ private void assertNoSessionLock(SessionID sessionID) { Thread.holdsLock(session)); } - public synchronized void onLogout(SessionID sessionID) { + public void onLogout(SessionID sessionID) { assertNoSessionLock(sessionID); inboundCracker.reset(); - Assert.assertTrue("No logged on when logout is received", isLoggedOn); - isLoggedOn = false; + Assert.assertTrue("Not logged on when logout is received", isLoggedOn.get()); + isLoggedOn.set(false); } public void toAdmin(Message message, SessionID sessionID) { diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/ATServer.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/ATServer.java index cbed9a2bac..073fee1226 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/ATServer.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/ATServer.java @@ -30,7 +30,6 @@ import quickfix.MemoryStoreFactory; import quickfix.MessageStoreFactory; import quickfix.RuntimeError; -import quickfix.SLF4JLogFactory; import quickfix.SessionID; import quickfix.SessionSettings; import quickfix.SocketAcceptor; @@ -53,6 +52,8 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import quickfix.ScreenLogFactory; +import quickfix.Session; public class ATServer implements Runnable { private final Logger log = LoggerFactory.getLogger(ATServer.class); @@ -107,6 +108,7 @@ public void run() { defaults.put("SocketTcpNoDelay", "Y"); defaults.put("StartTime", "00:00:00"); defaults.put("EndTime", "00:00:00"); + defaults.put(Session.SETTING_NON_STOP_SESSION, "Y"); defaults.put("SenderCompID", "ISLD"); defaults.put("TargetCompID", "TW"); defaults.put("JdbcDriver", "com.mysql.jdbc.Driver"); @@ -162,10 +164,7 @@ public void run() { MessageStoreFactory factory = usingMemoryStore ? new MemoryStoreFactory() : new FileStoreFactory(settings); - //MessageStoreFactory factory = new JdbcStoreFactory(settings); - //LogFactory logFactory = new CommonsLogFactory(settings); - quickfix.LogFactory logFactory = new SLF4JLogFactory(new SessionSettings()); - //quickfix.LogFactory logFactory = new JdbcLogFactory(settings); + quickfix.LogFactory logFactory = new ScreenLogFactory(true, true, true, true); if (threaded) { acceptor = new ThreadedSocketAcceptor(application, factory, settings, logFactory, new DefaultMessageFactory()); @@ -229,7 +228,7 @@ public void run() { acceptor.stop(true); } } catch (RuntimeException e) { - log.warn("Encountered Exception on stop", e); + // ignore on stop } finally { tearDownLatch.countDown(); } diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java index 72317fb7bd..32b0143454 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/AcceptanceTestSuite.java @@ -38,7 +38,7 @@ public class AcceptanceTestSuite extends TestSuite { private static final String acceptanceTestBaseDir = AcceptanceTestSuite.class.getClassLoader().getResource(acceptanceTestResourcePath).getPath(); private static int transportType = ProtocolFactory.SOCKET; - private static int port = 9887; + private static int port = AvailablePortFinder.getNextAvailable(); private final boolean skipSlowTests; private final boolean multithreaded; @@ -247,7 +247,6 @@ protected void tearDown() throws Exception { public static Test suite() { transportType = ProtocolFactory.getTransportType(System.getProperty(ATEST_TRANSPORT_KEY, ProtocolFactory.getTypeString(ProtocolFactory.SOCKET))); - port = AvailablePortFinder.getNextAvailable(port); TestSuite acceptanceTests = new TestSuite(AcceptanceTestSuite.class.getSimpleName()); // default server acceptanceTests.addTest(new AcceptanceTestServerSetUp(new AcceptanceTestSuite("server", false))); diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java index f64e7a084d..dc40f3c002 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/ConnectToServerStep.java @@ -55,7 +55,7 @@ public void run(TestResult result, TestConnection connection) { } else { Assert.fail("incorrect connect command: " + command); } - log.debug("connecting to client {}", clientId); + log.info("connecting to client {}", clientId); long reconnectDelay = Long.getLong("atest.reconnectDelay", 50L); if (reconnectDelay > 0) { try { diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/ExpectMessageStep.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/ExpectMessageStep.java index ba4e3ac33e..9a821e40b3 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/ExpectMessageStep.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/ExpectMessageStep.java @@ -73,7 +73,7 @@ private Map simpleParse(String data) { } public void run(TestResult result, final TestConnection connection) throws InterruptedException { - log.debug("expecting from client " + clientId + ": " + data + " " + expectedFields); + log.info("expecting from client " + clientId + ": " + data + " " + expectedFields); CharSequence message = connection.readMessage(clientId, TIMEOUT_IN_MS); if (message == null) { log.info("Dumping threads due to timeout when expecting a message..."); diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/InitiateMessageStep.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/InitiateMessageStep.java index 030b3e5e74..3d4b0f25dd 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/InitiateMessageStep.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/InitiateMessageStep.java @@ -96,7 +96,7 @@ public void run(TestResult result, TestConnection connection) { if (!message.contains("\00110=")) { message += "10=" + CHECKSUM_FORMAT.format(MessageUtils.checksum(message)) + '\001'; } - log.debug("sending to client " + clientId + ": " + message); + log.info("sending to client " + clientId + ": " + message); try { connection.sendMessage(clientId, message); } catch (IOException e) { diff --git a/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java b/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java index 36fc4f0403..e31fb55148 100644 --- a/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java +++ b/quickfixj-core/src/test/java/quickfix/test/acceptance/TestConnection.java @@ -44,16 +44,19 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import quickfix.mina.SessionConnector; public class TestConnection { - private static final HashMap connectors = new HashMap<>(); + private static final Map connectors = new HashMap<>(); private final Logger log = LoggerFactory.getLogger(getClass()); - private final HashMap ioHandlers = new HashMap<>(); + private final ConcurrentMap ioHandlers = new ConcurrentHashMap<>(); public void sendMessage(int clientId, String message) throws IOException { TestIoHandler handler = getIoHandler(clientId); @@ -61,15 +64,16 @@ public void sendMessage(int clientId, String message) throws IOException { } private TestIoHandler getIoHandler(int clientId) { - synchronized (ioHandlers) { - return ioHandlers.get(clientId); - } + return ioHandlers.get(clientId); } public void tearDown() { for (TestIoHandler testIoHandler : ioHandlers.values()) { - CloseFuture closeFuture = testIoHandler.getSession().closeNow(); - closeFuture.awaitUninterruptibly(); + IoSession session = testIoHandler.getSession(); + if (session != null) { + CloseFuture closeFuture = session.closeNow(); + closeFuture.awaitUninterruptibly(); + } } ioHandlers.clear(); } @@ -102,13 +106,11 @@ public void connect(int clientId, int transportType, int port) connectors.put(Integer.toString(clientId), connector); TestIoHandler testIoHandler = new TestIoHandler(); - synchronized (ioHandlers) { - ioHandlers.put(clientId, testIoHandler); - connector.setHandler(testIoHandler); - ConnectFuture future = connector.connect(address); - future.awaitUninterruptibly(5000L); - Assert.assertTrue("connection to server failed", future.isConnected()); - } + ioHandlers.put(clientId, testIoHandler); + connector.setHandler(testIoHandler); + ConnectFuture future = connector.connect(address); + future.awaitUninterruptibly(5000L); + Assert.assertTrue("connection to server failed", future.isConnected()); } private class TestIoHandler extends IoHandlerAdapter { diff --git a/quickfixj-core/src/test/resources/logging.properties b/quickfixj-core/src/test/resources/logging.properties index 059a6959f2..353d97cee2 100644 --- a/quickfixj-core/src/test/resources/logging.properties +++ b/quickfixj-core/src/test/resources/logging.properties @@ -1,5 +1,5 @@ handlers=java.util.logging.ConsoleHandler -java.util.logging.ConsoleHandler.level=WARN +java.util.logging.ConsoleHandler.level=ALL java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter -.level=WARN \ No newline at end of file +.level=ALL