Merge pull request #2150 from posjodin/PR2

Several changes to fix and harden mqtt code.
This commit is contained in:
George Oikonomou 2017-04-24 16:04:56 +01:00 committed by GitHub
commit 3c150a5e99
2 changed files with 24 additions and 20 deletions

View file

@ -130,6 +130,7 @@ typedef enum {
/*---------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------*/
/* Protothread send macros */ /* Protothread send macros */
#define PT_MQTT_WRITE_BYTES(conn, data, len) \ #define PT_MQTT_WRITE_BYTES(conn, data, len) \
conn->out_write_pos = 0; \
while(write_bytes(conn, data, len)) { \ while(write_bytes(conn, data, len)) { \
PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
} }
@ -147,14 +148,18 @@ typedef enum {
*/ */
#define PT_MQTT_WAIT_SEND() \ #define PT_MQTT_WAIT_SEND() \
do { \ do { \
process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \ if (PROCESS_ERR_OK == \
PROCESS_WAIT_EVENT(); \ process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
if(ev == mqtt_abort_now_event) { \ do { \
conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ PROCESS_WAIT_EVENT(); \
PT_EXIT(&conn->out_proto_thread); \ if(ev == mqtt_abort_now_event) { \
process_post(PROCESS_CURRENT(), ev, data); \ conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
} else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ PT_INIT(&conn->out_proto_thread); \
process_post(PROCESS_CURRENT(), ev, data); \ process_post(PROCESS_CURRENT(), ev, data); \
} else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
process_post(PROCESS_CURRENT(), ev, data); \
} \
} while (ev != mqtt_continue_send_event); \
} \ } \
} while(0) } while(0)
/*---------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------*/
@ -1188,8 +1193,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) { if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
if(conn->out_buffer_sent == 1) { if(conn->out_buffer_sent == 1) {
PT_INIT(&conn->out_proto_thread); PT_INIT(&conn->out_proto_thread);
while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED && while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) { disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
PT_MQTT_WAIT_SEND(); PT_MQTT_WAIT_SEND();
} }
abort_connection(conn); abort_connection(conn);
@ -1206,8 +1211,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
if(conn->out_buffer_sent == 1 && if(conn->out_buffer_sent == 1 &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
PT_INIT(&conn->out_proto_thread); PT_INIT(&conn->out_proto_thread);
while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED && while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
PT_MQTT_WAIT_SEND(); PT_MQTT_WAIT_SEND();
} }
} }
@ -1219,8 +1224,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
if(conn->out_buffer_sent == 1 && if(conn->out_buffer_sent == 1 &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
PT_INIT(&conn->out_proto_thread); PT_INIT(&conn->out_proto_thread);
while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
PT_MQTT_WAIT_SEND(); PT_MQTT_WAIT_SEND();
} }
} }
@ -1232,8 +1237,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
if(conn->out_buffer_sent == 1 && if(conn->out_buffer_sent == 1 &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
PT_INIT(&conn->out_proto_thread); PT_INIT(&conn->out_proto_thread);
while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
PT_MQTT_WAIT_SEND(); PT_MQTT_WAIT_SEND();
} }
} }
@ -1245,8 +1250,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
if(conn->out_buffer_sent == 1 && if(conn->out_buffer_sent == 1 &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
PT_INIT(&conn->out_proto_thread); PT_INIT(&conn->out_proto_thread);
while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED && while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
PT_MQTT_WAIT_SEND(); PT_MQTT_WAIT_SEND();
} }
} }

View file

@ -196,7 +196,7 @@ typedef enum {
MQTT_CONN_STATE_ERROR, MQTT_CONN_STATE_ERROR,
MQTT_CONN_STATE_DNS_ERROR, MQTT_CONN_STATE_DNS_ERROR,
MQTT_CONN_STATE_DISCONNECTING, MQTT_CONN_STATE_DISCONNECTING,
MQTT_CONN_STATE_ABORT_IMMEDIATE,
MQTT_CONN_STATE_NOT_CONNECTED, MQTT_CONN_STATE_NOT_CONNECTED,
MQTT_CONN_STATE_DNS_LOOKUP, MQTT_CONN_STATE_DNS_LOOKUP,
MQTT_CONN_STATE_TCP_CONNECTING, MQTT_CONN_STATE_TCP_CONNECTING,
@ -204,7 +204,6 @@ typedef enum {
MQTT_CONN_STATE_CONNECTING_TO_BROKER, MQTT_CONN_STATE_CONNECTING_TO_BROKER,
MQTT_CONN_STATE_CONNECTED_TO_BROKER, MQTT_CONN_STATE_CONNECTED_TO_BROKER,
MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT, MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT,
MQTT_CONN_STATE_ABORT_IMMEDIATE,
} mqtt_conn_state_t; } mqtt_conn_state_t;
/*---------------------------------------------------------------------------*/ /*---------------------------------------------------------------------------*/
struct mqtt_string { struct mqtt_string {