Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix oom. if config resendRequestChunkSize. Loading data is batch #777

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -2333,8 +2333,13 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre
}

private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo)
throws IOException, InvalidMessage, FieldNotFound {

throws IOException, FieldNotFound {
// Prevent a one-time load of data that is too large, resulting in memory OOM。 if config resendRequestChunkSize。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check that you don't use strange characters in the source code, e.g. as after

resendRequestChunkSize。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, what I optimized was the problem of overloading at one time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please check that you don't use strange characters in the source code, e.g. as after

resendRequestChunkSize。

Are you asking me to remove comments from the code?

int lastEndSeqNoSent = resendRequestChunkSize == 0 ? endSeqNo : beginSeqNo + resendRequestChunkSize - 1;
if (lastEndSeqNoSent > endSeqNo) {
lastEndSeqNoSent = endSeqNo;
}
endSeqNo = lastEndSeqNoSent;
final ArrayList<String> messages = new ArrayList<>();
try {
state.get(beginSeqNo, endSeqNo, messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ public static Session createSession(SessionID sessionID, Application application
.build();
}

public static Session createSession(SessionID sessionID, Application application,
boolean isInitiator, boolean resetOnLogon,
boolean validateSequenceNumbers,
int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum) {
return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator)
.setResetOnLogon(resetOnLogon).setValidateSequenceNumbers(validateSequenceNumbers)
.setPersistMessages(true)
.setResendRequestChunkSize(resendRequestChunkSize)
.setEnableNextExpectedMsgSeqNum(enableNextExpectedMsgSeqNum)
.build();
}


public static Session createSession(SessionID sessionID, Application application,
boolean isInitiator, boolean resetOnLogon, boolean validateSequenceNumbers,
boolean useDataDictionary, DefaultApplVerID senderDefaultApplVerID) {
Expand Down Expand Up @@ -108,7 +121,7 @@ public static final class Builder {
private final boolean forceResendWhenCorruptedStore = false;
private final Set<InetAddress> allowedRemoteAddresses = null;
private final boolean validateIncomingMessage = true;
private final int resendRequestChunkSize = 0;
private int resendRequestChunkSize = 0;
private boolean enableNextExpectedMsgSeqNum = false;
private final boolean enableLastMsgSeqNumProcessed = false;
private final boolean validateChecksum = true;
Expand Down Expand Up @@ -239,5 +252,10 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs
this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum;
return this;
}

public Builder setResendRequestChunkSize(final int resendRequestChunkSize) {
this.resendRequestChunkSize = resendRequestChunkSize;
return this;
}
}
}
43 changes: 43 additions & 0 deletions quickfixj-core/src/test/java/quickfix/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3145,4 +3145,47 @@ public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupCon
assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD));
assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD));
}


@Test
Copy link
Member

@chrjohn chrjohn Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this test do if you roll back your changes in the Session class? To me it looks like it just tests your session builder. But to be honest I am only looking at it on my mobile not IDE, so maybe I'm missing something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what you mean

public void testSessionWithResendRequestChunkSizeAndEnableNextExpectedMsgSeqNum() throws Exception {
final UnitTestApplication application = new UnitTestApplication();

final SessionID sessionID = new SessionID(
FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
try (Session session = createSession(sessionID, application, true,
true, false,10,true)) {

final UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);

session.logon();
session.next();

final Message logonRequest = new Message(responder.sentMessageData);
Message logonResponse = createLogonResponse(sessionID, logonRequest, 1);
// config Field NextExpectedMsgSeqNum=1
logonResponse.setField(new NextExpectedMsgSeqNum(1));
session.next(logonResponse);

assertEquals(
1,
application.lastToAdminMessage().getHeader()
.getInt(MsgSeqNum.FIELD));
assertEquals(2, session.getStore().getNextTargetMsgSeqNum());
assertEquals(2, session.getStore().getNextSenderMsgSeqNum());

session.next(createHeartbeatMessage(1002));
assertNotEquals(ResendRequest.MSGTYPE, application
.lastToAdminMessage().getHeader().getString(MsgType.FIELD));

session.next(createHeartbeatMessage(1003));
assertNotEquals(ResendRequest.MSGTYPE, application
.lastToAdminMessage().getHeader().getString(MsgType.FIELD));

session.next(createHeartbeatMessage(1001));
assertNotEquals(ResendRequest.MSGTYPE, application
.lastToAdminMessage().getHeader().getString(MsgType.FIELD));
}
}
}