Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

analyze flaky TimerTest #345

Draft
wants to merge 51 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
db07b40
- added logging to analyze flaky test
chrjohn Dec 17, 2020
a34b0ea
- added stack dump after two seconds
chrjohn Dec 17, 2020
b60173c
- start thread to create thread dump after Logon
chrjohn Dec 21, 2020
dc4bf97
Merge branch 'master' into flaky-timer-test
chrjohn Dec 22, 2020
97bb50d
- add information on started test to SocketAcceptorTest
chrjohn Dec 28, 2020
0a05a2a
- added some ugly debug logging
chrjohn Dec 31, 2020
5117d1d
Merge branch 'master' into flaky-timer-test
chrjohn Apr 17, 2021
20149e1
Update maven.yml
chrjohn Apr 18, 2021
00d4113
Merge branch 'master' into flaky-timer-test
chrjohn Apr 18, 2021
d0e9940
Merge branch 'master' into flaky-timer-test
chrjohn Apr 18, 2021
c979cd7
code cleanup
chrjohn Apr 19, 2021
420f0f1
Merge branch 'master' into flaky-timer-test
chrjohn Apr 20, 2021
b7431bd
AcceptanceTest changes
chrjohn Apr 20, 2021
c2dfadc
Merge branch 'master' into flaky-timer-test
chrjohn Jul 1, 2021
11c3c70
bogus change to test commit - please ignore
chrjohn Jul 1, 2021
3ae5967
Make isHeartBeatNeeded() use System.nanoTime()
chrjohn Jul 1, 2021
ae5848a
replaced Timer by ScheduledExecutorService
chrjohn Jul 1, 2021
a59c1e4
make timeSinceLastReceivedMessage use System.nanoTime()
chrjohn Jul 1, 2021
a967699
added support for using nanoseconds to SystemTime(Source)
chrjohn Jul 2, 2021
93a11d3
Set state.setLastReceivedTimeNanos in Session
chrjohn Jul 3, 2021
1f075b7
Update maven.yml
chrjohn Jul 4, 2021
e3c489b
Only call initializeHeader() (and setLastSentTime()) once per message
chrjohn Jul 4, 2021
c8131ad
Merge branch 'flaky-timer-test' of https://github.com/quickfix-j/quic…
chrjohn Jul 4, 2021
6f0e17e
made `failed` volatile since it is read/written by different threads
chrjohn Jul 5, 2021
c15d0d8
corrected sending of delayed message
chrjohn Jul 5, 2021
cca9208
corrected usage of setLastSentTimeNanos
chrjohn Jul 5, 2021
47c0184
added conversion from nano to millis
chrjohn Jul 5, 2021
090bb5b
debug logging in failing test
chrjohn Jul 5, 2021
31fdf84
improved test
chrjohn Jul 5, 2021
e155cc9
again debug logging
chrjohn Jul 5, 2021
2b7a212
added missing synchronization around lastReceivedTimeNanos
chrjohn Jul 5, 2021
eac3519
increased logging on maven build
chrjohn Jul 6, 2021
bd01a0e
changed SessionConnector to use nanotime in waitForLogout()
chrjohn Jul 6, 2021
96201ce
Merge branch 'flaky-timer-test' of https://github.com/quickfix-j/quic…
chrjohn Jul 6, 2021
ea33e9f
more debug output
chrjohn Jul 7, 2021
d483b72
even more debug logging
chrjohn Jul 7, 2021
255e12f
- removed unneded synchronization in ATApplication
chrjohn Jul 8, 2021
b20c490
- more debug logging on isHeartBeatNeeded()
chrjohn Jul 8, 2021
0529659
- prevent NPE on stopHandlingMessages()
chrjohn Jul 9, 2021
08110c8
ignore Exception on stop since it spams the logs
chrjohn Jul 9, 2021
c267e08
converted to JUnit4 test
chrjohn Jul 9, 2021
a864f76
use free port to avoid using already used addresses
chrjohn Jul 9, 2021
60bb326
prevent throwing RuntimeException, interrupt current thread instead
chrjohn Jul 9, 2021
8f817cb
use ScreenLogFactory to see logging in github Java CI build
chrjohn Jul 9, 2021
ab146f9
increased logging
chrjohn Jul 29, 2021
2db4602
increased logging in ExpectMessageStep
chrjohn Jul 29, 2021
bd1d734
increased logging in ConnectToServerStep
chrjohn Jul 29, 2021
8831e32
increased logging in InitiateMessageStep
chrjohn Jul 29, 2021
3b8f9bd
Merge branch 'master' into flaky-timer-test
chrjohn Jul 30, 2021
8f5f2ea
Merge branch 'master' into flaky-timer-test
chrjohn Jul 31, 2021
af5c5b0
correct SessionStateTest
chrjohn Jul 31, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ jobs:
name: Test JDK ${{ matrix.java }}, ${{ matrix.os }}

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2

- name: Set up JDK
uses: actions/setup-java@v1
uses: actions/setup-java@v2
with:
distribution: 'adopt'
java-version: ${{ matrix.java }}

- name: Disable NTP on macOS (https://github.com/actions/virtual-environments/issues/820)
run: |
sudo systemsetup -setusingnetworktime off
sudo rm -rf /etc/ntp.conf
if: runner.os == 'macOS'

- name: Test with Maven
env:
MAVEN_OPTS: "-Xms3g -Xmx3g"
run: mvn test -B -V -D"java.util.logging.config.file"="logging.properties" -D"http.keepAlive"="false" -D"maven.wagon.http.pool"="false" -D"maven.wagon.httpconnectionManager.ttlSeconds"="120"
run: mvn test -B -V -D"http.keepAlive"="false" -D"maven.wagon.http.pool"="false" -D"maven.wagon.httpconnectionManager.ttlSeconds"="120"
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ QuickFIX/J
[![Total alerts](https://img.shields.io/lgtm/alerts/g/quickfix-j/quickfixj.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/quickfix-j/quickfixj/alerts/)
[![Language grade: Java](https://img.shields.io/lgtm/grade/java/g/quickfix-j/quickfixj.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/quickfix-j/quickfixj/context:java)


This is the official QuickFIX/J project repository.

## intro
Expand Down
14 changes: 8 additions & 6 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ private void setEnabled(boolean enabled) {
}

private void initializeHeader(Message.Header header) {
state.setLastSentTime(SystemTime.currentTimeMillis());
// state.setLastSentTime(SystemTime.currentTimeMillis()); move to sendRaw()
header.setString(BeginString.FIELD, sessionID.getBeginString());
header.setString(SenderCompID.FIELD, sessionID.getSenderCompID());
optionallySetID(header, SenderSubID.FIELD, sessionID.getSenderSubID());
Expand Down Expand Up @@ -1753,6 +1753,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow)
UnsupportedMessageType, IOException {

state.setLastReceivedTime(SystemTime.currentTimeMillis());
state.setLastReceivedTimeNanos(SystemTime.currentTimeMillisFromNanos());
state.clearTestRequestCounter();

String msgType;
Expand Down Expand Up @@ -1975,11 +1976,11 @@ public void next() throws IOException {
LOG.warn("Heartbeat failure detected but deactivated");
}
} else {
if (state.isTestRequestNeeded()) {
if (state.isTestRequestNeeded(sessionID)) {
generateTestRequest("TEST");
getLog().onEvent("Sent test request TEST");
stateListener.onMissedHeartBeat();
} else if (state.isHeartBeatNeeded()) {
} else if (state.isHeartBeatNeeded(sessionID)) {
generateHeartbeat();
}
}
Expand Down Expand Up @@ -2036,6 +2037,7 @@ private boolean generateLogon() throws IOException {
logon.setBoolean(ResetSeqNumFlag.FIELD, true);
}
state.setLastReceivedTime(SystemTime.currentTimeMillis());
state.setLastReceivedTimeNanos(SystemTime.currentTimeMillisFromNanos());
state.clearTestRequestCounter();
state.setLogonSent(true);
logonAttempts++;
Expand Down Expand Up @@ -2336,6 +2338,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN
generateSequenceReset(receivedMessage, begin, msgSeqNum);
}
getLog().onEvent("Resending message: " + msgSeqNum);
state.setLastSentTime(SystemTime.currentTimeMillis());
send(msg.toString());
begin = 0;
appMessageJustSent = true;
Expand Down Expand Up @@ -2591,8 +2594,8 @@ private boolean sendRaw(Message message, int num) {
final Message.Header header = message.getHeader();
final String msgType = header.getString(MsgType.FIELD);

initializeHeader(header);

initializeHeader(header); // TODO still duplicate to generateXXX methods
state.setLastSentTime(SystemTime.currentTimeMillis());
if (num > 0) {
header.setInt(MsgSeqNum.FIELD, num);
}
Expand Down Expand Up @@ -2647,7 +2650,6 @@ private boolean sendRaw(Message message, int num) {
result = send(messageString);
}
}

return result;
} catch (final IOException e) {
logThrowable(getLog(), "Error reading/writing in MessageStore", e);
Expand Down
55 changes: 49 additions & 6 deletions quickfixj-core/src/main/java/quickfix/SessionState.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public final class SessionState {
private boolean logoutReceived = false;
private int testRequestCounter;
private long lastSentTime;
private long lastSentTimeNanos;
private long lastReceivedTime;
private long lastReceivedTimeNanos;
private final double testRequestDelayMultiplier;
private final double heartBeatTimeoutMultiplier;
private long heartBeatMillis = Long.MAX_VALUE;
Expand Down Expand Up @@ -112,10 +114,23 @@ long getHeartBeatMillis() {
}
}

public boolean isHeartBeatNeeded() {
long millisSinceLastSentTime = SystemTime.currentTimeMillis() - getLastSentTime();
SessionID sessionID1 = new SessionID(FixVersions.BEGINSTRING_FIX44, "ISLD", "TW");
// SessionID sessionID2 = new SessionID(FixVersions.BEGINSTRING_FIX44, "TW", "ISLD");
public boolean isHeartBeatNeeded(SessionID sessionID) {
long currentTimeMillisFromNanos = SystemTime.currentTimeMillisFromNanos();
long lastSentTimeNanos = getLastSentTimeNanos();
long millisSinceLastSentTime = TimeUnit.NANOSECONDS.toMillis(currentTimeMillisFromNanos - lastSentTimeNanos);
// long millisSinceLastSentTime = SystemTime.currentTimeMillis() - getLastSentTime();
boolean name = millisSinceLastSentTime + 10 > getHeartBeatMillis() && getTestRequestCounter() == 0;
// QFJ-448: allow 10 ms leeway since exact comparison causes skipped heartbeats occasionally
return millisSinceLastSentTime + 10 > getHeartBeatMillis() && getTestRequestCounter() == 0;
if (sessionID1.getSenderCompID().equals(sessionID.getSenderCompID())
|| sessionID1.getSenderCompID().equals(sessionID.getTargetCompID())
|| "ACCEPTOR-1".equals(sessionID.getTargetCompID())) {
getLog().onEvent("isHeartBeatNeeded() = " + name + " current=" + currentTimeMillisFromNanos
+ " - lastSentTime=" + lastSentTimeNanos
+ "= millisSinceLastSent=" + millisSinceLastSentTime);
}
return name;
}

public boolean isInitiator() {
Expand All @@ -128,21 +143,40 @@ public long getLastReceivedTime() {
}
}

public long getLastReceivedTimeNanos() {
synchronized (lock) {
return lastReceivedTimeNanos;
}
}

public void setLastReceivedTime(long lastReceivedTime) {
synchronized (lock) {
this.lastReceivedTime = lastReceivedTime;
}
}

public void setLastReceivedTimeNanos(long lastReceivedTimeNanos) {
synchronized (lock) {
this.lastReceivedTimeNanos = lastReceivedTimeNanos;
}
}

public long getLastSentTime() {
synchronized (lock) {
return lastSentTime;
}
}

public long getLastSentTimeNanos() {
synchronized (lock) {
return lastSentTimeNanos;
}
}

public void setLastSentTime(long lastSentTime) {
synchronized (lock) {
this.lastSentTime = lastSentTime;
this.lastSentTimeNanos = SystemTime.currentTimeMillisFromNanos();
}
}

Expand Down Expand Up @@ -184,7 +218,8 @@ public void setLogonSent(boolean logonSent) {

public boolean isLogonTimedOut() {
synchronized (lock) {
return isLogonSent() && SystemTime.currentTimeMillis() - getLastReceivedTime() >= getLogonTimeoutMs();
// return isLogonSent() && SystemTime.currentTimeMillis() - getLastReceivedTime() >= getLogonTimeoutMs();
return isLogonSent() && TimeUnit.NANOSECONDS.toMillis(SystemTime.currentTimeMillisFromNanos() - getLastReceivedTimeNanos()) >= getLogonTimeoutMs();
}
}

Expand Down Expand Up @@ -282,14 +317,22 @@ public void incrementTestRequestCounter() {
}
}

public boolean isTestRequestNeeded() {
public boolean isTestRequestNeeded(SessionID sessionID) {
long millisSinceLastReceivedTime = timeSinceLastReceivedMessage();
if (sessionID.getSenderCompID().equals(sessionID1.getSenderCompID())
|| sessionID.getTargetCompID().equals(sessionID1.getSenderCompID())
|| "ACCEPTOR-1".equals(sessionID.getTargetCompID())) {
getLog().onEvent("isTestRequestNeeded() - millisSinceLastReceived = " + millisSinceLastReceivedTime);
}
return millisSinceLastReceivedTime >= ((1 + testRequestDelayMultiplier) * (getTestRequestCounter() + 1))
* getHeartBeatMillis();
}

private long timeSinceLastReceivedMessage() {
return SystemTime.currentTimeMillis() - getLastReceivedTime();
long lastReceivedTimeNanos1 = getLastReceivedTimeNanos();
long currentTimeMillisFromNanos = SystemTime.currentTimeMillisFromNanos();
long timeSinceLastReceivedMessage = TimeUnit.NANOSECONDS.toMillis(currentTimeMillisFromNanos - lastReceivedTimeNanos1);
return timeSinceLastReceivedMessage;
}

public boolean isTimedOut() {
Expand Down
9 changes: 9 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SystemTime.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public long getTime() {
return System.currentTimeMillis();
}

@Override
public long getTimeFromNanos() {
return System.nanoTime();
}

@Override
public LocalDateTime getNow() {
return LocalDateTime.now(ZoneOffset.UTC);
Expand All @@ -48,6 +53,10 @@ public LocalDateTime getNow() {
public static long currentTimeMillis() {
return systemTimeSource.getTime();
}

public static long currentTimeMillisFromNanos() {
return systemTimeSource.getTimeFromNanos();
}

public static LocalDateTime now() {
return systemTimeSource.getNow();
Expand Down
2 changes: 2 additions & 0 deletions quickfixj-core/src/main/java/quickfix/SystemTimeSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public interface SystemTimeSource {
* @return current (possible simulated) time
*/
long getTime();

long getTimeFromNanos();

/**
* Obtain current LocalDateTime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.mina.core.future.CloseFuture;
import org.apache.mina.core.service.IoService;
import quickfix.SystemTime;

/**
* An abstract base class for acceptors and initiators. Provides support for common functionality and also serves as an
Expand Down Expand Up @@ -265,15 +266,15 @@ protected void logoutAllSessions(boolean forceDisconnect) {
}

protected void waitForLogout() {
long start = System.currentTimeMillis();
long start = SystemTime.currentTimeMillisFromNanos();
Set<Session> loggedOnSessions;
while (!(loggedOnSessions = getLoggedOnSessions()).isEmpty()) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
final long elapsed = System.currentTimeMillis() - start;
final long elapsed = TimeUnit.NANOSECONDS.toMillis(SystemTime.currentTimeMillisFromNanos() - start);
Iterator<Session> sessionItr = loggedOnSessions.iterator();
while (sessionItr.hasNext()) {
Session session = sessionItr.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void onMessage(Session quickfixSession, Message message) {
queueTracker.put(new SessionMessageEvent(quickfixSession, message));
} catch (InterruptedException e) {
isStopped = true;
throw new RuntimeException(e);
Thread.currentThread().interrupt();
}
}

Expand Down Expand Up @@ -194,7 +194,9 @@ public void stopHandlingMessages(boolean join) {
stopHandlingMessages();
if (join) {
try {
messageProcessingThread.join();
if (messageProcessingThread != null) {
messageProcessingThread.join();
}
} catch (InterruptedException e) {
sessionConnector.log.error("{} interrupted.", MESSAGE_PROCESSOR_THREAD_NAME);
}
Expand Down
23 changes: 21 additions & 2 deletions quickfixj-core/src/test/java/quickfix/MockSystemTimeSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;

public class MockSystemTimeSource implements SystemTimeSource {
private long[] systemTimes = { System.currentTimeMillis() };

private long[] systemTimes = {System.currentTimeMillis()};
private long[] systemTimesNanos = {System.nanoTime()};
private int offset;

public MockSystemTimeSource() {
Expand All @@ -34,14 +37,19 @@ public MockSystemTimeSource() {

public MockSystemTimeSource(long time) {
setSystemTimes(time);
setSystemTimesNanos(TimeUnit.MILLISECONDS.toNanos(time));
}

public void setSystemTimes(long[] times) {
systemTimes = times;
}

void setSystemTimes(long time) {
systemTimes = new long[] { time };
systemTimes = new long[]{time};
}

void setSystemTimesNanos(long time) {
systemTimesNanos = new long[]{time};
}

public void setTime(Calendar c) {
Expand All @@ -56,10 +64,21 @@ public long getTime() {
return systemTimes[offset];
}

@Override
public long getTimeFromNanos() {
if (systemTimesNanos.length - offset > 1) {
offset++;
}
return systemTimesNanos[offset];
}

public void increment(long delta) {
if (systemTimes.length - offset == 1) {
systemTimes[offset] += delta;
}
if (systemTimesNanos.length - offset == 1) {
systemTimesNanos[offset] += TimeUnit.MILLISECONDS.toNanos(delta);
}
}

@Override
Expand Down
Loading