diff --git a/INSTALL.md b/INSTALL.md index 0a67260f..01c3bfe5 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1,3 +1,11 @@ + +## Supported +| | | +|--|--| +|linux | gcc | x86_64-w64-mingw32-gcc | cmake | +|windows | Visual Studio c1 | linux-mingw | cmake | +|mac | gcc | cmake| | + ### esp32 * toolchain: xtensa-esp32-elf copy or softlink gear-lib to esp32 components dir diff --git a/README.cn.md b/README.cn.md index c2746885..544a7b92 100644 --- a/README.cn.md +++ b/README.cn.md @@ -26,7 +26,7 @@ | librtsp: RTSP协议,适合IPCamera和NVR开发 | librtmpc: RTMP协议,适合推流直播 | | libsock: Socket封装 | librpc: 远程过程调用库 | | libipc: 进程间通信 | libp2p: p2p穿透传输 | -| libhomekit: Apple homekit协议库 | | +| libmqttc: MQTT客户端协议 | libhomekit: Apple homekit协议库 | ## 异步 | | | diff --git a/README.md b/README.md index 98829248..50e69a19 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ This is a collection of basic libraries. | librtsp: Real Time Streaming Protocol server for ipcamera or NVR | librtmpc: Real Time Messaging Protocol client for liveshow | | libsock: socket warpper api for easily use | librpc: Remote Procedure Call library | | libipc: Inter-Process Communication, support mqueue/netlink/shm | libp2p: High level p2p punch hole library, easy API to use | -| libhomekit: Apple homekit protocol | +| libmqttc: MQTT client protocol | libhomekit: Apple homekit protocol | ## Async | | | diff --git a/gear-lib/libmqttc/Makefile b/gear-lib/libmqttc/Makefile index 54c23c25..a6ac5318 100644 --- a/gear-lib/libmqttc/Makefile +++ b/gear-lib/libmqttc/Makefile @@ -31,7 +31,7 @@ TGT_LIB_SO = $(LIBNAME).so TGT_LIB_SO_VER = $(TGT_LIB_SO).${VER} TGT_UNIT_TEST = test_$(LIBNAME) -OBJS_LIB = $(LIBNAME).o +OBJS_LIB = $(LIBNAME).o mqttc_socket.o OBJS_UNIT_TEST = test_$(LIBNAME).o ############################################################################### @@ -58,6 +58,7 @@ CFLAGS += -I$(OUTPUT)/include/gear-lib SHARED := -shared LDFLAGS := $($(ARCH)_LDFLAGS) +LDFLAGS += -L$(OUTLIBPATH)/lib/gear-lib -ltime -lsock -lgevent -ldarray -lthread -lringbuffer LDFLAGS += -pthread ############################################################################### diff --git a/gear-lib/libmqttc/README.md b/gear-lib/libmqttc/README.md index 0ee786d9..8eb943c0 100644 --- a/gear-lib/libmqttc/README.md +++ b/gear-lib/libmqttc/README.md @@ -1,6 +1,8 @@ ## libmqttc This is a simple mqtt client library based on paho.mqtt(29ab2aa). +sudo apt install mosquitto + 1.mosquitto_sub -h localhost -t "will topic" -P "testpassword" -u "testuser" -v 2.test_libmqttc diff --git a/gear-lib/libmqttc/libmqttc.c b/gear-lib/libmqttc/libmqttc.c index d16ee785..ddbc2815 100644 --- a/gear-lib/libmqttc/libmqttc.c +++ b/gear-lib/libmqttc/libmqttc.c @@ -28,6 +28,8 @@ #include #include +#define SOCK_API 0 + #define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */ #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4 @@ -51,20 +53,6 @@ #else -#if defined(WIN32) -#define inline __inline -#define FUNC_ENTRY StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MINIMUM) -#define FUNC_ENTRY_NOLOG StackTrace_entry(__FUNCTION__, __LINE__, -1) -#define FUNC_ENTRY_MED StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MEDIUM) -#define FUNC_ENTRY_MAX StackTrace_entry(__FUNCTION__, __LINE__, TRACE_MAXIMUM) -#define FUNC_EXIT StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MINIMUM) -#define FUNC_EXIT_NOLOG StackTrace_exit(__FUNCTION__, __LINE__, -1) -#define FUNC_EXIT_MED StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MEDIUM) -#define FUNC_EXIT_MAX StackTrace_exit(__FUNCTION__, __LINE__, NULL, TRACE_MAXIMUM) -#define FUNC_EXIT_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MINIMUM) -#define FUNC_EXIT_MED_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MEDIUM) -#define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__FUNCTION__, __LINE__, &x, TRACE_MAXIMUM) -#else #define FUNC_ENTRY StackTrace_entry(__func__, __LINE__, TRACE_MINIMUM) #define FUNC_ENTRY_NOLOG StackTrace_entry(__func__, __LINE__, -1) #define FUNC_ENTRY_MED StackTrace_entry(__func__, __LINE__, TRACE_MEDIUM) @@ -77,9 +65,10 @@ #define FUNC_EXIT_MED_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MEDIUM) #define FUNC_EXIT_MAX_RC(x) StackTrace_exit(__func__, __LINE__, &x, TRACE_MAXIMUM) -#endif #endif +extern const struct mqttc_ops mqttc_socket_ops; + static void StackTrace_entry(const char* name, int line, int trace) { printf("%s:%d %d\n", name, line, trace); @@ -105,11 +94,6 @@ static const char* mqtt_pkt_names[] = "UNSUBSCRIBE", "UNSUBACK", "PINGREQ", "PINGRESP", "DISCONNECT" }; -const char* mqtt_pkt_getname(unsigned short packetid) -{ - return mqtt_pkt_names[packetid]; -} - static void TimerInit(Timer* timer) { timer->end_time = (struct timeval){0, 0}; @@ -201,9 +185,8 @@ static int mqtt_stringFormat_subscribe(char* strbuf, int strbuflen, unsigned cha { return snprintf(strbuf, strbuflen, "SUBSCRIBE dup %d, packet id %d count %d topic %.*s qos %d", - dup, packetid, count, - topicFilters[0].lenstring.len, topicFilters[0].lenstring.data, - requestedQoSs[0]); + dup, packetid, count, topicFilters[0].lenstring.len, + topicFilters[0].lenstring.data, requestedQoSs[0]); } static int mqtt_stringFormat_suback(char* strbuf, int strbuflen, unsigned short packetid, int count, int* grantedQoSs) @@ -371,12 +354,6 @@ static void writeCString(unsigned char** pptr, const char* string) *pptr += len; } -int getLenStringLen(char* ptr) -{ - int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1)); - return len; -} - static void writemqtt_string(unsigned char** pptr, mqtt_string mqttstring) { if (mqttstring.lenstring.len > 0) { @@ -1140,28 +1117,11 @@ static void NewMessageData(MessageData* md, mqtt_string* aTopicName, mqtt_msg* a md->message = aMessage; } -static int getNextPacketId(mqtt_client *c) { +static int getNextPacketId(mqtt_client *c) +{ return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1; } -static int sendPacket(mqtt_client* c, int length, Timer* timer) -{ - int rc = MQTT_FAILURE; - int sent = 0; - - while (sent < length && !TimerIsExpired(timer)) { - rc = c->ipstack.write(&c->ipstack, &c->buf[sent], length, TimerLeftMS(timer)); - if (rc < 0) // there was an error writing the data - break; - sent += rc; - } - if (sent == length) { - TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet - rc = MQTT_SUCCESS; - } else - rc = MQTT_FAILURE; - return rc; -} static int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms) { @@ -1201,16 +1161,110 @@ static int linux_write(Network* n, unsigned char* buffer, int len, int timeout_m return rc; } +static int sendPacket(mqtt_client* c, int length, Timer* timer) +{ + int rc = MQTT_FAILURE; + int sent = 0; + + while (sent < length && !TimerIsExpired(timer)) { +#if SOCK_API + rc = c->ops->write(c, &c->buf[sent], length); +#else + rc = c->ipstack.write(&c->ipstack, &c->buf[sent], length, TimerLeftMS(timer)); +#endif + if (rc < 0) { // there was an error writing the data + printf("%s:%d rc = %d\n", __func__, __LINE__, rc); + break; + } + sent += rc; + } + if (sent == length) { + TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet + rc = MQTT_SUCCESS; + } else { + printf("sent = %d, length = %d\n", sent, length); + rc = MQTT_FAILURE; + } + return rc; +} +static int readPacket(mqtt_client* c, Timer* timer) +{ + mqtt_header header = {0}; + int len = 0; + int rem_len = 0; + unsigned char i; + int multiplier = 1; + int tmp_len = 0; + + /* 1. read the header byte. This has the packet type in it */ +#if SOCK_API + int rc = c->ops->read(c, c->readbuf, 1); +#else + int rc = c->ipstack.read(&c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); +#endif + if (rc != 1) { + goto exit; + } + + /* 2. read the remaining length. This is variable in itself */ + do { + rc = MQTTPACKET_READ_ERROR; + if (++tmp_len > MAX_NO_OF_REMAINING_LENGTH_BYTES) { + rc = MQTTPACKET_READ_ERROR; /* bad data */ + break; + } +#if SOCK_API + rc = c->ops->read(c, &i, 1); +#else + rc = c->ipstack.read(&c->ipstack, &i, 1, TimerLeftMS(timer)); +#endif + if (rc != 1) { + printf("read failed rc = %d\n", rc); + break; + } + rem_len += (i & 127) * multiplier; + multiplier *= 128; + } while ((i & 128) != 0); + + len = 1; + len += mqtt_pkt_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ + + if (rem_len > (c->readbuf_size - len)) { + rc = MQTT_BUFFER_OVERFLOW; + goto exit; + } + + /* 3. read the rest of the buffer using a callback to supply the rest of the data */ +#if SOCK_API + if (rem_len > 0 && (rc = c->ops->read(c, c->readbuf + len, rem_len) != rem_len)) { +#else + if (rem_len > 0 && (rc = c->ipstack.read(&c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) { +#endif + rc = 0; + goto exit; + } + + header.byte = c->readbuf[0]; + rc = header.bits.type; + if (c->keepAliveInterval > 0) + TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet +exit: + return rc; +} + //============================================================================= -int mqtt_client_init(mqtt_client* c, const char *host, int port, - uint8_t* sendbuf, size_t sendlen, - uint8_t* readbuf, size_t readlen) +int mqtt_client_init(mqtt_client* c, const char *host, int port) { int i; int command_timeout_ms = 1000; int type = SOCK_STREAM; sa_family_t family = AF_INET; + size_t sendlen = 1024; + size_t readlen = 1024; + + c->ops = &mqttc_socket_ops; + c->priv = c->ops->init(host, port); c->host = strdup(host); c->port = port; @@ -1228,9 +1282,9 @@ int mqtt_client_init(mqtt_client* c, const char *host, int port, for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i) c->messageHandlers[i].topicFilter = 0; c->command_timeout_ms = command_timeout_ms; - c->buf = sendbuf; + c->buf = calloc(1, sendlen); c->buf_size = sendlen; - c->readbuf = readbuf; + c->readbuf = calloc(1, readlen); c->readbuf_size = readlen; c->isconnected = 0; c->cleansession = 0; @@ -1242,71 +1296,9 @@ int mqtt_client_init(mqtt_client* c, const char *host, int port, #if defined(MQTT_TASK) MutexInit(&c->mutex); #endif - return 0; } -static int decodePacket(mqtt_client* c, int* value, int timeout) -{ - unsigned char i; - int multiplier = 1; - int len = 0; - - *value = 0; - do { - int rc = MQTTPACKET_READ_ERROR; - - if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) - { - rc = MQTTPACKET_READ_ERROR; /* bad data */ - goto exit; - } - rc = c->ipstack.read(&c->ipstack, &i, 1, timeout); - if (rc != 1) - goto exit; - *value += (i & 127) * multiplier; - multiplier *= 128; - } while ((i & 128) != 0); -exit: - return len; -} - -static int readPacket(mqtt_client* c, Timer* timer) -{ - mqtt_header header = {0}; - int len = 0; - int rem_len = 0; - - /* 1. read the header byte. This has the packet type in it */ - int rc = c->ipstack.read(&c->ipstack, c->readbuf, 1, TimerLeftMS(timer)); - if (rc != 1) - goto exit; - - len = 1; - /* 2. read the remaining length. This is variable in itself */ - decodePacket(c, &rem_len, TimerLeftMS(timer)); - len += mqtt_pkt_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */ - - if (rem_len > (c->readbuf_size - len)) - { - rc = MQTT_BUFFER_OVERFLOW; - goto exit; - } - - /* 3. read the rest of the buffer using a callback to supply the rest of the data */ - if (rem_len > 0 && (rc = c->ipstack.read(&c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) { - rc = 0; - goto exit; - } - - header.byte = c->readbuf[0]; - rc = header.bits.type; - if (c->keepAliveInterval > 0) - TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet -exit: - return rc; -} - // assume topic filter and name is in correct format // # can only be at end // + and # can only be next to separator @@ -1882,10 +1874,9 @@ static int keepalive(mqtt_client* c) goto exit; if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received)) { - if (c->ping_outstanding) + if (c->ping_outstanding) { rc = MQTT_FAILURE; /* PINGRESP not received in keepalive interval */ - else - { + } else { Timer timer; TimerInit(&timer); TimerCountdownMS(&timer, 1000); @@ -1894,7 +1885,6 @@ static int keepalive(mqtt_client* c) c->ping_outstanding = 1; } } - exit: return rc; } @@ -1906,8 +1896,8 @@ static int cycle(mqtt_client* c, Timer* timer) int packet_type = readPacket(c, timer); /* read the socket, see what work is due */ - switch (packet_type) - { + //printf("packet_type = %d\n", packet_type); + switch (packet_type) { default: /* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */ rc = packet_type; @@ -1919,35 +1909,38 @@ static int cycle(mqtt_client* c, Timer* timer) case SUBACK: case UNSUBACK: break; - case PUBLISH: - { + case PUBLISH: { mqtt_string topicName; mqtt_msg msg; int intQoS; msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */ if (mqtt_deserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName, - (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) + (unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1) { + printf("mqtt_deserialize_publish failed!\n"); goto exit; + } msg.qos = (enum mqtt_qos)intQoS; deliverMessage(c, &topicName, &msg); - if (msg.qos != MQTT_QOS0) - { - if (msg.qos == MQTT_QOS1) + if (msg.qos != MQTT_QOS0) { + if (msg.qos == MQTT_QOS1) { len = mqtt_serialize_ack(c->buf, c->buf_size, PUBACK, 0, msg.id); - else if (msg.qos == MQTT_QOS2) + } else if (msg.qos == MQTT_QOS2) { len = mqtt_serialize_ack(c->buf, c->buf_size, PUBREC, 0, msg.id); - if (len <= 0) + } + if (len <= 0) { rc = MQTT_FAILURE; - else + } else { rc = sendPacket(c, len, timer); - if (rc == MQTT_FAILURE) + } + if (rc == MQTT_FAILURE) { + printf("rc is MQTT_FAILURE\n"); goto exit; // there was a problem + } } break; } case PUBREC: - case PUBREL: - { + case PUBREL: { unsigned short mypacketid; unsigned char dup, type; if (mqtt_deserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1) @@ -1957,11 +1950,12 @@ static int cycle(mqtt_client* c, Timer* timer) rc = MQTT_FAILURE; else if ((rc = sendPacket(c, len, timer)) != MQTT_SUCCESS) // send the PUBREL packet rc = MQTT_FAILURE; // there was a problem - if (rc == MQTT_FAILURE) + if (rc == MQTT_FAILURE) { + printf("rc is MQTT_FAILURE\n"); goto exit; // there was a problem + } break; } - case PUBCOMP: break; case PINGRESP: @@ -1971,6 +1965,7 @@ static int cycle(mqtt_client* c, Timer* timer) if (keepalive(c) != MQTT_SUCCESS) { //check only keepalive MQTT_FAILURE status so that previous MQTT_FAILURE status can be considered as FAULT + printf("rc is MQTT_FAILURE\n"); rc = MQTT_FAILURE; } @@ -2040,10 +2035,15 @@ int mqtt_connect(mqtt_client* c, mqtt_pkt_conn_data* options, mqtt_connack_data* rc = connect(c->fd, (struct sockaddr*)&addr, sizeof(addr)); if (rc == -1) { - printf("%s:%d connect failed!\n", __func__, __LINE__); + printf("%s:%d connect failed: %d\n", __func__, __LINE__, errno); return -1; } + c->ops->connect(c); + if (rc == -1) { + printf("%s:%d connect failed: %d\n", __func__, __LINE__, errno); + return -1; + } TimerInit(&connect_timer); TimerCountdownMS(&connect_timer, c->command_timeout_ms); @@ -2063,12 +2063,14 @@ int mqtt_connect(mqtt_client* c, mqtt_pkt_conn_data* options, mqtt_connack_data* if (waitfor(c, CONNACK, &connect_timer) == CONNACK) { data->rc = 0; data->sessionPresent = 0; - if (mqtt_deserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) + if (mqtt_deserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1) { rc = data->rc; - else + } else { rc = MQTT_FAILURE; - } else + } + } else { rc = MQTT_FAILURE; + } exit: if (rc == MQTT_SUCCESS) { @@ -2094,8 +2096,10 @@ int mqtt_publish(mqtt_client* c, const char* topicName, mqtt_msg* message) #if defined(MQTT_TASK) MutexLock(&c->mutex); #endif - if (!c->isconnected) + if (!c->isconnected) { + printf("is not connected!\n"); goto exit; + } TimerInit(&timer); TimerCountdownMS(&timer, c->command_timeout_ms); @@ -2105,10 +2109,14 @@ int mqtt_publish(mqtt_client* c, const char* topicName, mqtt_msg* message) len = mqtt_serialize_publish(c->buf, c->buf_size, 0, message->qos, message->retained, message->id, topic, (unsigned char*)message->payload, message->payloadlen); - if (len <= 0) + if (len <= 0) { + printf("mqtt_serialize_publish failed!\n"); goto exit; - if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet + } + if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) {// send the subscribe packet + printf("sendPacket failed!\n"); goto exit; // there was a problem + } if (message->qos == MQTT_QOS1) { @@ -2166,17 +2174,15 @@ int mqtt_unsubscribe(mqtt_client* c, const char* topicFilter) if ((rc = sendPacket(c, len, &timer)) != MQTT_SUCCESS) // send the subscribe packet goto exit; // there was a problem - if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) - { + if (waitfor(c, UNSUBACK, &timer) == UNSUBACK) { unsigned short mypacketid; // should be the same as the packetid above - if (mqtt_deserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) - { + if (mqtt_deserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1) { /* remove the subscription message handler associated with this topic, if there is one */ mqtt_set_msg_handler(c, topicFilter, NULL); } - } - else + } else { rc = MQTT_FAILURE; + } exit: if (rc == MQTT_FAILURE) @@ -2304,6 +2310,15 @@ int mqtt_disconnect(mqtt_client* c) #if defined(MQTT_TASK) MutexUnlock(&c->mutex); #endif - close(c->fd); return rc; } + +void mqtt_client_deinit(mqtt_client* c) +{ + if (c) { + close(c->fd); + free(c->buf); + free(c->readbuf); + free(c->host); + } +} diff --git a/gear-lib/libmqttc/libmqttc.h b/gear-lib/libmqttc/libmqttc.h index 3cd36b3a..190739ae 100644 --- a/gear-lib/libmqttc/libmqttc.h +++ b/gear-lib/libmqttc/libmqttc.h @@ -24,6 +24,7 @@ #define LIBMQTTC_VERSION "0.0.1" +#include #include #include #include @@ -32,6 +33,16 @@ extern "C" { #endif +struct mqtt_client; +struct mqttc_ops { + void *(*init)(const char *host, uint16_t port); + void (*deinit)(struct mqtt_client *c); + int (*connect)(struct mqtt_client *c); + int (*disconnect)(struct mqtt_client *c); + int (*read)(struct mqtt_client *c, void *buf, size_t len); + int (*write)(struct mqtt_client *c, const void *buf, size_t len); +}; + typedef struct Timer { struct timeval end_time; } Timer; @@ -233,6 +244,7 @@ typedef void (*messageHandler)(MessageData*); typedef struct mqtt_client { + const struct mqttc_ops *ops; char *host; int port; int fd; @@ -260,6 +272,7 @@ typedef struct mqtt_client Mutex mutex; Thread thread; #endif + void *priv; } mqtt_client; /** @@ -267,9 +280,8 @@ typedef struct mqtt_client * @param client * @param */ -int mqtt_client_init(mqtt_client* c, const char *host, int port, - uint8_t* sendbuf, size_t sendbuf_size, - uint8_t* readbuf, size_t readbuf_size); +int mqtt_client_init(mqtt_client* c, const char *host, int port); +void mqtt_client_deinit(mqtt_client* c); /** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack * The nework object must be connected to the network endpoint before calling this diff --git a/gear-lib/libmqttc/mqttc_socket.c b/gear-lib/libmqttc/mqttc_socket.c new file mode 100644 index 00000000..872ab405 --- /dev/null +++ b/gear-lib/libmqttc/mqttc_socket.c @@ -0,0 +1,74 @@ + +#include "libmqttc.h" +#include +#include +#include + + +struct sock_ctx { + struct sock_client *sc; + struct ringbuffer *rbuf; +}; + +static void on_connect_client(struct sock_client *c, struct sock_connection *conn) +{ + printf("on_connect_client: fd=%d local=%s:%d, remote=%s:%d\n", conn->fd, + conn->local.ip_str, conn->local.port, + conn->remote.ip_str, conn->remote.port); +} + +static void on_recv_buf(struct sock_client *c, void *buf, size_t len) +{ + struct sock_ctx *ctx = (struct sock_ctx *)c->priv; + //printf("%s:%d fd = %d, recv buf len = %ld\n", __func__, __LINE__, c->fd, len); + rb_write(ctx->rbuf, buf, len); +} + +static void *sock_init(const char *host, uint16_t port) +{ + struct sock_ctx *ctx = calloc(1, sizeof(struct sock_ctx)); + if (!ctx) { + printf("malloc sock_ctx failed!\n"); + return NULL; + } + ctx->sc = sock_client_create(host, port, SOCK_TYPE_TCP); + ctx->rbuf = rb_create(1024); + + //sock_set_noblk(ctx->sc->fd, 1); + sock_client_set_callback(ctx->sc, on_connect_client, on_recv_buf, NULL); + ctx->sc->priv = ctx; + return ctx; +} + +static int sock_connect(struct mqtt_client *c) +{ + struct sock_ctx *ctx = (struct sock_ctx *)c->priv; + sock_client_connect(ctx->sc); + return 0; +} + +static int sock_write(struct mqtt_client *c, const void *buf, size_t len) +{ + struct sock_ctx *ctx = (struct sock_ctx *)c->priv; + int ret = 0; + ret = sock_send(ctx->sc->fd, buf, len); + + //printf("%s:%d write len=%d, ret = %d\n", __func__, __LINE__, len, ret); + return ret; +} + +static int sock_read(struct mqtt_client *c, void *buf, size_t len) +{ + struct sock_ctx *ctx = (struct sock_ctx *)c->priv; + usleep(5000); + int ret = rb_read(ctx->rbuf, buf, len); + //printf("%s:%d sock read len = %ld, ret=%d\n", __func__, __LINE__, len, ret); + return ret; +} + +struct mqttc_ops mqttc_socket_ops = { + .init = sock_init, + .connect = sock_connect, + .write = sock_write, + .read = sock_read, +}; diff --git a/gear-lib/libmqttc/test_libmqttc.c b/gear-lib/libmqttc/test_libmqttc.c index 9d968a6c..aaf92e67 100644 --- a/gear-lib/libmqttc/test_libmqttc.c +++ b/gear-lib/libmqttc/test_libmqttc.c @@ -20,22 +20,18 @@ * SOFTWARE. ******************************************************************************/ #include "libmqttc.h" +#include #include #include #include -#include -#include -#include -#include -#include - -#define LOGA_DEBUG 0 -#define LOGA_INFO 1 #include #include #include + +#define LOGA_DEBUG 0 +#define LOGA_INFO 1 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(a[0])) static volatile MessageData* test1_message_data = NULL; @@ -60,8 +56,10 @@ struct Options int iterations; } options = { "localhost", + //"127.0.0.1", 1883, "localhost", + //"127.0.0.1", 1885, 0, //verbose 0, //test_no @@ -130,8 +128,8 @@ void MyLog(int LOGA_level, char* format, ...) struct tm *timeinfo; - if (LOGA_level == LOGA_DEBUG && options.verbose == 0) - return; + //if (LOGA_level == LOGA_DEBUG && options.verbose == 0) + // return; ftime(&ts); timeinfo = localtime(&ts.time); @@ -148,16 +146,6 @@ void MyLog(int LOGA_level, char* format, ...) } -#if defined(WIN32) || defined(_WINDOWS) -#define mqsleep(A) Sleep(1000*A) -#define START_TIME_TYPE DWORD -static DWORD start_time = 0; -START_TIME_TYPE start_clock(void) -{ - return GetTickCount(); -} -#else -#define mqsleep sleep #define START_TIME_TYPE struct timeval /* TODO - unused - remove? static struct timeval start_time; */ START_TIME_TYPE start_clock(void) @@ -166,15 +154,8 @@ START_TIME_TYPE start_clock(void) gettimeofday(&start_time, NULL); return start_time; } -#endif -#if defined(WIN32) -long elapsed(START_TIME_TYPE start_time) -{ - return GetTickCount() - start_time; -} -#else long elapsed(START_TIME_TYPE start_time) { struct timeval now, res; @@ -183,7 +164,6 @@ long elapsed(START_TIME_TYPE start_time) timersub(&now, &start_time, &res); return (res.tv_sec)*1000 + (res.tv_usec)/1000; } -#endif #define assert(a, b, c, d) myassert(__FILE__, __LINE__, a, b, c, d) #define assert1(a, b, c, d, e) myassert(__FILE__, __LINE__, a, b, c, d, e) @@ -192,12 +172,14 @@ int tests = 0; int failures = 0; FILE* xml; START_TIME_TYPE global_start_time; +struct timeval g_start_time; char output[3000]; char* cur_output = output; void write_test_result(void) { - long duration = elapsed(global_start_time); + //long duration = elapsed(global_start_time); + uint64_t duration = time_elapsed_ms(&g_start_time); fprintf(xml, " time=\"%ld.%.3ld\" >\n", duration / 1000, duration % 1000); if (cur_output != output) { @@ -248,7 +230,7 @@ Test1: single-threaded client void test1_sendAndReceive(mqtt_client* c, int qos, char* test_topic) { int i = 0; - int iterations = 50; + int iterations = 5; int rc; int wait_seconds; @@ -263,7 +245,7 @@ void test1_sendAndReceive(mqtt_client* c, int qos, char* test_topic) for (i = 0; i < iterations; ++i) { test1_message_data = NULL; rc = mqtt_publish(c, test_topic, &pubmsg); - assert("Good rc from publish", rc == MQTT_SUCCESS, "rc was %d", rc); + assert("Good rc from publish", rc == MQTT_SUCCESS, "rc was %d\n", rc); /* wait for the message to be received */ wait_seconds = 10; @@ -289,15 +271,14 @@ int test1(struct Options options) mqtt_client c; int rc = 0; char* test_topic = "C client test1"; - unsigned char buf[100]; - unsigned char readbuf[100]; fprintf(xml, "\n"); fclose(xml); return rc;