Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configurable Due Slot Keys #274

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public void start() throws Exception {
this.smscPropertiesManagement.getCassandraUser(), this.smscPropertiesManagement.getCassandraPass(),
this.smscPropertiesManagement.getFirstDueDelay(), this.smscPropertiesManagement.getReviseSecondsOnSmscStart(),
this.smscPropertiesManagement.getProcessingSmsSetTimeout(), this.smscPropertiesManagement.getMinMessageId(),
this.smscPropertiesManagement.getMaxMessageId());
this.smscPropertiesManagement.getMaxMessageId(), this.smscPropertiesManagement.getCassandraCurrentDueSlotBase());

// Step 3 SmsSetCashe.start()
SmsSetCache.start(this.smscPropertiesManagement.getCorrelationIdLiveTime(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public class SmscPropertiesManagement implements SmscPropertiesManagementMBean {

private static final String CASSANDRA_USER = "cassandraUser";
private static final String CASSANDRA_PASS = "cassandraPass";
private static final String CASSANDRA_CURRENT_DUE_SLOT_BASE = "cassandraCurrentDueSlotBase";

private static final String TAB_INDENT = "\t";
private static final String CLASS_ATTRIBUTE = "type";
Expand Down Expand Up @@ -205,6 +206,8 @@ public class SmscPropertiesManagement implements SmscPropertiesManagementMBean {
// credential for cassandra
private String cassandraUser = "cassandra";
private String cassandraPass = "cassandra";

private int cassandraCurrentDueSlotBase;

// period of fetching messages from a database for delivering
// private long fetchPeriod = 5000; // that was C1
Expand Down Expand Up @@ -617,6 +620,17 @@ public void setCassandraPass(String pass) throws IllegalArgumentException {
public String getCassandraPass() {
return this.cassandraPass;
}

@Override
public void setCassandraCurrentDueSlotBase(final int aBase) throws IllegalArgumentException {
cassandraCurrentDueSlotBase = aBase;
store();
}

@Override
public int getCassandraCurrentDueSlotBase() {
return cassandraCurrentDueSlotBase;
}

@Override
public int getMaxMessageLengthReducer() {
Expand Down Expand Up @@ -1559,6 +1573,7 @@ public void store() {

writer.write(this.cassandraUser, CASSANDRA_USER, String.class);
writer.write(this.cassandraPass, CASSANDRA_PASS, String.class);
writer.write(this.cassandraCurrentDueSlotBase, CASSANDRA_CURRENT_DUE_SLOT_BASE, Integer.class);

writer.write(this.diameterDestRealm, DIAMETER_DEST_REALM, String.class);
writer.write(this.diameterDestHost, DIAMETER_DEST_HOST, String.class);
Expand Down Expand Up @@ -1891,6 +1906,11 @@ public void load() throws FileNotFoundException {
vals = reader.read(CASSANDRA_PASS, String.class);
if (vals != null)
this.cassandraPass = vals;

val = reader.read(CASSANDRA_CURRENT_DUE_SLOT_BASE, Integer.class);
if (val != null) {
cassandraCurrentDueSlotBase = val;
}

this.diameterDestRealm = reader.read(DIAMETER_DEST_REALM, String.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* TeleStax, Open Source Cloud Communications
* Telestax, Open Source Cloud Communications Copyright 2011-2017,
* Copyright 2012, Telestax Inc and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
Expand Down Expand Up @@ -384,5 +384,20 @@ public interface SmscPropertiesManagementMBean {
public void setCassandraPass(String pass) throws IllegalArgumentException;

public String getCassandraPass();

/**
* Sets the cassandra current due slot base.
*
* @param aBase the new cassandra current due slot base
* @throws IllegalArgumentException the illegal argument exception
*/
void setCassandraCurrentDueSlotBase(int aBase) throws IllegalArgumentException;

/**
* Gets the cassandra current due slot base.
*
* @return the cassandra current due slot base
*/
int getCassandraCurrentDueSlotBase() ;

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@
import com.datastax.driver.core.TableMetadata;
import com.datastax.driver.core.exceptions.InvalidQueryException;


/**
*
* @author sergey vetyutnev
*
*/
public class DBOperations {
private static final Logger logger = Logger.getLogger(DBOperations.class);
Expand Down Expand Up @@ -132,6 +132,8 @@ public class DBOperations {
private long minMessageId;
// Max value of messageId value for SMPP responses
private long maxMessageId;

private int itsCurrentDueSlotBase;

// data for processing

Expand Down Expand Up @@ -213,9 +215,17 @@ protected Cluster getCluster() {
protected Session getSession() {
return this.session;
}

public void start(String hosts, int port, String keyspace, String user, String password, int secondsForwardStoring,
int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId)
throws Exception {
start(hosts, port, keyspace, user, password, secondsForwardStoring, reviseSecondsOnSmscStart,
processingSmsSetTimeout, minMessageId, maxMessageId, 0);
}

public void start(String hosts, int port, String keyspace, String user, String password, int secondsForwardStoring,
int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId) throws Exception {
int reviseSecondsOnSmscStart, int processingSmsSetTimeout, long minMessageId, long maxMessageId,
final int aCurrentDueSlotBase) throws Exception {
if (this.started) {
throw new IllegalStateException("DBOperations already started");
}
Expand All @@ -227,6 +237,7 @@ public void start(String hosts, int port, String keyspace, String user, String p
this.processingSmsSetTimeout = processingSmsSetTimeout;
this.minMessageId = minMessageId;
this.maxMessageId = maxMessageId;
itsCurrentDueSlotBase = aCurrentDueSlotBase;

this.pcsDate = null;
currentSessionUUID = UUID.randomUUID();
Expand Down Expand Up @@ -317,7 +328,7 @@ public void start(String hosts, int port, String keyspace, String user, String p
+ "\" LIMIT " + row_count + ";");

try {
currentDueSlot = c2_getCurrentSlotTable(CURRENT_DUE_SLOT);
currentDueSlot = c2_getCurrentSlotTable(getCurrentDueSlotKey());
if (currentDueSlot == 0) {
// not yet set
long l1 = this.c2_getDueSlotForTime(new Date());
Expand All @@ -326,17 +337,17 @@ public void start(String hosts, int port, String keyspace, String user, String p
this.c2_setCurrentDueSlot(currentDueSlot - dueSlotReviseOnSmscStart);
}

messageId = c2_getCurrentSlotTable(NEXT_MESSAGE_ID);
messageId = c2_getCurrentSlotTable(getNextMessageIdDueSlotKey());
messageId += MESSAGE_ID_LAG;
c2_setCurrentSlotTable(NEXT_MESSAGE_ID, messageId);
c2_setCurrentSlotTable(getNextMessageIdDueSlotKey(), messageId);
} catch (Exception e1) {
String msg = "Failed reading a currentDueSlot !";
throw new PersistenceException(msg, e1);
}

this.started = true;
}

public void stop() throws Exception {
if (!this.started)
return;
Expand Down Expand Up @@ -392,7 +403,7 @@ public long c2_getCurrentDueSlot() {
public void c2_setCurrentDueSlot(long newDueSlot) throws PersistenceException {
currentDueSlot = newDueSlot;

c2_setCurrentSlotTable(CURRENT_DUE_SLOT, newDueSlot);
c2_setCurrentSlotTable(getCurrentDueSlotKey(), newDueSlot);
}

/**
Expand All @@ -407,7 +418,7 @@ public synchronized long c2_getNextMessageId() {
messageId = minMessageId;
if (messageId % MESSAGE_ID_LAG == 0 && databaseAvailable) {
try {
c2_setCurrentSlotTable(NEXT_MESSAGE_ID, messageId);
c2_setCurrentSlotTable(getNextMessageIdDueSlotKey(), messageId);
} catch (PersistenceException e) {
logger.error("Exception when storing next messageId to the database: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -2498,6 +2509,14 @@ protected String[] getLiveTableListAsNames(String keyspace) {
return sss;
}

private int getCurrentDueSlotKey() {
return itsCurrentDueSlotBase + CURRENT_DUE_SLOT;
}

private int getNextMessageIdDueSlotKey() {
return itsCurrentDueSlotBase + NEXT_MESSAGE_ID;
}

private String[] getArchiveTableListAsNames(String keyspace) {
String[] ss = this.c2_getTableList(keyspace);

Expand Down