diff --git a/apps/mqtt/mqtt.c b/apps/mqtt/mqtt.c index 0be8e83bc..0a803c761 100644 --- a/apps/mqtt/mqtt.c +++ b/apps/mqtt/mqtt.c @@ -130,6 +130,7 @@ typedef enum { /*---------------------------------------------------------------------------*/ /* Protothread send macros */ #define PT_MQTT_WRITE_BYTES(conn, data, len) \ + conn->out_write_pos = 0; \ while(write_bytes(conn, data, len)) { \ PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ } @@ -147,15 +148,19 @@ typedef enum { */ #define PT_MQTT_WAIT_SEND() \ do { \ - process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \ - PROCESS_WAIT_EVENT(); \ - if(ev == mqtt_abort_now_event) { \ - conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ - PT_EXIT(&conn->out_proto_thread); \ - process_post(PROCESS_CURRENT(), ev, data); \ - } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ - process_post(PROCESS_CURRENT(), ev, data); \ - } \ + if (PROCESS_ERR_OK == \ + process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \ + do { \ + PROCESS_WAIT_EVENT(); \ + if(ev == mqtt_abort_now_event) { \ + conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ + PT_INIT(&conn->out_proto_thread); \ + process_post(PROCESS_CURRENT(), ev, data); \ + } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ + process_post(PROCESS_CURRENT(), ev, data); \ + } \ + } while (ev != mqtt_continue_send_event); \ + } \ } while(0) /*---------------------------------------------------------------------------*/ static process_event_t mqtt_do_connect_tcp_event; @@ -1188,8 +1193,8 @@ PROCESS_THREAD(mqtt_process, ev, data) if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) { if(conn->out_buffer_sent == 1) { PT_INIT(&conn->out_proto_thread); - while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED && - conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) { + while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE && + disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } abort_connection(conn); @@ -1206,8 +1211,8 @@ PROCESS_THREAD(mqtt_process, ev, data) if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); - while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED && - conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && + pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } @@ -1219,8 +1224,8 @@ PROCESS_THREAD(mqtt_process, ev, data) if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); - while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && - conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && + subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } @@ -1232,8 +1237,8 @@ PROCESS_THREAD(mqtt_process, ev, data) if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); - while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && - conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && + unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } @@ -1245,8 +1250,8 @@ PROCESS_THREAD(mqtt_process, ev, data) if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); - while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED && - conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && + publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } diff --git a/apps/mqtt/mqtt.h b/apps/mqtt/mqtt.h index 4b27fa30c..1802b6c31 100644 --- a/apps/mqtt/mqtt.h +++ b/apps/mqtt/mqtt.h @@ -196,6 +196,7 @@ typedef enum { MQTT_CONN_STATE_ERROR, MQTT_CONN_STATE_DNS_ERROR, MQTT_CONN_STATE_DISCONNECTING, + MQTT_CONN_STATE_ABORT_IMMEDIATE, MQTT_CONN_STATE_NOT_CONNECTED, MQTT_CONN_STATE_DNS_LOOKUP, @@ -204,7 +205,6 @@ typedef enum { MQTT_CONN_STATE_CONNECTING_TO_BROKER, MQTT_CONN_STATE_CONNECTED_TO_BROKER, MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT, - MQTT_CONN_STATE_ABORT_IMMEDIATE, } mqtt_conn_state_t; /*---------------------------------------------------------------------------*/ struct mqtt_string {