From 17a17745227d0f130817eb83fe4219037495a7a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Mon, 18 Mar 2024 18:49:27 +0800 Subject: [PATCH 1/7] fix oom. if config resendRequestChunkSize. Loading data is batch --- .../src/main/java/quickfix/Session.java | 92 ++++++++++--------- 1 file changed, 48 insertions(+), 44 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 06a88b50d9..5175c2d0f2 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -460,7 +460,6 @@ public class Session implements Closeable { public static final double DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER = 1.4; private static final String ENCOUNTERED_END_OF_STREAM = "Encountered END_OF_STREAM"; - private static final int BAD_COMPID_REJ_REASON = SessionRejectReason.COMPID_PROBLEM; private static final String BAD_COMPID_TEXT = new FieldException(BAD_COMPID_REJ_REASON).getMessage(); private static final int BAD_TIME_REJ_REASON = SessionRejectReason.SENDINGTIME_ACCURACY_PROBLEM; @@ -475,10 +474,10 @@ public class Session implements Closeable { DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval) { this(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule, logFactory, - messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, - false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[]{5}, + false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); } Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, @@ -499,10 +498,10 @@ public class Session implements Closeable { boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier, boolean allowPossDup) { this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory, - messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, - false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); + messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[]{5}, + false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); } Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory, @@ -574,7 +573,7 @@ public class Session implements Closeable { } state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0, - messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); + messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); registerSession(this); @@ -667,7 +666,7 @@ public static boolean sendToTarget(Message message) throws SessionNotFound { * identifiers. The session qualifier is used to distinguish sessions with * the same target identifiers. * - * @param message a FIX message + * @param message a FIX message * @param qualifier a session qualifier * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located @@ -695,7 +694,7 @@ private static String getSenderCompIDFromMessage(final Message message) throws F * ID. The sender company ID is provided as an argument rather than from the * message. * - * @param message a FIX message + * @param message a FIX message * @param senderCompID the sender's company ID * @param targetCompID the target's company ID * @return true is send was successful, false otherwise @@ -712,15 +711,15 @@ public static boolean sendToTarget(Message message, String senderCompID, String * message. The session qualifier is used to distinguish sessions with the * same target identifiers. * - * @param message a FIX message + * @param message a FIX message * @param senderCompID the sender's company ID * @param targetCompID the target's company ID - * @param qualifier a session qualifier + * @param qualifier a session qualifier * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located */ public static boolean sendToTarget(Message message, String senderCompID, String targetCompID, - String qualifier) throws SessionNotFound { + String qualifier) throws SessionNotFound { try { return sendToTarget(message, new SessionID(message.getHeader().getString(BeginString.FIELD), senderCompID, @@ -735,7 +734,7 @@ public static boolean sendToTarget(Message message, String senderCompID, String /** * Send a message to the session specified by the provided session ID. * - * @param message a FIX message + * @param message a FIX message * @param sessionID the target SessionID * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located @@ -855,7 +854,7 @@ public boolean isEnabled() { /** * Predicate indicating whether a logon message has been sent. - * + *

* (QF Compatibility) * * @return true if logon message was sent, false otherwise. @@ -866,7 +865,7 @@ public boolean sentLogon() { /** * Predicate indicating whether a logon message has been received. - * + *

* (QF Compatibility) * * @return true if logon message was received, false otherwise. @@ -877,7 +876,7 @@ public boolean receivedLogon() { /** * Predicate indicating whether a logout message has been sent. - * + *

* (QF Compatibility) * * @return true if logout message was sent, false otherwise. @@ -1340,8 +1339,8 @@ private void nextResendRequest(Message resendRequest) throws IOException, Reject * A Gap has been request to be filled by either a resend request or on a logon message * * @param messageOutSync the message that caused the gap to be filled - * @param beginSeqNo the seqNum of the first missing message - * @param endSeqNo the seqNum of the last missing message + * @param beginSeqNo the seqNum of the first missing message + * @param endSeqNo the seqNum of the last missing message * @throws FieldNotFound * @throws IOException * @throws InvalidMessage @@ -1705,7 +1704,7 @@ private void setRejectReason(Message reject, String reason) { } private void setRejectReason(Message reject, int field, String reason, - boolean includeFieldInReason) { + boolean includeFieldInReason) { boolean isRejectMessage; try { isRejectMessage = MsgType.REJECT.equals(reject.getHeader().getString(MsgType.FIELD)); @@ -1718,7 +1717,7 @@ private void setRejectReason(Message reject, int field, String reason, reject.setString(Text.FIELD, reason); } else { String rejectReason = reason; - if (includeFieldInReason && !rejectReason.endsWith("" + field) ) { + if (includeFieldInReason && !rejectReason.endsWith("" + field)) { rejectReason = rejectReason + ", field=" + field; } reject.setString(Text.FIELD, rejectReason); @@ -1817,7 +1816,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) return false; } - if(!state.isResetReceived()){ + if (!state.isResetReceived()) { if (checkTooHigh && isTargetTooHigh(msgSeqNum)) { doTargetTooHigh(msg); return false; @@ -2092,13 +2091,13 @@ private boolean generateLogon() throws IOException { /** * Logs out from session and closes the network connection. - * + *

* This method should not be called from user-code since it is likely * to deadlock when called from a different thread than the Session thread * and messages are sent/received concurrently. * Instead the logout() method should be used where possible. * - * @param reason the reason why the session is disconnected + * @param reason the reason why the session is disconnected * @param logError set to true if this disconnection is an error * @throws IOException IO error */ @@ -2211,7 +2210,6 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre resetState(); } - // reset logout messages state.setLogoutReceived(false); state.setLogoutSent(false); @@ -2333,8 +2331,13 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre } private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo) - throws IOException, InvalidMessage, FieldNotFound { - + throws IOException, FieldNotFound { + // Prevent a one-time load of data that is too large, resulting in memory OOM。 if config resendRequestChunkSize + int lastEndSeqNoSent = resendRequestChunkSize == 0 ? endSeqNo : beginSeqNo + resendRequestChunkSize - 1; + if (lastEndSeqNoSent > endSeqNo) { + lastEndSeqNoSent = endSeqNo; + } + endSeqNo = lastEndSeqNoSent; final ArrayList messages = new ArrayList<>(); try { state.get(beginSeqNo, endSeqNo, messages); @@ -2368,7 +2371,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN } catch (final Exception e) { getLog().onErrorEvent( "Error handling ResendRequest: failed to parse message (" + e.getMessage() - + "): " + message); + + "): " + message); // Note: a SequenceReset message will be generated to fill the gap continue; } @@ -2415,7 +2418,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN * may not have been realistic to production on the other hand. * Apart from the else */ - generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum); + generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum); } } else { if (begin != 0) { @@ -2516,6 +2519,7 @@ private void generateResendRequest(String beginString, int msgSeqNum) { /** * Sends a resend request + * * @param beginString The begin string of the session. * FIX 4.1 and earlier get sent 999999 as the upper bound for unbounded requests. * FIX 4.2 and later get sent 0 @@ -2585,7 +2589,7 @@ private boolean isTargetTooHigh(int sequence) throws IOException { /** * Outgoing Logon in response to Logon received * - * @param otherLogon the one we are responding to with a Logon (response) + * @param otherLogon the one we are responding to with a Logon (response) * @param expectedTargetNum value for 789 tag (used only if enabled in properties) * @throws FieldNotFound expected message field of Logon not present. */ @@ -2619,20 +2623,20 @@ private void generateLogon(Message otherLogon, int expectedTargetNum) throws Fie } private void persist(Header header, String messageString, int num) throws IOException, FieldNotFound { - if (num == 0) { - if (persistMessages) { - final int msgSeqNum = header.getInt(MsgSeqNum.FIELD); - state.set(msgSeqNum, messageString); - } - state.incrNextSenderMsgSeqNum(); - } + if (num == 0) { + if (persistMessages) { + final int msgSeqNum = header.getInt(MsgSeqNum.FIELD); + state.set(msgSeqNum, messageString); + } + state.incrNextSenderMsgSeqNum(); + } } /** * Send the message * * @param message is the message to send - * @param num is the seq num of the message to send, if 0, the next expected sender seqnum is used. + * @param num is the seq num of the message to send, if 0, the next expected sender seqnum is used. * @return */ private boolean sendRaw(Message message, int num) { @@ -2736,7 +2740,7 @@ private void resetState() { * Send a message to a counterparty. Sequence numbers and information about the sender * and target identification will be added automatically (or overwritten if that * information already is present). - * + *

* The returned status flag is included for * compatibility with the JNI API but its usefulness is questionable. * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean @@ -2754,15 +2758,15 @@ public boolean send(Message message) { * Send a message to a counterparty. Sequence numbers and information about the sender * and target identification will be added automatically (or overwritten if that * information already is present). - * + *

* The returned status flag is included for * compatibility with the JNI API but its usefulness is questionable. * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean * only indicates the message was successfully queued for transmission. An error could still * occur before the message data is actually sent. * - * @param message the message to send - * @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message + * @param message the message to send + * @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message * @return a status flag indicating whether the write to the network layer was successful. */ public boolean send(Message message, boolean allowPosDup) { From 628baa0c6163c39baa483f17d8c897b8e2467de8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Mon, 18 Mar 2024 18:55:29 +0800 Subject: [PATCH 2/7] fix oom. if config resendRequestChunkSize. Loading data is batch --- .../src/main/java/quickfix/Session.java | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 5175c2d0f2..03bf56ee54 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -460,6 +460,7 @@ public class Session implements Closeable { public static final double DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER = 1.4; private static final String ENCOUNTERED_END_OF_STREAM = "Encountered END_OF_STREAM"; + private static final int BAD_COMPID_REJ_REASON = SessionRejectReason.COMPID_PROBLEM; private static final String BAD_COMPID_TEXT = new FieldException(BAD_COMPID_REJ_REASON).getMessage(); private static final int BAD_TIME_REJ_REASON = SessionRejectReason.SENDINGTIME_ACCURACY_PROBLEM; @@ -475,7 +476,7 @@ public class Session implements Closeable { MessageFactory messageFactory, int heartbeatInterval) { this(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule, logFactory, messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[]{5}, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); } @@ -499,7 +500,7 @@ public class Session implements Closeable { boolean allowPossDup) { this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory, messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[]{5}, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); } @@ -666,7 +667,7 @@ public static boolean sendToTarget(Message message) throws SessionNotFound { * identifiers. The session qualifier is used to distinguish sessions with * the same target identifiers. * - * @param message a FIX message + * @param message a FIX message * @param qualifier a session qualifier * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located @@ -694,7 +695,7 @@ private static String getSenderCompIDFromMessage(final Message message) throws F * ID. The sender company ID is provided as an argument rather than from the * message. * - * @param message a FIX message + * @param message a FIX message * @param senderCompID the sender's company ID * @param targetCompID the target's company ID * @return true is send was successful, false otherwise @@ -711,10 +712,10 @@ public static boolean sendToTarget(Message message, String senderCompID, String * message. The session qualifier is used to distinguish sessions with the * same target identifiers. * - * @param message a FIX message + * @param message a FIX message * @param senderCompID the sender's company ID * @param targetCompID the target's company ID - * @param qualifier a session qualifier + * @param qualifier a session qualifier * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located */ @@ -734,7 +735,7 @@ public static boolean sendToTarget(Message message, String senderCompID, String /** * Send a message to the session specified by the provided session ID. * - * @param message a FIX message + * @param message a FIX message * @param sessionID the target SessionID * @return true is send was successful, false otherwise * @throws SessionNotFound if session could not be located @@ -854,7 +855,7 @@ public boolean isEnabled() { /** * Predicate indicating whether a logon message has been sent. - *

+ * * (QF Compatibility) * * @return true if logon message was sent, false otherwise. @@ -865,7 +866,7 @@ public boolean sentLogon() { /** * Predicate indicating whether a logon message has been received. - *

+ * * (QF Compatibility) * * @return true if logon message was received, false otherwise. @@ -876,7 +877,7 @@ public boolean receivedLogon() { /** * Predicate indicating whether a logout message has been sent. - *

+ * * (QF Compatibility) * * @return true if logout message was sent, false otherwise. @@ -1339,8 +1340,8 @@ private void nextResendRequest(Message resendRequest) throws IOException, Reject * A Gap has been request to be filled by either a resend request or on a logon message * * @param messageOutSync the message that caused the gap to be filled - * @param beginSeqNo the seqNum of the first missing message - * @param endSeqNo the seqNum of the last missing message + * @param beginSeqNo the seqNum of the first missing message + * @param endSeqNo the seqNum of the last missing message * @throws FieldNotFound * @throws IOException * @throws InvalidMessage @@ -1717,7 +1718,7 @@ private void setRejectReason(Message reject, int field, String reason, reject.setString(Text.FIELD, reason); } else { String rejectReason = reason; - if (includeFieldInReason && !rejectReason.endsWith("" + field)) { + if (includeFieldInReason && !rejectReason.endsWith("" + field) ) { rejectReason = rejectReason + ", field=" + field; } reject.setString(Text.FIELD, rejectReason); @@ -1816,7 +1817,7 @@ private boolean verify(Message msg, boolean checkTooHigh, boolean checkTooLow) return false; } - if (!state.isResetReceived()) { + if(!state.isResetReceived()){ if (checkTooHigh && isTargetTooHigh(msgSeqNum)) { doTargetTooHigh(msg); return false; @@ -2091,13 +2092,13 @@ private boolean generateLogon() throws IOException { /** * Logs out from session and closes the network connection. - *

+ * * This method should not be called from user-code since it is likely * to deadlock when called from a different thread than the Session thread * and messages are sent/received concurrently. * Instead the logout() method should be used where possible. * - * @param reason the reason why the session is disconnected + * @param reason the reason why the session is disconnected * @param logError set to true if this disconnection is an error * @throws IOException IO error */ @@ -2210,6 +2211,7 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre resetState(); } + // reset logout messages state.setLogoutReceived(false); state.setLogoutSent(false); @@ -2332,7 +2334,7 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo) throws IOException, FieldNotFound { - // Prevent a one-time load of data that is too large, resulting in memory OOM。 if config resendRequestChunkSize + // Prevent a one-time load of data that is too large, resulting in memory OOM。 if config resendRequestChunkSize。 int lastEndSeqNoSent = resendRequestChunkSize == 0 ? endSeqNo : beginSeqNo + resendRequestChunkSize - 1; if (lastEndSeqNoSent > endSeqNo) { lastEndSeqNoSent = endSeqNo; @@ -2519,7 +2521,6 @@ private void generateResendRequest(String beginString, int msgSeqNum) { /** * Sends a resend request - * * @param beginString The begin string of the session. * FIX 4.1 and earlier get sent 999999 as the upper bound for unbounded requests. * FIX 4.2 and later get sent 0 @@ -2589,7 +2590,7 @@ private boolean isTargetTooHigh(int sequence) throws IOException { /** * Outgoing Logon in response to Logon received * - * @param otherLogon the one we are responding to with a Logon (response) + * @param otherLogon the one we are responding to with a Logon (response) * @param expectedTargetNum value for 789 tag (used only if enabled in properties) * @throws FieldNotFound expected message field of Logon not present. */ @@ -2636,7 +2637,7 @@ private void persist(Header header, String messageString, int num) throws IOExce * Send the message * * @param message is the message to send - * @param num is the seq num of the message to send, if 0, the next expected sender seqnum is used. + * @param num is the seq num of the message to send, if 0, the next expected sender seqnum is used. * @return */ private boolean sendRaw(Message message, int num) { @@ -2740,7 +2741,7 @@ private void resetState() { * Send a message to a counterparty. Sequence numbers and information about the sender * and target identification will be added automatically (or overwritten if that * information already is present). - *

+ * * The returned status flag is included for * compatibility with the JNI API but its usefulness is questionable. * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean @@ -2758,15 +2759,15 @@ public boolean send(Message message) { * Send a message to a counterparty. Sequence numbers and information about the sender * and target identification will be added automatically (or overwritten if that * information already is present). - *

+ * * The returned status flag is included for * compatibility with the JNI API but its usefulness is questionable. * In QuickFIX/J, the message is transmitted using asynchronous network I/O so the boolean * only indicates the message was successfully queued for transmission. An error could still * occur before the message data is actually sent. * - * @param message the message to send - * @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message + * @param message the message to send + * @param allowPosDup whether to allow PossDupFlag and OrigSendingTime in the message * @return a status flag indicating whether the write to the network layer was successful. */ public boolean send(Message message, boolean allowPosDup) { From e0c1d9ef730b524fe11953b459bebb629823680a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Mon, 18 Mar 2024 18:58:26 +0800 Subject: [PATCH 3/7] fix oom. if config resendRequestChunkSize. Loading data is batch rollback code style --- .../src/main/java/quickfix/Session.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 03bf56ee54..9921811dc6 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -475,10 +475,10 @@ public class Session implements Closeable { DataDictionaryProvider dataDictionaryProvider, SessionSchedule sessionSchedule, LogFactory logFactory, MessageFactory messageFactory, int heartbeatInterval) { this(application, messageStoreFactory, sessionID, dataDictionaryProvider, sessionSchedule, logFactory, - messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, - false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); + messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, + false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, false); } Session(Application application, MessageStoreFactory messageStoreFactory, SessionID sessionID, @@ -499,10 +499,10 @@ public class Session implements Closeable { boolean validateChecksum, List logonTags, double heartBeatTimeoutMultiplier, boolean allowPossDup) { this(application, messageStoreFactory, new InMemoryMessageQueueFactory(), sessionID, dataDictionaryProvider, sessionSchedule, logFactory, - messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, - false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, - false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, - false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); + messageFactory, heartbeatInterval, true, DEFAULT_MAX_LATENCY, UtcTimestampPrecision.MILLIS, false, false, + false, false, true, false, true, false, DEFAULT_TEST_REQUEST_DELAY_MULTIPLIER, null, true, new int[] {5}, + false, false, false, false, true, false, true, false, null, true, DEFAULT_RESEND_RANGE_CHUNK_SIZE, false, + false, false, new ArrayList(), DEFAULT_HEARTBEAT_TIMEOUT_MULTIPLIER, allowPossDup); } Session(Application application, MessageStoreFactory messageStoreFactory, MessageQueueFactory messageQueueFactory, @@ -574,7 +574,7 @@ public class Session implements Closeable { } state = new SessionState(this, engineLog, heartbeatInterval, heartbeatInterval != 0, - messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); + messageStore, messageQueue, testRequestDelayMultiplier, heartBeatTimeoutMultiplier); registerSession(this); @@ -720,7 +720,7 @@ public static boolean sendToTarget(Message message, String senderCompID, String * @throws SessionNotFound if session could not be located */ public static boolean sendToTarget(Message message, String senderCompID, String targetCompID, - String qualifier) throws SessionNotFound { + String qualifier) throws SessionNotFound { try { return sendToTarget(message, new SessionID(message.getHeader().getString(BeginString.FIELD), senderCompID, @@ -1705,7 +1705,7 @@ private void setRejectReason(Message reject, String reason) { } private void setRejectReason(Message reject, int field, String reason, - boolean includeFieldInReason) { + boolean includeFieldInReason) { boolean isRejectMessage; try { isRejectMessage = MsgType.REJECT.equals(reject.getHeader().getString(MsgType.FIELD)); @@ -2373,7 +2373,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN } catch (final Exception e) { getLog().onErrorEvent( "Error handling ResendRequest: failed to parse message (" + e.getMessage() - + "): " + message); + + "): " + message); // Note: a SequenceReset message will be generated to fill the gap continue; } @@ -2420,7 +2420,7 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN * may not have been realistic to production on the other hand. * Apart from the else */ - generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum); + generateSequenceResetIfNeeded(receivedMessage, newBegin, endSeqNo, msgSeqNum); } } else { if (begin != 0) { @@ -2624,13 +2624,13 @@ private void generateLogon(Message otherLogon, int expectedTargetNum) throws Fie } private void persist(Header header, String messageString, int num) throws IOException, FieldNotFound { - if (num == 0) { - if (persistMessages) { - final int msgSeqNum = header.getInt(MsgSeqNum.FIELD); - state.set(msgSeqNum, messageString); - } - state.incrNextSenderMsgSeqNum(); - } + if (num == 0) { + if (persistMessages) { + final int msgSeqNum = header.getInt(MsgSeqNum.FIELD); + state.set(msgSeqNum, messageString); + } + state.incrNextSenderMsgSeqNum(); + } } /** From e47e651ac3e7c2f6c2f729efedb20a516ccd2891 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Tue, 19 Mar 2024 13:30:22 +0800 Subject: [PATCH 4/7] add case test --- .../quickfix/SessionFactoryTestSupport.java | 20 ++++++++- .../src/test/java/quickfix/SessionTest.java | 45 +++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index 35356fb9e5..ac596d2d00 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -44,6 +44,19 @@ public static Session createSession(SessionID sessionID, Application application .build(); } + public static Session createSession(SessionID sessionID, Application application, + boolean isInitiator, boolean resetOnLogon, + boolean validateSequenceNumbers, + int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum) { + return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator) + .setResetOnLogon(resetOnLogon).setValidateSequenceNumbers(validateSequenceNumbers) + .setPersistMessages(true) + .setResendRequestChunkSize(resendRequestChunkSize) + .setEnableNextExpectedMsgSeqNum(enableNextExpectedMsgSeqNum) + .build(); + } + + public static Session createSession(SessionID sessionID, Application application, boolean isInitiator, boolean resetOnLogon, boolean validateSequenceNumbers, boolean useDataDictionary, DefaultApplVerID senderDefaultApplVerID) { @@ -108,7 +121,7 @@ public static final class Builder { private final boolean forceResendWhenCorruptedStore = false; private final Set allowedRemoteAddresses = null; private final boolean validateIncomingMessage = true; - private final int resendRequestChunkSize = 0; + private int resendRequestChunkSize = 0; private boolean enableNextExpectedMsgSeqNum = false; private final boolean enableLastMsgSeqNumProcessed = false; private final boolean validateChecksum = true; @@ -239,5 +252,10 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum; return this; } + + public Builder setResendRequestChunkSize(final int resendRequestChunkSize) { + this.resendRequestChunkSize = resendRequestChunkSize; + return this; + } } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index daadf31032..c9bef562c2 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -3145,4 +3145,49 @@ public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupCon assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); } + + + @Test + public void testSessionWithResendRequestChunkSizeAndEnableNextExpectedMsgSeqNum() throws Exception { + final UnitTestApplication application = new UnitTestApplication(); + + final SessionID sessionID = new SessionID( + FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); + try (Session session = createSession(sessionID, application, true, + true, false,10,true)) { + + final UnitTestResponder responder = new UnitTestResponder(); + session.setResponder(responder); + + session.logon(); + session.next(); + + final Message logonRequest = new Message(responder.sentMessageData); + logonRequest.getHeader().setField(new NextExpectedMsgSeqNum(1)); + + Message logonResponse = createLogonResponse(sessionID, logonRequest, 1); + // config NextExpectedMsgSeqNum=1 + logonResponse.getHeader().setField(new NextExpectedMsgSeqNum(1)); + session.next(logonResponse); + + assertEquals( + 1, + application.lastToAdminMessage().getHeader() + .getInt(MsgSeqNum.FIELD)); + assertEquals(2, session.getStore().getNextTargetMsgSeqNum()); + assertEquals(2, session.getStore().getNextSenderMsgSeqNum()); + + session.next(createHeartbeatMessage(1002)); + assertNotEquals(ResendRequest.MSGTYPE, application + .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); + + session.next(createHeartbeatMessage(1003)); + assertNotEquals(ResendRequest.MSGTYPE, application + .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); + + session.next(createHeartbeatMessage(1001)); + assertNotEquals(ResendRequest.MSGTYPE, application + .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); + } + } } From 5a18e398ed6af48a4bfd8516c9f0aa7333d2e3c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Tue, 19 Mar 2024 13:33:04 +0800 Subject: [PATCH 5/7] add case test --- quickfixj-core/src/test/java/quickfix/SessionTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index c9bef562c2..0e85d2a897 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -3163,11 +3163,9 @@ public void testSessionWithResendRequestChunkSizeAndEnableNextExpectedMsgSeqNum( session.next(); final Message logonRequest = new Message(responder.sentMessageData); - logonRequest.getHeader().setField(new NextExpectedMsgSeqNum(1)); - Message logonResponse = createLogonResponse(sessionID, logonRequest, 1); - // config NextExpectedMsgSeqNum=1 - logonResponse.getHeader().setField(new NextExpectedMsgSeqNum(1)); + // config Field NextExpectedMsgSeqNum=1 + logonResponse.setField(new NextExpectedMsgSeqNum(1)); session.next(logonResponse); assertEquals( From b96e9b35bf7c9820d5fb1c2d898eb9b8d5e9d899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Sun, 31 Mar 2024 19:19:40 +0800 Subject: [PATCH 6/7] add case test --- quickfixj-core/src/main/java/quickfix/Session.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/quickfixj-core/src/main/java/quickfix/Session.java b/quickfixj-core/src/main/java/quickfix/Session.java index 9921811dc6..742fa7d835 100644 --- a/quickfixj-core/src/main/java/quickfix/Session.java +++ b/quickfixj-core/src/main/java/quickfix/Session.java @@ -2333,8 +2333,7 @@ private void nextLogon(Message logon) throws FieldNotFound, RejectLogon, Incorre } private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqNo) - throws IOException, FieldNotFound { - // Prevent a one-time load of data that is too large, resulting in memory OOM。 if config resendRequestChunkSize。 + throws IOException, InvalidMessage, FieldNotFound { int lastEndSeqNoSent = resendRequestChunkSize == 0 ? endSeqNo : beginSeqNo + resendRequestChunkSize - 1; if (lastEndSeqNoSent > endSeqNo) { lastEndSeqNoSent = endSeqNo; From 074f73cccc80a23977ef17d29fd8dab7a86d28fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=8B=A4=E5=A5=8B?= Date: Sat, 25 May 2024 15:03:00 +0800 Subject: [PATCH 7/7] rollback case test --- .../quickfix/SessionFactoryTestSupport.java | 20 +-------- .../src/test/java/quickfix/SessionTest.java | 43 ------------------- 2 files changed, 1 insertion(+), 62 deletions(-) diff --git a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java index ac596d2d00..35356fb9e5 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java +++ b/quickfixj-core/src/test/java/quickfix/SessionFactoryTestSupport.java @@ -44,19 +44,6 @@ public static Session createSession(SessionID sessionID, Application application .build(); } - public static Session createSession(SessionID sessionID, Application application, - boolean isInitiator, boolean resetOnLogon, - boolean validateSequenceNumbers, - int resendRequestChunkSize, boolean enableNextExpectedMsgSeqNum) { - return new Builder().setSessionId(sessionID).setApplication(application).setIsInitiator(isInitiator) - .setResetOnLogon(resetOnLogon).setValidateSequenceNumbers(validateSequenceNumbers) - .setPersistMessages(true) - .setResendRequestChunkSize(resendRequestChunkSize) - .setEnableNextExpectedMsgSeqNum(enableNextExpectedMsgSeqNum) - .build(); - } - - public static Session createSession(SessionID sessionID, Application application, boolean isInitiator, boolean resetOnLogon, boolean validateSequenceNumbers, boolean useDataDictionary, DefaultApplVerID senderDefaultApplVerID) { @@ -121,7 +108,7 @@ public static final class Builder { private final boolean forceResendWhenCorruptedStore = false; private final Set allowedRemoteAddresses = null; private final boolean validateIncomingMessage = true; - private int resendRequestChunkSize = 0; + private final int resendRequestChunkSize = 0; private boolean enableNextExpectedMsgSeqNum = false; private final boolean enableLastMsgSeqNumProcessed = false; private final boolean validateChecksum = true; @@ -252,10 +239,5 @@ public Builder setEnableNextExpectedMsgSeqNum(final boolean enableNextExpectedMs this.enableNextExpectedMsgSeqNum = enableNextExpectedMsgSeqNum; return this; } - - public Builder setResendRequestChunkSize(final int resendRequestChunkSize) { - this.resendRequestChunkSize = resendRequestChunkSize; - return this; - } } } diff --git a/quickfixj-core/src/test/java/quickfix/SessionTest.java b/quickfixj-core/src/test/java/quickfix/SessionTest.java index 0e85d2a897..daadf31032 100644 --- a/quickfixj-core/src/test/java/quickfix/SessionTest.java +++ b/quickfixj-core/src/test/java/quickfix/SessionTest.java @@ -3145,47 +3145,4 @@ public void testSend_ShouldKeepPossDupFlagAndOrigSendingTime_GivenAllowPosDupCon assertTrue(sentMessage.getHeader().isSetField(PossDupFlag.FIELD)); assertTrue(sentMessage.getHeader().isSetField(OrigSendingTime.FIELD)); } - - - @Test - public void testSessionWithResendRequestChunkSizeAndEnableNextExpectedMsgSeqNum() throws Exception { - final UnitTestApplication application = new UnitTestApplication(); - - final SessionID sessionID = new SessionID( - FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET"); - try (Session session = createSession(sessionID, application, true, - true, false,10,true)) { - - final UnitTestResponder responder = new UnitTestResponder(); - session.setResponder(responder); - - session.logon(); - session.next(); - - final Message logonRequest = new Message(responder.sentMessageData); - Message logonResponse = createLogonResponse(sessionID, logonRequest, 1); - // config Field NextExpectedMsgSeqNum=1 - logonResponse.setField(new NextExpectedMsgSeqNum(1)); - session.next(logonResponse); - - assertEquals( - 1, - application.lastToAdminMessage().getHeader() - .getInt(MsgSeqNum.FIELD)); - assertEquals(2, session.getStore().getNextTargetMsgSeqNum()); - assertEquals(2, session.getStore().getNextSenderMsgSeqNum()); - - session.next(createHeartbeatMessage(1002)); - assertNotEquals(ResendRequest.MSGTYPE, application - .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); - - session.next(createHeartbeatMessage(1003)); - assertNotEquals(ResendRequest.MSGTYPE, application - .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); - - session.next(createHeartbeatMessage(1001)); - assertNotEquals(ResendRequest.MSGTYPE, application - .lastToAdminMessage().getHeader().getString(MsgType.FIELD)); - } - } }