From 6c65bc83b526d03ec64aba0dfda3db0228dd0281 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sat, 4 Apr 2015 15:20:41 +0200 Subject: [PATCH 01/13] Added the possibility to disable individual net services by setting the port to 0. --- dump1090.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) mode change 100644 => 100755 dump1090.c diff --git a/dump1090.c b/dump1090.c old mode 100644 new mode 100755 index 7fa5327a3..53469152e --- a/dump1090.c +++ b/dump1090.c @@ -1910,6 +1910,9 @@ void modesInitNet(void) { Modes.maxfd = -1; for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } int s = anetTcpServer(Modes.aneterr, modesNetServices[j].port, NULL); if (s == -1) { fprintf(stderr, "Error opening the listening port %d (%s): %s\n", @@ -1934,6 +1937,9 @@ void modesAcceptClients(void) { struct client *c; for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } fd = anetTcpAccept(Modes.aneterr, *modesNetServices[j].socket, NULL, &port); if (fd == -1) { @@ -2385,6 +2391,9 @@ void modesWaitReadableClients(int timeout_ms) { /* Set listening sockets to accept new clients ASAP. */ for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } int s = *modesNetServices[j].socket; FD_SET(s,&fds); if (s > maxfd) maxfd = s; @@ -2429,10 +2438,10 @@ void showHelp(void) { "--raw Show only messages hex values.\n" "--net Enable networking.\n" "--net-only Enable just networking, no RTL device or file used.\n" -"--net-ro-port TCP listening port for raw output (default: 30002).\n" -"--net-ri-port TCP listening port for raw input (default: 30001).\n" -"--net-http-port HTTP server port (default: 8080).\n" -"--net-sbs-port TCP listening port for BaseStation format output (default: 30003).\n" +"--net-ro-port TCP listening port for raw output (default: 30002, 0 to disable).\n" +"--net-ri-port TCP listening port for raw input (default: 30001, 0 to disable).\n" +"--net-http-port HTTP server port (default: 8080, 0 to disable).\n" +"--net-sbs-port TCP listening port for BaseStation format output (default: 30003, 0 to disable).\n" "--no-fix Disable single-bits error correction using CRC.\n" "--no-crc-check Disable messages with broken CRC (discouraged).\n" "--aggressive More CPU for more messages (two bits fixes, ...).\n" From 52162708afcb987a4c0ba48e4f277cc7d5599f3e Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sat, 4 Apr 2015 15:27:57 +0200 Subject: [PATCH 02/13] Added possibility to list all available devices without running the whole program. Was hard to try to see which device to choose when a wall of text just started rolling with ADS-B data, hence adding this flag so a user can just list the available devices and then rerun the program with the device-index flag to select the desired device. --- dump1090.c | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/dump1090.c b/dump1090.c index 53469152e..153e5ef45 100755 --- a/dump1090.c +++ b/dump1090.c @@ -331,10 +331,9 @@ void modesInit(void) { /* =============================== RTLSDR handling ========================== */ -void modesInitRTLSDR(void) { +void listDevices(void) { int j; int device_count; - int ppm_error = 0; char vendor[256], product[256], serial[256]; device_count = rtlsdr_get_device_count(); @@ -349,6 +348,12 @@ void modesInitRTLSDR(void) { fprintf(stderr, "%d: %s, %s, SN: %s %s\n", j, vendor, product, serial, (j == Modes.dev_index) ? "(currently selected)" : ""); } +} + +void modesInitRTLSDR(void) { + int ppm_error = 0; + + listDevices(); if (rtlsdr_open(&Modes.dev, Modes.dev_index) < 0) { fprintf(stderr, "Error opening the RTLSDR device: %s\n", @@ -2450,6 +2455,7 @@ void showHelp(void) { "--metric Use metric units (meters, km/h, ...).\n" "--snip Strip IQ file removing samples < level.\n" "--debug Debug mode (verbose), see README for details.\n" +"--list-devices Lists available devices and then exit.\n" "--help Show this help.\n" "\n" "Debug mode flags: d = Log frames decoded with errors\n" @@ -2495,7 +2501,10 @@ int main(int argc, char **argv) { if (!strcmp(argv[j],"--device-index") && more) { Modes.dev_index = atoi(argv[++j]); - } else if (!strcmp(argv[j],"--gain") && more) { + } else if(!strcmp(argv[j],"--list-devices")) { + listDevices(); + return 0; + } else if (!strcmp(argv[j],"--gain") && more) { Modes.gain = atof(argv[++j])*10; /* Gain is in tens of DBs */ } else if (!strcmp(argv[j],"--enable-agc")) { Modes.enable_agc++; From 73a3c6f81c1a4bedbe3f035dfdd9e3605dafe4b0 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sat, 4 Apr 2015 15:40:35 +0200 Subject: [PATCH 03/13] Added a quiet flag to stop dump1090 spamming the console in case one is running with the --net flag. --- dump1090.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dump1090.c b/dump1090.c index 153e5ef45..59f97c8ef 100755 --- a/dump1090.c +++ b/dump1090.c @@ -168,6 +168,7 @@ struct { int onlyaddr; /* Print only ICAO addresses. */ int metric; /* Use metric units. */ int aggressive; /* Aggressive detection algorithm. */ + int quiet; /* Quiet mode, not printing any human readable messages */ /* Interactive mode */ struct aircraft *aircrafts; @@ -274,6 +275,7 @@ void modesInitConfig(void) { Modes.interactive_ttl = MODES_INTERACTIVE_TTL; Modes.aggressive = 0; Modes.interactive_rows = getTermRows(); + Modes.quiet = 0; } void modesInit(void) { @@ -1553,7 +1555,7 @@ void useModesMessage(struct modesMessage *mm) { if (a && Modes.stat_sbs_connections > 0) modesSendSBSOutput(mm, a); /* Feed SBS output clients. */ } /* In non-interactive way, display messages on standard output. */ - if (!Modes.interactive) { + if (!Modes.interactive && !Modes.quiet) { displayModesMessage(mm); if (!Modes.raw && !Modes.onlyaddr) printf("\n"); } @@ -2456,6 +2458,7 @@ void showHelp(void) { "--snip Strip IQ file removing samples < level.\n" "--debug Debug mode (verbose), see README for details.\n" "--list-devices Lists available devices and then exit.\n" +"--quiet Makes the console quiet, for use with the --net flag.\n" "--help Show this help.\n" "\n" "Debug mode flags: d = Log frames decoded with errors\n" @@ -2566,6 +2569,8 @@ int main(int argc, char **argv) { } else if (!strcmp(argv[j],"--snip") && more) { snipMode(atoi(argv[++j])); exit(0); + } else if (!strcmp(argv[j],"--quiet")) { + Modes.quiet = 1; } else if (!strcmp(argv[j],"--help")) { showHelp(); exit(0); From 2d5bc759698ecdd70cec5c6cda620145b4d2d4e4 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sun, 19 Apr 2015 11:28:33 +0200 Subject: [PATCH 04/13] Switched to always use aggressive mode as performance impact is very low (measured on rPi). Also added MQTT support based on Paho https://www.eclipse.org/paho/clients/c/. --- Makefile | 6 +- dump1090.c | 51 ++++++++++------- mqtt.c | 157 +++++++++++++++++++++++++++++++++++++++++++++++++++++ mqtt.h | 16 ++++++ 4 files changed, 208 insertions(+), 22 deletions(-) create mode 100644 mqtt.c create mode 100644 mqtt.h diff --git a/Makefile b/Makefile index f9637b7eb..35b40c3ee 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr) -LDLIBS+=$(shell pkg-config --libs librtlsdr) -lpthread -lm +LDLIBS+=$(shell pkg-config --libs librtlsdr) -lpthread -lm -lpaho-mqtt3c CC?=gcc PROGNAME=dump1090 @@ -8,8 +8,8 @@ all: dump1090 %.o: %.c $(CC) $(CFLAGS) -c $< -dump1090: dump1090.o anet.o - $(CC) -g -o dump1090 dump1090.o anet.o $(LDFLAGS) $(LDLIBS) +dump1090: dump1090.o anet.o mqtt.o + $(CC) -g -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS) clean: rm -f *.o dump1090 diff --git a/dump1090.c b/dump1090.c index 59f97c8ef..f73d2409b 100755 --- a/dump1090.c +++ b/dump1090.c @@ -45,6 +45,7 @@ #include #include "rtl-sdr.h" #include "anet.h" +#include "mqtt.h" #define MODES_DEFAULT_RATE 2000000 #define MODES_DEFAULT_FREQ 1090000000 @@ -167,9 +168,13 @@ struct { int stats; /* Print stats at exit in --ifile mode. */ int onlyaddr; /* Print only ICAO addresses. */ int metric; /* Use metric units. */ - int aggressive; /* Aggressive detection algorithm. */ int quiet; /* Quiet mode, not printing any human readable messages */ + /* Mqtt configuration */ + char* mqtt_uri; + char* mqtt_username; + char* mqtt_password; + /* Interactive mode */ struct aircraft *aircrafts; long long interactive_last_update; /* Last screen update in milliseconds */ @@ -273,7 +278,6 @@ void modesInitConfig(void) { Modes.interactive = 0; Modes.interactive_rows = MODES_INTERACTIVE_ROWS; Modes.interactive_ttl = MODES_INTERACTIVE_TTL; - Modes.aggressive = 0; Modes.interactive_rows = getTermRows(); Modes.quiet = 0; } @@ -960,7 +964,7 @@ void decodeModesMessage(struct modesMessage *mm, unsigned char *msg) { if ((mm->errorbit = fixSingleBitErrors(msg,mm->msgbits)) != -1) { mm->crc = modesChecksum(msg,mm->msgbits); mm->crcok = 1; - } else if (Modes.aggressive && mm->msgtype == 17 && + } else if (mm->msgtype == 17 && (mm->errorbit = fixTwoBitsErrors(msg,mm->msgbits)) != -1) { mm->crc = modesChecksum(msg,mm->msgbits); @@ -1475,7 +1479,7 @@ void detectModeS(uint16_t *m, uint32_t mlen) { /* If we reached this point, and error is zero, we are very likely * with a Mode S message in our hands, but it may still be broken * and CRC may not be correct. This is handled by the next layer. */ - if (errors == 0 || (Modes.aggressive && errors < 3)) { + if (errors == 0 || (errors < 3)) { struct modesMessage mm; /* Decode the received message and update statistics */ @@ -1560,7 +1564,7 @@ void useModesMessage(struct modesMessage *mm) { if (!Modes.raw && !Modes.onlyaddr) printf("\n"); } /* Send data to connected clients. */ - if (Modes.net) { + if (Modes.net || Modes.mqtt_uri) { modesSendRawOutput(mm); /* Feed raw output clients. */ } } @@ -2023,17 +2027,21 @@ void modesSendAllClients(int service, void *msg, int len) { /* Write raw output to TCP clients. */ void modesSendRawOutput(struct modesMessage *mm) { - char msg[128], *p = msg; - int j; - - *p++ = '*'; - for (j = 0; j < mm->msgbits/8; j++) { - sprintf(p, "%02X", mm->msg[j]); - p += 2; - } - *p++ = ';'; - *p++ = '\n'; - modesSendAllClients(Modes.ros, msg, p-msg); + char msg[(mm->msgbits/4)+3], *p = msg; + int j; + *p++ = '*'; + for (j = 0; j < mm->msgbits/8; j++) { + sprintf(p, "%02X", mm->msg[j]); + p += 2; + } + *p++ = ';'; + *p++ = '\n'; + if (Modes.mqtt_uri) { + addRawMessageToMq(msg, p-msg); // TODO: Check if MQ really should be used, perhaps as a compile flag for both include and sending. + } + if (Modes.net) { + modesSendAllClients(Modes.ros, msg, p-msg); + } } @@ -2451,7 +2459,6 @@ void showHelp(void) { "--net-sbs-port TCP listening port for BaseStation format output (default: 30003, 0 to disable).\n" "--no-fix Disable single-bits error correction using CRC.\n" "--no-crc-check Disable messages with broken CRC (discouraged).\n" -"--aggressive More CPU for more messages (two bits fixes, ...).\n" "--stats With --ifile print stats at exit. No other output.\n" "--onlyaddr Show only ICAO addresses (testing purposes).\n" "--metric Use metric units (meters, km/h, ...).\n" @@ -2538,8 +2545,6 @@ int main(int argc, char **argv) { Modes.onlyaddr = 1; } else if (!strcmp(argv[j],"--metric")) { Modes.metric = 1; - } else if (!strcmp(argv[j],"--aggressive")) { - Modes.aggressive++; } else if (!strcmp(argv[j],"--interactive")) { Modes.interactive = 1; } else if (!strcmp(argv[j],"--interactive-rows")) { @@ -2571,6 +2576,12 @@ int main(int argc, char **argv) { exit(0); } else if (!strcmp(argv[j],"--quiet")) { Modes.quiet = 1; + } else if (!strcmp(argv[j],"--mqtt-uri") && more) { + Modes.mqtt_uri = argv[++j]; + } else if (!strcmp(argv[j],"--mqtt-username") && more) { + Modes.mqtt_username = argv[++j]; + } else if (!strcmp(argv[j],"--mqtt-password") && more) { + Modes.mqtt_password = argv[++j]; } else if (!strcmp(argv[j],"--help")) { showHelp(); exit(0); @@ -2588,6 +2599,7 @@ int main(int argc, char **argv) { /* Initialization */ modesInit(); + initMqConnection(Modes.mqtt_uri, Modes.mqtt_username, Modes.mqtt_password); if (Modes.net_only) { fprintf(stderr,"Net-only mode, no RTL device or file open.\n"); } else if (Modes.filename == NULL) { @@ -2653,6 +2665,7 @@ int main(int argc, char **argv) { } rtlsdr_close(Modes.dev); + shutdownMqConnection(); return 0; } diff --git a/mqtt.c b/mqtt.c new file mode 100644 index 000000000..0d39978fc --- /dev/null +++ b/mqtt.c @@ -0,0 +1,157 @@ +/* + * Mqtt.c + * + * Created on: Apr 18, 2015 + * Author: borax + */ +#include +#include +#include +#include +#include +#include "mqtt.h" +#include "MQTTClient.h" + +#define CLIENTID "Dump1090" +#define TOPIC "adsb/data/raw" +#define QOS 0 +#define TIMEOUT 10000L + +struct { + // Threading + pthread_t sender_thread; + pthread_mutex_t thread_lock; + pthread_cond_t thread_signal; + + // Internal message list + volatile int message_count; + struct queue_message *first_message; + struct queue_message *last_message; + + // MQTT connection options + char* uri; + char* username; + char* password; + MQTTClient client; + MQTTClient_deliveryToken token; +} Mqtt; + +struct queue_message { + char *message; + int length; + struct queue_message *next; +}; + +struct queue_message *popFirstMessageInQueue(); +void sendMessagesToMq(); +void sendMessageToMq(struct queue_message *msg); + +/* Initialize things */ +int initMqConnection(char* uri, char* username, char* password) { + printf("MQ connection settings: %s %s %s\n", uri, username, password); + Mqtt.uri = uri; + Mqtt.username = username; + Mqtt.password = password; + Mqtt.first_message = NULL; + Mqtt.message_count = 0; + + + pthread_mutex_init(&Mqtt.thread_lock,NULL); + pthread_cond_init(&Mqtt.thread_signal,NULL); + pthread_create(&Mqtt.sender_thread, NULL, sendMessagesToMq, NULL); + + return 0; +} + +void shutdownMqConnection() { + printf("Closing MQ connection\n"); + +} + +/* Queue functionality */ +void addRawMessageToMq(char *data, int length) { + if(!Mqtt.uri) { + return; + } + struct queue_message *tail = Mqtt.last_message; + struct queue_message *curr = malloc(sizeof(struct queue_message)); + curr->message = malloc(length+1); + memcpy(curr->message, data, length); + curr->message[length] = '\0'; + curr->length = length; + if(tail) { + tail->next = curr; + } else { + Mqtt.first_message = curr; + } + Mqtt.last_message = curr; + Mqtt.message_count++; + pthread_cond_signal(&Mqtt.thread_signal); +} + +struct queue_message *popFirstMessageInQueue() { + struct queue_message *msg = Mqtt.first_message; + Mqtt.first_message = Mqtt.first_message->next; + Mqtt.message_count--; + if(Mqtt.message_count == 0) { + Mqtt.first_message = 0; + Mqtt.last_message = 0; + } + return msg; +} + +/* Mqtt */ +void sendMessagesToMq() { + while(1) { + if (Mqtt.message_count == 0) { + pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); + continue; + } + else if(Mqtt.message_count > 0) { + struct queue_message *msg = popFirstMessageInQueue(); + sendMessageToMq(msg); + free(msg); + } + } +} + +void initiateConnection() { + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + int rc; + + MQTTClient_create(&Mqtt.client, Mqtt.uri, CLIENTID, + MQTTCLIENT_PERSISTENCE_NONE, NULL); + conn_opts.keepAliveInterval = 20; + conn_opts.cleansession = 1; + + if ((rc = MQTTClient_connect(Mqtt.client, &conn_opts)) != MQTTCLIENT_SUCCESS) + { + printf("Failed to connect, return code %d\n", rc); + } +} + +void destroyConnection() { + MQTTClient_disconnect(Mqtt.client, 10000); + MQTTClient_destroy(&Mqtt.client); +} + +void sendMessageToMq(struct queue_message *msg) { + if(!Mqtt.client) { + initiateConnection(); + } + + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = msg->message; + pubmsg.payloadlen = msg->length; + pubmsg.qos = QOS; + pubmsg.retained = 0; + + if(MQTTClient_publishMessage(Mqtt.client, TOPIC, &pubmsg, &Mqtt.token) != MQTTCLIENT_SUCCESS) { + destroyConnection(); + return; + } + if(MQTTClient_waitForCompletion(Mqtt.client, Mqtt.token, TIMEOUT) != MQTTCLIENT_SUCCESS) { + destroyConnection(); + return; + } +} diff --git a/mqtt.h b/mqtt.h new file mode 100644 index 000000000..2105da855 --- /dev/null +++ b/mqtt.h @@ -0,0 +1,16 @@ +/* + * Mqtt.h + * + * Created on: Apr 18, 2015 + * Author: borax + */ + +#ifndef MQTT_H_ +#define MQTT_H_ + +int initMqConnection(char* uri, char* username, char* password); +void shutdownMqConnection(); +void addRawMessageToMq(char *data, int length); + + +#endif /* MQTT_H_ */ From b0b025543181f7611076e49395e77c05f80fa6ca Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sun, 19 Apr 2015 11:41:59 +0200 Subject: [PATCH 05/13] Updated readme and added username/password MQTT functionality. --- README.md | 15 +++++++++++++++ mqtt.c | 5 +++++ 2 files changed, 20 insertions(+) diff --git a/README.md b/README.md index d18eb90e9..0dc09ce7b 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ The main features are: * CPR coordinates decoding and track calculation from velocity. * TCP server streaming and receiving raw data to/from connected clients (using --net). +* MQTT support. While from time to time I still add / fix stuff in my fork, I target minimalism of the implementation. However there is a @@ -30,8 +31,14 @@ available, developed by MalcolmRobb. Installation --- +Make sure that you have librtlsdr, libusb, libssl-dev and Paho MQTT lib +to be able to make this project. Google for them to find them. This has +been tested on Ubuntu and Raspberry Pi. + Type "make". + + Normal usage --- @@ -169,6 +176,14 @@ similar to: This can be used to feed data to various sharing sites without the need to use another decoder. +MQTT +--- +For basic operation, just specify the URI to send the data to with (no need for the --net flag): +./dump1090 --mqtt-uri "your-uri" + +You could also specify username and password if needed by using: +./dump1090 --mqtt-uri "your-uri" --mqtt-username "username" --mqtt-password "password" + Antenna --- diff --git a/mqtt.c b/mqtt.c index 0d39978fc..c66113a2e 100644 --- a/mqtt.c +++ b/mqtt.c @@ -124,6 +124,11 @@ void initiateConnection() { conn_opts.keepAliveInterval = 20; conn_opts.cleansession = 1; + if(Mqtt.username) { + conn_opts.username = Mqtt.username; + conn_opts.password = Mqtt.password; + } + if ((rc = MQTTClient_connect(Mqtt.client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); From 4c1b2d80f42c92636076e1779f900be198716caf Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Sun, 19 Apr 2015 11:45:41 +0200 Subject: [PATCH 06/13] Updated readme with some more details. --- README.md | 23 +++++++---------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 0dc09ce7b..3e8b61010 100644 --- a/README.md +++ b/README.md @@ -179,10 +179,12 @@ This can be used to feed data to various sharing sites without the need to use a MQTT --- For basic operation, just specify the URI to send the data to with (no need for the --net flag): -./dump1090 --mqtt-uri "your-uri" + + ./dump1090 --mqtt-uri "your-uri" You could also specify username and password if needed by using: -./dump1090 --mqtt-uri "your-uri" --mqtt-username "username" --mqtt-password "password" + + ./dump1090 --mqtt-uri "your-uri" --mqtt-username "username" --mqtt-password "password" Antenna --- @@ -208,20 +210,9 @@ resources: Aggressive mode --- -With --aggressive it is possible to activate the *aggressive mode* that is a -modified version of the Mode S packet detection and decoding. -THe aggresive mode uses more CPU usually (especially if there are many planes -sending DF17 packets), but can detect a few more messages. - -The algorithm in aggressive mode is modified in the following ways: - -* Up to two demodulation errors are tolerated (adjacent entires in the magnitude - vector with the same eight). Normally only messages without errors are - checked. -* It tries to fix DF17 messages trying every two bits combination. - -The use of aggressive mdoe is only advised in places where there is low traffic -in order to have a chance to capture some more messages. +This release always has aggressive mode active. I did some performance +measurements on an Raspberry PI and when receiving around 50 ADSB messages +per second the CPU-load just increased 3-4%. Debug mode --- From 92b2b39b7bad5eff76db93151509dcc56461823e Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Tue, 8 Sep 2015 07:28:38 +0000 Subject: [PATCH 07/13] Removed message count variable to make the code safer --- Makefile | 4 ++++ mqtt.c | 41 +++++++++++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index 35b40c3ee..f6065da32 100644 --- a/Makefile +++ b/Makefile @@ -13,3 +13,7 @@ dump1090: dump1090.o anet.o mqtt.o clean: rm -f *.o dump1090 + +release: + $(CC) -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS) + diff --git a/mqtt.c b/mqtt.c index c66113a2e..187abfc4c 100644 --- a/mqtt.c +++ b/mqtt.c @@ -9,6 +9,7 @@ #include #include #include +#include #include "mqtt.h" #include "MQTTClient.h" @@ -24,7 +25,7 @@ struct { pthread_cond_t thread_signal; // Internal message list - volatile int message_count; +// volatile int message_count; struct queue_message *first_message; struct queue_message *last_message; @@ -48,12 +49,13 @@ void sendMessageToMq(struct queue_message *msg); /* Initialize things */ int initMqConnection(char* uri, char* username, char* password) { + printf("MQ connection settings: %s %s %s\n", uri, username, password); Mqtt.uri = uri; Mqtt.username = username; Mqtt.password = password; Mqtt.first_message = NULL; - Mqtt.message_count = 0; +// Mqtt.message_count = 0; pthread_mutex_init(&Mqtt.thread_lock,NULL); @@ -75,9 +77,26 @@ void addRawMessageToMq(char *data, int length) { } struct queue_message *tail = Mqtt.last_message; struct queue_message *curr = malloc(sizeof(struct queue_message)); + +struct timeval tv; + +gettimeofday(&tv, NULL); + +unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; + + data[length-1] = '\0'; + +// sprintf(curr->message, "{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }", millisecondsSinceEpoch, data); + + curr->message = malloc(length+1); memcpy(curr->message, data, length); curr->message[length] = '\0'; + + + curr->length = length; if(tail) { tail->next = curr; @@ -85,32 +104,38 @@ void addRawMessageToMq(char *data, int length) { Mqtt.first_message = curr; } Mqtt.last_message = curr; - Mqtt.message_count++; +// Mqtt.message_count++; pthread_cond_signal(&Mqtt.thread_signal); + +// printf("MessageCount: %d \r", Mqtt.message_count); } struct queue_message *popFirstMessageInQueue() { struct queue_message *msg = Mqtt.first_message; - Mqtt.first_message = Mqtt.first_message->next; + //if(Mqtt.first_message != 0) { + Mqtt.first_message = Mqtt.first_message->next; +// } +/* Mqtt.message_count--; if(Mqtt.message_count == 0) { Mqtt.first_message = 0; Mqtt.last_message = 0; } +*/ return msg; } /* Mqtt */ void sendMessagesToMq() { while(1) { - if (Mqtt.message_count == 0) { + if (Mqtt.first_message == 0) { // (Mqtt.message_count == 0) { pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); continue; } - else if(Mqtt.message_count > 0) { + else { //if(Mqtt.message_count > 0) { struct queue_message *msg = popFirstMessageInQueue(); - sendMessageToMq(msg); - free(msg); + sendMessageToMq(msg); + free(msg); } } } From d552e6728690912e92a63b00292160003f446da1 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Tue, 8 Sep 2015 09:07:25 +0000 Subject: [PATCH 08/13] Removed dead code and added some debugging --- mqtt.c | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/mqtt.c b/mqtt.c index 187abfc4c..d19e9c9f9 100644 --- a/mqtt.c +++ b/mqtt.c @@ -55,8 +55,6 @@ int initMqConnection(char* uri, char* username, char* password) { Mqtt.username = username; Mqtt.password = password; Mqtt.first_message = NULL; -// Mqtt.message_count = 0; - pthread_mutex_init(&Mqtt.thread_lock,NULL); pthread_cond_init(&Mqtt.thread_signal,NULL); @@ -78,25 +76,22 @@ void addRawMessageToMq(char *data, int length) { struct queue_message *tail = Mqtt.last_message; struct queue_message *curr = malloc(sizeof(struct queue_message)); -struct timeval tv; + struct timeval tv; -gettimeofday(&tv, NULL); + gettimeofday(&tv, NULL); -unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; data[length-1] = '\0'; -// sprintf(curr->message, "{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }", millisecondsSinceEpoch, data); - + printf("{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }\n", millisecondsSinceEpoch, data); curr->message = malloc(length+1); memcpy(curr->message, data, length); curr->message[length] = '\0'; - - curr->length = length; if(tail) { tail->next = curr; @@ -104,35 +99,23 @@ unsigned long long millisecondsSinceEpoch = Mqtt.first_message = curr; } Mqtt.last_message = curr; -// Mqtt.message_count++; pthread_cond_signal(&Mqtt.thread_signal); - -// printf("MessageCount: %d \r", Mqtt.message_count); } struct queue_message *popFirstMessageInQueue() { struct queue_message *msg = Mqtt.first_message; - //if(Mqtt.first_message != 0) { - Mqtt.first_message = Mqtt.first_message->next; -// } -/* - Mqtt.message_count--; - if(Mqtt.message_count == 0) { - Mqtt.first_message = 0; - Mqtt.last_message = 0; - } -*/ + Mqtt.first_message = Mqtt.first_message->next; return msg; } /* Mqtt */ void sendMessagesToMq() { while(1) { - if (Mqtt.first_message == 0) { // (Mqtt.message_count == 0) { + if (Mqtt.first_message == 0) { pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); continue; } - else { //if(Mqtt.message_count > 0) { + else { struct queue_message *msg = popFirstMessageInQueue(); sendMessageToMq(msg); free(msg); From f2b690a36ac08db7f542969247bb875e3d0d60ce Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Thu, 10 Sep 2015 08:56:30 +0000 Subject: [PATCH 09/13] Working version in need of code cleanup. --- mqtt.c | 70 +++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/mqtt.c b/mqtt.c index d19e9c9f9..a82fc8b79 100644 --- a/mqtt.c +++ b/mqtt.c @@ -22,10 +22,14 @@ struct { // Threading pthread_t sender_thread; pthread_mutex_t thread_lock; + pthread_mutex_t thread_lock2; pthread_cond_t thread_signal; + pthread_cond_t thread_new_message; + + int testa; + int testb; // Internal message list -// volatile int message_count; struct queue_message *first_message; struct queue_message *last_message; @@ -57,9 +61,14 @@ int initMqConnection(char* uri, char* username, char* password) { Mqtt.first_message = NULL; pthread_mutex_init(&Mqtt.thread_lock,NULL); + pthread_mutex_init(&Mqtt.thread_lock2,NULL); pthread_cond_init(&Mqtt.thread_signal,NULL); + pthread_cond_init(&Mqtt.thread_new_message,NULL); pthread_create(&Mqtt.sender_thread, NULL, sendMessagesToMq, NULL); + Mqtt.testa = 0; + Mqtt.testb = 0; + return 0; } @@ -73,13 +82,10 @@ void addRawMessageToMq(char *data, int length) { if(!Mqtt.uri) { return; } - struct queue_message *tail = Mqtt.last_message; struct queue_message *curr = malloc(sizeof(struct queue_message)); - struct timeval tv; gettimeofday(&tv, NULL); - unsigned long long millisecondsSinceEpoch = (unsigned long long)(tv.tv_sec) * 1000 + (unsigned long long)(tv.tv_usec) / 1000; @@ -91,35 +97,61 @@ void addRawMessageToMq(char *data, int length) { curr->message = malloc(length+1); memcpy(curr->message, data, length); curr->message[length] = '\0'; - curr->length = length; - if(tail) { - tail->next = curr; - } else { - Mqtt.first_message = curr; - } - Mqtt.last_message = curr; - pthread_cond_signal(&Mqtt.thread_signal); + + addMessageToQueue(curr); } +void addMessageToQueue(struct queue_message *msg) { + pthread_mutex_lock(&Mqtt.thread_lock); +// pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); + struct queue_message *tail = Mqtt.last_message; + if(tail) { + tail->next = msg; + } else { + Mqtt.first_message = msg; + } + Mqtt.last_message = msg; + pthread_mutex_unlock(&Mqtt.thread_lock); +// pthread_cond_broadcast(&Mqtt.thread_new_message); +} + + struct queue_message *popFirstMessageInQueue() { + pthread_mutex_lock(&Mqtt.thread_lock); +/* while(Mqtt.first_message == 0) { + pthread_cond_wait(&Mqtt.thread_new_message,&Mqtt.thread_lock); + }*/ struct queue_message *msg = Mqtt.first_message; - Mqtt.first_message = Mqtt.first_message->next; +// Mqtt.first_message = Mqtt.first_message->next; + if(Mqtt.first_message == Mqtt.last_message) { + Mqtt.first_message = 0; + Mqtt.last_message = 0; + } else { + Mqtt.first_message = Mqtt.first_message->next; + } + pthread_mutex_unlock(&Mqtt.thread_lock); +// pthread_cond_broadcast(&Mqtt.thread_signal); return msg; } /* Mqtt */ void sendMessagesToMq() { while(1) { - if (Mqtt.first_message == 0) { - pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); - continue; - } - else { + if (Mqtt.first_message != NULL) { struct queue_message *msg = popFirstMessageInQueue(); + if(msg != NULL) { sendMessageToMq(msg); free(msg); + } +// usleep(50); } + usleep(50); +/* else { + printf("Waiting for data\n"); + sleep(1); + } +*/ } } @@ -153,6 +185,8 @@ void sendMessageToMq(struct queue_message *msg) { initiateConnection(); } + printf("Sending: %s \n", msg->message); + MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = msg->message; pubmsg.payloadlen = msg->length; From 9e910df29e1d7b19c446b42bad528959880e669b Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Thu, 10 Sep 2015 09:20:18 +0000 Subject: [PATCH 10/13] More cleanup to come... --- mqtt.c | 42 ++++++++---------------------------------- 1 file changed, 8 insertions(+), 34 deletions(-) diff --git a/mqtt.c b/mqtt.c index a82fc8b79..df6522044 100644 --- a/mqtt.c +++ b/mqtt.c @@ -22,12 +22,6 @@ struct { // Threading pthread_t sender_thread; pthread_mutex_t thread_lock; - pthread_mutex_t thread_lock2; - pthread_cond_t thread_signal; - pthread_cond_t thread_new_message; - - int testa; - int testb; // Internal message list struct queue_message *first_message; @@ -61,14 +55,8 @@ int initMqConnection(char* uri, char* username, char* password) { Mqtt.first_message = NULL; pthread_mutex_init(&Mqtt.thread_lock,NULL); - pthread_mutex_init(&Mqtt.thread_lock2,NULL); - pthread_cond_init(&Mqtt.thread_signal,NULL); - pthread_cond_init(&Mqtt.thread_new_message,NULL); pthread_create(&Mqtt.sender_thread, NULL, sendMessagesToMq, NULL); - Mqtt.testa = 0; - Mqtt.testb = 0; - return 0; } @@ -104,7 +92,6 @@ void addRawMessageToMq(char *data, int length) { void addMessageToQueue(struct queue_message *msg) { pthread_mutex_lock(&Mqtt.thread_lock); -// pthread_cond_wait(&Mqtt.thread_signal,&Mqtt.thread_lock); struct queue_message *tail = Mqtt.last_message; if(tail) { tail->next = msg; @@ -113,17 +100,12 @@ void addMessageToQueue(struct queue_message *msg) { } Mqtt.last_message = msg; pthread_mutex_unlock(&Mqtt.thread_lock); -// pthread_cond_broadcast(&Mqtt.thread_new_message); } struct queue_message *popFirstMessageInQueue() { pthread_mutex_lock(&Mqtt.thread_lock); -/* while(Mqtt.first_message == 0) { - pthread_cond_wait(&Mqtt.thread_new_message,&Mqtt.thread_lock); - }*/ struct queue_message *msg = Mqtt.first_message; -// Mqtt.first_message = Mqtt.first_message->next; if(Mqtt.first_message == Mqtt.last_message) { Mqtt.first_message = 0; Mqtt.last_message = 0; @@ -131,27 +113,24 @@ struct queue_message *popFirstMessageInQueue() { Mqtt.first_message = Mqtt.first_message->next; } pthread_mutex_unlock(&Mqtt.thread_lock); -// pthread_cond_broadcast(&Mqtt.thread_signal); return msg; } /* Mqtt */ void sendMessagesToMq() { while(1) { - if (Mqtt.first_message != NULL) { + if(!Mqtt.client) { + initiateConnection(); + } + if (Mqtt.first_message && Mqtt.client) { struct queue_message *msg = popFirstMessageInQueue(); - if(msg != NULL) { + if(msg) { sendMessageToMq(msg); free(msg); + continue; } -// usleep(50); - } - usleep(50); -/* else { - printf("Waiting for data\n"); - sleep(1); } -*/ + usleep(250); } } @@ -181,12 +160,6 @@ void destroyConnection() { } void sendMessageToMq(struct queue_message *msg) { - if(!Mqtt.client) { - initiateConnection(); - } - - printf("Sending: %s \n", msg->message); - MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = msg->message; pubmsg.payloadlen = msg->length; @@ -201,4 +174,5 @@ void sendMessageToMq(struct queue_message *msg) { destroyConnection(); return; } + printf("Sent: %s \n", msg->message); } From 9052a484be28df221b40fa1e7f1e4405c0376b15 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Thu, 10 Sep 2015 09:53:00 +0000 Subject: [PATCH 11/13] Code cleaned and seems to be working just fine. --- mqtt.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/mqtt.c b/mqtt.c index df6522044..790097622 100644 --- a/mqtt.c +++ b/mqtt.c @@ -79,13 +79,10 @@ void addRawMessageToMq(char *data, int length) { (unsigned long long)(tv.tv_usec) / 1000; data[length-1] = '\0'; - - printf("{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }\n", millisecondsSinceEpoch, data); - - curr->message = malloc(length+1); - memcpy(curr->message, data, length); - curr->message[length] = '\0'; - curr->length = length; + char *buf = malloc(100); + sprintf(buf, "{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }", millisecondsSinceEpoch, data); + curr->message = buf; + curr->length = strlen(curr->message); addMessageToQueue(curr); } @@ -126,6 +123,7 @@ void sendMessagesToMq() { struct queue_message *msg = popFirstMessageInQueue(); if(msg) { sendMessageToMq(msg); + free(msg->message); free(msg); continue; } @@ -150,7 +148,7 @@ void initiateConnection() { if ((rc = MQTTClient_connect(Mqtt.client, &conn_opts)) != MQTTCLIENT_SUCCESS) { - printf("Failed to connect, return code %d\n", rc); + //printf("Failed to connect, return code %d\n", rc); } } @@ -174,5 +172,4 @@ void sendMessageToMq(struct queue_message *msg) { destroyConnection(); return; } - printf("Sent: %s \n", msg->message); } From c3c8cc82a4d8639da72dd4988e43553eefa3bb1d Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Thu, 10 Sep 2015 12:40:36 +0000 Subject: [PATCH 12/13] Updated gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 0984522d7..d6300499f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ testfiles/*.bin misc frames.js .*.swp +start.sh From 3acc90919542b082bbded90d9c1b654e540cdb47 Mon Sep 17 00:00:00 2001 From: Ola Andersson Date: Thu, 4 Feb 2016 10:47:08 +0000 Subject: [PATCH 13/13] Changed from using MQTT to using REST instead. --- Makefile | 10 +-- dump1090.c | 25 +++----- mqtt.c | 175 ----------------------------------------------------- mqtt.h | 16 ----- rest.c | 154 ++++++++++++++++++++++++++++++++++++++++++++++ rest.h | 15 +++++ 6 files changed, 183 insertions(+), 212 deletions(-) delete mode 100644 mqtt.c delete mode 100644 mqtt.h create mode 100644 rest.c create mode 100644 rest.h diff --git a/Makefile b/Makefile index f6065da32..5f1c361ee 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr) -LDLIBS+=$(shell pkg-config --libs librtlsdr) -lpthread -lm -lpaho-mqtt3c +CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr json-c) +LDLIBS+=$(shell pkg-config --libs librtlsdr json-c ) -lpthread -lm -lcurl CC?=gcc PROGNAME=dump1090 @@ -8,12 +8,12 @@ all: dump1090 %.o: %.c $(CC) $(CFLAGS) -c $< -dump1090: dump1090.o anet.o mqtt.o - $(CC) -g -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS) +dump1090: dump1090.o anet.o rest.o + $(CC) -g -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS) clean: rm -f *.o dump1090 release: - $(CC) -o dump1090 dump1090.o anet.o mqtt.o $(LDFLAGS) $(LDLIBS) + $(CC) -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS) diff --git a/dump1090.c b/dump1090.c index f73d2409b..dfac44a47 100755 --- a/dump1090.c +++ b/dump1090.c @@ -45,7 +45,7 @@ #include #include "rtl-sdr.h" #include "anet.h" -#include "mqtt.h" +#include "rest.h" #define MODES_DEFAULT_RATE 2000000 #define MODES_DEFAULT_FREQ 1090000000 @@ -170,10 +170,8 @@ struct { int metric; /* Use metric units. */ int quiet; /* Quiet mode, not printing any human readable messages */ - /* Mqtt configuration */ - char* mqtt_uri; - char* mqtt_username; - char* mqtt_password; + /* REST configuration */ + char* rest_uri; /* Interactive mode */ struct aircraft *aircrafts; @@ -1564,7 +1562,7 @@ void useModesMessage(struct modesMessage *mm) { if (!Modes.raw && !Modes.onlyaddr) printf("\n"); } /* Send data to connected clients. */ - if (Modes.net || Modes.mqtt_uri) { + if (Modes.net || Modes.rest_uri) { modesSendRawOutput(mm); /* Feed raw output clients. */ } } @@ -2036,8 +2034,8 @@ void modesSendRawOutput(struct modesMessage *mm) { } *p++ = ';'; *p++ = '\n'; - if (Modes.mqtt_uri) { - addRawMessageToMq(msg, p-msg); // TODO: Check if MQ really should be used, perhaps as a compile flag for both include and sending. + if (Modes.rest_uri) { + addRawMessageToRestQueue(msg, p-msg); } if (Modes.net) { modesSendAllClients(Modes.ros, msg, p-msg); @@ -2576,12 +2574,8 @@ int main(int argc, char **argv) { exit(0); } else if (!strcmp(argv[j],"--quiet")) { Modes.quiet = 1; - } else if (!strcmp(argv[j],"--mqtt-uri") && more) { - Modes.mqtt_uri = argv[++j]; - } else if (!strcmp(argv[j],"--mqtt-username") && more) { - Modes.mqtt_username = argv[++j]; - } else if (!strcmp(argv[j],"--mqtt-password") && more) { - Modes.mqtt_password = argv[++j]; + } else if (!strcmp(argv[j],"--rest-uri") && more) { + Modes.rest_uri = argv[++j]; } else if (!strcmp(argv[j],"--help")) { showHelp(); exit(0); @@ -2599,7 +2593,7 @@ int main(int argc, char **argv) { /* Initialization */ modesInit(); - initMqConnection(Modes.mqtt_uri, Modes.mqtt_username, Modes.mqtt_password); + initRestConnection(Modes.rest_uri); if (Modes.net_only) { fprintf(stderr,"Net-only mode, no RTL device or file open.\n"); } else if (Modes.filename == NULL) { @@ -2665,7 +2659,6 @@ int main(int argc, char **argv) { } rtlsdr_close(Modes.dev); - shutdownMqConnection(); return 0; } diff --git a/mqtt.c b/mqtt.c deleted file mode 100644 index 790097622..000000000 --- a/mqtt.c +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Mqtt.c - * - * Created on: Apr 18, 2015 - * Author: borax - */ -#include -#include -#include -#include -#include -#include -#include "mqtt.h" -#include "MQTTClient.h" - -#define CLIENTID "Dump1090" -#define TOPIC "adsb/data/raw" -#define QOS 0 -#define TIMEOUT 10000L - -struct { - // Threading - pthread_t sender_thread; - pthread_mutex_t thread_lock; - - // Internal message list - struct queue_message *first_message; - struct queue_message *last_message; - - // MQTT connection options - char* uri; - char* username; - char* password; - MQTTClient client; - MQTTClient_deliveryToken token; -} Mqtt; - -struct queue_message { - char *message; - int length; - struct queue_message *next; -}; - -struct queue_message *popFirstMessageInQueue(); -void sendMessagesToMq(); -void sendMessageToMq(struct queue_message *msg); - -/* Initialize things */ -int initMqConnection(char* uri, char* username, char* password) { - - printf("MQ connection settings: %s %s %s\n", uri, username, password); - Mqtt.uri = uri; - Mqtt.username = username; - Mqtt.password = password; - Mqtt.first_message = NULL; - - pthread_mutex_init(&Mqtt.thread_lock,NULL); - pthread_create(&Mqtt.sender_thread, NULL, sendMessagesToMq, NULL); - - return 0; -} - -void shutdownMqConnection() { - printf("Closing MQ connection\n"); - -} - -/* Queue functionality */ -void addRawMessageToMq(char *data, int length) { - if(!Mqtt.uri) { - return; - } - struct queue_message *curr = malloc(sizeof(struct queue_message)); - struct timeval tv; - - gettimeofday(&tv, NULL); - unsigned long long millisecondsSinceEpoch = - (unsigned long long)(tv.tv_sec) * 1000 + - (unsigned long long)(tv.tv_usec) / 1000; - - data[length-1] = '\0'; - char *buf = malloc(100); - sprintf(buf, "{ \"timeSinceEpochUTC\":%llu, \"message\":\"%s\" }", millisecondsSinceEpoch, data); - curr->message = buf; - curr->length = strlen(curr->message); - - addMessageToQueue(curr); -} - -void addMessageToQueue(struct queue_message *msg) { - pthread_mutex_lock(&Mqtt.thread_lock); - struct queue_message *tail = Mqtt.last_message; - if(tail) { - tail->next = msg; - } else { - Mqtt.first_message = msg; - } - Mqtt.last_message = msg; - pthread_mutex_unlock(&Mqtt.thread_lock); -} - - -struct queue_message *popFirstMessageInQueue() { - pthread_mutex_lock(&Mqtt.thread_lock); - struct queue_message *msg = Mqtt.first_message; - if(Mqtt.first_message == Mqtt.last_message) { - Mqtt.first_message = 0; - Mqtt.last_message = 0; - } else { - Mqtt.first_message = Mqtt.first_message->next; - } - pthread_mutex_unlock(&Mqtt.thread_lock); - return msg; -} - -/* Mqtt */ -void sendMessagesToMq() { - while(1) { - if(!Mqtt.client) { - initiateConnection(); - } - if (Mqtt.first_message && Mqtt.client) { - struct queue_message *msg = popFirstMessageInQueue(); - if(msg) { - sendMessageToMq(msg); - free(msg->message); - free(msg); - continue; - } - } - usleep(250); - } -} - -void initiateConnection() { - MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; - int rc; - - MQTTClient_create(&Mqtt.client, Mqtt.uri, CLIENTID, - MQTTCLIENT_PERSISTENCE_NONE, NULL); - conn_opts.keepAliveInterval = 20; - conn_opts.cleansession = 1; - - if(Mqtt.username) { - conn_opts.username = Mqtt.username; - conn_opts.password = Mqtt.password; - } - - if ((rc = MQTTClient_connect(Mqtt.client, &conn_opts)) != MQTTCLIENT_SUCCESS) - { - //printf("Failed to connect, return code %d\n", rc); - } -} - -void destroyConnection() { - MQTTClient_disconnect(Mqtt.client, 10000); - MQTTClient_destroy(&Mqtt.client); -} - -void sendMessageToMq(struct queue_message *msg) { - MQTTClient_message pubmsg = MQTTClient_message_initializer; - pubmsg.payload = msg->message; - pubmsg.payloadlen = msg->length; - pubmsg.qos = QOS; - pubmsg.retained = 0; - - if(MQTTClient_publishMessage(Mqtt.client, TOPIC, &pubmsg, &Mqtt.token) != MQTTCLIENT_SUCCESS) { - destroyConnection(); - return; - } - if(MQTTClient_waitForCompletion(Mqtt.client, Mqtt.token, TIMEOUT) != MQTTCLIENT_SUCCESS) { - destroyConnection(); - return; - } -} diff --git a/mqtt.h b/mqtt.h deleted file mode 100644 index 2105da855..000000000 --- a/mqtt.h +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Mqtt.h - * - * Created on: Apr 18, 2015 - * Author: borax - */ - -#ifndef MQTT_H_ -#define MQTT_H_ - -int initMqConnection(char* uri, char* username, char* password); -void shutdownMqConnection(); -void addRawMessageToMq(char *data, int length); - - -#endif /* MQTT_H_ */ diff --git a/rest.c b/rest.c new file mode 100644 index 000000000..fd65e0f16 --- /dev/null +++ b/rest.c @@ -0,0 +1,154 @@ +/* + * rest.c + * + * Created on: Apr 18, 2015 + * Author: borax + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "rest.h" + +#define TIMEOUT 10000L + +struct { + // Threading + pthread_t sender_thread; + pthread_mutex_t thread_lock; + + // Internal message list + struct queue_message *first_message; + struct queue_message *last_message; + + char* uri; + + CURL *curl; +} Rest; + +struct queue_message { + json_object *message; + struct queue_message *next; +}; + +struct queue_message *popFirstMessageInQueue(); +void sendMessagesToRestInterface(); +void sendMessage(json_object *json); + +/* Initialize things */ +int initRestConnection(char* uri) { + + printf("Rest connection settings: %s\n", uri); + Rest.uri = uri; + Rest.first_message = NULL; + + pthread_mutex_init(&Rest.thread_lock,NULL); + pthread_create(&Rest.sender_thread, NULL, sendMessagesToRestInterface, NULL); + + Rest.curl = curl_easy_init(); + if(Rest.curl) { + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Accept: application/json"); + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(Rest.curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(Rest.curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(Rest.curl, CURLOPT_URL, uri); +// curl_easy_setopt(Rest.curl, CURLOPT_VERBOSE, 1L); + } + + return 0; +} + +/* Queue functionality */ +void addRawMessageToRestQueue(char *data, int length) { + if(!Rest.uri) { + return; + } + struct queue_message *curr = malloc(sizeof(struct queue_message)); + struct timeval tv; + + gettimeofday(&tv, NULL); + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; + + data[length-1] = '\0'; + json_object *json = json_object_new_object(); + json_object *adsb_data = json_object_new_string(data); + json_object *timestamp = json_object_new_int64(millisecondsSinceEpoch); + + json_object_object_add(json, "message", adsb_data); + json_object_object_add(json, "timestamp", timestamp); + + curr->message = json; + + addMessageToQueue(curr); +} + +void addMessageToQueue(struct queue_message *msg) { + pthread_mutex_lock(&Rest.thread_lock); + struct queue_message *tail = Rest.last_message; + if(tail) { + tail->next = msg; + } else { + Rest.first_message = msg; + } + Rest.last_message = msg; + pthread_mutex_unlock(&Rest.thread_lock); +} + + +struct queue_message *popFirstMessageInQueue() { + pthread_mutex_lock(&Rest.thread_lock); + struct queue_message *msg = Rest.first_message; + if(Rest.first_message == Rest.last_message) { + Rest.first_message = 0; + Rest.last_message = 0; + } else { + Rest.first_message = Rest.first_message->next; + } + pthread_mutex_unlock(&Rest.thread_lock); + return msg; +} + +/* REST */ +void sendMessagesToRestInterface() { + json_object *json = json_object_new_object(); + int message_counter = 1; + int wait_counter = 0; + while(1) { + // Send batch in case we either have 100 messages in queue or we have waited 5s and have messages to send + if(message_counter > 100 || (message_counter > 1 && wait_counter > 100)) { + sendMessage(json); + message_counter = 1; + wait_counter = 0; + json_object_put(json); + json = json_object_new_object(); + } + else if (Rest.first_message) { + struct queue_message *msg = popFirstMessageInQueue(); + if(msg) { + char buf[9]; + sprintf(buf, "entry-%d", message_counter); + json_object_object_add(json, buf, msg->message); + message_counter++; + free(msg); + continue; + } + } else { + // Sleep for 50ms + usleep(50000); + wait_counter++; + } + } +} + +void sendMessage(json_object *json) { + curl_easy_setopt(Rest.curl, CURLOPT_POSTFIELDS, json_object_to_json_string(json)); + curl_easy_perform(Rest.curl); +// printf("\nSending message %s\n", json_object_to_json_string(json)); +} diff --git a/rest.h b/rest.h new file mode 100644 index 000000000..d6ef68204 --- /dev/null +++ b/rest.h @@ -0,0 +1,15 @@ +/* + * Mqtt.h + * + * Created on: Apr 18, 2015 + * Author: borax + */ + +#ifndef REST_H_ +#define REST_H_ + +int initRestConnection(char* uri); +void addRawMessageToRestQueue(char *data, int length); + + +#endif /* REST_H_ */