Rewrite of the collect module. The new version uses a packet queue for outgoing and forwarded packets so that more than one packet can be forwarded at the same time. The new version uses the packetqueue module to maintain the packet queue.

This commit is contained in:
adamdunkels 2009-04-29 20:48:57 +00:00
parent c489b1118d
commit e70cf0c7c6
2 changed files with 126 additions and 36 deletions

View file

@ -36,7 +36,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.c,v 1.24 2009/04/07 13:06:03 adamdunkels Exp $
* $Id: collect.c,v 1.25 2009/04/29 20:48:57 adamdunkels Exp $
*/
/**
@ -52,6 +52,8 @@
#include "net/rime/neighbor.h"
#include "net/rime/collect.h"
#include "net/rime/packetqueue.h"
#include "dev/radio-sensor.h"
#if CONTIKI_TARGET_NETSIM
@ -78,6 +80,10 @@ struct recent_packet {
static struct recent_packet recent_packets[NUM_RECENT_PACKETS];
static uint8_t recent_packet_ptr;
#define FORWARD_PACKET_LIFETIME (CLOCK_SECOND * 16)
#define MAX_FORWARDING_QUEUE 4
PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE);
#define SINK 0
#define RTMETRIC_MAX COLLECT_MAX_DEPTH
@ -86,11 +92,69 @@ static uint8_t recent_packet_ptr;
#define DEBUG 0
#if DEBUG
#include <stdio.h>
#define PRINTF(...) printf(__VA_ARGS__)
#define PRINTF(...) printf(__VA_ARGS__); fflush(NULL)
#else
#define PRINTF(...)
#endif
/*---------------------------------------------------------------------------*/
static void
send_queued_packet(void)
{
struct queuebuf *q;
struct neighbor *n;
struct packetqueue_item *i;
struct collect_conn *c;
i = packetqueue_first(&forwarding_queue);
if(i == NULL) {
PRINTF("%d.%d: nothing on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
/* No packet on the queue, so there is nothing for us to send. */
return;
}
c = packetqueue_ptr(i);
if(c == NULL) {
/* c should not be NULL, but we check it just to be sure. */
PRINTF("%d.%d: queue, c == NULL!\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
if(c->forwarding) {
/* If we are currently forwarding a packet, we wait until the
packet is forwarded and try again then. */
PRINTF("%d.%d: queue, c is forwarding\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
}
q = packetqueue_queuebuf(i);
if(q != NULL) {
PRINTF("%d.%d: queue, q is on queue\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
queuebuf_to_packetbuf(q);
n = neighbor_best();
/* Don't send to the neighbor if it is the same neighbor that sent
us the packet. */
if(n != NULL && !rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) {
#if CONTIKI_TARGET_NETSIM
ether_set_line(n->addr.u8[0], n->addr.u8[1]);
#endif /* CONTIKI_TARGET_NETSIM */
PRINTF("%d.%d: sending packet to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
c->forwarding = 1;
runicast_send(&c->runicast_conn, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT));
} else {
PRINTF("%d.%d: did not find any neighbor to forward to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
}
}
/*---------------------------------------------------------------------------*/
static void
update_rtmetric(struct collect_conn *tc)
@ -114,18 +178,25 @@ update_rtmetric(struct collect_conn *tc)
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
tc->rtmetric = RTMETRIC_MAX;
announcement_set_value(&tc->announcement, tc->rtmetric);
announcement_set_value(&tc->announcement, tc->rtmetric);
} else {
/* We set our rtmetric to the rtmetric of our best neighbor plus
the expected transmissions to reach that neighbor. */
if(n->rtmetric + neighbor_etx(n) != tc->rtmetric) {
uint16_t old_rtmetric = tc->rtmetric;
tc->rtmetric = n->rtmetric + neighbor_etx(n);
/* neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);*/
announcement_set_value(&tc->announcement, tc->rtmetric);
PRINTF("%d.%d: new rtmetric %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
tc->rtmetric);
/* We got a new, working, route we send any queued packets we may have. */
if(old_rtmetric == RTMETRIC_MAX) {
send_queued_packet();
}
}
}
}
@ -149,7 +220,6 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
struct neighbor *n;
int i;
/* To protect against forwarding duplicate packets, we keep a list
@ -158,9 +228,11 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno)
for(i = 0; i < NUM_RECENT_PACKETS; i++) {
if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) &&
rimeaddr_cmp(&recent_packets[i].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER))) {
PRINTF("%d.%d: dropping duplicate packet with seqno %d\n",
rimeaddr_cmp(&recent_packets[i].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER))) {
PRINTF("%d.%d: dropping duplicate packet from %d.%d with seqno %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID));
/* Drop the packet. */
return;
@ -202,49 +274,56 @@ node_packet_received(struct runicast_conn *c, rimeaddr_t *from, uint8_t seqno)
packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1],
from->u8[0], from->u8[1], tc->forwarding);
if(!tc->forwarding) {
n = neighbor_best();
if(n != NULL && !rimeaddr_cmp(&n->addr, from)) {
#if CONTIKI_TARGET_NETSIM
ether_set_line(n->addr.u8[0], n->addr.u8[1]);
#endif /* CONTIKI_TARGET_NETSIM */
tc->forwarding = 1;
runicast_send(c, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT));
} else {
PRINTF("%d.%d: did not find any neighbor to forward to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
return;
} else {
PRINTF("%d.%d: still forwarding another packet\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
return;
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
tc)) {
send_queued_packet();
}
}
return;
}
/*---------------------------------------------------------------------------*/
static void
node_packet_sent(struct runicast_conn *c, rimeaddr_t *to, uint8_t retransmissions)
node_packet_sent(struct runicast_conn *c, rimeaddr_t *to, uint8_t transmissions)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
PRINTF("%d.%d: sent to %d.%d after %d retransmissions\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
to->u8[0], to->u8[1],
transmissions);
tc->forwarding = 0;
neighbor_update_etx(neighbor_find(to), retransmissions);
neighbor_update_etx(neighbor_find(to), transmissions);
update_rtmetric(tc);
/* Remove the first packet on the queue, the packet that was just sent. */
packetqueue_dequeue(&forwarding_queue);
/* Send the next packet in the queue, if any. */
send_queued_packet();
}
/*---------------------------------------------------------------------------*/
static void
node_packet_timedout(struct runicast_conn *c, rimeaddr_t *to, uint8_t retransmissions)
node_packet_timedout(struct runicast_conn *c, rimeaddr_t *to, uint8_t transmissions)
{
struct collect_conn *tc = (struct collect_conn *)
((char *)c - offsetof(struct collect_conn, runicast_conn));
PRINTF("%d.%d: timedout after %d retransmissions\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], transmissions);
tc->forwarding = 0;
neighbor_timedout_etx(neighbor_find(to), retransmissions);
neighbor_timedout_etx(neighbor_find(to), transmissions);
update_rtmetric(tc);
/* Remove the first packet on the queue, the packet that just timed out. */
packetqueue_dequeue(&forwarding_queue);
/* Send the next packet in the queue, if any. */
send_queued_packet();
}
/*---------------------------------------------------------------------------*/
/*static void
@ -279,6 +358,9 @@ received_announcement(struct announcement *a, rimeaddr_t *from,
if(n == NULL) {
neighbor_add(from, value, 1);
PRINTF("%d.%d: new neighbor %d.%d, etx %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
from->u8[0], from->u8[1], value);
} else {
neighbor_update(n, value);
PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n",
@ -290,8 +372,8 @@ received_announcement(struct announcement *a, rimeaddr_t *from,
}
/*---------------------------------------------------------------------------*/
static const struct runicast_callbacks runicast_callbacks = {node_packet_received,
node_packet_sent,
node_packet_timedout};
node_packet_sent,
node_packet_timedout};
/*static const struct neighbor_discovery_callbacks neighbor_discovery_callbacks =
{ adv_received, NULL};*/
/*---------------------------------------------------------------------------*/
@ -328,6 +410,7 @@ collect_set_sink(struct collect_conn *tc, int should_be_sink)
if(should_be_sink) {
tc->rtmetric = SINK;
/* neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric);*/
announcement_set_value(&tc->announcement, tc->rtmetric);
} else {
tc->rtmetric = RTMETRIC_MAX;
}
@ -346,7 +429,7 @@ collect_send(struct collect_conn *tc, int rexmits)
packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, rexmits);
if(tc->rtmetric == SINK) {
if(tc->rtmetric == 0) {
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 0);
if(tc->cb->recv != NULL) {
tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER),
@ -363,13 +446,20 @@ collect_send(struct collect_conn *tc, int rexmits)
PRINTF("%d.%d: sending to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
tc->forwarding = 1;
return runicast_send(&tc->runicast_conn, &n->addr, rexmits);
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
tc)) {
send_queued_packet();
return 1;
}
} else {
/* printf("Didn't find any neighbor\n");*/
PRINTF("%d.%d: did not find any neighbor to send to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
announcement_listen(1);
if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME,
tc)) {
return 1;
}
}
}
return 0;

View file

@ -47,7 +47,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.h,v 1.10 2009/03/12 21:58:21 adamdunkels Exp $
* $Id: collect.h,v 1.11 2009/04/29 20:48:57 adamdunkels Exp $
*/
/**
@ -64,7 +64,7 @@
#include "net/rime/runicast.h"
#define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \
{ PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 2 }, \
{ PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \
{ PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \