Significant rework of the Contiki data collection protocol:

* the new version makes use of MAC-layer feedback so that bad paths
  can be identified quicker and then avoided.

* the new code uses transport layer ACKs that contain feedback from
  the collect protocol: when a packet cannot be forwarded due to lack
  of resources, the ACK contains a flag that indicates that the packet
  could not be forwarded. ACKs also contain the routing metric of the
  sender, which improves agility in face of rapid path changes.

* loop detection and management has been improved: with higher path
  metric agility, the system is more prone to short-lived routing
  loops. Instead of dropping looping packets, the new version adjusts
  the routing metric for the routes that exhibit loops so that the
  risk for future loops is reduced.

* make use of packet attributes to inform the MAC layer of how many
  times packets should be retransmitted.
This commit is contained in:
adamdunkels 2010-02-28 09:18:01 +00:00
parent b6b4941871
commit 7165a3866f
2 changed files with 386 additions and 186 deletions

View file

@ -1,6 +1,3 @@
/* XXX: send explicit congestion notification if already forwarding
packet. */
/**
* \addtogroup rimecollect
* @{
@ -36,7 +33,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.c,v 1.33 2010/02/23 18:35:23 adamdunkels Exp $
* $Id: collect.c,v 1.34 2010/02/28 09:18:01 adamdunkels Exp $
*/
/**
@ -56,9 +53,7 @@
#include "dev/radio-sensor.h"
#if CONTIKI_TARGET_NETSIM
#include "ether.h"
#endif
#include "lib/random.h"
#include <string.h>
#include <stdio.h>
@ -78,12 +73,21 @@ struct recent_packet {
uint8_t seqno;
};
struct ack_msg {
uint8_t flags, dummy;
uint16_t rtmetric;
};
#define ACK_FLAGS_CONGESTED 0x80
#define ACK_FLAGS_DROPPED 0x40
#define ACK_FLAGS_LIFETIME_EXCEEDED 0x20
static struct recent_packet recent_packets[NUM_RECENT_PACKETS];
static uint8_t recent_packet_ptr;
#define FORWARD_PACKET_LIFETIME (CLOCK_SECOND * 16)
#define MAX_FORWARDING_QUEUE 6
PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE);
#define MAX_SENDING_QUEUE 6
PACKETQUEUE(sending_queue, MAX_SENDING_QUEUE);
#define SINK 0
#define RTMETRIC_MAX COLLECT_MAX_DEPTH
@ -104,64 +108,16 @@ PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE);
#define PRINTF(...)
#endif
/*---------------------------------------------------------------------------*/
static void
send_queued_packet(void)
{
struct queuebuf *q;
struct neighbor *n;
struct packetqueue_item *i;
struct collect_conn *c;
i = packetqueue_first(&forwarding_queue);
if(i == NULL) {
PRINTF("%d.%d: nothing on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
/* No packet on the queue, so there is nothing for us to send. */
return;
}
c = packetqueue_ptr(i);
if(c == NULL) {
/* c should not be NULL, but we check it just to be sure. */
PRINTF("%d.%d: queue, c == NULL!\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
if(c->forwarding) {
/* If we are currently forwarding a packet, we wait until the
packet is forwarded and try again then. */
PRINTF("%d.%d: queue, c is forwarding\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
#define REXMIT_TIME CLOCK_SECOND * 2
q = packetqueue_queuebuf(i);
if(q != NULL) {
PRINTF("%d.%d: queue, q is on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
queuebuf_to_packetbuf(q);
n = neighbor_best();
/* Don't send to the neighbor if it is the same neighbor that sent
us the packet. */
if(n != NULL && !rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) {
#if CONTIKI_TARGET_NETSIM
ether_set_line(n->addr.u8[0], n->addr.u8[1]);
#include "ether.h"
#endif /* CONTIKI_TARGET_NETSIM */
PRINTF("%d.%d: sending packet to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
c->forwarding = 1;
runicast_send(&c->runicast_conn, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT));
} else {
PRINTF("%d.%d: did not find any neighbor to forward to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
}
}
static void send_queued_packet(void);
static void retransmit_callback(void *ptr);
/*---------------------------------------------------------------------------*/
static void
update_rtmetric(struct collect_conn *tc)
@ -191,9 +147,8 @@ update_rtmetric(struct collect_conn *tc)
neighbor_discovery_set_val(&tc->neighbor_discovery_conn, tc->rtmetric);
#endif /* COLLECT_ANNOUNCEMENTS */
} else {
/* We set our rtmetric to the rtmetric of our best neighbor plus
the expected transmissions to reach that neighbor. */
the expected transmissions to reach that neighbor. */
if(n->rtmetric + neighbor_etx(n) != tc->rtmetric) {
uint16_t old_rtmetric = tc->rtmetric;
@ -201,8 +156,6 @@ update_rtmetric(struct collect_conn *tc)
#if ! COLLECT_ANNOUNCEMENTS
/* neighbor_discovery_set_val(&tc->neighbor_discovery_conn, tc->rtmetric);*/
/* If we get a significantly better rtmetric than we had
before, we call neighbor_discovery_start to start a new
period. */
@ -234,7 +187,7 @@ update_rtmetric(struct collect_conn *tc)
if(tc->rtmetric == RTMETRIC_MAX) {
strcpy(buf, " ");
} else {
sprintf(buf, "%.1f", (float)tc->rtmetric / NEIGHBOR_ETX_SCALE);
sPRINTF(buf, "%.1f", (float)tc->rtmetric / NEIGHBOR_ETX_SCALE);
}
ether_set_text(buf);
}
@ -242,134 +195,367 @@ update_rtmetric(struct collect_conn *tc)
}
/*---------------------------------------------------------------------------*/
static void
node_packet_received(struct runicast_conn *c, const rimeaddr_t *from,
uint8_t seqno)
send_queued_packet(void)
{
struct queuebuf *q;
struct neighbor *n;
struct packetqueue_item *i;
struct collect_conn *c;
PRINTF("%d.%d: send_queued_packet queue len %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetqueue_len(&sending_queue));
i = packetqueue_first(&sending_queue);
if(i == NULL) {
PRINTF("%d.%d: nothing on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
/* No packet on the queue, so there is nothing for us to send. */
return;
}
c = packetqueue_ptr(i);
if(c == NULL) {
/* c should not be NULL, but we check it just to be sure. */
PRINTF("%d.%d: queue, c == NULL!\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
if(c->sending) {
/* If we are currently sending a packet, we wait until the
packet is forwarded and try again then. */
PRINTF("%d.%d: queue, c is sending\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
q = packetqueue_queuebuf(i);
if(q != NULL) {
PRINTF("%d.%d: queue, q is on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
queuebuf_to_packetbuf(q);
n = neighbor_best();
while(n != NULL && rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) {
PRINTF("%d.%d: avoiding fowarding loop to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
neighbor_remove(&n->addr);
update_rtmetric(c);
n = neighbor_best();;
}
/* Don't send to the neighbor if it is the same neighbor that sent
us the packet. */
if(n != NULL) {
clock_time_t time;
uint8_t rexmit_time_scaling;
#if CONTIKI_TARGET_NETSIM
ether_set_line(n->addr.u8[0], n->addr.u8[1]);
#endif /* CONTIKI_TARGET_NETSIM */
PRINTF("%d.%d: sending packet to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
rimeaddr_copy(&c->current_receiver, &n->addr);
c->sending = 1;
c->transmissions = 0;
c->max_rexmits = 8;//packetbuf_attr(PACKETBUF_ATTR_EMAX_REXMIT);
PRINTF("max_rexmits %d\n", c->max_rexmits);
packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 1);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_REXMIT, 2);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, c->seqno);
unicast_send(&c->unicast_conn, &n->addr);
rexmit_time_scaling = c->transmissions;
if(rexmit_time_scaling > 3) {
rexmit_time_scaling = 3;
}
time = REXMIT_TIME << rexmit_time_scaling;
time = time / 2 + random_rand() % (time / 2);
PRINTF("retransmission time %lu\n", time);
ctimer_set(&c->retransmission_timer, time,
retransmit_callback, c);
} else {
}
}
}
/*---------------------------------------------------------------------------*/
static void
send_next_packet(struct collect_conn *tc)
{
/* Cancel retransmission timer. */
ctimer_stop(&tc->retransmission_timer);
/* Remove the first packet on the queue, the packet that was just sent. */
packetqueue_dequeue(&sending_queue);
tc->seqno = (tc->seqno + 1) % (1 << COLLECT_PACKET_ID_BITS);
tc->sending = 0;
tc->transmissions = 0;
/* Send the next packet in the queue, if any. */
send_queued_packet();
}
/*---------------------------------------------------------------------------*/
static void
handle_ack(struct collect_conn *tc)
{
struct ack_msg *msg;
uint16_t rtmetric;
struct neighbor *n;
if(rimeaddr_cmp(packetbuf_addr(PACKETBUF_ADDR_SENDER),
&tc->current_receiver) &&
packetbuf_attr(PACKETBUF_ATTR_PACKET_ID) == tc->seqno) {
msg = packetbuf_dataptr();
memcpy(&rtmetric, &msg->rtmetric, sizeof(uint16_t));
n = neighbor_find(packetbuf_addr(PACKETBUF_ADDR_SENDER));
if(n != NULL) {
neighbor_update(n, rtmetric);
update_rtmetric(tc);
}
PRINTF("%d.%d: ACK from %d.%d after %d transmissions, flags %02x\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
tc->current_receiver.u8[0], tc->current_receiver.u8[1],
tc->transmissions,
msg->flags);
if(!(msg->flags & ACK_FLAGS_DROPPED)) {
send_next_packet(tc);
}
}
}
/*---------------------------------------------------------------------------*/
static void
send_ack(struct collect_conn *tc, const rimeaddr_t *to, int congestion, int dropped, int ttl)
{
struct ack_msg *ack;
struct queuebuf *q;
uint16_t packet_seqno;
PRINTF("send_ack\n");
packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID);
q = queuebuf_new_from_packetbuf();
if(q != NULL) {
packetbuf_clear();
packetbuf_set_datalen(sizeof(struct ack_msg));
ack = packetbuf_dataptr();
memset(ack, 0, sizeof(struct ack_msg));
ack->rtmetric = tc->rtmetric;
ack->flags = (congestion? ACK_FLAGS_CONGESTED: 0) |
(dropped? ACK_FLAGS_DROPPED: 0) |
(ttl? ACK_FLAGS_LIFETIME_EXCEEDED: 0);
/* XXX: send explicit congestion notification in ACK queue full; add rtmetric to ACK. */
packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK);
packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, 2);
unicast_send(&tc->unicast_conn, to);
PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
to->u8[0],
to->u8[1],
packet_seqno);
RIMESTATS_ADD(acktx);
queuebuf_to_packetbuf(q);
queuebuf_free(q);
} else {
PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
to->u8[0], to->u8[1],
packet_seqno);
}
}
/*---------------------------------------------------------------------------*/
static void
node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
((char *)c - offsetof(struct collect_conn, unicast_conn));
int i;
struct neighbor *n;
/* To protect against forwarding duplicate packets, we keep a list
/* To protect against sending duplicate packets, we keep a list
of recently forwarded packet seqnos. If the seqno of the current
packet exists in the list, we drop the packet and increase the
ETX of the neighbor we sent it to in the first place. */
if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) ==
PACKETBUF_ATTR_PACKET_TYPE_DATA) {
rimeaddr_t ack_to;
uint8_t packet_seqno;
for(i = 0; i < NUM_RECENT_PACKETS; i++) {
if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) &&
rimeaddr_cmp(&recent_packets[i].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER))) {
PRINTF("%d.%d: dropping duplicate packet from %d.%d with seqno %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID));
n = neighbor_find(&recent_packets[i].sent_to);
if(n != NULL) {
neighbor_update_etx(n, neighbor_etx(n) + NEIGHBOR_ETX_SCALE * 4);
}
/* Drop the packet. */
rimeaddr_copy(&ack_to, packetbuf_addr(PACKETBUF_ADDR_SENDER));
packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID);
if(rimeaddr_cmp(&tc->last_received_addr, packetbuf_addr(PACKETBUF_ADDR_SENDER)) &&
tc->last_received_seqno == packetbuf_attr(PACKETBUF_ATTR_PACKET_ID)) {
/* This is a duplicate of the packet we last received, so we just send an ACK. */
send_ack(tc, &ack_to, 0, 0, 0);
return;
}
}
recent_packets[recent_packet_ptr].seqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
rimeaddr_copy(&recent_packets[recent_packet_ptr].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER));
n = neighbor_best();
rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to,
&n->addr);
rimeaddr_copy(&tc->last_received_addr, packetbuf_addr(PACKETBUF_ADDR_SENDER));
tc->last_received_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID);
recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS;
if(tc->rtmetric == SINK) {
/* If we are the sink, we call the receive function. */
PRINTF("%d.%d: sink received packet from %d.%d via %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1],
from->u8[0], from->u8[1]);
if(tc->cb->recv != NULL) {
tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER),
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID),
packetbuf_attr(PACKETBUF_ATTR_HOPS));
for(i = 0; i < NUM_RECENT_PACKETS; i++) {
if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) &&
rimeaddr_cmp(&recent_packets[i].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER))) {
PRINTF("%d.%d: found duplicate packet from %d.%d with seqno %d, via %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID),
packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[1]);
n = neighbor_find(&recent_packets[i].sent_to);
if(n != NULL) {
neighbor_update_etx(n, neighbor_etx(n) / NEIGHBOR_ETX_SCALE + 4);
update_rtmetric(tc);
}
break;
}
}
return;
} else if(packetbuf_attr(PACKETBUF_ATTR_TTL) > 1 &&
tc->rtmetric != RTMETRIC_MAX) {
recent_packets[recent_packet_ptr].seqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
rimeaddr_copy(&recent_packets[recent_packet_ptr].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER));
/* n = neighbor_best();*/
/* If we are not the sink, we forward the packet to the best
neighbor. */
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, packetbuf_attr(PACKETBUF_ATTR_HOPS) + 1);
packetbuf_set_attr(PACKETBUF_ATTR_TTL, packetbuf_attr(PACKETBUF_ATTR_TTL) - 1);
PRINTF("%d.%d: packet received from %d.%d via %d.%d, forwarding %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1],
from->u8[0], from->u8[1], tc->forwarding);
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
tc)) {
send_queued_packet();
if(tc->rtmetric != SINK) {
n = neighbor_best();
rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to,
&n->addr);
} else {
PRINTF("%d.%d: packet dropped: no queue buffer available\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to,
&rimeaddr_null);
}
} else if(packetbuf_attr(PACKETBUF_ATTR_TTL) <= 1) {
recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS;
if(tc->rtmetric == SINK) {
/* If we are the sink, we call the receive function. */
send_ack(tc, &ack_to, 0, 0, 0);
PRINTF("%d.%d: sink received packet %d from %d.%d via %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID),
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1],
from->u8[0], from->u8[1]);
if(tc->cb->recv != NULL) {
tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER),
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID),
packetbuf_attr(PACKETBUF_ATTR_HOPS));
}
return;
} else if(packetbuf_attr(PACKETBUF_ATTR_TTL) > 1 &&
tc->rtmetric != RTMETRIC_MAX) {
/* If we are not the sink, we forward the packet to the best
neighbor. */
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, packetbuf_attr(PACKETBUF_ATTR_HOPS) + 1);
packetbuf_set_attr(PACKETBUF_ATTR_TTL, packetbuf_attr(PACKETBUF_ATTR_TTL) - 1);
PRINTF("%d.%d: packet received from %d.%d via %d.%d, sending %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1],
from->u8[0], from->u8[1], tc->sending);
if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME,
tc)) {
send_ack(tc, &ack_to, 0, 0, 0);
send_queued_packet();
} else {
send_ack(tc, &ack_to, 0, 1, 0);
PRINTF("%d.%d: packet dropped: no queue buffer available\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
} else if(packetbuf_attr(PACKETBUF_ATTR_TTL) <= 1) {
PRINTF("%d.%d: packet dropped: ttl %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_attr(PACKETBUF_ATTR_TTL));
send_ack(tc, &ack_to, 0, 1, 1);
}
} else if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) ==
PACKETBUF_ATTR_PACKET_TYPE_ACK) {
PRINTF("Collect: incoming ack %d from %d.%d (%d.%d) seqno %d (%d)\n",
packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE),
packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[0],
packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[1],
tc->current_receiver.u8[0],
tc->current_receiver.u8[1],
packetbuf_attr(PACKETBUF_ATTR_PACKET_ID),
tc->seqno);
handle_ack(tc);
}
return;
}
/*---------------------------------------------------------------------------*/
static void
node_packet_sent(struct runicast_conn *c, const rimeaddr_t *to,
uint8_t transmissions)
node_packet_sent(struct unicast_conn *c, int status, int transmissions)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
((char *)c - offsetof(struct collect_conn, unicast_conn));
PRINTF("%d.%d: sent to %d.%d after %d retransmissions\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
to->u8[0], to->u8[1],
transmissions);
/* For data packets, we record the number of transmissions */
if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) ==
PACKETBUF_ATTR_PACKET_TYPE_DATA) {
PRINTF("%d.%d: sent to %d.%d after %d transmissions\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
tc->current_receiver.u8[0], tc->current_receiver.u8[1],
transmissions);
/* neighbor_update_etx(neighbor_find(to), transmissions);
update_rtmetric(tc);*/
tc->transmissions += transmissions;
tc->forwarding = 0;
neighbor_update_etx(neighbor_find(to), transmissions);
update_rtmetric(tc);
/* Remove the first packet on the queue, the packet that was just sent. */
packetqueue_dequeue(&forwarding_queue);
/* Send the next packet in the queue, if any. */
send_queued_packet();
/* Update ETX with the number of transmissions. */
PRINTF("Updating ETX with %d transmissions\n", tc->transmissions);
neighbor_update_etx(neighbor_find(&tc->current_receiver), tc->transmissions);
update_rtmetric(tc);
}
}
/*---------------------------------------------------------------------------*/
static void
node_packet_timedout(struct runicast_conn *c, const rimeaddr_t *to,
uint8_t transmissions)
timedout(struct collect_conn *tc)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
PRINTF("%d.%d: timedout after %d retransmissions: packet dropped\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], transmissions);
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->transmissions);
tc->forwarding = 0;
neighbor_timedout_etx(neighbor_find(to), transmissions);
tc->sending = 0;
neighbor_timedout_etx(neighbor_find(&tc->current_receiver), tc->transmissions);
update_rtmetric(tc);
/* Remove the first packet on the queue, the packet that just timed out. */
packetqueue_dequeue(&forwarding_queue);
send_next_packet(tc);
}
/*---------------------------------------------------------------------------*/
static void
retransmit_callback(void *ptr)
{
struct collect_conn *c = ptr;
/* Send the next packet in the queue, if any. */
send_queued_packet();
PRINTF("retransmit\n");
if(c->transmissions >= c->max_rexmits) {
timedout(c);
} else {
c->sending = 0;
send_queued_packet();
}
}
/*---------------------------------------------------------------------------*/
#if !COLLECT_ANNOUNCEMENTS
@ -421,9 +607,8 @@ received_announcement(struct announcement *a, const rimeaddr_t *from,
}
#endif /* !COLLECT_ANNOUNCEMENTS */
/*---------------------------------------------------------------------------*/
static const struct runicast_callbacks runicast_callbacks = {node_packet_received,
node_packet_sent,
node_packet_timedout};
static const struct unicast_callbacks unicast_callbacks = {node_packet_received,
node_packet_sent};
#if !COLLECT_ANNOUNCEMENTS
static const struct neighbor_discovery_callbacks neighbor_discovery_callbacks =
{ adv_received, NULL};
@ -439,19 +624,19 @@ collect_open(struct collect_conn *tc, uint16_t channels,
CLOCK_SECOND * 32,
CLOCK_SECOND * 600,
&neighbor_discovery_callbacks);
#endif /* !COLLECT_ANNOUNCEMENTS */
runicast_open(&tc->runicast_conn, channels + 1, &runicast_callbacks);
channel_set_attributes(channels + 1, attributes);
tc->rtmetric = RTMETRIC_MAX;
tc->cb = cb;
#if COLLECT_ANNOUNCEMENTS
neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);
#else /* !COLLECT_ANNOUNCEMENTS */
announcement_register(&tc->announcement, channels, tc->rtmetric,
received_announcement);
announcement_listen(2);
#else
neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);
#endif /* COLLECT_ANNOUNCEMENTS */
#endif /* !COLLECT_ANNOUNCEMENTS */
unicast_open(&tc->unicast_conn, channels + 1, &unicast_callbacks);
channel_set_attributes(channels + 1, attributes);
tc->rtmetric = RTMETRIC_MAX;
tc->cb = cb;
neighbor_init();
packetqueue_init(&sending_queue);
}
/*---------------------------------------------------------------------------*/
void
@ -462,7 +647,7 @@ collect_close(struct collect_conn *tc)
#else
neighbor_discovery_close(&tc->neighbor_discovery_conn);
#endif /* COLLECT_ANNOUNCEMENTS */
runicast_close(&tc->runicast_conn);
unicast_close(&tc->unicast_conn);
}
/*---------------------------------------------------------------------------*/
void
@ -487,11 +672,18 @@ collect_send(struct collect_conn *tc, int rexmits)
{
struct neighbor *n;
packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->seqno++);
packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno++);
packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr);
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1);
packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, rexmits);
PRINTF("%d.%d: originating packet %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID));
PRINTF("rexmit %d\n", rexmits);
if(tc->rtmetric == SINK) {
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 0);
@ -502,6 +694,7 @@ collect_send(struct collect_conn *tc, int rexmits)
}
return 1;
} else {
update_rtmetric(tc);
n = neighbor_best();
if(n != NULL) {
#if CONTIKI_TARGET_NETSIM
@ -510,7 +703,7 @@ collect_send(struct collect_conn *tc, int rexmits)
PRINTF("%d.%d: sending to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME,
tc)) {
send_queued_packet();
return 1;
@ -519,13 +712,12 @@ collect_send(struct collect_conn *tc, int rexmits)
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
} else {
/* printf("Didn't find any neighbor\n");*/
PRINTF("%d.%d: did not find any neighbor to send to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
#if COLLECT_ANNOUNCEMENTS
announcement_listen(1);
#endif /* COLLECT_ANNOUNCEMENTS */
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME,
tc)) {
return 1;
} else {

View file

@ -47,7 +47,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.h,v 1.12 2010/02/23 18:35:23 adamdunkels Exp $
* $Id: collect.h,v 1.13 2010/02/28 09:18:01 adamdunkels Exp $
*/
/**
@ -64,12 +64,16 @@
#include "net/rime/runicast.h"
#include "net/rime/neighbor-discovery.h"
#define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \
{ PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \
RUNICAST_ATTRIBUTES
#define COLLECT_PACKET_ID_BITS 4
#define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \
{ PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_PACKET_ID, PACKETBUF_ATTR_BIT * COLLECT_PACKET_ID_BITS }, \
{ PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \
{ PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_BIT }, \
UNICAST_ATTRIBUTES
struct collect_callbacks {
void (* recv)(const rimeaddr_t *originator, uint8_t seqno,
@ -77,7 +81,7 @@ struct collect_callbacks {
};
struct collect_conn {
struct runicast_conn runicast_conn;
struct unicast_conn unicast_conn;
#if ! COLLECT_CONF_ANNOUNCEMENTS
struct neighbor_discovery_conn neighbor_discovery_conn;
#else /* ! COLLECT_CONF_ANNOUNCEMENTS */
@ -86,8 +90,12 @@ struct collect_conn {
const struct collect_callbacks *cb;
struct ctimer t;
uint16_t rtmetric;
uint8_t forwarding;
uint8_t seqno;
uint8_t sending, transmissions, max_rexmits;
uint8_t seqno, last_received_seqno;
rimeaddr_t last_received_addr;
uint8_t eseqno;
struct ctimer retransmission_timer;
rimeaddr_t current_receiver;
};
void collect_open(struct collect_conn *c, uint16_t channels,