From 58da7e16b3bb44080f28ba815c0c2b12f32bf26f Mon Sep 17 00:00:00 2001 From: Adam Gorak Date: Tue, 19 Dec 2017 13:14:25 +0100 Subject: [PATCH] Current Due Slot to be configurable with its base (specific to SMSCGW instance). --- .../mobicents/smsc/domain/SmscManagement.java | 2 +- .../smsc/domain/SmscPropertiesManagement.java | 20 +++++++++++ .../domain/SmscPropertiesManagementMBean.java | 17 ++++++++- .../smsc/cassandra/DBOperations.java | 35 ++++++++++++++----- 4 files changed, 64 insertions(+), 10 deletions(-) diff --git a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscManagement.java b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscManagement.java index 28522e63d..288055942 100644 --- a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscManagement.java +++ b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscManagement.java @@ -256,7 +256,7 @@ public void start() throws Exception { this.smscPropertiesManagement.getCassandraUser(), this.smscPropertiesManagement.getCassandraPass(), this.smscPropertiesManagement.getFirstDueDelay(), this.smscPropertiesManagement.getReviseSecondsOnSmscStart(), this.smscPropertiesManagement.getProcessingSmsSetTimeout(), this.smscPropertiesManagement.getMinMessageId(), - this.smscPropertiesManagement.getMaxMessageId()); + this.smscPropertiesManagement.getMaxMessageId(), this.smscPropertiesManagement.getCassandraCurrentDueSlotBase()); // Step 3 SmsSetCashe.start() SmsSetCache.start(this.smscPropertiesManagement.getCorrelationIdLiveTime(), diff --git a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagement.java b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagement.java index b7c52260b..26456d142 100644 --- a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagement.java +++ b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagement.java @@ -140,6 +140,7 @@ public class SmscPropertiesManagement implements SmscPropertiesManagementMBean { private static final String CASSANDRA_USER = "cassandraUser"; private static final String CASSANDRA_PASS = "cassandraPass"; + private static final String CASSANDRA_CURRENT_DUE_SLOT_BASE = "cassandraCurrentDueSlotBase"; private static final String TAB_INDENT = "\t"; private static final String CLASS_ATTRIBUTE = "type"; @@ -205,6 +206,8 @@ public class SmscPropertiesManagement implements SmscPropertiesManagementMBean { // credential for cassandra private String cassandraUser = "cassandra"; private String cassandraPass = "cassandra"; + + private int cassandraCurrentDueSlotBase; // period of fetching messages from a database for delivering // private long fetchPeriod = 5000; // that was C1 @@ -617,6 +620,17 @@ public void setCassandraPass(String pass) throws IllegalArgumentException { public String getCassandraPass() { return this.cassandraPass; } + + @Override + public void setCassandraCurrentDueSlotBase(final int aBase) throws IllegalArgumentException { + cassandraCurrentDueSlotBase = aBase; + store(); + } + + @Override + public int getCassandraCurrentDueSlotBase() { + return cassandraCurrentDueSlotBase; + } @Override public int getMaxMessageLengthReducer() { @@ -1559,6 +1573,7 @@ public void store() { writer.write(this.cassandraUser, CASSANDRA_USER, String.class); writer.write(this.cassandraPass, CASSANDRA_PASS, String.class); + writer.write(this.cassandraCurrentDueSlotBase, CASSANDRA_CURRENT_DUE_SLOT_BASE, Integer.class); writer.write(this.diameterDestRealm, DIAMETER_DEST_REALM, String.class); writer.write(this.diameterDestHost, DIAMETER_DEST_HOST, String.class); @@ -1891,6 +1906,11 @@ public void load() throws FileNotFoundException { vals = reader.read(CASSANDRA_PASS, String.class); if (vals != null) this.cassandraPass = vals; + + val = reader.read(CASSANDRA_CURRENT_DUE_SLOT_BASE, Integer.class); + if (val != null) { + cassandraCurrentDueSlotBase = val; + } this.diameterDestRealm = reader.read(DIAMETER_DEST_REALM, String.class); diff --git a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagementMBean.java b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagementMBean.java index 9004630ec..8d531140a 100644 --- a/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagementMBean.java +++ b/core/domain/src/main/java/org/mobicents/smsc/domain/SmscPropertiesManagementMBean.java @@ -1,5 +1,5 @@ /* - * TeleStax, Open Source Cloud Communications + * Telestax, Open Source Cloud Communications Copyright 2011-2017, * Copyright 2012, Telestax Inc and individual contributors * by the @authors tag. See the copyright.txt in the distribution for a * full listing of individual contributors. @@ -384,5 +384,20 @@ public interface SmscPropertiesManagementMBean { public void setCassandraPass(String pass) throws IllegalArgumentException; public String getCassandraPass(); + + /** + * Sets the cassandra current due slot base. + * + * @param aBase the new cassandra current due slot base + * @throws IllegalArgumentException the illegal argument exception + */ + void setCassandraCurrentDueSlotBase(int aBase) throws IllegalArgumentException; + + /** + * Gets the cassandra current due slot base. + * + * @return the cassandra current due slot base + */ + int getCassandraCurrentDueSlotBase() ; } diff --git a/core/smsc-common-library/src/main/java/org/mobicents/smsc/cassandra/DBOperations.java b/core/smsc-common-library/src/main/java/org/mobicents/smsc/cassandra/DBOperations.java index 5005f6bfb..35f200487 100644 --- a/core/smsc-common-library/src/main/java/org/mobicents/smsc/cassandra/DBOperations.java +++ b/core/smsc-common-library/src/main/java/org/mobicents/smsc/cassandra/DBOperations.java @@ -71,10 +71,10 @@ import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.exceptions.InvalidQueryException; + /** * * @author sergey vetyutnev - * */ public class DBOperations { private static final Logger logger = Logger.getLogger(DBOperations.class); @@ -132,6 +132,8 @@ public class DBOperations { private long minMessageId; // Max value of messageId value for SMPP responses private long maxMessageId; + + private int itsCurrentDueSlotBase; // data for processing @@ -213,9 +215,17 @@ protected Cluster getCluster() { protected Session getSession() { return this.session; } + + public void start(String hosts, int port, String keyspace, String user, String password, int secondsForwardStoring, + int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId) + throws Exception { + start(hosts, port, keyspace, user, password, secondsForwardStoring, reviseSecondsOnSmscStart, + processingSmsSetTimeout, minMessageId, maxMessageId, 0); + } public void start(String hosts, int port, String keyspace, String user, String password, int secondsForwardStoring, - int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId) throws Exception { + int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId, + final int aCurrentDueSlotBase) throws Exception { if (this.started) { throw new IllegalStateException("DBOperations already started"); } @@ -227,6 +237,7 @@ public void start(String hosts, int port, String keyspace, String user, String p this.processingSmsSetTimeout = processingSmsSetTimeout; this.minMessageId = minMessageId; this.maxMessageId = maxMessageId; + itsCurrentDueSlotBase = aCurrentDueSlotBase; this.pcsDate = null; currentSessionUUID = UUID.randomUUID(); @@ -317,7 +328,7 @@ public void start(String hosts, int port, String keyspace, String user, String p + "\" LIMIT " + row_count + ";"); try { - currentDueSlot = c2_getCurrentSlotTable(CURRENT_DUE_SLOT); + currentDueSlot = c2_getCurrentSlotTable(getCurrentDueSlotKey()); if (currentDueSlot == 0) { // not yet set long l1 = this.c2_getDueSlotForTime(new Date()); @@ -326,9 +337,9 @@ public void start(String hosts, int port, String keyspace, String user, String p this.c2_setCurrentDueSlot(currentDueSlot - dueSlotReviseOnSmscStart); } - messageId = c2_getCurrentSlotTable(NEXT_MESSAGE_ID); + messageId = c2_getCurrentSlotTable(getNextMessageIdDueSlotKey()); messageId += MESSAGE_ID_LAG; - c2_setCurrentSlotTable(NEXT_MESSAGE_ID, messageId); + c2_setCurrentSlotTable(getNextMessageIdDueSlotKey(), messageId); } catch (Exception e1) { String msg = "Failed reading a currentDueSlot !"; throw new PersistenceException(msg, e1); @@ -336,7 +347,7 @@ public void start(String hosts, int port, String keyspace, String user, String p this.started = true; } - + public void stop() throws Exception { if (!this.started) return; @@ -392,7 +403,7 @@ public long c2_getCurrentDueSlot() { public void c2_setCurrentDueSlot(long newDueSlot) throws PersistenceException { currentDueSlot = newDueSlot; - c2_setCurrentSlotTable(CURRENT_DUE_SLOT, newDueSlot); + c2_setCurrentSlotTable(getCurrentDueSlotKey(), newDueSlot); } /** @@ -407,7 +418,7 @@ public synchronized long c2_getNextMessageId() { messageId = minMessageId; if (messageId % MESSAGE_ID_LAG == 0 && databaseAvailable) { try { - c2_setCurrentSlotTable(NEXT_MESSAGE_ID, messageId); + c2_setCurrentSlotTable(getNextMessageIdDueSlotKey(), messageId); } catch (PersistenceException e) { logger.error("Exception when storing next messageId to the database: " + e.getMessage(), e); } @@ -2498,6 +2509,14 @@ protected String[] getLiveTableListAsNames(String keyspace) { return sss; } + private int getCurrentDueSlotKey() { + return itsCurrentDueSlotBase + CURRENT_DUE_SLOT; + } + + private int getNextMessageIdDueSlotKey() { + return itsCurrentDueSlotBase + NEXT_MESSAGE_ID; + } + private String[] getArchiveTableListAsNames(String keyspace) { String[] ss = this.c2_getTableList(keyspace);