diff --git a/.gitignore b/.gitignore index 0984522d7..d6300499f 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ testfiles/*.bin misc frames.js .*.swp +start.sh diff --git a/Makefile b/Makefile index f9637b7eb..5f1c361ee 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ -CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr) -LDLIBS+=$(shell pkg-config --libs librtlsdr) -lpthread -lm +CFLAGS?=-O2 -g -Wall -W $(shell pkg-config --cflags librtlsdr json-c) +LDLIBS+=$(shell pkg-config --libs librtlsdr json-c ) -lpthread -lm -lcurl CC?=gcc PROGNAME=dump1090 @@ -8,8 +8,12 @@ all: dump1090 %.o: %.c $(CC) $(CFLAGS) -c $< -dump1090: dump1090.o anet.o - $(CC) -g -o dump1090 dump1090.o anet.o $(LDFLAGS) $(LDLIBS) +dump1090: dump1090.o anet.o rest.o + $(CC) -g -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS) clean: rm -f *.o dump1090 + +release: + $(CC) -o dump1090 dump1090.o anet.o rest.o $(LDFLAGS) $(LDLIBS) + diff --git a/README.md b/README.md index d18eb90e9..3e8b61010 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ The main features are: * CPR coordinates decoding and track calculation from velocity. * TCP server streaming and receiving raw data to/from connected clients (using --net). +* MQTT support. While from time to time I still add / fix stuff in my fork, I target minimalism of the implementation. However there is a @@ -30,8 +31,14 @@ available, developed by MalcolmRobb. Installation --- +Make sure that you have librtlsdr, libusb, libssl-dev and Paho MQTT lib +to be able to make this project. Google for them to find them. This has +been tested on Ubuntu and Raspberry Pi. + Type "make". + + Normal usage --- @@ -169,6 +176,16 @@ similar to: This can be used to feed data to various sharing sites without the need to use another decoder. +MQTT +--- +For basic operation, just specify the URI to send the data to with (no need for the --net flag): + + ./dump1090 --mqtt-uri "your-uri" + +You could also specify username and password if needed by using: + + ./dump1090 --mqtt-uri "your-uri" --mqtt-username "username" --mqtt-password "password" + Antenna --- @@ -193,20 +210,9 @@ resources: Aggressive mode --- -With --aggressive it is possible to activate the *aggressive mode* that is a -modified version of the Mode S packet detection and decoding. -THe aggresive mode uses more CPU usually (especially if there are many planes -sending DF17 packets), but can detect a few more messages. - -The algorithm in aggressive mode is modified in the following ways: - -* Up to two demodulation errors are tolerated (adjacent entires in the magnitude - vector with the same eight). Normally only messages without errors are - checked. -* It tries to fix DF17 messages trying every two bits combination. - -The use of aggressive mdoe is only advised in places where there is low traffic -in order to have a chance to capture some more messages. +This release always has aggressive mode active. I did some performance +measurements on an Raspberry PI and when receiving around 50 ADSB messages +per second the CPU-load just increased 3-4%. Debug mode --- diff --git a/dump1090.c b/dump1090.c old mode 100644 new mode 100755 index 7fa5327a3..dfac44a47 --- a/dump1090.c +++ b/dump1090.c @@ -45,6 +45,7 @@ #include #include "rtl-sdr.h" #include "anet.h" +#include "rest.h" #define MODES_DEFAULT_RATE 2000000 #define MODES_DEFAULT_FREQ 1090000000 @@ -167,7 +168,10 @@ struct { int stats; /* Print stats at exit in --ifile mode. */ int onlyaddr; /* Print only ICAO addresses. */ int metric; /* Use metric units. */ - int aggressive; /* Aggressive detection algorithm. */ + int quiet; /* Quiet mode, not printing any human readable messages */ + + /* REST configuration */ + char* rest_uri; /* Interactive mode */ struct aircraft *aircrafts; @@ -272,8 +276,8 @@ void modesInitConfig(void) { Modes.interactive = 0; Modes.interactive_rows = MODES_INTERACTIVE_ROWS; Modes.interactive_ttl = MODES_INTERACTIVE_TTL; - Modes.aggressive = 0; Modes.interactive_rows = getTermRows(); + Modes.quiet = 0; } void modesInit(void) { @@ -331,10 +335,9 @@ void modesInit(void) { /* =============================== RTLSDR handling ========================== */ -void modesInitRTLSDR(void) { +void listDevices(void) { int j; int device_count; - int ppm_error = 0; char vendor[256], product[256], serial[256]; device_count = rtlsdr_get_device_count(); @@ -349,6 +352,12 @@ void modesInitRTLSDR(void) { fprintf(stderr, "%d: %s, %s, SN: %s %s\n", j, vendor, product, serial, (j == Modes.dev_index) ? "(currently selected)" : ""); } +} + +void modesInitRTLSDR(void) { + int ppm_error = 0; + + listDevices(); if (rtlsdr_open(&Modes.dev, Modes.dev_index) < 0) { fprintf(stderr, "Error opening the RTLSDR device: %s\n", @@ -953,7 +962,7 @@ void decodeModesMessage(struct modesMessage *mm, unsigned char *msg) { if ((mm->errorbit = fixSingleBitErrors(msg,mm->msgbits)) != -1) { mm->crc = modesChecksum(msg,mm->msgbits); mm->crcok = 1; - } else if (Modes.aggressive && mm->msgtype == 17 && + } else if (mm->msgtype == 17 && (mm->errorbit = fixTwoBitsErrors(msg,mm->msgbits)) != -1) { mm->crc = modesChecksum(msg,mm->msgbits); @@ -1468,7 +1477,7 @@ void detectModeS(uint16_t *m, uint32_t mlen) { /* If we reached this point, and error is zero, we are very likely * with a Mode S message in our hands, but it may still be broken * and CRC may not be correct. This is handled by the next layer. */ - if (errors == 0 || (Modes.aggressive && errors < 3)) { + if (errors == 0 || (errors < 3)) { struct modesMessage mm; /* Decode the received message and update statistics */ @@ -1548,12 +1557,12 @@ void useModesMessage(struct modesMessage *mm) { if (a && Modes.stat_sbs_connections > 0) modesSendSBSOutput(mm, a); /* Feed SBS output clients. */ } /* In non-interactive way, display messages on standard output. */ - if (!Modes.interactive) { + if (!Modes.interactive && !Modes.quiet) { displayModesMessage(mm); if (!Modes.raw && !Modes.onlyaddr) printf("\n"); } /* Send data to connected clients. */ - if (Modes.net) { + if (Modes.net || Modes.rest_uri) { modesSendRawOutput(mm); /* Feed raw output clients. */ } } @@ -1910,6 +1919,9 @@ void modesInitNet(void) { Modes.maxfd = -1; for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } int s = anetTcpServer(Modes.aneterr, modesNetServices[j].port, NULL); if (s == -1) { fprintf(stderr, "Error opening the listening port %d (%s): %s\n", @@ -1934,6 +1946,9 @@ void modesAcceptClients(void) { struct client *c; for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } fd = anetTcpAccept(Modes.aneterr, *modesNetServices[j].socket, NULL, &port); if (fd == -1) { @@ -2010,17 +2025,21 @@ void modesSendAllClients(int service, void *msg, int len) { /* Write raw output to TCP clients. */ void modesSendRawOutput(struct modesMessage *mm) { - char msg[128], *p = msg; - int j; - - *p++ = '*'; - for (j = 0; j < mm->msgbits/8; j++) { - sprintf(p, "%02X", mm->msg[j]); - p += 2; - } - *p++ = ';'; - *p++ = '\n'; - modesSendAllClients(Modes.ros, msg, p-msg); + char msg[(mm->msgbits/4)+3], *p = msg; + int j; + *p++ = '*'; + for (j = 0; j < mm->msgbits/8; j++) { + sprintf(p, "%02X", mm->msg[j]); + p += 2; + } + *p++ = ';'; + *p++ = '\n'; + if (Modes.rest_uri) { + addRawMessageToRestQueue(msg, p-msg); + } + if (Modes.net) { + modesSendAllClients(Modes.ros, msg, p-msg); + } } @@ -2385,6 +2404,9 @@ void modesWaitReadableClients(int timeout_ms) { /* Set listening sockets to accept new clients ASAP. */ for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if(modesNetServices[j].port == 0) { + continue; + } int s = *modesNetServices[j].socket; FD_SET(s,&fds); if (s > maxfd) maxfd = s; @@ -2429,18 +2451,19 @@ void showHelp(void) { "--raw Show only messages hex values.\n" "--net Enable networking.\n" "--net-only Enable just networking, no RTL device or file used.\n" -"--net-ro-port TCP listening port for raw output (default: 30002).\n" -"--net-ri-port TCP listening port for raw input (default: 30001).\n" -"--net-http-port HTTP server port (default: 8080).\n" -"--net-sbs-port TCP listening port for BaseStation format output (default: 30003).\n" +"--net-ro-port TCP listening port for raw output (default: 30002, 0 to disable).\n" +"--net-ri-port TCP listening port for raw input (default: 30001, 0 to disable).\n" +"--net-http-port HTTP server port (default: 8080, 0 to disable).\n" +"--net-sbs-port TCP listening port for BaseStation format output (default: 30003, 0 to disable).\n" "--no-fix Disable single-bits error correction using CRC.\n" "--no-crc-check Disable messages with broken CRC (discouraged).\n" -"--aggressive More CPU for more messages (two bits fixes, ...).\n" "--stats With --ifile print stats at exit. No other output.\n" "--onlyaddr Show only ICAO addresses (testing purposes).\n" "--metric Use metric units (meters, km/h, ...).\n" "--snip Strip IQ file removing samples < level.\n" "--debug Debug mode (verbose), see README for details.\n" +"--list-devices Lists available devices and then exit.\n" +"--quiet Makes the console quiet, for use with the --net flag.\n" "--help Show this help.\n" "\n" "Debug mode flags: d = Log frames decoded with errors\n" @@ -2486,7 +2509,10 @@ int main(int argc, char **argv) { if (!strcmp(argv[j],"--device-index") && more) { Modes.dev_index = atoi(argv[++j]); - } else if (!strcmp(argv[j],"--gain") && more) { + } else if(!strcmp(argv[j],"--list-devices")) { + listDevices(); + return 0; + } else if (!strcmp(argv[j],"--gain") && more) { Modes.gain = atof(argv[++j])*10; /* Gain is in tens of DBs */ } else if (!strcmp(argv[j],"--enable-agc")) { Modes.enable_agc++; @@ -2517,8 +2543,6 @@ int main(int argc, char **argv) { Modes.onlyaddr = 1; } else if (!strcmp(argv[j],"--metric")) { Modes.metric = 1; - } else if (!strcmp(argv[j],"--aggressive")) { - Modes.aggressive++; } else if (!strcmp(argv[j],"--interactive")) { Modes.interactive = 1; } else if (!strcmp(argv[j],"--interactive-rows")) { @@ -2548,6 +2572,10 @@ int main(int argc, char **argv) { } else if (!strcmp(argv[j],"--snip") && more) { snipMode(atoi(argv[++j])); exit(0); + } else if (!strcmp(argv[j],"--quiet")) { + Modes.quiet = 1; + } else if (!strcmp(argv[j],"--rest-uri") && more) { + Modes.rest_uri = argv[++j]; } else if (!strcmp(argv[j],"--help")) { showHelp(); exit(0); @@ -2565,6 +2593,7 @@ int main(int argc, char **argv) { /* Initialization */ modesInit(); + initRestConnection(Modes.rest_uri); if (Modes.net_only) { fprintf(stderr,"Net-only mode, no RTL device or file open.\n"); } else if (Modes.filename == NULL) { diff --git a/rest.c b/rest.c new file mode 100644 index 000000000..fd65e0f16 --- /dev/null +++ b/rest.c @@ -0,0 +1,154 @@ +/* + * rest.c + * + * Created on: Apr 18, 2015 + * Author: borax + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "rest.h" + +#define TIMEOUT 10000L + +struct { + // Threading + pthread_t sender_thread; + pthread_mutex_t thread_lock; + + // Internal message list + struct queue_message *first_message; + struct queue_message *last_message; + + char* uri; + + CURL *curl; +} Rest; + +struct queue_message { + json_object *message; + struct queue_message *next; +}; + +struct queue_message *popFirstMessageInQueue(); +void sendMessagesToRestInterface(); +void sendMessage(json_object *json); + +/* Initialize things */ +int initRestConnection(char* uri) { + + printf("Rest connection settings: %s\n", uri); + Rest.uri = uri; + Rest.first_message = NULL; + + pthread_mutex_init(&Rest.thread_lock,NULL); + pthread_create(&Rest.sender_thread, NULL, sendMessagesToRestInterface, NULL); + + Rest.curl = curl_easy_init(); + if(Rest.curl) { + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Accept: application/json"); + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(Rest.curl, CURLOPT_CUSTOMREQUEST, "POST"); + curl_easy_setopt(Rest.curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(Rest.curl, CURLOPT_URL, uri); +// curl_easy_setopt(Rest.curl, CURLOPT_VERBOSE, 1L); + } + + return 0; +} + +/* Queue functionality */ +void addRawMessageToRestQueue(char *data, int length) { + if(!Rest.uri) { + return; + } + struct queue_message *curr = malloc(sizeof(struct queue_message)); + struct timeval tv; + + gettimeofday(&tv, NULL); + unsigned long long millisecondsSinceEpoch = + (unsigned long long)(tv.tv_sec) * 1000 + + (unsigned long long)(tv.tv_usec) / 1000; + + data[length-1] = '\0'; + json_object *json = json_object_new_object(); + json_object *adsb_data = json_object_new_string(data); + json_object *timestamp = json_object_new_int64(millisecondsSinceEpoch); + + json_object_object_add(json, "message", adsb_data); + json_object_object_add(json, "timestamp", timestamp); + + curr->message = json; + + addMessageToQueue(curr); +} + +void addMessageToQueue(struct queue_message *msg) { + pthread_mutex_lock(&Rest.thread_lock); + struct queue_message *tail = Rest.last_message; + if(tail) { + tail->next = msg; + } else { + Rest.first_message = msg; + } + Rest.last_message = msg; + pthread_mutex_unlock(&Rest.thread_lock); +} + + +struct queue_message *popFirstMessageInQueue() { + pthread_mutex_lock(&Rest.thread_lock); + struct queue_message *msg = Rest.first_message; + if(Rest.first_message == Rest.last_message) { + Rest.first_message = 0; + Rest.last_message = 0; + } else { + Rest.first_message = Rest.first_message->next; + } + pthread_mutex_unlock(&Rest.thread_lock); + return msg; +} + +/* REST */ +void sendMessagesToRestInterface() { + json_object *json = json_object_new_object(); + int message_counter = 1; + int wait_counter = 0; + while(1) { + // Send batch in case we either have 100 messages in queue or we have waited 5s and have messages to send + if(message_counter > 100 || (message_counter > 1 && wait_counter > 100)) { + sendMessage(json); + message_counter = 1; + wait_counter = 0; + json_object_put(json); + json = json_object_new_object(); + } + else if (Rest.first_message) { + struct queue_message *msg = popFirstMessageInQueue(); + if(msg) { + char buf[9]; + sprintf(buf, "entry-%d", message_counter); + json_object_object_add(json, buf, msg->message); + message_counter++; + free(msg); + continue; + } + } else { + // Sleep for 50ms + usleep(50000); + wait_counter++; + } + } +} + +void sendMessage(json_object *json) { + curl_easy_setopt(Rest.curl, CURLOPT_POSTFIELDS, json_object_to_json_string(json)); + curl_easy_perform(Rest.curl); +// printf("\nSending message %s\n", json_object_to_json_string(json)); +} diff --git a/rest.h b/rest.h new file mode 100644 index 000000000..d6ef68204 --- /dev/null +++ b/rest.h @@ -0,0 +1,15 @@ +/* + * Mqtt.h + * + * Created on: Apr 18, 2015 + * Author: borax + */ + +#ifndef REST_H_ +#define REST_H_ + +int initRestConnection(char* uri); +void addRawMessageToRestQueue(char *data, int length); + + +#endif /* REST_H_ */