/* * Copyright (c) 2007, Swedish Institute of Computer Science * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the Institute nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * This file is part of the Contiki operating system. * * $Id: deluge.c,v 1.7 2009/05/15 23:04:15 nvt-se Exp $ */ /** * \file * The Deluge protocol for data dissemination. * (Hui and Culler: The dynamic behavior of a data * dissemination protocol for network programming at scale, * SenSys 2004) * \author * Nicolas Tsiftes <nvt@sics.se> */ #include "contiki.h" #include "net/rime.h" #include "net/rime/ctimer.h" #include "cfs/cfs.h" #include "loader/elfloader.h" #include "lib/crc16.h" #include "lib/random.h" #include "node-id.h" #include "deluge.h" #if NETSIM #include "ether.h" #include <stdio.h> #endif #include "dev/leds.h" #include <stdlib.h> #include <string.h> #define DEBUG 0 #if DEBUG #include <stdio.h> #define PRINTF(...) \ do { \ printf("[Node %02u] ", (unsigned) node_id); \ printf(__VA_ARGS__); \ } while (0) #else #define PRINTF(...) #endif PROCESS(deluge_process, "Deluge process"); static void broadcast_recv(struct broadcast_conn *, rimeaddr_t *); static void unicast_recv(struct unicast_conn *, rimeaddr_t *); /* Implementation-specific variables. */ static struct broadcast_conn deluge_broadcast; static struct unicast_conn deluge_uc; static struct deluge_object current_object; static process_event_t deluge_event; /* Deluge variables. */ static int deluge_state; static int old_summary; static int neighbor_inconsistency; static unsigned r_interval; static unsigned recv_adv; static int broadcast_profile; static struct ctimer rx_timer; static struct ctimer tx_timer; static struct ctimer summary_timer; static struct ctimer profile_timer; static unsigned next_object_id; static const struct broadcast_callbacks broadcast_call = {broadcast_recv}; static const struct unicast_callbacks unicast_call = {unicast_recv}; #if ENERGEST_CONF_ON static long cpu_start_time, tx_start_time, listen_start_time; #endif #if NETSIM static char label[128]; #endif static uint16_t checksum(unsigned char *buf, unsigned len) { unsigned i; uint16_t sum; for(i = sum = 0; i < len; i++) { sum = crc16_add(buf[i], sum); } return sum; } static void transition(int state) { switch (deluge_state) { case DELUGE_STATE_MAINTAIN: ctimer_stop(&summary_timer); ctimer_stop(&profile_timer); break; case DELUGE_STATE_RX: ctimer_stop(&rx_timer); break; case DELUGE_STATE_TX: ctimer_stop(&tx_timer); break; } deluge_state = state; } static int write_page(struct deluge_object *obj, unsigned pagenum, unsigned char *data) { cfs_seek(obj->cfs_fd, pagenum * S_PAGE, CFS_SEEK_SET); return cfs_write(obj->cfs_fd, (char *)data, S_PAGE); } static int read_page(struct deluge_object *obj, unsigned pagenum, unsigned char *buf) { cfs_seek(obj->cfs_fd, pagenum * S_PAGE, CFS_SEEK_SET); return cfs_read(obj->cfs_fd, (char *)buf, S_PAGE); } static void init_page(struct deluge_object *obj, int pagenum, int have) { struct deluge_page *page; unsigned char buf[S_PAGE]; page = &obj->pages[pagenum]; page->flags = 0; page->last_request = 0; page->last_data = 0; if(have) { page->version = obj->version; page->packet_set = ALL_PACKETS; page->flags |= PAGE_COMPLETE; read_page(obj, pagenum, buf); page->crc = checksum(buf, S_PAGE); } else { page->version = 0; page->packet_set = 0; } } static int file_size(const char *file) { int fd, size; fd = cfs_open(file, CFS_READ); if(fd < 0) { return -1; } size = cfs_seek(fd, 0, CFS_SEEK_END); cfs_close(fd); return size; } static int init_object(struct deluge_object *obj, char *filename, unsigned version) { static struct deluge_page *page; int i; obj->cfs_fd = cfs_open(filename, CFS_READ | CFS_WRITE); if(obj->cfs_fd < 0) { return -1; } obj->filename = filename; obj->object_id = next_object_id++; obj->size = file_size(filename); obj->version = obj->update_version = version; obj->current_rx_page = 0; obj->nrequests = 0; obj->tx_set = 0; obj->pages = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages)); if(obj->pages == NULL) { return -1; } for(i = 0; i < OBJECT_PAGE_COUNT(current_object); i++) { page = ¤t_object.pages[i]; init_page(¤t_object, i, 1); } memset(obj->current_page, 0, sizeof (obj->current_page)); return 0; } static int highest_available_page(struct deluge_object *obj) { int i; for(i = 0; i < OBJECT_PAGE_COUNT(*obj); i++) { if(!(obj->pages[i].flags & PAGE_COMPLETE)) { break; } } return i; } static void send_request(void *arg) { struct deluge_object *obj; struct deluge_msg_request request; obj = (struct deluge_object *)arg; request.object_id = obj->object_id; request.cmd = DELUGE_CMD_REQUEST; request.pagenum = obj->current_rx_page; request.version = obj->pages[request.pagenum].version; request.request_set = ~obj->pages[obj->current_rx_page].packet_set; PRINTF("Sending request for page %d, version %u, request_set %u\n", request.pagenum, request.version, request.request_set); packetbuf_copyfrom((uint8_t *)&request, sizeof (request)); unicast_send(&deluge_uc, &obj->summary_from); /* Deluge R.2 */ if(++obj->nrequests == CONST_LAMBDA) { /* XXX check rate here too. */ obj->nrequests = 0; transition(DELUGE_STATE_MAINTAIN); } else { ctimer_reset(&rx_timer); } } static void advertise_summary(struct deluge_object *obj) { struct deluge_msg_summary summary; if(recv_adv >= CONST_K) { ctimer_stop(&summary_timer); return; } summary.cmd = DELUGE_CMD_SUMMARY; summary.object_id = obj->object_id; summary.version = obj->update_version; summary.highest_available = highest_available_page(obj); PRINTF("Advertising summary for object id %u: version=%u, available=%u\n", (unsigned) obj->object_id, summary.version, summary.highest_available); packetbuf_copyfrom((uint8_t *)&summary, sizeof (summary)); broadcast_send(&deluge_broadcast); } static void handle_summary(struct deluge_msg_summary *msg, rimeaddr_t *sender) { int highest_available, i; clock_time_t oldest_request, oldest_data, now; struct deluge_page *page; highest_available = highest_available_page(¤t_object); if(msg->version != current_object.version || msg->highest_available != highest_available) { neighbor_inconsistency = 1; } else { recv_adv++; } if(msg->version < current_object.version) { old_summary = 1; broadcast_profile = 1; } /* Deluge M.5 */ if(msg->version == current_object.update_version && msg->highest_available > highest_available) { if(msg->highest_available > OBJECT_PAGE_COUNT(current_object)) { PRINTF("Error: highest available is above object page count!\n"); return; } oldest_request = oldest_data = now = clock_time(); for(i = 0; i < msg->highest_available; i++) { page = ¤t_object.pages[i]; if(page->last_request < oldest_request) oldest_request = page->last_request; if(page->last_request < oldest_data) oldest_data = page->last_data; } if(((now - oldest_request) / CLOCK_SECOND) <= 2 * r_interval || ((now - oldest_data) / CLOCK_SECOND) <= r_interval) { return; } rimeaddr_copy(¤t_object.summary_from, sender); transition(DELUGE_STATE_RX); ctimer_set(&rx_timer, CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), send_request, ¤t_object); } } static void send_page(struct deluge_object *obj, unsigned pagenum) { unsigned char buf[S_PAGE]; struct deluge_msg_packet pkt; unsigned char *cp; pkt.cmd = DELUGE_CMD_PACKET; pkt.object_id = obj->object_id; pkt.pagenum = pagenum; pkt.version = obj->pages[pagenum].version; pkt.packetnum = 0; pkt.crc = 0; read_page(obj, pagenum, buf); /* Divide the page into packets and send them one at a time. */ for(cp = buf; cp + S_PKT <= (unsigned char *)&buf[S_PAGE]; cp += S_PKT) { if(obj->tx_set & (1 << pkt.packetnum)) { pkt.crc = checksum(cp, S_PKT); memcpy(pkt.payload, cp, S_PKT); packetbuf_copyfrom((uint8_t *)&pkt, sizeof (pkt)); broadcast_send(&deluge_broadcast); } pkt.packetnum++; } obj->tx_set = 0; } static void tx_callback(void *arg) { struct deluge_object *obj; obj = (struct deluge_object *)arg; if(obj->current_tx_page >= 0 && obj->tx_set) { send_page(obj, obj->current_tx_page); /* Deluge T.2. */ if(obj->tx_set) { ctimer_reset(&tx_timer); } else { obj->current_tx_page = -1; transition(DELUGE_STATE_MAINTAIN); } } } static void handle_request(struct deluge_msg_request *msg) { int highest_available; if(msg->pagenum >= OBJECT_PAGE_COUNT(current_object)) { return; } if(msg->version != current_object.version) { neighbor_inconsistency = 1; } highest_available = highest_available_page(¤t_object); /* Deluge M.6 */ if(msg->version == current_object.version && msg->pagenum <= highest_available) { current_object.pages[msg->pagenum].last_request = clock_time(); /* Deluge T.1 */ if(msg->pagenum == current_object.current_tx_page) { current_object.tx_set |= msg->request_set; } else { current_object.current_tx_page = msg->pagenum; current_object.tx_set = msg->request_set; } transition(DELUGE_STATE_TX); ctimer_set(&tx_timer, CLOCK_SECOND, tx_callback, ¤t_object); } } static void handle_packet(struct deluge_msg_packet *msg) { struct deluge_page *page; uint16_t crc; #if ENERGEST_CONF_ON long cpu_time, listen_time, tx_time; #endif PRINTF("Incoming packet for object id %u, version %u, page %u, packet num %u!\n", (unsigned) msg->object_id, (unsigned) msg->version, (unsigned) msg->pagenum, (unsigned) msg->packetnum); if(msg->pagenum != current_object.current_rx_page) { return; } if(msg->version != current_object.version) { neighbor_inconsistency = 1; } page = ¤t_object.pages[msg->pagenum]; if(msg->version == page->version && !(page->flags & PAGE_COMPLETE)) { memcpy(¤t_object.current_page[S_PKT * msg->packetnum], msg->payload, S_PKT); crc = checksum(msg->payload, S_PKT); if(msg->crc != crc) { PRINTF("packet crc: %hu, calculated crc: %hu\n", msg->crc, crc); return; } page->last_data = clock_time(); page->packet_set |= (1 << msg->packetnum); #if NETSIM #define RX_FRACTION \ (float) (current_object.current_rx_page + 1) / \ OBJECT_PAGE_COUNT(current_object) snprintf(label, sizeof (label), "v:%u %u%%", msg->version, (unsigned) (100 * RX_FRACTION)); ether_set_text(label); #endif if(page->packet_set == ALL_PACKETS) { write_page(¤t_object, msg->pagenum, current_object.current_page); page->version = msg->version; page->flags = PAGE_COMPLETE; PRINTF("Page %u completed\n", msg->pagenum); current_object.current_rx_page++; if(msg->pagenum == OBJECT_PAGE_COUNT(current_object) - 1) { current_object.version = current_object.update_version; leds_on(LEDS_RED); PRINTF("Update completed for object %u, version %u\n", current_object.object_id, msg->version); #if ENERGEST_CONF_ON cpu_time = energest_type_time(ENERGEST_TYPE_CPU) - cpu_start_time; tx_time = energest_type_time(ENERGEST_TYPE_TRANSMIT) - tx_start_time; listen_time = energest_type_time(ENERGEST_TYPE_LISTEN) - listen_start_time; PRINTF("Time estimation: CPU %ld, TX %ld, Listen %ld\n", cpu_time, tx_time, listen_time); PRINTF("Energy: %lumJ\n", (unsigned long) ((1.8 * cpu_time + 20.0 * listen_time + 17.7 * tx_time) * 3 / RTIMER_SECOND)); #endif } else if(current_object.current_rx_page < OBJECT_PAGE_COUNT(current_object)) { ctimer_set(&rx_timer, CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), send_request, ¤t_object); } /* Deluge R.3 */ transition(DELUGE_STATE_MAINTAIN); } } } static void unicast_recv(struct unicast_conn *c, rimeaddr_t *sender) { char *msg; int len; msg = packetbuf_dataptr(); len = packetbuf_datalen(); if(len < 5) return; switch (msg[2]) { case DELUGE_CMD_REQUEST: if(len >= sizeof (struct deluge_msg_request)) handle_request((struct deluge_msg_request *)msg); break; default: PRINTF("Incoming packet with unknown command!\n"); } } static void send_profile(struct deluge_object *obj) { struct deluge_msg_profile *msg; unsigned char buf[sizeof (*msg) + OBJECT_PAGE_COUNT(*obj)]; int i; if(broadcast_profile && recv_adv < CONST_K) { broadcast_profile = 0; msg = (struct deluge_msg_profile *)buf; msg->cmd = DELUGE_CMD_PROFILE; msg->object_id = obj->object_id; msg->version = obj->version; msg->npages = OBJECT_PAGE_COUNT(*obj); for(i = 0; i < msg->npages; i++) { msg->version_vector[i] = obj->pages[i].version; } packetbuf_copyfrom(buf, sizeof (buf)); broadcast_send(&deluge_broadcast); } } static void handle_profile(struct deluge_msg_profile *msg) { int i; int npages; struct deluge_object *obj; char *p; obj = ¤t_object; if(msg->version <= current_object.update_version) { return; } #ifdef ENERGEST_CONF_ON cpu_start_time = energest_type_time(ENERGEST_TYPE_CPU); tx_start_time = energest_type_time(ENERGEST_TYPE_TRANSMIT); listen_start_time = energest_type_time(ENERGEST_TYPE_LISTEN); #endif PRINTF("Received profile of version %u with a vector of %u pages.\n", msg->version, msg->npages); leds_off(LEDS_RED); current_object.tx_set = 0; npages = OBJECT_PAGE_COUNT(*obj); obj->size = msg->npages * S_PAGE; p = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof (*obj->pages)); if(p == NULL) { PRINTF("Failed to reallocate memory for pages!\n"); return; } memcpy(p, obj->pages, npages * sizeof (*obj->pages)); free(obj->pages); obj->pages = (struct deluge_page *)p; if(msg->npages < npages) { npages = msg->npages; } for(i = 0; i < npages; i++) { if(msg->version_vector[i] > obj->pages[i].version) { obj->pages[i].packet_set = 0; obj->pages[i].flags &= ~PAGE_COMPLETE; obj->pages[i].version = msg->version_vector[i]; } } for(; i < msg->npages; i++) { init_page(obj, i, 0); } obj->current_rx_page = highest_available_page(obj); obj->update_version = msg->version; transition(DELUGE_STATE_RX); ctimer_set(&rx_timer, CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), send_request, obj); } static void broadcast_recv(struct broadcast_conn *c, rimeaddr_t *sender) { char *msg; int len; struct deluge_msg_profile *profile; msg = packetbuf_dataptr(); len = packetbuf_datalen(); if(len < 5) return; switch (msg[2]) { case DELUGE_CMD_SUMMARY: if(len >= sizeof (struct deluge_msg_summary)) handle_summary((struct deluge_msg_summary *)msg, sender); break; case DELUGE_CMD_REQUEST: if(len >= sizeof (struct deluge_msg_request)) handle_request((struct deluge_msg_request *)msg); break; case DELUGE_CMD_PACKET: if(len >= sizeof (struct deluge_msg_packet)) handle_packet((struct deluge_msg_packet *)msg); break; case DELUGE_CMD_PROFILE: profile = (struct deluge_msg_profile *)msg; if(len >= sizeof (*profile) && len >= sizeof (*profile) + profile->npages * profile->version_vector[0]) handle_profile((struct deluge_msg_profile *)msg); break; default: PRINTF("Incoming packet with unknown command!\n"); } } int deluge_disseminate(char *file, unsigned version) { if(init_object(¤t_object, file, version) < 0) { return -1; } process_start(&deluge_process, file); return 0; } PROCESS_THREAD(deluge_process, ev, data) { static struct etimer et; static unsigned time_counter; static int r_rand; PROCESS_EXITHANDLER(goto exit); PROCESS_BEGIN(); deluge_event = process_alloc_event(); broadcast_open(&deluge_broadcast, DELUGE_BROADCAST_CHANNEL, &broadcast_call); unicast_open(&deluge_uc, DELUGE_UNICAST_CHANNEL, &unicast_call); r_interval = T_LOW; PRINTF("Maintaining state for object %s of %d pages\n", current_object.filename, OBJECT_PAGE_COUNT(current_object)); deluge_state = DELUGE_STATE_MAINTAIN; for(r_interval = T_LOW;;) { if(neighbor_inconsistency) { /* Deluge M.2 */ r_interval = T_LOW; neighbor_inconsistency = 0; } else { /* Deluge M.3 */ r_interval = (2 * r_interval >= T_HIGH) ? T_HIGH : 2 * r_interval; } r_rand = r_interval / 2 + (random_rand() % (r_interval / 2)); recv_adv = 0; old_summary = 0; /* Deluge M.1 */ ctimer_set(&summary_timer, r_rand * CLOCK_SECOND, (void *)(void *)advertise_summary, ¤t_object); /* Deluge M.4 */ ctimer_set(&profile_timer, r_rand * CLOCK_SECOND, (void *)(void *)send_profile, ¤t_object); LONG_TIMER(et, time_counter, r_interval); } exit: unicast_close(&deluge_uc); broadcast_close(&deluge_broadcast); if(current_object.cfs_fd >= 0) { cfs_close(current_object.cfs_fd); } if(current_object.pages != NULL) { free(current_object.pages); } PROCESS_END(); }