699 lines
18 KiB
C
699 lines
18 KiB
C
/*
|
|
* Copyright (c) 2007, Swedish Institute of Computer Science
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* 3. Neither the name of the Institute nor the names of its contributors
|
|
* may be used to endorse or promote products derived from this software
|
|
* without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*
|
|
* This file is part of the Contiki operating system.
|
|
*
|
|
*/
|
|
|
|
/**
|
|
* \file
|
|
* An implementation of the Deluge protocol.
|
|
* (Hui and Culler: The dynamic behavior of a data
|
|
* dissemination protocol for network programming at scale,
|
|
* ACM SenSys 2004)
|
|
* \author
|
|
* Nicolas Tsiftes <nvt@sics.se>
|
|
*/
|
|
|
|
#include "contiki.h"
|
|
#include "net/rime/rime.h"
|
|
#include "cfs/cfs.h"
|
|
#include "loader/elfloader.h"
|
|
#include "lib/crc16.h"
|
|
#include "lib/random.h"
|
|
#include "sys/node-id.h"
|
|
#include "deluge.h"
|
|
|
|
#if NETSIM
|
|
#include "ether.h"
|
|
#include <stdio.h>
|
|
#endif
|
|
|
|
#include "dev/leds.h"
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#define DEBUG 0
|
|
#if DEBUG
|
|
#include <stdio.h>
|
|
#define PRINTF(...) printf(__VA_ARGS__)
|
|
#else
|
|
#define PRINTF(...)
|
|
#endif
|
|
|
|
/* Implementation-specific variables. */
|
|
static struct broadcast_conn deluge_broadcast;
|
|
static struct unicast_conn deluge_uc;
|
|
static struct deluge_object current_object;
|
|
static process_event_t deluge_event;
|
|
|
|
/* Deluge variables. */
|
|
static int deluge_state;
|
|
static int old_summary;
|
|
static int neighbor_inconsistency;
|
|
static unsigned r_interval;
|
|
static unsigned recv_adv;
|
|
static int broadcast_profile;
|
|
|
|
/* Deluge timers. */
|
|
static struct ctimer rx_timer;
|
|
static struct ctimer tx_timer;
|
|
static struct ctimer summary_timer;
|
|
static struct ctimer profile_timer;
|
|
|
|
/* Deluge objects will get an ID that defaults to the current value of
|
|
the next_object_id parameter. */
|
|
static deluge_object_id_t next_object_id;
|
|
|
|
/* Rime callbacks. */
|
|
static void broadcast_recv(struct broadcast_conn *, const rimeaddr_t *);
|
|
static void unicast_recv(struct unicast_conn *, const rimeaddr_t *);
|
|
|
|
static const struct broadcast_callbacks broadcast_call = {broadcast_recv, NULL};
|
|
static const struct unicast_callbacks unicast_call = {unicast_recv, NULL};
|
|
|
|
/* The Deluge process manages the main Deluge timer. */
|
|
PROCESS(deluge_process, "Deluge");
|
|
|
|
static void
|
|
transition(int state)
|
|
{
|
|
if(state != deluge_state) {
|
|
switch(deluge_state) {
|
|
case DELUGE_STATE_MAINTAIN:
|
|
ctimer_stop(&summary_timer);
|
|
ctimer_stop(&profile_timer);
|
|
break;
|
|
case DELUGE_STATE_RX:
|
|
ctimer_stop(&rx_timer);
|
|
break;
|
|
case DELUGE_STATE_TX:
|
|
ctimer_stop(&tx_timer);
|
|
break;
|
|
}
|
|
deluge_state = state;
|
|
}
|
|
}
|
|
|
|
static int
|
|
write_page(struct deluge_object *obj, unsigned pagenum, unsigned char *data)
|
|
{
|
|
cfs_offset_t offset;
|
|
|
|
offset = pagenum * S_PAGE;
|
|
|
|
if(cfs_seek(obj->cfs_fd, offset, CFS_SEEK_SET) != offset) {
|
|
return -1;
|
|
}
|
|
return cfs_write(obj->cfs_fd, (char *)data, S_PAGE);
|
|
}
|
|
|
|
static int
|
|
read_page(struct deluge_object *obj, unsigned pagenum, unsigned char *buf)
|
|
{
|
|
cfs_offset_t offset;
|
|
|
|
offset = pagenum * S_PAGE;
|
|
|
|
if(cfs_seek(obj->cfs_fd, offset, CFS_SEEK_SET) != offset) {
|
|
return -1;
|
|
}
|
|
return cfs_read(obj->cfs_fd, (char *)buf, S_PAGE);
|
|
}
|
|
|
|
static void
|
|
init_page(struct deluge_object *obj, int pagenum, int have)
|
|
{
|
|
struct deluge_page *page;
|
|
unsigned char buf[S_PAGE];
|
|
|
|
page = &obj->pages[pagenum];
|
|
|
|
page->flags = 0;
|
|
page->last_request = 0;
|
|
page->last_data = 0;
|
|
|
|
if(have) {
|
|
page->version = obj->version;
|
|
page->packet_set = ALL_PACKETS;
|
|
page->flags |= PAGE_COMPLETE;
|
|
read_page(obj, pagenum, buf);
|
|
page->crc = crc16_data(buf, S_PAGE, 0);
|
|
} else {
|
|
page->version = 0;
|
|
page->packet_set = 0;
|
|
}
|
|
}
|
|
|
|
static cfs_offset_t
|
|
file_size(const char *file)
|
|
{
|
|
int fd;
|
|
cfs_offset_t size;
|
|
|
|
fd = cfs_open(file, CFS_READ);
|
|
if(fd < 0) {
|
|
return (cfs_offset_t)-1;
|
|
}
|
|
|
|
size = cfs_seek(fd, 0, CFS_SEEK_END);
|
|
cfs_close(fd);
|
|
|
|
return size;
|
|
}
|
|
|
|
static int
|
|
init_object(struct deluge_object *obj, char *filename, unsigned version)
|
|
{
|
|
static struct deluge_page *page;
|
|
int i;
|
|
|
|
obj->cfs_fd = cfs_open(filename, CFS_READ | CFS_WRITE);
|
|
if(obj->cfs_fd < 0) {
|
|
return -1;
|
|
}
|
|
|
|
obj->filename = filename;
|
|
obj->object_id = next_object_id++;
|
|
obj->size = file_size(filename);
|
|
obj->version = obj->update_version = version;
|
|
obj->current_rx_page = 0;
|
|
obj->nrequests = 0;
|
|
obj->tx_set = 0;
|
|
|
|
obj->pages = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages));
|
|
if(obj->pages == NULL) {
|
|
cfs_close(obj->cfs_fd);
|
|
return -1;
|
|
}
|
|
|
|
for(i = 0; i < OBJECT_PAGE_COUNT(current_object); i++) {
|
|
page = ¤t_object.pages[i];
|
|
init_page(¤t_object, i, 1);
|
|
}
|
|
|
|
memset(obj->current_page, 0, sizeof(obj->current_page));
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
highest_available_page(struct deluge_object *obj)
|
|
{
|
|
int i;
|
|
|
|
for(i = 0; i < OBJECT_PAGE_COUNT(*obj); i++) {
|
|
if(!(obj->pages[i].flags & PAGE_COMPLETE)) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
return i;
|
|
}
|
|
|
|
static void
|
|
send_request(void *arg)
|
|
{
|
|
struct deluge_object *obj;
|
|
struct deluge_msg_request request;
|
|
|
|
obj = (struct deluge_object *)arg;
|
|
|
|
request.cmd = DELUGE_CMD_REQUEST;
|
|
request.pagenum = obj->current_rx_page;
|
|
request.version = obj->pages[request.pagenum].version;
|
|
request.request_set = ~obj->pages[obj->current_rx_page].packet_set;
|
|
request.object_id = obj->object_id;
|
|
|
|
PRINTF("Sending request for page %d, version %u, request_set %u\n",
|
|
request.pagenum, request.version, request.request_set);
|
|
packetbuf_copyfrom(&request, sizeof(request));
|
|
unicast_send(&deluge_uc, &obj->summary_from);
|
|
|
|
/* Deluge R.2 */
|
|
if(++obj->nrequests == CONST_LAMBDA) {
|
|
/* XXX check rate here too. */
|
|
obj->nrequests = 0;
|
|
transition(DELUGE_STATE_MAINTAIN);
|
|
} else {
|
|
ctimer_reset(&rx_timer);
|
|
}
|
|
}
|
|
|
|
static void
|
|
advertise_summary(struct deluge_object *obj)
|
|
{
|
|
struct deluge_msg_summary summary;
|
|
|
|
if(recv_adv >= CONST_K) {
|
|
ctimer_stop(&summary_timer);
|
|
return;
|
|
}
|
|
|
|
summary.cmd = DELUGE_CMD_SUMMARY;
|
|
summary.version = obj->update_version;
|
|
summary.highest_available = highest_available_page(obj);
|
|
summary.object_id = obj->object_id;
|
|
|
|
PRINTF("Advertising summary for object id %u: version=%u, available=%u\n",
|
|
(unsigned)obj->object_id, summary.version, summary.highest_available);
|
|
|
|
packetbuf_copyfrom(&summary, sizeof(summary));
|
|
broadcast_send(&deluge_broadcast);
|
|
}
|
|
|
|
static void
|
|
handle_summary(struct deluge_msg_summary *msg, const rimeaddr_t *sender)
|
|
{
|
|
int highest_available, i;
|
|
clock_time_t oldest_request, oldest_data, now;
|
|
struct deluge_page *page;
|
|
|
|
highest_available = highest_available_page(¤t_object);
|
|
|
|
if(msg->version != current_object.version ||
|
|
msg->highest_available != highest_available) {
|
|
neighbor_inconsistency = 1;
|
|
} else {
|
|
recv_adv++;
|
|
}
|
|
|
|
if(msg->version < current_object.version) {
|
|
old_summary = 1;
|
|
broadcast_profile = 1;
|
|
}
|
|
|
|
/* Deluge M.5 */
|
|
if(msg->version == current_object.update_version &&
|
|
msg->highest_available > highest_available) {
|
|
if(msg->highest_available > OBJECT_PAGE_COUNT(current_object)) {
|
|
PRINTF("Error: highest available is above object page count!\n");
|
|
return;
|
|
}
|
|
|
|
oldest_request = oldest_data = now = clock_time();
|
|
for(i = 0; i < msg->highest_available; i++) {
|
|
page = ¤t_object.pages[i];
|
|
if(page->last_request < oldest_request) {
|
|
oldest_request = page->last_request;
|
|
}
|
|
if(page->last_request < oldest_data) {
|
|
oldest_data = page->last_data;
|
|
}
|
|
}
|
|
|
|
if(((now - oldest_request) / CLOCK_SECOND) <= 2 * r_interval ||
|
|
((now - oldest_data) / CLOCK_SECOND) <= r_interval) {
|
|
return;
|
|
}
|
|
|
|
rimeaddr_copy(¤t_object.summary_from, sender);
|
|
transition(DELUGE_STATE_RX);
|
|
|
|
if(ctimer_expired(&rx_timer)) {
|
|
ctimer_set(&rx_timer,
|
|
CONST_OMEGA * ESTIMATED_TX_TIME + ((unsigned)random_rand() % T_R),
|
|
send_request, ¤t_object);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
send_page(struct deluge_object *obj, unsigned pagenum)
|
|
{
|
|
unsigned char buf[S_PAGE];
|
|
struct deluge_msg_packet pkt;
|
|
unsigned char *cp;
|
|
|
|
pkt.cmd = DELUGE_CMD_PACKET;
|
|
pkt.pagenum = pagenum;
|
|
pkt.version = obj->pages[pagenum].version;
|
|
pkt.packetnum = 0;
|
|
pkt.object_id = obj->object_id;
|
|
pkt.crc = 0;
|
|
|
|
read_page(obj, pagenum, buf);
|
|
|
|
/* Divide the page into packets and send them one at a time. */
|
|
for(cp = buf; cp + S_PKT <= (unsigned char *)&buf[S_PAGE]; cp += S_PKT) {
|
|
if(obj->tx_set & (1 << pkt.packetnum)) {
|
|
pkt.crc = crc16_data(cp, S_PKT, 0);
|
|
memcpy(pkt.payload, cp, S_PKT);
|
|
packetbuf_copyfrom(&pkt, sizeof(pkt));
|
|
broadcast_send(&deluge_broadcast);
|
|
}
|
|
pkt.packetnum++;
|
|
}
|
|
obj->tx_set = 0;
|
|
}
|
|
|
|
static void
|
|
tx_callback(void *arg)
|
|
{
|
|
struct deluge_object *obj;
|
|
|
|
obj = (struct deluge_object *)arg;
|
|
if(obj->current_tx_page >= 0 && obj->tx_set) {
|
|
send_page(obj, obj->current_tx_page);
|
|
/* Deluge T.2. */
|
|
if(obj->tx_set) {
|
|
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
|
|
PACKETBUF_ATTR_PACKET_TYPE_STREAM);
|
|
ctimer_reset(&tx_timer);
|
|
} else {
|
|
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
|
|
PACKETBUF_ATTR_PACKET_TYPE_STREAM_END);
|
|
obj->current_tx_page = -1;
|
|
transition(DELUGE_STATE_MAINTAIN);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_request(struct deluge_msg_request *msg)
|
|
{
|
|
int highest_available;
|
|
|
|
if(msg->pagenum >= OBJECT_PAGE_COUNT(current_object)) {
|
|
return;
|
|
}
|
|
|
|
if(msg->version != current_object.version) {
|
|
neighbor_inconsistency = 1;
|
|
}
|
|
|
|
highest_available = highest_available_page(¤t_object);
|
|
|
|
/* Deluge M.6 */
|
|
if(msg->version == current_object.version &&
|
|
msg->pagenum <= highest_available) {
|
|
current_object.pages[msg->pagenum].last_request = clock_time();
|
|
|
|
/* Deluge T.1 */
|
|
if(msg->pagenum == current_object.current_tx_page) {
|
|
current_object.tx_set |= msg->request_set;
|
|
} else {
|
|
current_object.current_tx_page = msg->pagenum;
|
|
current_object.tx_set = msg->request_set;
|
|
}
|
|
|
|
transition(DELUGE_STATE_TX);
|
|
ctimer_set(&tx_timer, CLOCK_SECOND, tx_callback, ¤t_object);
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_packet(struct deluge_msg_packet *msg)
|
|
{
|
|
struct deluge_page *page;
|
|
uint16_t crc;
|
|
struct deluge_msg_packet packet;
|
|
|
|
memcpy(&packet, msg, sizeof(packet));
|
|
|
|
PRINTF("Incoming packet for object id %u, version %u, page %u, packet num %u!\n",
|
|
(unsigned)packet.object_id, (unsigned)packet.version,
|
|
(unsigned)packet.pagenum, (unsigned)packet.packetnum);
|
|
|
|
if(packet.pagenum != current_object.current_rx_page) {
|
|
return;
|
|
}
|
|
|
|
if(packet.version != current_object.version) {
|
|
neighbor_inconsistency = 1;
|
|
}
|
|
|
|
page = ¤t_object.pages[packet.pagenum];
|
|
if(packet.version == page->version && !(page->flags & PAGE_COMPLETE)) {
|
|
memcpy(¤t_object.current_page[S_PKT * packet.packetnum],
|
|
packet.payload, S_PKT);
|
|
|
|
crc = crc16_data(packet.payload, S_PKT, 0);
|
|
if(packet.crc != crc) {
|
|
PRINTF("packet crc: %hu, calculated crc: %hu\n", packet.crc, crc);
|
|
return;
|
|
}
|
|
|
|
page->last_data = clock_time();
|
|
page->packet_set |= (1 << packet.packetnum);
|
|
|
|
if(page->packet_set == ALL_PACKETS) {
|
|
/* This is the last packet of the requested page; stop streaming. */
|
|
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
|
|
PACKETBUF_ATTR_PACKET_TYPE_STREAM_END);
|
|
|
|
write_page(¤t_object, packet.pagenum, current_object.current_page);
|
|
page->version = packet.version;
|
|
page->flags = PAGE_COMPLETE;
|
|
PRINTF("Page %u completed\n", packet.pagenum);
|
|
|
|
current_object.current_rx_page++;
|
|
|
|
if(packet.pagenum == OBJECT_PAGE_COUNT(current_object) - 1) {
|
|
current_object.version = current_object.update_version;
|
|
leds_on(LEDS_RED);
|
|
PRINTF("Update completed for object %u, version %u\n",
|
|
(unsigned)current_object.object_id, packet.version);
|
|
} else if(current_object.current_rx_page < OBJECT_PAGE_COUNT(current_object)) {
|
|
if(ctimer_expired(&rx_timer)) {
|
|
ctimer_set(&rx_timer,
|
|
CONST_OMEGA * ESTIMATED_TX_TIME + (random_rand() % T_R),
|
|
send_request, ¤t_object);
|
|
}
|
|
}
|
|
/* Deluge R.3 */
|
|
transition(DELUGE_STATE_MAINTAIN);
|
|
} else {
|
|
/* More packets to come. Put lower layers in streaming mode. */
|
|
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE,
|
|
PACKETBUF_ATTR_PACKET_TYPE_STREAM);
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
send_profile(struct deluge_object *obj)
|
|
{
|
|
struct deluge_msg_profile *msg;
|
|
unsigned char buf[sizeof(*msg) + OBJECT_PAGE_COUNT(*obj)];
|
|
int i;
|
|
|
|
if(broadcast_profile && recv_adv < CONST_K) {
|
|
broadcast_profile = 0;
|
|
|
|
msg = (struct deluge_msg_profile *)buf;
|
|
msg->cmd = DELUGE_CMD_PROFILE;
|
|
msg->version = obj->version;
|
|
msg->npages = OBJECT_PAGE_COUNT(*obj);
|
|
msg->object_id = obj->object_id;
|
|
for(i = 0; i < msg->npages; i++) {
|
|
msg->version_vector[i] = obj->pages[i].version;
|
|
}
|
|
|
|
packetbuf_copyfrom(buf, sizeof(buf));
|
|
broadcast_send(&deluge_broadcast);
|
|
}
|
|
}
|
|
|
|
static void
|
|
handle_profile(struct deluge_msg_profile *msg)
|
|
{
|
|
int i;
|
|
int npages;
|
|
struct deluge_object *obj;
|
|
char *p;
|
|
|
|
obj = ¤t_object;
|
|
if(msg->version <= current_object.update_version) {
|
|
return;
|
|
}
|
|
|
|
PRINTF("Received profile of version %u with a vector of %u pages.\n",
|
|
msg->version, msg->npages);
|
|
|
|
leds_off(LEDS_RED);
|
|
current_object.tx_set = 0;
|
|
|
|
npages = OBJECT_PAGE_COUNT(*obj);
|
|
obj->size = msg->npages * S_PAGE;
|
|
|
|
p = malloc(OBJECT_PAGE_COUNT(*obj) * sizeof(*obj->pages));
|
|
if(p == NULL) {
|
|
PRINTF("Failed to reallocate memory for pages!\n");
|
|
return;
|
|
}
|
|
|
|
memcpy(p, obj->pages, npages * sizeof(*obj->pages));
|
|
free(obj->pages);
|
|
obj->pages = (struct deluge_page *)p;
|
|
|
|
if(msg->npages < npages) {
|
|
npages = msg->npages;
|
|
}
|
|
|
|
for(i = 0; i < npages; i++) {
|
|
if(msg->version_vector[i] > obj->pages[i].version) {
|
|
obj->pages[i].packet_set = 0;
|
|
obj->pages[i].flags &= ~PAGE_COMPLETE;
|
|
obj->pages[i].version = msg->version_vector[i];
|
|
}
|
|
}
|
|
|
|
for(; i < msg->npages; i++) {
|
|
init_page(obj, i, 0);
|
|
}
|
|
|
|
obj->current_rx_page = highest_available_page(obj);
|
|
obj->update_version = msg->version;
|
|
|
|
transition(DELUGE_STATE_RX);
|
|
|
|
ctimer_set(&rx_timer,
|
|
CONST_OMEGA * ESTIMATED_TX_TIME + ((unsigned)random_rand() % T_R),
|
|
send_request, obj);
|
|
}
|
|
|
|
static void
|
|
command_dispatcher(const rimeaddr_t *sender)
|
|
{
|
|
char *msg;
|
|
int len;
|
|
struct deluge_msg_profile *profile;
|
|
|
|
msg = packetbuf_dataptr();
|
|
len = packetbuf_datalen();
|
|
if(len < 1)
|
|
return;
|
|
|
|
switch(msg[0]) {
|
|
case DELUGE_CMD_SUMMARY:
|
|
if(len >= sizeof(struct deluge_msg_summary))
|
|
handle_summary((struct deluge_msg_summary *)msg, sender);
|
|
break;
|
|
case DELUGE_CMD_REQUEST:
|
|
if(len >= sizeof(struct deluge_msg_request))
|
|
handle_request((struct deluge_msg_request *)msg);
|
|
break;
|
|
case DELUGE_CMD_PACKET:
|
|
if(len >= sizeof(struct deluge_msg_packet))
|
|
handle_packet((struct deluge_msg_packet *)msg);
|
|
break;
|
|
case DELUGE_CMD_PROFILE:
|
|
profile = (struct deluge_msg_profile *)msg;
|
|
if(len >= sizeof(*profile) &&
|
|
len >= sizeof(*profile) + profile->npages * profile->version_vector[0])
|
|
handle_profile((struct deluge_msg_profile *)msg);
|
|
break;
|
|
default:
|
|
PRINTF("Incoming packet with unknown command: %d\n", msg[0]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
unicast_recv(struct unicast_conn *c, const rimeaddr_t *sender)
|
|
{
|
|
command_dispatcher(sender);
|
|
}
|
|
|
|
static void
|
|
broadcast_recv(struct broadcast_conn *c, const rimeaddr_t *sender)
|
|
{
|
|
command_dispatcher(sender);
|
|
}
|
|
|
|
int
|
|
deluge_disseminate(char *file, unsigned version)
|
|
{
|
|
/* This implementation disseminates at most one object. */
|
|
if(next_object_id > 0 || init_object(¤t_object, file, version) < 0) {
|
|
return -1;
|
|
}
|
|
process_start(&deluge_process, file);
|
|
|
|
return 0;
|
|
}
|
|
|
|
PROCESS_THREAD(deluge_process, ev, data)
|
|
{
|
|
static struct etimer et;
|
|
static unsigned time_counter;
|
|
static unsigned r_rand;
|
|
|
|
PROCESS_EXITHANDLER(goto exit);
|
|
|
|
PROCESS_BEGIN();
|
|
|
|
deluge_event = process_alloc_event();
|
|
|
|
broadcast_open(&deluge_broadcast, DELUGE_BROADCAST_CHANNEL, &broadcast_call);
|
|
unicast_open(&deluge_uc, DELUGE_UNICAST_CHANNEL, &unicast_call);
|
|
r_interval = T_LOW;
|
|
|
|
PRINTF("Maintaining state for object %s of %d pages\n",
|
|
current_object.filename, OBJECT_PAGE_COUNT(current_object));
|
|
|
|
deluge_state = DELUGE_STATE_MAINTAIN;
|
|
|
|
for(r_interval = T_LOW;;) {
|
|
if(neighbor_inconsistency) {
|
|
/* Deluge M.2 */
|
|
r_interval = T_LOW;
|
|
neighbor_inconsistency = 0;
|
|
} else {
|
|
/* Deluge M.3 */
|
|
r_interval = (2 * r_interval >= T_HIGH) ? T_HIGH : 2 * r_interval;
|
|
}
|
|
|
|
r_rand = r_interval / 2 + ((unsigned)random_rand() % (r_interval / 2));
|
|
recv_adv = 0;
|
|
old_summary = 0;
|
|
|
|
/* Deluge M.1 */
|
|
ctimer_set(&summary_timer, r_rand * CLOCK_SECOND,
|
|
(void *)(void *)advertise_summary, ¤t_object);
|
|
|
|
/* Deluge M.4 */
|
|
ctimer_set(&profile_timer, r_rand * CLOCK_SECOND,
|
|
(void *)(void *)send_profile, ¤t_object);
|
|
|
|
LONG_TIMER(et, time_counter, r_interval);
|
|
}
|
|
|
|
exit:
|
|
unicast_close(&deluge_uc);
|
|
broadcast_close(&deluge_broadcast);
|
|
if(current_object.cfs_fd >= 0) {
|
|
cfs_close(current_object.cfs_fd);
|
|
}
|
|
if(current_object.pages != NULL) {
|
|
free(current_object.pages);
|
|
}
|
|
|
|
PROCESS_END();
|
|
}
|