Skip to content

Commit

Permalink
Merge pull request #87 from RI-SE/feature_mqttInCITSModule
Browse files Browse the repository at this point in the history
Feature mqtt in cits module
  • Loading branch information
viktorjo authored Jul 5, 2019
2 parents 970fc1d + a6d924a commit c5d2213
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 21 deletions.
50 changes: 49 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,62 @@ cp -R ../conf/ .
Create a folder for Trajectory files in /build and move one of the existing trajectory files to this folder.
```sh
mkdir traj
cp ../traj/0.traj ./traj/192.168.0.1
cp ../traj/0.traj ./traj/192.168.0.1
```

Start the server
```sh
./TEServer
```

## Building the server with CITS module and mqtt

The CITS module uses PAHO MQTT, which can be found through the following link:
https://www.eclipse.org/paho/

To be able to run the server with the CITS module you must first build and install paho mqtt.

Paho mqtt requires OpenSSL to be able to run. To install OpenSSL do
```sh
apt-get install libssl-dev
```
In order to get and build the documentation for paho mqtt, do the following
```sh
apt-get install doxygen graphviz
```

Now get the latest source code for paho mqtt
```sh
git clone https://github.com/eclipse/paho.mqtt.c.git
```

Go to the root of the cloned git repo and build the documentation by doing
```sh
cd paho.mqtt.c.git
sudo make html
```
This will build the documentation for all the code. Then proceede to build and install paho
```sh
sudo make
sudo make install
```

The server will not bu default build the CITS module. This is to prevent the use of the CITS module when it is not necessary. To enable building of the module, run `cmake` from the `build/` directory
```sh
cmake "Unix Makefiles" -DUSE_CITS:BOOL=TRUE ..
```
then you can build and run the server as normal
```sh
make
./TEServer
```

To disable the CITS module, remake the `cmake` procedure

```sh
cmake "Unix Makefiles" -DUSE_CITS:BOOL=FALSE ..
```

# To communicate with server start program.
./UserControl [IP] [port]

Expand Down
30 changes: 29 additions & 1 deletion server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,19 @@ include(GNUInstallDirs)
#add_library(MaestroLogging SHARED IMPORTED)
#set_property(TARGET MaestroLogging PROPERTY IMPORTED_LOCATION ${MAESTRO_LOGGING_PATH})

SET(USE_CITS FALSE CACHE BOOL "Flag to indicate the use of CITS.")

# MQTT PAHO
if(USE_CITS)
find_package(OpenSSL REQUIRED)
find_library(paho-mqtt3c NAMES libpaho-mqtt3c.so REQUIRED)
add_library(pahomqtt3c SHARED IMPORTED)
set_property(TARGET pahomqtt3c PROPERTY IMPORTED_LOCATION ${paho-mqtt3c})
add_library(cits
src/citscontrol.c
inc/citscontrol.h
)
endif()

