Minor bugfixes pertaining to queue handling: if a packet was explicitly dropped, the packet would later be acknowledged despite it never being received; added a limit on how many queue entries that can be used for packets that are being forwarded, to allow for originated packets even if the forwarding queue is filled.

This commit is contained in:
adamdunkels 2011-01-10 15:10:43 +00:00
parent 70f9f075a0
commit 81d23a3a36

View file

@ -33,7 +33,7 @@
*
* This file is part of the Contiki operating system.
*
* $Id: collect.c,v 1.71 2011/01/09 23:49:27 adamdunkels Exp $
* $Id: collect.c,v 1.72 2011/01/10 15:10:43 adamdunkels Exp $
*/
/**
@ -71,7 +71,7 @@ static const struct packetbuf_attrlist attributes[] =
and the connection for packets that have been recently
forwarded. This list is maintained to avoid forwarding duplicate
packets. */
#define NUM_RECENT_PACKETS 16
#define NUM_RECENT_PACKETS 8
struct recent_packet {
struct collect_conn *conn;
@ -127,8 +127,9 @@ struct ack_msg {
#define MAX_MAC_REXMITS 2
#define MAX_ACK_MAC_REXMITS 5
#define REXMIT_TIME CLOCK_SECOND * 4
#define FORWARD_PACKET_LIFETIME_BASE REXMIT_TIME
#define FORWARD_PACKET_LIFETIME_BASE REXMIT_TIME * 2
#define MAX_SENDING_QUEUE 3 * QUEUEBUF_NUM / 4
#define MIN_AVAILABLE_QUEUE_ENTRIES 3
#define KEEPALIVE_REXMITS 8
#define MAX_REXMITS 31
@ -739,12 +740,12 @@ retransmit_current_packet(struct collect_conn *c)
static void
send_next_packet(struct collect_conn *tc)
{
/* Cancel retransmission timer. */
ctimer_stop(&tc->retransmission_timer);
/* Remove the first packet on the queue, the packet that was just sent. */
packetqueue_dequeue(&tc->send_queue);
tc->seqno = (tc->seqno + 1) % (1 << COLLECT_PACKET_ID_BITS);
/* Cancel retransmission timer. */
ctimer_stop(&tc->retransmission_timer);
tc->sending = 0;
tc->transmissions = 0;
@ -815,6 +816,7 @@ handle_ack(struct collect_conn *tc)
transmission number to its routing metric, which increases the
chance that another parent will be chosen. */
if(msg->flags & ACK_FLAGS_CONGESTED) {
PRINTF("ACK flag indicated parent was congested.\n");
collect_neighbor_set_congested(n);
collect_neighbor_tx(n, tc->max_rexmits * 2);
update_rtmetric(tc);
@ -832,9 +834,13 @@ handle_ack(struct collect_conn *tc)
/* If the packet was dropped, but without the node being
congested or the packets lifetime being exceeded, we
penalize the parent and try sending the packet again. */
PRINTF("ACK flag indicated packet was dropped by parent.\n");
collect_neighbor_tx(n, tc->max_rexmits);
update_rtmetric(tc);
send_queued_packet(tc);
ctimer_set(&tc->retransmission_timer,
REXMIT_TIME + (random_rand() % (REXMIT_TIME)),
retransmit_callback, tc);
}
}
@ -880,6 +886,23 @@ send_ack(struct collect_conn *tc, const rimeaddr_t *to, int flags)
}
/*---------------------------------------------------------------------------*/
static void
add_packet_to_recent_packets(struct collect_conn *tc)
{
/* Remember that we have seen this packet for later, but only if
it has a length that is larger than zero. Packets with size
zero are keepalive or proactive link estimate probes, so we do
not record them in our history. */
if(packetbuf_datalen() > sizeof(struct data_msg_hdr)) {
recent_packets[recent_packet_ptr].eseqno =
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
rimeaddr_copy(&recent_packets[recent_packet_ptr].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER));
recent_packets[recent_packet_ptr].conn = tc;
recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS;
}
}
/*---------------------------------------------------------------------------*/
static void
node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
{
struct collect_conn *tc = (struct collect_conn *)
@ -912,7 +935,7 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
uint8_t packet_seqno;
stats.datarecv++;
/* Remember to whom we should send the ACK, since we reuse the
packet buffer and its attributes when sending the ACK. */
rimeaddr_copy(&ack_to, packetbuf_addr(PACKETBUF_ADDR_SENDER));
@ -946,23 +969,13 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
}
}
/* Remember that we have seen this packet for later, but only if
it has a length that is larger than zero. Packets with size
zero are keepalive or proactive link estimate probes, so we do
not record them in our history. */
if(packetbuf_datalen() > sizeof(struct data_msg_hdr)) {
recent_packets[recent_packet_ptr].eseqno =
packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID);
rimeaddr_copy(&recent_packets[recent_packet_ptr].originator,
packetbuf_addr(PACKETBUF_ADDR_ESENDER));
recent_packets[recent_packet_ptr].conn = tc;
recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS;
}
/* If we are the sink, the packet has reached its final
destination and we call the receive function. */
if(tc->rtmetric == RTMETRIC_SINK) {
struct queuebuf *q;
add_packet_to_recent_packets(tc);
/* We first send the ACK. We copy the data packet to a queuebuf
first. */
q = queuebuf_new_from_packetbuf();
@ -1025,13 +1038,15 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from)
we are able to enqueue the packet, we send a positive ACK. If
we are unable to enqueue the packet, we send a negative ACK
to inform the sender that the packet was dropped due to
memory problems. We first check the size of our sending
queue. */
if(packetqueue_enqueue_packetbuf(&tc->send_queue,
memory problems. We first check the size of our sending queue
to ensure that we always have entries for packets that
are originated by this node. */
if(packetqueue_len(&tc->send_queue) <= MAX_SENDING_QUEUE - MIN_AVAILABLE_QUEUE_ENTRIES &&
packetqueue_enqueue_packetbuf(&tc->send_queue,
FORWARD_PACKET_LIFETIME_BASE *
packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT),
tc)) {
add_packet_to_recent_packets(tc);
send_ack(tc, &ack_to, ackflags);
send_queued_packet(tc);
} else {
@ -1073,6 +1088,10 @@ timedout(struct collect_conn *tc)
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->transmissions,
tc->current_parent.u8[0], tc->current_parent.u8[1],
tc->max_rexmits);
printf("%d.%d: timedout after %d retransmissions to %d.%d (max retransmissions %d): packet dropped\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->transmissions,
tc->current_parent.u8[0], tc->current_parent.u8[1],
tc->max_rexmits);
tc->sending = 0;
n = collect_neighbor_list_find(&tc->neighbor_list,
@ -1290,6 +1309,7 @@ collect_open(struct collect_conn *tc, uint16_t channels,
ctimer_set(&tc->proactive_probing_timer, PROACTIVE_PROBING_INTERVAL,
proactive_probing_callback, tc);
}
/*---------------------------------------------------------------------------*/
static void
@ -1377,7 +1397,8 @@ int
collect_send(struct collect_conn *tc, int rexmits)
{
struct collect_neighbor *n;
int ret;
packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno);
/* Increase the sequence number for the packet we send out. We
@ -1420,23 +1441,25 @@ collect_send(struct collect_conn *tc, int rexmits)
/* Allocate space for the header. */
packetbuf_hdralloc(sizeof(struct data_msg_hdr));
if(packetqueue_enqueue_packetbuf(&tc->send_queue,
FORWARD_PACKET_LIFETIME_BASE *
packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT),
tc)) {
send_queued_packet(tc);
ret = 1;
} else {
PRINTF("%d.%d: drop originated packet: no queuebuf\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
printf("%d.%d: drop originated packet: no queuebuf\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
n = collect_neighbor_list_find(&tc->neighbor_list, &tc->parent);
if(n != NULL) {
PRINTF("%d.%d: sending to %d.%d\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1],
n->addr.u8[0], n->addr.u8[1]);
if(packetqueue_enqueue_packetbuf(&tc->send_queue,
FORWARD_PACKET_LIFETIME_BASE *
packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT),
tc)) {
send_queued_packet(tc);
return 1;
} else {
PRINTF("%d.%d: drop originated packet: no queuebuf\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
} else {
PRINTF("%d.%d: did not find any neighbor to send to\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
@ -1452,7 +1475,7 @@ collect_send(struct collect_conn *tc, int rexmits)
#endif /* COLLECT_CONF_WITH_LISTEN */
#endif /* COLLECT_ANNOUNCEMENTS */
if(packetqueue_enqueue_packetbuf(&tc->send_queue,
/* if(packetqueue_enqueue_packetbuf(&tc->send_queue,
FORWARD_PACKET_LIFETIME_BASE *
packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT),
tc)) {
@ -1460,10 +1483,12 @@ collect_send(struct collect_conn *tc, int rexmits)
} else {
PRINTF("%d.%d: drop originated packet: no queuebuf\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}
printf("%d.%d: drop originated packet: no queuebuf\n",
rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]);
}*/
}
}
return 0;
return ret;
}
/*---------------------------------------------------------------------------*/
int