A number of changes to the collect code:

* Added an optional "keep alive" mechanism whereby an idle network is
periodically probed by dummy packets to maintain a recent quality
metric when there is no traffic.
* Bugfix in when new routing metrics should be advertised
* Rewrote the ACK logic so that a queuebuf is not allocated for each
ack, only for those acks generated by the sink node.
* Updated the wrap-around logic for sequence numbers: when a sequence
number wraps, it won't go back to 0. Instead, it wraps to 128. This
allows us to understand when a node has rebooted: when its seqno is <
128, it has recently rebooted.
This commit is contained in:
adamdunkels 2010-09-22 22:08:08 +00:00
parent 2cae71e89b
commit eed22a6f3c
2 changed files with 170 additions and 56 deletions

View file

@ -33,7 +33,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.c,v 1.52 2010/09/14 06:48:36 adamdunkels Exp $
* $Id: collect.c,v 1.53 2010/09/22 22:08:08 adamdunkels Exp $
*/
/**
@ -129,6 +129,8 @@ struct ack_msg {
#define REXMIT_TIME CLOCK_SECOND * 1
#define FORWARD_PACKET_LIFETIME (6 * (REXMIT_TIME) << 3)
#define MAX_SENDING_QUEUE 16
#define KEEPALIVE_REXMITS 4
MEMB(send_queue_memb, struct packetqueue_item, MAX_SENDING_QUEUE);
/* These specifiy the sink's routing metric (0) and the maximum
@ -204,6 +206,8 @@ struct {
/* Forward declarations. */
static void send_queued_packet(struct collect_conn *c);
static void retransmit_callback(void *ptr);
static void set_keepalive_timer(struct collect_conn *c);
/*---------------------------------------------------------------------------*/
/**
* This function computes the current rtmetric by adding the last
@ -233,7 +237,7 @@ rtmetric_compute(struct collect_conn *tc)
} else {
/* Our rtmetric is the rtmetric of our parent neighbor plus
the expected transmissions to reach that neighbor. */
rtmetric = collect_neighbor_rtmetric(n);
rtmetric = collect_neighbor_rtmetric_link_estimate(n);
}
return rtmetric;
@ -287,8 +291,8 @@ update_parent(struct collect_conn *tc)
#if DRAW_TREE
printf("#L %d 0\n", tc->parent.u8[0]);
#endif /* DRAW_TREE */
if(collect_neighbor_rtmetric(best) + SIGNIFICANT_RTMETRIC_CHANGE <
collect_neighbor_rtmetric(current)) {
if(collect_neighbor_rtmetric_link_estimate(best) + SIGNIFICANT_RTMETRIC_CHANGE <
collect_neighbor_rtmetric_link_estimate(current)) {
/* We switch parent. */
PRINTF("update_parent: new parent %d.%d (%d) old parent %d.%d (%d)\n",
best->addr.u8[0], best->addr.u8[1],
@ -381,7 +385,7 @@ update_rtmetric(struct collect_conn *tc)
/* If we now have a significantly better or worse rtmetric than
we had before, what we need to make sure that our neighbors
find out about this quickly. */
if(new_rtmetric + SIGNIFICANT_RTMETRIC_CHANGE < old_rtmetric &&
if(new_rtmetric < old_rtmetric - SIGNIFICANT_RTMETRIC_CHANGE ||
new_rtmetric > old_rtmetric + SIGNIFICANT_RTMETRIC_CHANGE) {
PRINTF("update_rtmetric: new_rtmetric %d + %d < old_rtmetric %d\n",
new_rtmetric, SIGNIFICANT_RTMETRIC_CHANGE, old_rtmetric);
@ -502,7 +506,7 @@ send_queued_packet(struct collect_conn *c)
PRINTF("listen\n");
announcement_listen(1);
ctimer_set(&c->transmit_after_scan_timer, ANNOUNCEMENT_SCAN_TIME,
send_queued_packet, tc);
send_queued_packet, c);
#else /* COLLECT_CONF_WITH_LISTEN */
announcement_set_value(&c->announcement, RTMETRIC_MAX);
announcement_bump(&c->announcement);
@ -631,9 +635,9 @@ handle_ack(struct collect_conn *tc)
tc->transmissions);
n = collect_neighbor_list_find(&tc->neighbor_list,
packetbuf_addr(PACKETBUF_ADDR_SENDER));
collect_neighbor_tx(n, tc->transmissions);
if(n != NULL) {
collect_neighbor_tx(n, tc->transmissions);
collect_neighbor_update_rtmetric(n, rtmetric);
update_rtmetric(tc);
}
@ -651,6 +655,7 @@ handle_ack(struct collect_conn *tc)
if(msg->flags & ACK_FLAGS_RTMETRIC_NEEDS_UPDATE) {
bump_advertisement(tc);
}
set_keepalive_timer(tc);
} else {
stats.badack++;
}
@ -660,48 +665,32 @@ static void
send_ack(struct collect_conn *tc, const rimeaddr_t *to, int flags)
{
struct ack_msg *ack;
struct queuebuf *q;
uint16_t packet_seqno, packet_eseqno;
uint16_t packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID);
uint16_t packet_eseqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID);
packet_eseqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
packetbuf_clear();
packetbuf_set_datalen(sizeof(struct ack_msg));
ack = packetbuf_dataptr();
memset(ack, 0, sizeof(struct ack_msg));
ack->rtmetric = tc->rtmetric;
ack->flags = flags;
q = queuebuf_new_from_packetbuf();
if(q != NULL) {
packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK);
packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_TRANSMISSIONS, MAX_ACK_MAC_REXMITS);
unicast_send(&tc->unicast_conn, to);
packetbuf_clear();
packetbuf_set_datalen(sizeof(struct ack_msg));
ack = packetbuf_dataptr();
memset(ack, 0, sizeof(struct ack_msg));
ack->rtmetric = tc->rtmetric;
ack->flags = flags;
PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d (epacket_id %d)\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
to->u8[0],
to->u8[1],
packet_seqno, packet_eseqno);
packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK);
packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0);
packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_TRANSMISSIONS, MAX_ACK_MAC_REXMITS);
unicast_send(&tc->unicast_conn, to);
PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d (epacket_id %d)\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
to->u8[0],
to->u8[1],
packet_seqno, packet_eseqno);
RIMESTATS_ADD(acktx);
queuebuf_to_packetbuf(q);
queuebuf_free(q);
stats.acksent++;
} else {
PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
to->u8[0], to->u8[1],
packet_seqno);
stats.ackdrop++;
}
RIMESTATS_ADD(acktx);
stats.acksent++;
}
/*---------------------------------------------------------------------------*/
static void
@ -774,9 +763,23 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
/* If we are the sink, the packet has reached its final
destination and we call the receive function. */
if(tc->rtmetric == RTMETRIC_SINK) {
struct queuebuf *q;
/* We first send the ACK. We copy the data packet to a queuebuf
first. */
q = queuebuf_new_from_packetbuf();
if(q != NULL) {
send_ack(tc, &ack_to, 0);
queuebuf_to_packetbuf(q);
queuebuf_free(q);
} else {
PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n",
rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1],
ack_to.u8[0], ack_to.u8[1],
packet_seqno);
stats.ackdrop++;
}
/* We first send the ACK. */
send_ack(tc, &ack_to, 0);
PRINTF("%d.%d: sink received packet %d from %d.%d via %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
@ -831,7 +834,8 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
send_ack(tc, &ack_to, ackflags);
send_queued_packet(tc);
} else {
send_ack(tc, &ack_to, ackflags | ACK_FLAGS_DROPPED | ACK_FLAGS_CONGESTED);
send_ack(tc, &ack_to,
ackflags | ACK_FLAGS_DROPPED | ACK_FLAGS_CONGESTED);
PRINTF("%d.%d: packet dropped: no queue buffer available\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
stats.qdrop++;
@ -906,8 +910,8 @@ timedout(struct collect_conn *tc)
&tc->current_parent),
tc->max_rexmits);
update_rtmetric(tc);
send_next_packet(tc);
set_keepalive_timer(tc);
}
/*---------------------------------------------------------------------------*/
static void
@ -938,7 +942,22 @@ adv_received(struct neighbor_discovery_conn *c, const rimeaddr_t *from,
if(n == NULL) {
collect_neighbor_list_add(&tc->neighbor_list, from, rtmetric);
if(rtmetric == RTMETRIC_MAX) {
bump_advertisement(tc);
}
} else {
/* Check if the advertised rtmetric has changed to
RTMETRIC_MAX. This may indicate that the neighbor has lost its
routes or that it has rebooted. In either case, we bump our
advertisement rate to allow our neighbor to receive a new
rtmetric from us. If our neighbor already happens to have an
rtmetric of RTMETRIC_MAX recorded, it may mean that our
neighbor does not hear our advertisements. If this is the case,
we should not bump our advertisement rate. */
if(rtmetric == RTMETRIC_MAX &&
collect_neighbor_rtmetric(n) != RTMETRIC_MAX) {
bump_advertisement(tc);
}
collect_neighbor_update_rtmetric(n, rtmetric);
PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
@ -963,7 +982,22 @@ received_announcement(struct announcement *a, const rimeaddr_t *from,
PRINTF("%d.%d: new neighbor %d.%d, etx %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
from->u8[0], from->u8[1], value);
if(value == RTMETRIC_MAX) {
bump_advertisement(tc);
}
} else {
/* Check if the advertised rtmetric has changed to
RTMETRIC_MAX. This may indicate that the neighbor has lost its
routes or that it has rebooted. In either case, we bump our
advertisement rate to allow our neighbor to receive a new
rtmetric from us. If our neighbor already happens to have an
rtmetric of RTMETRIC_MAX recorded, it may mean that our
neighbor does not hear our advertisements. If this is the case,
we should not bump our advertisement rate. */
if(value == RTMETRIC_MAX &&
collect_neighbor_rtmetric(n) != RTMETRIC_MAX) {
bump_advertisement(tc);
}
collect_neighbor_update_rtmetric(n, value);
PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
@ -1021,6 +1055,64 @@ collect_open(struct collect_conn *tc, uint16_t channels,
#endif /* !COLLECT_ANNOUNCEMENTS */
}
/*---------------------------------------------------------------------------*/
static void
send_keepalive(void *ptr)
{
struct collect_conn *c = ptr;
struct collect_neighbor *n;
set_keepalive_timer(c);
/* Send keepalive message only if there are no pending transmissions. */
if(packetqueue_len(&c->send_queue) == 0) {
packetbuf_clear();
packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr);
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1);
packetbuf_set_attr(PACKETBUF_ATTR_TTL, 1);
packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, KEEPALIVE_REXMITS);
PRINTF("%d.%d: saending keepalive packet %d, max_rexmits %d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID),
packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT));
/* Allocate space for the header. */
packetbuf_hdralloc(sizeof(struct data_msg_hdr));
n = collect_neighbor_list_find(&c->neighbor_list, &c->parent);
if(n != NULL) {
PRINTF("%d.%d: sending keepalive to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
if(packetqueue_enqueue_packetbuf(&c->send_queue,
FORWARD_PACKET_LIFETIME,
c)) {
send_queued_packet(c);
}
}
}
}
/*---------------------------------------------------------------------------*/
static void
set_keepalive_timer(struct collect_conn *c)
{
if(c->keepalive_period != 0) {
ctimer_set(&c->keepalive_timer, (c->keepalive_period / 2) +
(random_rand() % (c->keepalive_period / 2)),
send_keepalive, c);
} else {
ctimer_stop(&c->keepalive_timer);
}
}
/*---------------------------------------------------------------------------*/
void
collect_set_keepalive(struct collect_conn *c, clock_time_t period)
{
c->keepalive_period = period;
set_keepalive_timer(c);
}
/*---------------------------------------------------------------------------*/
void
collect_close(struct collect_conn *tc)
{
@ -1050,6 +1142,8 @@ collect_set_sink(struct collect_conn *tc, int should_be_sink)
announcement_set_value(&tc->announcement, tc->rtmetric);
#endif /* COLLECT_ANNOUNCEMENTS */
update_rtmetric(tc);
bump_advertisement(tc);
}
/*---------------------------------------------------------------------------*/
int
@ -1057,7 +1151,21 @@ collect_send(struct collect_conn *tc, int rexmits)
{
struct collect_neighbor *n;
packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno++);
packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno);
/* Increase the sequence number for the packet we send out. We
employ a trick that allows us to see that a node has been
rebooted: if the sequence number wraps to 0, we set it to half of
the sequence number space. This allows us to detect reboots,
since if a sequence number is less than half of the sequence
number space, the data comes from a node that was recently
rebooted. */
tc->eseqno = (tc->eseqno + 1) % (1 << COLLECT_PACKET_ID_BITS);
if(tc->eseqno == 0) {
tc->eseqno = ((int)(1 << COLLECT_PACKET_ID_BITS)) / 2;
}
packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr);
packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1);
packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM);
@ -1135,6 +1243,7 @@ void
collect_purge(struct collect_conn *tc)
{
collect_neighbor_list_purge(&tc->neighbor_list);
rimeaddr_copy(&tc->parent, &rimeaddr_null);
update_rtmetric(tc);
#if DRAW_TREE
printf("#L %d 0\n", tc->parent.u8[0]);

View file

@ -47,7 +47,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.h,v 1.20 2010/09/13 13:28:14 adamdunkels Exp $
* $Id: collect.h,v 1.21 2010/09/22 22:08:08 adamdunkels Exp $
*/
/**
@ -93,11 +93,14 @@ struct collect_conn {
struct ctimer transmit_after_scan_timer;
#endif /* COLLECT_CONF_ANNOUNCEMENTS */
const struct collect_callbacks *cb;
struct ctimer t;
struct ctimer retransmission_timer;
LIST_STRUCT(send_queue_list);
struct packetqueue send_queue;
struct collect_neighbor_list neighbor_list;
struct ctimer keepalive_timer;
clock_time_t keepalive_period;
rimeaddr_t parent, current_parent;
uint16_t rtmetric;
uint8_t seqno;
@ -122,6 +125,8 @@ void collect_set_sink(struct collect_conn *c, int should_be_sink);
int collect_depth(struct collect_conn *c);
void collect_set_keepalive(struct collect_conn *c, clock_time_t period);
void collect_print_stats(void);
#define COLLECT_MAX_DEPTH 255