# Create library
add_library(MaestroLogging
Expand All @@ -40,6 +53,8 @@ add_library(MQBus
../util/C/MQBus/mqbus.c
)



# Create library
add_library(util
src/util.c
Expand All @@ -56,6 +71,13 @@ add_executable(UserControl
src/usercontrol.c
)

# TODO: Make citscontrol into a separate program that is started when the server starts
#add_executable(CitsControl
# inc/citscontrol.h
# src/citscontrol.c
#)


# add the executable
add_executable(TEServer
src/main.c
Expand All @@ -70,7 +92,6 @@ add_executable(TEServer
inc/supervision.h
inc/systemcontrol.h
inc/timecontrol.h

inc/supervisorcontrol.h
)

Expand All @@ -86,4 +107,11 @@ target_link_libraries(util MaestroLogging MaestroTime MQBus)
target_link_libraries(UserControl util objctrl MaestroLogging MaestroTime)

target_link_libraries(TEServer util)

if(USE_CITS)
target_link_libraries(cits pahomqtt3c)
target_link_libraries(TEServer cits)
endif()


target_link_libraries(TEServer MaestroLogging MaestroTime MQBus)
7 changes: 4 additions & 3 deletions server/inc/citscontrol.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
#ifndef __CITSCONTROL_H_INCLUDED__
#define __CITSCONTROL_H_INCLUDED__


#include "util.h"
#include "logging.h"

/*------------------------------------------------------------
-- Function declarations.
------------------------------------------------------------*/
int citscontrol_task(TimeType *GPSTime, GSDType *GSD);
void citscontrol_task(TimeType *GPSTime, GSDType *GSD, LOG_LEVEL logLevel);


#endif
#endif
184 changes: 168 additions & 16 deletions server/src/citscontrol.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
/*------------------------------------------------------------
-- Include files.
------------------------------------------------------------*/


#include <sys/time.h>
#include <stdio.h>
#include <string.h>
Expand All @@ -25,48 +27,198 @@
#include <poll.h>
#include <netdb.h>

#include "util.h"
#include "logger.h"
#include "MQTTClient.h"
#include "citscontrol.h"



#define CITS_CONTROL_CONF_FILE_PATH "conf/test.conf"
#define CITS_CONTROL_BUFFER_SIZE_20 20
#define CITS_CONTROL_BUFFER_SIZE_52 52
#define CITS_CONTROL_TASK_PERIOD_MS 1

#define DEFAULT_MQTT_ADDRESS "tcp://localhost:1883"
#define ERICSSON_MQTT_ADDRESS "tcp://10.130.100.18:1883"
#define DEFAULT_MQTT_CLIENTID "ExampleClientPub1"
#define DEFAULT_MQTT_TOPIC "CLIENT/CAM/CS01/1/AZ12B"
#define DEFAULT_MQTT_PAYLOAD "Hello World!"
#define DEFAULT_MQTT_QOS 1
#define DEFAULT_MQTT_TIMEOUT 10000L

#define MODULE_NAME "CitsControl"

/*------------------------------------------------------------
-- Function declarations.
------------------------------------------------------------*/

void init_mqtt(char* ip_addr, char * clientid);
int connect_mqtt();
int publish_mqtt(char *payload, int payload_len, char *topic);
void delivered_mqtt(void *context, MQTTClient_deliveryToken dt);
int msgarrvd_mqtt(void *context, char *topicName, int topicLen, MQTTClient_message *message);
void connlost_mqtt(void *context, char *cause);



/*------------------------------------------------------------
-- Private variables
------------------------------------------------------------*/

enum CITS_STATE {
INIT,
DISCONNECTED,
CONNECTED,
SENDING
};

static int state = INIT;
static volatile int pending_state = INIT;

static MQTTClient client;
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static volatile MQTTClient_deliveryToken deliveredtoken = 0;
static MQTTClient_deliveryToken sendtoken = 0;
/*------------------------------------------------------------
-- The main function.
------------------------------------------------------------*/
int citscontrol_task(TimeType *GPSTime, GSDType *GSD)
void citscontrol_task(TimeType *GPSTime, GSDType *GSD, LOG_LEVEL logLevel)
{

I32 iExit = 0, iCommand;
C8 MqRecvBuffer[MQ_MAX_MESSAGE_LENGTH];
(void)iCommInit(IPC_RECV_SEND,MQ_LG,0);
I32 iExit = 0;
char busReceiveBuffer[MBUS_MAX_DATALEN]; //!< Buffer for receiving from message bus
enum COMMAND command;
int mqtt_response_code = 0;

LogInit(MODULE_NAME,LOG_LEVEL_DEBUG);
LogMessage(LOG_LEVEL_INFO, "CITS running with PID: %i", getpid());

(void)iCommInit();

(void)init_mqtt(ERICSSON_MQTT_ADDRESS,DEFAULT_MQTT_CLIENTID);

MQTTClient_setCallbacks(client, NULL, connlost_mqtt, msgarrvd_mqtt, delivered_mqtt);


LogMessage(LOG_LEVEL_INFO,"Starting cits control...\n");
while(!iExit)
{

// Handle states specific things
state = pending_state;

//LogMessage(LOG_LEVEL_DEBUG,"CITS state %d",state);
switch (state) {
case INIT:

if (!connect_mqtt()){
LogMessage(LOG_LEVEL_INFO,"Connected!");
//MQTTClient_subscribe(client,DEFAULT_MQTT_TOPIC,DEFAULT_MQTT_QOS);
pending_state = CONNECTED;
LogMessage(LOG_LEVEL_DEBUG,"CITS state change from %d to %d",state,pending_state);
}
break;
case CONNECTED:

if ((mqtt_response_code = publish_mqtt(DEFAULT_MQTT_PAYLOAD,strlen(DEFAULT_MQTT_PAYLOAD),DEFAULT_MQTT_TOPIC))) {
LogMessage(LOG_LEVEL_ERROR,"Could not publish message, error code %d", mqtt_response_code);
}
else {
pending_state = SENDING;
}

break;
case SENDING:
if (sendtoken == deliveredtoken) {
pending_state = CONNECTED;
LogMessage(LOG_LEVEL_DEBUG,"CITS state change from %d to %d",state,pending_state);
}
break;
}

// Handle MQ messages

bzero(busReceiveBuffer, sizeof(busReceiveBuffer));
(void)iCommRecv(&command,busReceiveBuffer, sizeof(busReceiveBuffer), NULL);

if(command == COMM_EXIT)
{
iExit = 1;
printf("citscontrol exiting.\n");
(void)iCommClose();
}
//usleep(100000);
}
}

printf("Starting cits control...\n");
while(!iExit)
{
void init_mqtt(char* ip_addr, char * clientid){


bzero(MqRecvBuffer,MQ_MAX_MESSAGE_LENGTH);
(void)iCommRecv(&iCommand,MqRecvBuffer,MQ_MAX_MESSAGE_LENGTH, NULL);
MQTTClient_create(&client, ip_addr, clientid,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
}

if(iCommand == COMM_EXIT)
int connect_mqtt(){
int return_code;
if ((return_code = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
iExit = 1;
printf("citscontrol exiting.\n");
(void)iCommClose();
LogMessage(LOG_LEVEL_ERROR,"Failed to connect, return code %d\n", return_code);
return 1;
}
return 0;
}

int publish_mqtt(char *payload, int payload_len, char *topic){

}
pubmsg.payload = payload;
pubmsg.payloadlen = payload_len;
pubmsg.qos = DEFAULT_MQTT_QOS;
pubmsg.retained = 0;
deliveredtoken = 0;

int retval = MQTTClient_publishMessage(client, topic, &pubmsg, &sendtoken);
if (!retval) {
LogMessage(LOG_LEVEL_INFO,"Waiting for publication of %s\n on topic %s for client with ClitentID %s",payload, topic, DEFAULT_MQTT_CLIENTID );
}
return retval;
}

int msgarrvd_mqtt(void *context, char *topicName, int topicLen, MQTTClient_message *message){
(void)context;
(void)topicLen;
int i;
char* payloadptr;
LogMessage(LOG_LEVEL_DEBUG,"Message arrived! Length=%d",message->payloadlen);
//if (message->payloadlen == 0) return 1;
//else if (topicLen == 0) return 2;
//if(message->payloadlen > 0) {
// LogMessage(LOG_LEVEL_DEBUG,"\n\tTopic: %s\n\tmessage: %s",topicName,message->payload);
//}
//printf("Message arrived\n");
//printf(" topic: %s\n", topicName);
//printf(" message: ");
//payloadptr = message->payload;
/*
for(i=0; i<message->payloadlen; i++)
{
putchar(*payloadptr++);
}
putchar('\n');
*/
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 0;
}

void delivered_mqtt(void *context, MQTTClient_deliveryToken dt)
{
LogMessage(LOG_LEVEL_DEBUG,"Message delivered with token: %d .", dt);
deliveredtoken = dt;
}

void connlost_mqtt(void *context, char *cause){
LogMessage(LOG_LEVEL_DEBUG,"Connection Lost.\n Cause: %s",cause);
printf("Connection lost \n");
}
Loading

0 comments on commit c5d2213

Please sign in to comment.