From ddd245ed15877952a498ab23ba12424008521c3f Mon Sep 17 00:00:00 2001 From: Nicolas Tsiftes Date: Tue, 14 Jun 2011 17:12:15 +0200 Subject: [PATCH] Added streaming mode for page updates. Rewrote and removed some legacy code. Improved error checks and comments. --- apps/deluge/deluge.c | 165 ++++++++++++++++++++++--------------------- apps/deluge/deluge.h | 18 ++--- 2 files changed, 95 insertions(+), 88 deletions(-) diff --git a/apps/deluge/deluge.c b/apps/deluge/deluge.c index 6699f19a9..8aaed5a90 100644 --- a/apps/deluge/deluge.c +++ b/apps/deluge/deluge.c @@ -33,10 +33,10 @@ /** * \file - * The Deluge protocol for data dissemination. + * An implementation of the Deluge protocol. * (Hui and Culler: The dynamic behavior of a data * dissemination protocol for network programming at scale, - * SenSys 2004) + * ACM SenSys 2004) * \author * Nicolas Tsiftes */ @@ -67,11 +67,6 @@ #define PRINTF(...) #endif -PROCESS(deluge_process, "Deluge process"); - -static void broadcast_recv(struct broadcast_conn *, const rimeaddr_t *); -static void unicast_recv(struct unicast_conn *, const rimeaddr_t *); - /* Implementation-specific variables. */ static struct broadcast_conn deluge_broadcast; static struct unicast_conn deluge_uc; @@ -86,32 +81,25 @@ static unsigned r_interval; static unsigned recv_adv; static int broadcast_profile; +/* Deluge timers. */ static struct ctimer rx_timer; static struct ctimer tx_timer; static struct ctimer summary_timer; static struct ctimer profile_timer; -static unsigned next_object_id; +/* Deluge objects will get an ID that defaults to the current value of + the next_object_id parameter. */ +static deluge_object_id_t next_object_id; + +/* Rime callbacks. */ +static void broadcast_recv(struct broadcast_conn *, const rimeaddr_t *); +static void unicast_recv(struct unicast_conn *, const rimeaddr_t *); static const struct broadcast_callbacks broadcast_call = {broadcast_recv, NULL}; static const struct unicast_callbacks unicast_call = {unicast_recv, NULL}; -#if NETSIM -static char label[128]; -#endif - -static uint16_t -checksum(unsigned char *buf, unsigned len) -{ - unsigned i; - uint16_t sum; - - for(i = sum = 0; i < len; i++) { - sum = crc16_add(buf[i], sum); - } - - return sum; -} +/* The Deluge process manages the main Deluge timer. */ +PROCESS(deluge_process, "Deluge"); static void transition(int state) @@ -136,15 +124,26 @@ transition(int state) static int write_page(struct deluge_object *obj, unsigned pagenum, unsigned char *data) { - cfs_seek(obj->cfs_fd, pagenum * S_PAGE, CFS_SEEK_SET); - return cfs_write(obj->cfs_fd, (char *)data, - S_PAGE); + cfs_offset_t offset; + + offset = pagenum * S_PAGE; + + if(cfs_seek(obj->cfs_fd, offset, CFS_SEEK_SET) != offset) { + return -1; + } + return cfs_write(obj->cfs_fd, (char *)data, S_PAGE); } static int read_page(struct deluge_object *obj, unsigned pagenum, unsigned char *buf) { - cfs_seek(obj->cfs_fd, pagenum * S_PAGE, CFS_SEEK_SET); + cfs_offset_t offset; + + offset = pagenum * S_PAGE; + + if(cfs_seek(obj->cfs_fd, offset, CFS_SEEK_SET) != offset) { + return -1; + } return cfs_read(obj->cfs_fd, (char *)buf, S_PAGE); } @@ -165,26 +164,27 @@ init_page(struct deluge_object *obj, int pagenum, int have) page->packet_set = ALL_PACKETS; page->flags |= PAGE_COMPLETE; read_page(obj, pagenum, buf); - page->crc = checksum(buf, S_PAGE); + page->crc = crc16_data(buf, S_PAGE, 0); } else { page->version = 0; page->packet_set = 0; } } -static int +static cfs_offset_t file_size(const char *file) { - int fd, size; + int fd; + cfs_offset_t size; fd = cfs_open(file, CFS_READ); if(fd < 0) { - return -1; + return (cfs_offset_t)-1; } size = cfs_seek(fd, 0, CFS_SEEK_END); - cfs_close(fd); + return size; } @@ -209,6 +209,7 @@ init_object(struct deluge_object *obj, char *filename, unsigned version) obj->pages = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages)); if(obj->pages == NULL) { + cfs_close(obj->cfs_fd); return -1; } @@ -217,7 +218,7 @@ init_object(struct deluge_object *obj, char *filename, unsigned version) init_page(¤t_object, i, 1); } - memset(obj->current_page, 0, sizeof (obj->current_page)); + memset(obj->current_page, 0, sizeof(obj->current_page)); return 0; } @@ -244,15 +245,15 @@ send_request(void *arg) obj = (struct deluge_object *)arg; - request.object_id = obj->object_id; request.cmd = DELUGE_CMD_REQUEST; request.pagenum = obj->current_rx_page; request.version = obj->pages[request.pagenum].version; request.request_set = ~obj->pages[obj->current_rx_page].packet_set; + request.object_id = obj->object_id; PRINTF("Sending request for page %d, version %u, request_set %u\n", request.pagenum, request.version, request.request_set); - packetbuf_copyfrom((uint8_t *)&request, sizeof (request)); + packetbuf_copyfrom(&request, sizeof(request)); unicast_send(&deluge_uc, &obj->summary_from); /* Deluge R.2 */ @@ -276,14 +277,14 @@ advertise_summary(struct deluge_object *obj) } summary.cmd = DELUGE_CMD_SUMMARY; - summary.object_id = obj->object_id; summary.version = obj->update_version; summary.highest_available = highest_available_page(obj); + summary.object_id = obj->object_id; PRINTF("Advertising summary for object id %u: version=%u, available=%u\n", - (unsigned) obj->object_id, summary.version, summary.highest_available); + (unsigned)obj->object_id, summary.version, summary.highest_available); - packetbuf_copyfrom((uint8_t *)&summary, sizeof (summary)); + packetbuf_copyfrom(&summary, sizeof(summary)); broadcast_send(&deluge_broadcast); } @@ -351,10 +352,10 @@ send_page(struct deluge_object *obj, unsigned pagenum) unsigned char *cp; pkt.cmd = DELUGE_CMD_PACKET; - pkt.object_id = obj->object_id; pkt.pagenum = pagenum; pkt.version = obj->pages[pagenum].version; pkt.packetnum = 0; + pkt.object_id = obj->object_id; pkt.crc = 0; read_page(obj, pagenum, buf); @@ -362,9 +363,9 @@ send_page(struct deluge_object *obj, unsigned pagenum) /* Divide the page into packets and send them one at a time. */ for(cp = buf; cp + S_PKT <= (unsigned char *)&buf[S_PAGE]; cp += S_PKT) { if(obj->tx_set & (1 << pkt.packetnum)) { - pkt.crc = checksum(cp, S_PKT); + pkt.crc = crc16_data(cp, S_PKT, 0); memcpy(pkt.payload, cp, S_PKT); - packetbuf_copyfrom((uint8_t *)&pkt, sizeof (pkt)); + packetbuf_copyfrom(&pkt, sizeof(pkt)); broadcast_send(&deluge_broadcast); } pkt.packetnum++; @@ -382,8 +383,12 @@ tx_callback(void *arg) send_page(obj, obj->current_tx_page); /* Deluge T.2. */ if(obj->tx_set) { + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, + PACKETBUF_ATTR_PACKET_TYPE_STREAM); ctimer_reset(&tx_timer); } else { + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, + PACKETBUF_ATTR_PACKET_TYPE_STREAM_END); obj->current_tx_page = -1; transition(DELUGE_STATE_MAINTAIN); } @@ -449,7 +454,7 @@ handle_packet(struct deluge_msg_packet *msg) memcpy(¤t_object.current_page[S_PKT * packet.packetnum], packet.payload, S_PKT); - crc = checksum(packet.payload, S_PKT); + crc = crc16_data(packet.payload, S_PKT, 0); if(packet.crc != crc) { PRINTF("packet crc: %hu, calculated crc: %hu\n", packet.crc, crc); return; @@ -459,6 +464,10 @@ handle_packet(struct deluge_msg_packet *msg) page->packet_set |= (1 << packet.packetnum); if(page->packet_set == ALL_PACKETS) { + /* This is the last packet of the requested page; stop streaming. */ + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, + PACKETBUF_ATTR_PACKET_TYPE_STREAM_END); + write_page(¤t_object, packet.pagenum, current_object.current_page); page->version = packet.version; page->flags = PAGE_COMPLETE; @@ -470,7 +479,7 @@ handle_packet(struct deluge_msg_packet *msg) current_object.version = current_object.update_version; leds_on(LEDS_RED); PRINTF("Update completed for object %u, version %u\n", - current_object.object_id, packet.version); + (unsigned)current_object.object_id, packet.version); } else if(current_object.current_rx_page < OBJECT_PAGE_COUNT(current_object)) { if(ctimer_expired(&rx_timer)) { ctimer_set(&rx_timer, @@ -480,36 +489,19 @@ handle_packet(struct deluge_msg_packet *msg) } /* Deluge R.3 */ transition(DELUGE_STATE_MAINTAIN); + } else { + /* More packets to come. Put lower layers in streaming mode. */ + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, + PACKETBUF_ATTR_PACKET_TYPE_STREAM); } } } -static void -unicast_recv(struct unicast_conn *c, const rimeaddr_t *sender) -{ - char *msg; - int len; - - msg = packetbuf_dataptr(); - len = packetbuf_datalen(); - if(len < 5) - return; - - switch(msg[2]) { - case DELUGE_CMD_REQUEST: - if(len >= sizeof (struct deluge_msg_request)) - handle_request((struct deluge_msg_request *)msg); - break; - default: - PRINTF("Incoming packet with unknown command!\n"); - } -} - static void send_profile(struct deluge_object *obj) { struct deluge_msg_profile *msg; - unsigned char buf[sizeof (*msg) + OBJECT_PAGE_COUNT(*obj)]; + unsigned char buf[sizeof(*msg) + OBJECT_PAGE_COUNT(*obj)]; int i; if(broadcast_profile && recv_adv < CONST_K) { @@ -517,14 +509,14 @@ send_profile(struct deluge_object *obj) msg = (struct deluge_msg_profile *)buf; msg->cmd = DELUGE_CMD_PROFILE; - msg->object_id = obj->object_id; msg->version = obj->version; msg->npages = OBJECT_PAGE_COUNT(*obj); + msg->object_id = obj->object_id; for(i = 0; i < msg->npages; i++) { msg->version_vector[i] = obj->pages[i].version; } - packetbuf_copyfrom(buf, sizeof (buf)); + packetbuf_copyfrom(buf, sizeof(buf)); broadcast_send(&deluge_broadcast); } } @@ -551,13 +543,13 @@ handle_profile(struct deluge_msg_profile *msg) npages = OBJECT_PAGE_COUNT(*obj); obj->size = msg->npages * S_PAGE; - p = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof (*obj->pages)); + p = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages)); if(p == NULL) { PRINTF("Failed to reallocate memory for pages!\n"); return; } - memcpy(p, obj->pages, npages * sizeof (*obj->pages)); + memcpy(p, obj->pages, npages * sizeof(*obj->pages)); free(obj->pages); obj->pages = (struct deluge_page *)p; @@ -588,7 +580,7 @@ handle_profile(struct deluge_msg_profile *msg) } static void -broadcast_recv(struct broadcast_conn *c, const rimeaddr_t *sender) +command_dispatcher(const rimeaddr_t *sender) { char *msg; int len; @@ -596,37 +588,50 @@ broadcast_recv(struct broadcast_conn *c, const rimeaddr_t *sender) msg = packetbuf_dataptr(); len = packetbuf_datalen(); - if(len < 5) + if(len < 1) return; - switch(msg[2]) { + switch(msg[0]) { case DELUGE_CMD_SUMMARY: - if(len >= sizeof (struct deluge_msg_summary)) + if(len >= sizeof(struct deluge_msg_summary)) handle_summary((struct deluge_msg_summary *)msg, sender); break; case DELUGE_CMD_REQUEST: - if(len >= sizeof (struct deluge_msg_request)) + if(len >= sizeof(struct deluge_msg_request)) handle_request((struct deluge_msg_request *)msg); break; case DELUGE_CMD_PACKET: - if(len >= sizeof (struct deluge_msg_packet)) + if(len >= sizeof(struct deluge_msg_packet)) handle_packet((struct deluge_msg_packet *)msg); break; case DELUGE_CMD_PROFILE: profile = (struct deluge_msg_profile *)msg; - if(len >= sizeof (*profile) && - len >= sizeof (*profile) + profile->npages * profile->version_vector[0]) + if(len >= sizeof(*profile) && + len >= sizeof(*profile) + profile->npages * profile->version_vector[0]) handle_profile((struct deluge_msg_profile *)msg); break; default: - PRINTF("Incoming packet with unknown command!\n"); + PRINTF("Incoming packet with unknown command: %d\n", msg[0]); } } +static void +unicast_recv(struct unicast_conn *c, const rimeaddr_t *sender) +{ + command_dispatcher(sender); +} + +static void +broadcast_recv(struct broadcast_conn *c, const rimeaddr_t *sender) +{ + command_dispatcher(sender); +} + int deluge_disseminate(char *file, unsigned version) { - if(init_object(¤t_object, file, version) < 0) { + /* This implementation disseminates at most one object. */ + if(next_object_id > 0 || init_object(¤t_object, file, version) < 0) { return -1; } process_start(&deluge_process, file); diff --git a/apps/deluge/deluge.h b/apps/deluge/deluge.h index ea965c6d5..ec68cdfce 100644 --- a/apps/deluge/deluge.h +++ b/apps/deluge/deluge.h @@ -95,38 +95,40 @@ PROCESS_NAME(deluge_process); #define CONST_OMEGA 8 #define ESTIMATED_TX_TIME (CLOCK_SECOND) +typedef uint8_t deluge_object_id_t; + struct deluge_msg_summary { - uint16_t object_id; uint8_t cmd; uint8_t version; uint8_t highest_available; -} __attribute__((packed)); + deluge_object_id_t object_id; +}; struct deluge_msg_request { - uint16_t object_id; uint8_t cmd; uint8_t version; uint8_t pagenum; uint8_t request_set; -} __attribute__((packed)); + deluge_object_id_t object_id; +}; struct deluge_msg_packet { - uint16_t object_id; uint8_t cmd; uint8_t version; uint8_t pagenum; uint8_t packetnum; uint16_t crc; + deluge_object_id_t object_id; unsigned char payload[S_PKT]; -} __attribute__((packed)); +}; struct deluge_msg_profile { - uint16_t object_id; uint8_t cmd; uint8_t version; uint8_t npages; + deluge_object_id_t object_id; uint8_t version_vector[]; -} __attribute__((packed)); +}; struct deluge_object { char *filename;