From 31bc37a0439530de0a814f3b3b72c55f8fd7e65c Mon Sep 17 00:00:00 2001 From: nvt-se Date: Wed, 25 Feb 2009 17:00:00 +0000 Subject: [PATCH] Added Deluge and tests for it. --- apps/deluge/Makefile.deluge | 1 + apps/deluge/deluge.c | 717 ++++++++++++++++++++++ apps/deluge/deluge.h | 170 +++++ examples/sky/test-deluge.c | 108 ++++ tools/cooja/contiki_tests/sky_deluge.csc | 98 +++ tools/cooja/contiki_tests/sky_deluge.info | 1 + tools/cooja/contiki_tests/sky_deluge.js | 9 + tools/cooja/contiki_tests/sky_deluge.log | 5 + 8 files changed, 1109 insertions(+) create mode 100644 apps/deluge/Makefile.deluge create mode 100644 apps/deluge/deluge.c create mode 100644 apps/deluge/deluge.h create mode 100644 examples/sky/test-deluge.c create mode 100644 tools/cooja/contiki_tests/sky_deluge.csc create mode 100644 tools/cooja/contiki_tests/sky_deluge.info create mode 100644 tools/cooja/contiki_tests/sky_deluge.js create mode 100644 tools/cooja/contiki_tests/sky_deluge.log diff --git a/apps/deluge/Makefile.deluge b/apps/deluge/Makefile.deluge new file mode 100644 index 000000000..9cb02423d --- /dev/null +++ b/apps/deluge/Makefile.deluge @@ -0,0 +1 @@ +deluge_src += deluge.c diff --git a/apps/deluge/deluge.c b/apps/deluge/deluge.c new file mode 100644 index 000000000..e1f79706f --- /dev/null +++ b/apps/deluge/deluge.c @@ -0,0 +1,717 @@ +/* + * Copyright (c) 2007, Swedish Institute of Computer Science + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * This file is part of the Contiki operating system. + * + * $Id: deluge.c,v 1.1 2009/02/25 17:00:41 nvt-se Exp $ + */ + +/** + * \file + * The Deluge protocol for data dissemination. + * (Hui and Culler: The dynamic behavior of a data + * dissemination protocol for network programming at scale, + * SenSys 2004) + * \author + * Nicolas Tsiftes + */ + +#define DEBUG 0 + +#include "contiki.h" +#include "net/rime.h" +#include "net/rime/ctimer.h" +#include "cfs/cfs.h" +#include "loader/elfloader.h" +#include "lib/crc16.h" +#include "lib/random.h" +#include "node-id.h" +#include "deluge.h" + +#if NETSIM +#include "ether.h" +#include +#endif + +#include "dev/leds.h" +#include +#include + +PROCESS(deluge_process, "Deluge process"); + +static void broadcast_recv(struct broadcast_conn *, rimeaddr_t *); +static void unicast_recv(struct unicast_conn *, rimeaddr_t *); + +/* Implementation-specific variables. */ +static struct broadcast_conn deluge_broadcast; +static struct unicast_conn deluge_uc; +static struct deluge_object current_object; +static process_event_t deluge_event; + +/* Deluge variables. */ +static int deluge_state; +static int old_summary; +static int neighbor_inconsistency; +static unsigned r_interval; +static unsigned recv_adv; +static int broadcast_profile; + +static struct ctimer rx_timer; +static struct ctimer tx_timer; +static struct ctimer summary_timer; +static struct ctimer profile_timer; + +static unsigned next_object_id; + +static const struct broadcast_callbacks broadcast_call = {broadcast_recv}; +static const struct unicast_callbacks unicast_call = {unicast_recv}; + +#if ENERGEST_CONF_ON +static long cpu_start_time, tx_start_time, listen_start_time; +#endif + +#if NETSIM +static char label[128]; +#endif + +static uint16_t +checksum(unsigned char *buf, unsigned len) +{ + unsigned i; + uint16_t sum; + + for(i = sum = 0; i < len; i++) { + sum = crc16_add(buf[i], sum); + } + + return sum; +} + +static void +transition(int state) +{ + switch (deluge_state) { + case DELUGE_STATE_MAINTAIN: + ctimer_stop(&summary_timer); + ctimer_stop(&profile_timer); + break; + case DELUGE_STATE_RX: + ctimer_stop(&rx_timer); + break; + case DELUGE_STATE_TX: + ctimer_stop(&tx_timer); + break; + } + + deluge_state = state; +} + +static int +write_page(struct deluge_object *obj, unsigned pagenum, unsigned char *data) +{ + cfs_seek(obj->cfs_fd, pagenum * S_PAGE); + return cfs_write(obj->cfs_fd, (char *) data, + S_PAGE); +} + +static int +read_page(struct deluge_object *obj, unsigned pagenum, unsigned char *buf) +{ + cfs_seek(obj->cfs_fd, pagenum * S_PAGE); + return cfs_read(obj->cfs_fd, (char *) buf, S_PAGE); +} + +static void +init_page(struct deluge_object *obj, int pagenum, int have) +{ + struct deluge_page *page; + unsigned char buf[S_PAGE]; + + page = &obj->pages[pagenum]; + + page->flags = 0; + page->last_request = 0; + page->last_data = 0; + + if(have) { + page->version = obj->version; + page->packet_set = ALL_PACKETS; + page->flags |= PAGE_COMPLETE; + read_page(obj, pagenum, buf); + page->crc = checksum(buf, S_PAGE); + } else { + page->version = 0; + page->packet_set = 0; + } +} + +static int +file_size(const char *file) +{ + int fd, r, size; + char buf[32]; + + fd = cfs_open(file, CFS_READ); + if(fd < 0) { + return -1; + } + + size = 0; + do { + r = cfs_read(fd, buf, sizeof(buf)); + if(r < 0) { + cfs_close(fd); + return -1; + } + size += r; + } while(r > 0); + + cfs_close(fd); + return size; +} + +static int +init_object(struct deluge_object *obj, char *filename, unsigned version) +{ + static struct deluge_page *page; + int i; + + obj->cfs_fd = cfs_open(filename, CFS_READ | CFS_WRITE); + if(obj->cfs_fd < 0) { + return -1; + } + + obj->filename = filename; + obj->object_id = next_object_id++; + obj->size = file_size(filename); + obj->version = obj->update_version = version; + obj->current_rx_page = 0; + obj->nrequests = 0; + obj->tx_set = 0; + + obj->pages = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages)); + if(obj->pages == NULL) { + return -1; + } + + for(i = 0; i < OBJECT_PAGE_COUNT(current_object); i++) { + page = ¤t_object.pages[i]; + init_page(¤t_object, i, 1); + } + + memset(obj->current_page, 0, sizeof (obj->current_page)); + + return 0; +} + +static int +highest_available_page(struct deluge_object *obj) +{ + int i; + + for(i = 0; i < OBJECT_PAGE_COUNT(*obj); i++) { + if(!(obj->pages[i].flags & PAGE_COMPLETE)) { + break; + } + } + + return i; +} + +static void +send_request(void *arg) +{ + struct deluge_object *obj; + struct deluge_msg_request request; + + obj = (struct deluge_object *) arg; + + request.object_id = obj->object_id; + request.cmd = DELUGE_CMD_REQUEST; + request.pagenum = obj->current_rx_page; + request.version = obj->pages[request.pagenum].version; + request.request_set = ~obj->pages[obj->current_rx_page].packet_set; + + PRINTF("Sending request for page %d, version %u, request_set %u\n", + request.pagenum, request.version, request.request_set); + rimebuf_copyfrom((uint8_t *) &request, sizeof (request)); + unicast_send(&deluge_uc, &obj->summary_from); + + /* Deluge R.2 */ + if(++obj->nrequests == CONST_LAMBDA) { + /* XXX check rate here too. */ + obj->nrequests = 0; + transition(DELUGE_STATE_MAINTAIN); + } else { + ctimer_reset(&rx_timer); + } +} + +static void +advertise_summary(struct deluge_object *obj) +{ + struct deluge_msg_summary summary; + + if(recv_adv >= CONST_K) { + ctimer_stop(&summary_timer); + return; + } + + summary.cmd = DELUGE_CMD_SUMMARY; + summary.object_id = obj->object_id; + summary.version = obj->update_version; + summary.highest_available = highest_available_page(obj); + + PRINTF("Advertising summary for object id %u: version=%u, available=%u\n", + (unsigned) obj->object_id, summary.version, summary.highest_available); + + rimebuf_copyfrom((uint8_t *) &summary, sizeof (summary)); + broadcast_send(&deluge_broadcast); +} + +static void +handle_summary(struct deluge_msg_summary *msg, rimeaddr_t *sender) +{ + int highest_available, i; + clock_time_t oldest_request, oldest_data, now; + struct deluge_page *page; + + highest_available = highest_available_page(¤t_object); + + if(msg->version != current_object.version || + msg->highest_available != highest_available) { + neighbor_inconsistency = 1; + } else { + recv_adv++; + } + + if(msg->version < current_object.version) { + old_summary = 1; + broadcast_profile = 1; + } + + /* Deluge M.5 */ + if(msg->version == current_object.update_version && + msg->highest_available > highest_available) { + if(msg->highest_available > OBJECT_PAGE_COUNT(current_object)) { + PRINTF("Error: highest available is above object page count!\n"); + return; + } + + oldest_request = oldest_data = now = clock_time(); + for(i = 0; i < msg->highest_available; i++) { + page = ¤t_object.pages[i]; + if(page->last_request < oldest_request) + oldest_request = page->last_request; + if(page->last_request < oldest_data) + oldest_data = page->last_data; + } + + if(((now - oldest_request) / CLOCK_SECOND) <= 2 * r_interval || + ((now - oldest_data) / CLOCK_SECOND) <= r_interval) { + return; + } + + rimeaddr_copy(¤t_object.summary_from, sender); + transition(DELUGE_STATE_RX); + ctimer_set(&rx_timer, + CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), + send_request, ¤t_object); + } +} + +static void +send_page(struct deluge_object *obj, unsigned pagenum) +{ + unsigned char buf[S_PAGE]; + struct deluge_msg_packet pkt; + unsigned char *cp; + + pkt.cmd = DELUGE_CMD_PACKET; + pkt.object_id = obj->object_id; + pkt.pagenum = pagenum; + pkt.version = obj->pages[pagenum].version; + pkt.packetnum = 0; + pkt.crc = 0; + + read_page(obj, pagenum, buf); + + /* Divide the page into packets and send them one at a time. */ + for(cp = buf; cp + S_PKT <= (unsigned char *) &buf[S_PAGE]; cp += S_PKT) { + pkt.crc = checksum(cp, S_PKT); + memcpy(pkt.payload, cp, S_PKT); + rimebuf_copyfrom((uint8_t *) &pkt, sizeof (pkt)); + broadcast_send(&deluge_broadcast); + + obj->tx_set &= ~(1 << pkt.packetnum++); + } +} + +static void +tx_callback(void *arg) +{ + struct deluge_object *obj; + + obj = (struct deluge_object *) arg; + if(obj->current_tx_page >= 0 && obj->tx_set) { + send_page(obj, obj->current_tx_page); + /* Deluge T.2. */ + if(obj->tx_set) { + ctimer_reset(&tx_timer); + } else { + obj->current_tx_page = -1; + transition(DELUGE_STATE_MAINTAIN); + } + } +} + +static void +handle_request(struct deluge_msg_request *msg) +{ + int highest_available; + + if(msg->pagenum >= OBJECT_PAGE_COUNT(current_object)) { + return; + } + + if(msg->version != current_object.version) { + neighbor_inconsistency = 1; + } + + highest_available = highest_available_page(¤t_object); + + /* Deluge M.6 */ + if(msg->version == current_object.version && + msg->pagenum <= highest_available) { + current_object.pages[msg->pagenum].last_request = clock_time(); + + /* Deluge T.1 */ + if(msg->pagenum == current_object.current_tx_page) { + current_object.tx_set |= msg->request_set; + } else { + current_object.current_tx_page = msg->pagenum; + current_object.tx_set = msg->request_set; + } + + transition(DELUGE_STATE_TX); + ctimer_set(&tx_timer, CLOCK_SECOND, tx_callback, ¤t_object); + } +} + +static void +handle_packet(struct deluge_msg_packet *msg) +{ + struct deluge_page *page; + uint16_t crc; +#if ENERGEST_CONF_ON + long cpu_time, listen_time, tx_time; +#endif + + PRINTF("Incoming packet for object id %u, version %u, page %u, packet num %u!\n", + (unsigned) msg->object_id, (unsigned) msg->version, + (unsigned) msg->pagenum, (unsigned) msg->packetnum); + + if(msg->pagenum != current_object.current_rx_page) { + return; + } + + if(msg->version != current_object.version) { + neighbor_inconsistency = 1; + } + + page = ¤t_object.pages[msg->pagenum]; + if(msg->version == page->version && !(page->flags & PAGE_COMPLETE)) { + memcpy(¤t_object.current_page[S_PKT * msg->packetnum], + msg->payload, S_PKT); + + crc = checksum(msg->payload, S_PKT); + if(msg->crc != crc) { + PRINTF("packet crc: %hu, calculated crc: %hu\n", msg->crc, crc); + return; + } + + page->last_data = clock_time(); + page->packet_set |= (1 << msg->packetnum); + +#if NETSIM +#define RX_FRACTION \ + (float) (current_object.current_rx_page + 1) / \ + OBJECT_PAGE_COUNT(current_object) + snprintf(label, sizeof (label), "v:%u %u%%", msg->version, + (unsigned) (100 * RX_FRACTION)); + ether_set_text(label); +#endif + + if(page->packet_set == ALL_PACKETS) { + write_page(¤t_object, msg->pagenum, current_object.current_page); + page->version = msg->version; + page->flags = PAGE_COMPLETE; + PRINTF("Page %u completed\n", msg->pagenum); + + current_object.current_rx_page++; + + if(msg->pagenum == OBJECT_PAGE_COUNT(current_object) - 1) { + current_object.version = current_object.update_version; + leds_on(LEDS_RED); + PRINTF("Update completed for object %u, version %u\n", + current_object.object_id, msg->version); +#if ENERGEST_CONF_ON + cpu_time = energest_type_time(ENERGEST_TYPE_CPU) - cpu_start_time; + tx_time = energest_type_time(ENERGEST_TYPE_TRANSMIT) - tx_start_time; + listen_time = energest_type_time(ENERGEST_TYPE_LISTEN) - listen_start_time; + PRINTF("Time estimation: CPU %ld, TX %ld, Listen %ld\n", + cpu_time, tx_time, listen_time); + PRINTF("Energy: %lumJ\n", + (unsigned long) ((1.8 * cpu_time + 20.0 * listen_time + + 17.7 * tx_time) * 3 / RTIMER_SECOND)); +#endif + } else if(current_object.current_rx_page < OBJECT_PAGE_COUNT(current_object)) { + ctimer_set(&rx_timer, + CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), + send_request, ¤t_object); + } + /* Deluge R.3 */ + transition(DELUGE_STATE_MAINTAIN); + } + } +} + +static void +unicast_recv(struct unicast_conn *c, rimeaddr_t *sender) +{ + char *msg; + int len; + + msg = rimebuf_dataptr(); + len = rimebuf_datalen(); + if(len < 5) + return; + + switch (msg[2]) { + case DELUGE_CMD_REQUEST: + if(len >= sizeof (struct deluge_msg_request)) + handle_request((struct deluge_msg_request *) msg); + break; + default: + PRINTF("Incoming packet with unknown command!\n"); + } +} + +static void +send_profile(struct deluge_object *obj) +{ + struct deluge_msg_profile *msg; + unsigned char buf[sizeof (*msg) + OBJECT_PAGE_COUNT(*obj)]; + int i; + + if(broadcast_profile && recv_adv < CONST_K) { + broadcast_profile = 0; + + msg = (struct deluge_msg_profile *) buf; + msg->cmd = DELUGE_CMD_PROFILE; + msg->object_id = obj->object_id; + msg->version = obj->version; + msg->npages = OBJECT_PAGE_COUNT(*obj); + for(i = 0; i < msg->npages; i++) { + msg->version_vector[i] = obj->pages[i].version; + } + + rimebuf_copyfrom(buf, sizeof (buf)); + broadcast_send(&deluge_broadcast); + } +} + +static void +handle_profile(struct deluge_msg_profile *msg) +{ + int i; + int npages; + struct deluge_object *obj; + char *p; + + obj = ¤t_object; + if(msg->version <= current_object.update_version) { + return; + } + +#ifdef ENERGEST_CONF_ON + cpu_start_time = energest_type_time(ENERGEST_TYPE_CPU); + tx_start_time = energest_type_time(ENERGEST_TYPE_TRANSMIT); + listen_start_time = energest_type_time(ENERGEST_TYPE_LISTEN); +#endif + + PRINTF("Received profile of version %u with a vector of %u pages.\n", + msg->version, msg->npages); + + leds_off(LEDS_RED); + current_object.tx_set = 0; + + npages = OBJECT_PAGE_COUNT(*obj); + obj->size = msg->npages * S_PAGE; + + p = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof (*obj->pages)); + if(p == NULL) { + PRINTF("Failed to reallocate memory for pages!\n"); + return; + } + + memcpy(p, obj->pages, npages * sizeof (*obj->pages)); + free(obj->pages); + obj->pages = (struct deluge_page *) p; + + if(msg->npages < npages) { + npages = msg->npages; + } + + for(i = 0; i < npages; i++) { + if(msg->version_vector[i] > obj->pages[i].version) { + obj->pages[i].packet_set = 0; + obj->pages[i].flags &= ~PAGE_COMPLETE; + obj->pages[i].version = msg->version_vector[i]; + } + } + + for(; i < msg->npages; i++) { + init_page(obj, i, 0); + } + + obj->current_rx_page = highest_available_page(obj); + obj->update_version = msg->version; + + transition(DELUGE_STATE_RX); + + ctimer_set(&rx_timer, + CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R), + send_request, obj); +} + +static void +broadcast_recv(struct broadcast_conn *c, rimeaddr_t *sender) +{ + char *msg; + int len; + struct deluge_msg_profile *profile; + + msg = rimebuf_dataptr(); + len = rimebuf_datalen(); + if(len < 5) + return; + + switch (msg[2]) { + case DELUGE_CMD_SUMMARY: + if(len >= sizeof (struct deluge_msg_summary)) + handle_summary((struct deluge_msg_summary *) msg, sender); + break; + case DELUGE_CMD_REQUEST: + if(len >= sizeof (struct deluge_msg_request)) + handle_request((struct deluge_msg_request *) msg); + break; + case DELUGE_CMD_PACKET: + if(len >= sizeof (struct deluge_msg_packet)) + handle_packet((struct deluge_msg_packet *) msg); + break; + case DELUGE_CMD_PROFILE: + profile = (struct deluge_msg_profile *) msg; + if(len >= sizeof (*profile) && + len >= sizeof (*profile) + profile->npages * profile->version_vector[0]) + handle_profile((struct deluge_msg_profile *) msg); + break; + default: + PRINTF("Incoming packet with unknown command!\n"); + } +} + +int +deluge_disseminate(char *file, unsigned version) +{ + if(init_object(¤t_object, file, version) < 0) { + return -1; + } + process_start(&deluge_process, file); + + return 0; +} + +PROCESS_THREAD(deluge_process, ev, data) +{ + static struct etimer et; + static unsigned time_counter; + static int r_rand; + + PROCESS_EXITHANDLER(goto exit); + + PROCESS_BEGIN(); + + deluge_event = process_alloc_event(); + + broadcast_open(&deluge_broadcast, DELUGE_BROADCAST_CHANNEL, &broadcast_call); + unicast_open(&deluge_uc, DELUGE_UNICAST_CHANNEL, &unicast_call); + r_interval = T_LOW; + + PRINTF("Maintaining state for object %s of %d pages\n", + current_object.filename, OBJECT_PAGE_COUNT(current_object)); + + deluge_state = DELUGE_STATE_MAINTAIN; + + for(r_interval = T_LOW;;) { + if(neighbor_inconsistency) { + /* Deluge M.2 */ + r_interval = T_LOW; + neighbor_inconsistency = 0; + } else { + /* Deluge M.3 */ + r_interval = (2 * r_interval >= T_HIGH) ? T_HIGH : 2 * r_interval; + } + + r_rand = r_interval / 2 + (random_rand() % (r_interval / 2)); + recv_adv = 0; + old_summary = 0; + + /* Deluge M.1 */ + ctimer_set(&summary_timer, r_rand * CLOCK_SECOND, + (void *)(void *)advertise_summary, ¤t_object); + + /* Deluge M.4 */ + ctimer_set(&profile_timer, r_rand * CLOCK_SECOND, + (void *)(void *) send_profile, ¤t_object); + + LONG_TIMER(et, time_counter, r_interval); + } + +exit: + unicast_close(&deluge_uc); + broadcast_close(&deluge_broadcast); + if(current_object.cfs_fd >= 0) { + cfs_close(current_object.cfs_fd); + } + if(current_object.pages != NULL) { + free(current_object.pages); + } + + PROCESS_END(); +} diff --git a/apps/deluge/deluge.h b/apps/deluge/deluge.h new file mode 100644 index 000000000..dcd79163f --- /dev/null +++ b/apps/deluge/deluge.h @@ -0,0 +1,170 @@ +/* + * Copyright (c) 2007, Swedish Institute of Computer Science + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * This file is part of the Contiki operating system. + * + * $Id: deluge.h,v 1.1 2009/02/25 17:00:41 nvt-se Exp $ + */ + +/** + * \file + * Header for Deluge. + * \author + * Nicolas Tsiftes + */ + +#ifndef DELUGE_H +#define DELUGE_H + +#include "net/rime.h" + +PROCESS_NAME(deluge_process); + +#if DEBUG +#include +#define PRINTF(...) \ + do { \ + printf("[Node %02u] ", (unsigned) node_id); \ + printf(__VA_ARGS__); \ + } while (0) +#else +#define PRINTF(...) +#endif + +#define LONG_TIMER(et, counter, time) \ + do { \ + for (counter = 0; counter < time; counter++) { \ + etimer_set(&et, CLOCK_SECOND); \ + PROCESS_WAIT_EVENT_UNTIL(etimer_expired(&et)); \ + } \ + } while (0) + +#define DELUGE_UNICAST_CHANNEL 55 +#define DELUGE_BROADCAST_CHANNEL 56 + +/* All the packets in a page have been received. */ +#define PAGE_COMPLETE 1 +/* All pages up to, and including, this page are complete. */ +#define PAGE_AVAILABLE 1 + +#define S_PKT 32 /* Deluge packet size. */ +#define N_PKT 4 /* Packets per page. */ +#define S_PAGE (S_PKT * N_PKT) /* Fixed page size. */ + +/* Bounds for the round time in seconds. */ +#define T_LOW 2 +#define T_HIGH 32 + +/* Random interval for request transmissions in jiffies. */ +#define T_R (CLOCK_SECOND / 2) + +/* Bound for the number of advertisements. */ +#define CONST_K 1 + +/* The number of pages in this object. */ +#define OBJECT_PAGE_COUNT(obj) (((obj).size + (S_PAGE - 1)) / S_PAGE) + +#define ALL_PACKETS ((1 << N_PKT) - 1) + +#define DELUGE_CMD_SUMMARY 1 +#define DELUGE_CMD_REQUEST 2 +#define DELUGE_CMD_PACKET 3 +#define DELUGE_CMD_PROFILE 4 + +#define DELUGE_STATE_MAINTAIN 1 +#define DELUGE_STATE_RX 2 +#define DELUGE_STATE_TX 3 + +#define CONST_LAMBDA 2 +#define CONST_ALPHA 0.5 + +#define CONST_OMEGA 8 +#define ESTIMATED_TX_TIME (CLOCK_SECOND / 8) + +struct deluge_msg_summary { + uint16_t object_id; + uint8_t cmd; + uint8_t version; + uint8_t highest_available; +} __attribute__((packed)); + +struct deluge_msg_request { + uint16_t object_id; + uint8_t cmd; + uint8_t version; + uint8_t pagenum; + unsigned request_set:N_PKT; +} __attribute__((packed)); + +struct deluge_msg_packet { + uint16_t object_id; + uint8_t cmd; + uint8_t version; + uint8_t pagenum; + uint8_t packetnum; + uint16_t crc; + unsigned char payload[S_PKT]; +} __attribute__((packed)); + +struct deluge_msg_profile { + uint16_t object_id; + uint8_t cmd; + uint8_t version; + uint8_t npages; + uint8_t version_vector[]; +} __attribute__((packed)); + +struct deluge_object { + char *filename; + uint16_t object_id; + uint16_t size; + uint8_t version; + uint8_t update_version; + struct deluge_page *pages; + uint8_t current_rx_page; + int8_t current_tx_page; + uint8_t nrequests; + uint8_t current_page[S_PAGE]; + unsigned tx_set:N_PKT; + int cfs_fd; + rimeaddr_t summary_from; +}; + +struct deluge_page { + uint16_t crc; + clock_time_t last_request; + clock_time_t last_data; + uint8_t flags; + uint8_t version; + unsigned packet_set:N_PKT; +}; + +int deluge_disseminate(char *file, unsigned version); + + +#endif diff --git a/examples/sky/test-deluge.c b/examples/sky/test-deluge.c new file mode 100644 index 000000000..dd03dbb0a --- /dev/null +++ b/examples/sky/test-deluge.c @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2007, Swedish Institute of Computer Science. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the Institute nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * This file is part of the Contiki operating system. + * + * $Id: test-deluge.c,v 1.1 2009/02/25 17:00:00 nvt-se Exp $ + */ + +/** + * \file + * A test program for Deluge. + * \author + * Nicolas Tsiftes + */ + +#include "contiki.h" +#include "cfs/cfs.h" +#include "deluge.h" +#include "node-id.h" + +#include +#include + +#ifndef SINK_ID +#define SINK_ID 1 +#endif + +PROCESS(cfs_process, "Test CFS process"); +AUTOSTART_PROCESSES(&cfs_process); +/*---------------------------------------------------------------------------*/ +PROCESS_THREAD(cfs_process, ev, data) +{ + int fd, r; + char buf[32]; + static struct etimer et; + + PROCESS_BEGIN(); + + memset(buf, 0, sizeof(buf)); + if(node_id == SINK_ID) { + strcpy(buf, "This is version 1 of the file"); + } else { + strcpy(buf, "This is version 0 of the file"); + } + + cfs_remove("test"); + fd = cfs_open("test", CFS_WRITE); + if(fd < 0) { + process_exit(NULL); + } + if(cfs_write(fd, buf, sizeof(buf)) != sizeof(buf)) { + cfs_close(fd); + process_exit(NULL); + } + cfs_close(fd); + + deluge_disseminate("test", node_id == SINK_ID); + + etimer_set(&et, CLOCK_SECOND); + for(;;) { + PROCESS_WAIT_EVENT_UNTIL(etimer_expired(&et)); + if(node_id != SINK_ID) { + fd = cfs_open("test", CFS_READ); + if(fd < 0) { + printf("failed to open the test file\n"); + } else { + r = cfs_read(fd, buf, sizeof(buf)); + buf[sizeof(buf) - 1] = '\0'; + if(r <= 0) { + printf("failed to read data from the file\n"); + } else { + printf("File contents: %s\n", buf); + } + cfs_close(fd); + } + } + etimer_reset(&et); + } + + + PROCESS_END(); +} +/*---------------------------------------------------------------------------*/ diff --git a/tools/cooja/contiki_tests/sky_deluge.csc b/tools/cooja/contiki_tests/sky_deluge.csc new file mode 100644 index 000000000..f0920d5fe --- /dev/null +++ b/tools/cooja/contiki_tests/sky_deluge.csc @@ -0,0 +1,98 @@ + + + + Deluge + 0 + 1 + 123456 + 1000 + + se.sics.cooja.radiomediums.UDGM + 50.0 + 100.0 + 1.0 + 1.0 + + + se.sics.cooja.mspmote.SkyMoteType + sky1 + Sky Mote Type #1 + ../../../examples/sky/test-deluge.c + make test-deluge.sky TARGET=sky + + + se.sics.cooja.mspmote.SkyMote + sky1 + + se.sics.cooja.interfaces.Position + 22.464792491653174 + 11.3235347656354 + 0.0 + + + se.sics.cooja.mspmote.interfaces.MspMoteID + 1 + + + + se.sics.cooja.mspmote.SkyMote + sky1 + + se.sics.cooja.interfaces.Position + 16.167564578306468 + 29.89745599030348 + 0.0 + + + se.sics.cooja.mspmote.interfaces.MspMoteID + 3 + + + + se.sics.cooja.mspmote.SkyMote + sky1 + + se.sics.cooja.interfaces.Position + 63.42409596590043 + 12.470791515046386 + 0.0 + + + se.sics.cooja.mspmote.interfaces.MspMoteID + 5 + + + + + se.sics.cooja.plugins.SimControl + 282 + 2 + 187 + 0 + 0 + false + + + se.sics.cooja.plugins.VisState + 300 + 3 + 300 + 390 + 0 + false + + + se.sics.cooja.plugins.LogListener + + version + 256 + + 690 + 1 + 328 + 0 + 319 + false + + + diff --git a/tools/cooja/contiki_tests/sky_deluge.info b/tools/cooja/contiki_tests/sky_deluge.info new file mode 100644 index 000000000..c2dabbe46 --- /dev/null +++ b/tools/cooja/contiki_tests/sky_deluge.info @@ -0,0 +1 @@ +Three Sky nodes running a Deluge test. examples/sky/test-deluge.c diff --git a/tools/cooja/contiki_tests/sky_deluge.js b/tools/cooja/contiki_tests/sky_deluge.js new file mode 100644 index 000000000..ce40273e8 --- /dev/null +++ b/tools/cooja/contiki_tests/sky_deluge.js @@ -0,0 +1,9 @@ +TIMEOUT(100000, log.log("last msg: " + msg + "\n")); /* print last msg at timeout */ + +WAIT_UNTIL(id == 3 && msg.contains("version 1")); +log.log("Node 3 got version 1\n"); + +WAIT_UNTIL(id == 5 && msg.contains("version 1")); +log.log("Node 5 got version 1\n"); + +log.testOK(); /* Report test success and quit */ diff --git a/tools/cooja/contiki_tests/sky_deluge.log b/tools/cooja/contiki_tests/sky_deluge.log new file mode 100644 index 000000000..71a10ff78 --- /dev/null +++ b/tools/cooja/contiki_tests/sky_deluge.log @@ -0,0 +1,5 @@ +[sky_deluge.log] + +Node 3 got version 1 +Node 5 got version 1 +TEST OK