/*
 * Copyright (c) 2010, Swedish Institute of Computer Science
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the name of the Institute nor the names of its contributors
 *    may be used to endorse or promote products derived from this software
 *    without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE INSTITUTE AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE INSTITUTE OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 */

/**
 * \file
 *	Logic for relational databases.
 * \author
 * 	Nicolas Tsiftes <nvt@sics.se>
 */

#include <limits.h>
#include <string.h>

#include "lib/crc16.h"
#include "lib/list.h"
#include "lib/memb.h"

#define DEBUG DEBUG_NONE
#include "net/ip/uip-debug.h"

#include "db-options.h"
#include "index.h"
#include "lvm.h"
#include "relation.h"
#include "result.h"
#include "storage.h"
#include "aql.h"

/*
 * The source_dest_map structure is used for mapping the pointers to 
 * data in a source row and in the corresponding destination row. The 
 * structure is calculated just before processing a relational 
 * selection, and then used to improve the performance when processing 
 * each row.
*/
struct source_dest_map {
  attribute_t *from_attr;
  attribute_t *to_attr;
  unsigned from_offset;
  unsigned to_offset;
};

static struct source_dest_map attr_map[AQL_ATTRIBUTE_LIMIT];

#if DB_FEATURE_JOIN
/*
 * The source_map structure is used for mapping attributes to
 * their offsets in rows.
 */
struct source_map {
  attribute_t *attr;
  unsigned char *from_ptr;
};

static struct source_map source_map[AQL_ATTRIBUTE_LIMIT];
#endif /* DB_FEATURE_JOIN */

static unsigned char row[DB_MAX_ATTRIBUTES_PER_RELATION * DB_MAX_ELEMENT_SIZE];
static unsigned char extra_row[DB_MAX_ATTRIBUTES_PER_RELATION * DB_MAX_ELEMENT_SIZE];
static unsigned char result_row[AQL_ATTRIBUTE_LIMIT * DB_MAX_ELEMENT_SIZE];
static unsigned char * const left_row = row;
static unsigned char * const right_row = extra_row;
static unsigned char * const join_row = result_row;

LIST(relations);
MEMB(relations_memb, relation_t, DB_RELATION_POOL_SIZE);
MEMB(attributes_memb, attribute_t, DB_ATTRIBUTE_POOL_SIZE);

static relation_t *relation_find(char *);
static attribute_t *attribute_find(relation_t *, char *);
static int get_attribute_value_offset(relation_t *, attribute_t *);
static void attribute_free(relation_t *, attribute_t *);
static void purge_relations(void);
static void relation_clear(relation_t *);
static relation_t *relation_allocate(void);
static void relation_free(relation_t *);

static relation_t *
relation_find(char *name)
{
  relation_t *rel;

  for(rel = list_head(relations); rel != NULL; rel = rel->next) {
    if(strcmp(rel->name, name) == 0) {
      return rel;
    }
  }

  return NULL;
}

static attribute_t *
attribute_find(relation_t *rel, char *name)
{
  attribute_t *attr;

  for(attr = list_head(rel->attributes); attr != NULL; attr = attr->next) {
    if(strcmp(attr->name, name) == 0) {
      return attr;
    }
  }
    return NULL;
}

static int
get_attribute_value_offset(relation_t *rel, attribute_t *attr)
{
  attribute_t *ptr;
  int offset;

  for(offset = 0, ptr = list_head(rel->attributes);
      ptr != NULL;
      ptr = ptr->next) {
    if(ptr == attr) {
      return offset;
    }
    offset += ptr->element_size;
  }

  return -1;
}

static void
attribute_free(relation_t *rel, attribute_t *attr)
{
  if(attr->index != NULL) {
    index_release(attr->index);
  }
  memb_free(&attributes_memb, attr);
  rel->attribute_count--;
}

static void
purge_relations(void)
{
  relation_t *rel;
  relation_t *next;

  for(rel = list_head(relations); rel != NULL;) {
    next = rel->next;
    if(rel->references == 0) {
      relation_free(rel);
    }
    rel = next;
  }
}

static void
relation_clear(relation_t *rel)
{
  memset(rel, 0, sizeof(*rel));
  rel->tuple_storage = -1;
  rel->cardinality = INVALID_TUPLE;
  rel->dir = DB_STORAGE;
  LIST_STRUCT_INIT(rel, attributes);
}

static relation_t *
relation_allocate(void)
{
  relation_t *rel;

  rel = memb_alloc(&relations_memb);
  if(rel == NULL) {
    purge_relations();
    rel = memb_alloc(&relations_memb);
    if(rel == NULL) {
      PRINTF("DB: Failed to allocate a relation\n");
      return NULL;
    }
  }

  relation_clear(rel);
  return rel;
}

static void
relation_free(relation_t *rel)
{
  attribute_t *attr;

  while((attr = list_pop(rel->attributes)) != NULL) {
    attribute_free(rel, attr);
  }

  list_remove(relations, rel);
  memb_free(&relations_memb, rel);
}

db_result_t
relation_init(void)
{
  list_init(relations);
  memb_init(&relations_memb);
  memb_init(&attributes_memb);

  return DB_OK;
}

relation_t *
relation_load(char *name)
{
  relation_t *rel;

  rel = relation_find(name);
  if(rel != NULL) {
    rel->references++;
    goto end;
  }

  rel = relation_allocate();
  if(rel == NULL) {
    return NULL;
  }

  if(DB_ERROR(storage_get_relation(rel, name))) {
    memb_free(&relations_memb, rel);
    return NULL;
  }

  memcpy(rel->name, name, sizeof(rel->name));
  rel->name[sizeof(rel->name) - 1] = '\0';
  rel->references = 1;
  list_add(relations, rel);

end:
  if(rel->dir == DB_STORAGE && DB_ERROR(storage_load(rel))) {
    relation_release(rel);
    return NULL;
  }

  return rel;
}

db_result_t
relation_release(relation_t *rel)
{
  if(rel->references > 0) {
    rel->references--;
  }

  if(rel->references == 0) {
    storage_unload(rel);
  }

  return DB_OK;
}

relation_t *
relation_create(char *name, db_direction_t dir)
{
  relation_t old_rel;
  relation_t *rel;

  if(*name != '\0') {
    relation_clear(&old_rel);

    if(storage_get_relation(&old_rel, name) == DB_OK) {
      /* Reject a creation request if the relation already exists. */
      PRINTF("DB: Attempted to create a relation that already exists (%s)\n",
             name);
      return NULL;
    }

    rel = relation_allocate();
    if(rel == NULL) {
      return NULL;
    }

    rel->cardinality = 0;

    strncpy(rel->name, name, sizeof(rel->name) - 1);
    rel->name[sizeof(rel->name) - 1] = '\0';
    rel->dir = dir;

    if(dir == DB_STORAGE) {
      storage_drop_relation(rel, 1);

      if(storage_put_relation(rel) == DB_OK) {
        list_add(relations, rel);
        return rel;
      }
      memb_free(&relations_memb, rel);
    } else {
      list_add(relations, rel);
      return rel;
    }
  }

  return NULL;
}

#if DB_FEATURE_REMOVE
db_result_t
relation_rename(char *old_name, char *new_name)
{
  if(DB_ERROR(relation_remove(new_name, 0)) ||
     DB_ERROR(storage_rename_relation(old_name, new_name))) {
    return DB_STORAGE_ERROR;
  }

  return DB_OK;
}
#endif /* DB_FEATURE_REMOVE */

attribute_t *
relation_attribute_add(relation_t *rel, db_direction_t dir, char *name,
		       domain_t domain, size_t element_size)
{
  attribute_t *attribute;
  tuple_id_t cardinality;

  cardinality = relation_cardinality(rel);
  if(cardinality != INVALID_TUPLE && cardinality > 0) {
    PRINTF("DB: Attempt to create an attribute in a non-empty relation\n");
    return NULL;
  }

  if(element_size == 0 || element_size > DB_MAX_ELEMENT_SIZE) {
    PRINTF("DB: Unacceptable element size: %u\n", element_size);
    return NULL;
  }

  attribute = memb_alloc(&attributes_memb);
  if(attribute == NULL) {
    PRINTF("DB: Failed to allocate attribute \"%s\"!\n", name);
    return NULL;
  }

  strncpy(attribute->name, name, sizeof(attribute->name) - 1);
  attribute->name[sizeof(attribute->name) - 1] = '\0';
  attribute->domain = domain;
  attribute->element_size = element_size;
  attribute->aggregator = 0;
  attribute->index = NULL;
  attribute->flags = 0 /*ATTRIBUTE_FLAG_UNIQUE*/;

  rel->row_length += element_size;

  list_add(rel->attributes, attribute);
  rel->attribute_count++;

  if(dir == DB_STORAGE) {
    if(DB_ERROR(storage_put_attribute(rel, attribute))) {
       PRINTF("DB: Failed to store attribute %s\n", attribute->name);
       memb_free(&attributes_memb, attribute);
       return NULL;
    }
  } else {
    index_load(rel, attribute);
  }

  return attribute;
}

attribute_t *
relation_attribute_get(relation_t *rel, char *name)
{
  attribute_t *attr;

  attr = attribute_find(rel, name);
  if(attr != NULL && !(attr->flags & ATTRIBUTE_FLAG_INVALID)) {
    return attr;
  }

  return NULL;
}

db_result_t
relation_attribute_remove(relation_t *rel, char *name)
{
  /* Not implemented completely. */
  return DB_IMPLEMENTATION_ERROR;
#if 0
  attribute_t *attr;

  if(rel->references > 1) {
    return DB_BUSY_ERROR;
  }

  attr = relation_attribute_get(rel, name);
  if(attr == NULL) {
    return DB_NAME_ERROR;
  }

  list_remove(rel->attributes, attr);
  attribute_free(rel, attr);
  return DB_OK;
#endif
}

db_result_t
relation_get_value(relation_t *rel, attribute_t *attr,
                  unsigned char *row_ptr, attribute_value_t *value)
{
  int offset;
  unsigned char *from_ptr;

  offset = get_attribute_value_offset(rel, attr);
  if(offset < 0) {
    return DB_IMPLEMENTATION_ERROR;
  }
  from_ptr = row_ptr + offset;

  return db_phy_to_value(value, attr, from_ptr);
}

db_result_t
relation_set_primary_key(relation_t *rel, char *name)
{
  attribute_t *attribute;

  attribute = relation_attribute_get(rel, name);
  if(attribute == NULL) {
    return DB_NAME_ERROR;
  }

  attribute->flags = ATTRIBUTE_FLAG_PRIMARY_KEY;
  PRINTF("DB: New primary key: %s\n", attribute->name);

  return DB_OK;
}

db_result_t
relation_remove(char *name, int remove_tuples)
{
  relation_t *rel;
  db_result_t result;

  rel = relation_load(name);
  if(rel == NULL) {
    /*
     * Attempt to remove an inexistent relation. To allow for this 
     * operation to be used for setting up repeatable tests and 
     * experiments, we do not signal an error.
     */
    return DB_OK;
  }

  if(rel->references > 1) {
    return DB_BUSY_ERROR;
  }

  result = storage_drop_relation(rel, remove_tuples);
  relation_free(rel);
  return result;
}

db_result_t
relation_insert(relation_t *rel, attribute_value_t *values)
{
  attribute_t *attr;
  unsigned char record[rel->row_length];
  unsigned char *ptr;
  attribute_value_t *value;
  db_result_t result;

  value = values;

  PRINTF("DB: Relation %s has a record size of %u bytes\n",
	 rel->name, (unsigned)rel->row_length);
  ptr = record;

  PRINTF("DB: Insert (");

  for(attr = list_head(rel->attributes); attr != NULL; attr = attr->next, value++) {
    /* Verify that the value is in the expected domain. An exception
       to this rule is that INT may be promoted to LONG. */
    if(attr->domain != value->domain &&
       !(attr->domain == DOMAIN_LONG && value->domain == DOMAIN_INT)) {
      PRINTF("DB: The value domain %d does not match the domain %d of attribute %s\n",
             value->domain, attr->domain, attr->name);
      return DB_RELATIONAL_ERROR;
    }

    /* Set the data area for removed attributes to 0. */
    if(attr->flags & ATTRIBUTE_FLAG_INVALID) {
      memset(ptr, 0, attr->element_size);
      ptr += attr->element_size;
      continue;
    }

    result = db_value_to_phy((unsigned char *)ptr, attr, value);
    if(DB_ERROR(result)) {
      return result;
    }

#if DEBUG
    switch(attr->domain) {
    case DOMAIN_INT:
      PRINTF("%s=%d", attr->name, VALUE_INT(value));
      break;
    case DOMAIN_LONG:
      PRINTF("%s=%ld", attr->name, VALUE_LONG(value));
      break;
    case DOMAIN_STRING:
      PRINTF("%s='%s", attr->name, VALUE_STRING(value));
      break;
    default:
      PRINTF(")\nDB: Unhandled attribute domain: %d\n", attr->domain);
      return DB_TYPE_ERROR;
    }

    if(attr->next != NULL) {
      PRINTF(", ");
    }
#endif /* DEBUG */

    ptr += attr->element_size;
    if(attr->index != NULL) {
      if(DB_ERROR(index_insert(attr->index, value, rel->next_row))) {
        return DB_INDEX_ERROR;
      }
    }
  }

  PRINTF(")\n");

  rel->cardinality++;
  rel->next_row++;
  return storage_put_row(rel, record);
}

static void
aggregate(attribute_t *attr, attribute_value_t *value)
{
  long long_value;

  switch(value->domain) {
  case DOMAIN_INT:
    long_value = VALUE_INT(value);
    break;
  case DOMAIN_LONG:
    long_value = VALUE_LONG(value);
    break;
  default:
    return;
  }

  switch(attr->aggregator) {
  case AQL_COUNT:
    attr->aggregation_value++;
    break;
  case AQL_SUM:
    attr->aggregation_value += long_value;
    break;
  case AQL_MEAN:
    break;
  case AQL_MEDIAN:
    break;
  case AQL_MAX:
    if(long_value > attr->aggregation_value) {
      attr->aggregation_value = long_value;
    }
    break;
  case AQL_MIN:
    if(long_value < attr->aggregation_value) {
      attr->aggregation_value = long_value;
    }
    break;
  default:
    break;
  }
}

static db_result_t
generate_attribute_map(struct source_dest_map *attr_map, unsigned attribute_count,
                       relation_t *from_rel, relation_t *to_rel, 
                       unsigned char *from_row, unsigned char *to_row)
{
  attribute_t *from_attr;
  attribute_t *to_attr;
  unsigned size_sum;
  struct source_dest_map *attr_map_ptr;
  int offset;

  attr_map_ptr = attr_map;
  for(size_sum = 0, to_attr = list_head(to_rel->attributes);
      to_attr != NULL;
      to_attr = to_attr->next) {
    from_attr = attribute_find(from_rel, to_attr->name);
    if(from_attr == NULL) {
       PRINTF("DB: Invalid attribute in the result relation: %s\n",
		to_attr->name);
      return DB_NAME_ERROR;
    }

    attr_map_ptr->from_attr = from_attr;
    attr_map_ptr->to_attr = to_attr;
    offset = get_attribute_value_offset(from_rel, from_attr);
    if(offset < 0) {
      return DB_IMPLEMENTATION_ERROR;
    }
    attr_map_ptr->from_offset = offset;
    attr_map_ptr->to_offset = size_sum;

    size_sum += to_attr->element_size;
    attr_map_ptr++;
  }

  return DB_OK;
}

static void
select_index(db_handle_t *handle, lvm_instance_t *lvm_instance)
{
  index_t *index;
  attribute_t *attr;
  operand_value_t min;
  operand_value_t max;
  attribute_value_t av_min;
  attribute_value_t av_max;
  long range;
  unsigned long min_range;

  index = NULL;
  min_range = ULONG_MAX;

  /* Find all indexed and derived attributes, and select the index of 
     the attribute with the smallest range. */
  for(attr = list_head(handle->rel->attributes);
      attr != NULL;
      attr = attr->next) {
    if(attr->index != NULL &&
       !LVM_ERROR(lvm_get_derived_range(lvm_instance, attr->name, &min, &max))) {
      range = (unsigned long)max.l - (unsigned long)min.l;
      PRINTF("DB: The search range for attribute \"%s\" comprises %ld values\n",
             attr->name, range + 1);

      if(range <= min_range) {
        index = attr->index;
        av_min.domain = av_max.domain = DOMAIN_INT;
        VALUE_LONG(&av_min) = min.l;
        VALUE_LONG(&av_max) = max.l;
      }
    }
  }

  if(index != NULL) {
    /* We found a suitable index; get an iterator for it. */
    if(index_get_iterator(&handle->index_iterator, index, 
                          &av_min, &av_max) == DB_OK) {
      handle->flags |= DB_HANDLE_FLAG_SEARCH_INDEX;
    }
  }
}

static db_result_t
generate_selection_result(db_handle_t *handle, relation_t *rel, aql_adt_t *adt)
{
  relation_t *result_rel;
  unsigned attribute_count;
  attribute_t *attr;

  result_rel = handle->result_rel;

  handle->current_row = 0;
  handle->ncolumns = 0;
  handle->tuple_id = 0;
  for(attr = list_head(result_rel->attributes); attr != NULL; attr = attr->next) {
    if(attr->flags & ATTRIBUTE_FLAG_NO_STORE) {
      continue;
    }
    handle->ncolumns++;
  }
  handle->tuple = (tuple_t)result_row;

  attribute_count = result_rel->attribute_count;
  if(DB_ERROR(generate_attribute_map(attr_map, attribute_count, rel, result_rel, row, result_row))) {
    return DB_IMPLEMENTATION_ERROR;
  }

  if(adt->lvm_instance != NULL) {
    /* Try to establish acceptable ranges for the attribute values. */
    if(!LVM_ERROR(lvm_derive(adt->lvm_instance))) {
      select_index(handle, adt->lvm_instance);
    }
  }

  handle->flags |= DB_HANDLE_FLAG_PROCESSING;

  return DB_OK;
}

#if DB_FEATURE_REMOVE
db_result_t
relation_process_remove(void *handle_ptr)
{
  db_handle_t *handle;
  aql_adt_t *adt;
  db_result_t result;

  handle = (db_handle_t *)handle_ptr;
  adt = handle->adt;

  result = relation_process_select(handle_ptr);
  if(result == DB_FINISHED) {
    PRINTF("DB: Finished removing tuples. Overwriting relation %s with the result\n", 
	adt->relations[1]);
    relation_release(handle->rel);
    relation_rename(adt->relations[0], adt->relations[1]);
  }

  return result;
}
#endif

db_result_t
relation_process_select(void *handle_ptr)
{
  db_handle_t *handle;
  aql_adt_t *adt;
  db_result_t result;
  unsigned attribute_count;
  struct source_dest_map *attr_map_ptr, *attr_map_end;
  attribute_t *result_attr;
  unsigned char *from_ptr;
  unsigned char *to_ptr;
  operand_value_t operand_value;
  uint8_t intbuf[2];
  attribute_value_t value;
  lvm_status_t wanted_result;

  handle = (db_handle_t *)handle_ptr;
  adt = (aql_adt_t *)handle->adt;

  attribute_count = handle->result_rel->attribute_count;
  attr_map_end = attr_map + attribute_count;

  if(handle->flags & DB_HANDLE_FLAG_SEARCH_INDEX) {
    handle->tuple_id = index_get_next(&handle->index_iterator);
    if(handle->tuple_id == INVALID_TUPLE) {
      PRINTF("DB: An attribute value could not be found in the index\n");
      if(handle->index_iterator.next_item_no == 0) {
        return DB_INDEX_ERROR;
      }

      if(adt->flags & AQL_FLAG_AGGREGATE) {
        goto end_aggregation;
      }

      return DB_FINISHED;
    }
  }

  /* Put the tuples fulfilling the given condition into a new relation.
     The tuples may be projected. */
  result = storage_get_row(handle->rel, &handle->tuple_id, row);
  handle->tuple_id++;
  if(DB_ERROR(result)) {
    PRINTF("DB: Failed to get a row in relation %s!\n", handle->rel->name);
    return result;
  } else if(result == DB_FINISHED) {
    if(AQL_GET_FLAGS(adt) & AQL_FLAG_AGGREGATE) {
      goto end_aggregation;
    }
    return DB_FINISHED;
  }

  /* Process the attributes in the result relation. */
  for(attr_map_ptr = attr_map; attr_map_ptr < attr_map_end; attr_map_ptr++) {
    from_ptr = row + attr_map_ptr->from_offset;
    result_attr = attr_map_ptr->to_attr;

    /* Update the internal state of the PLE. */
    if(result_attr->domain == DOMAIN_INT) {
      operand_value.l = from_ptr[0] << 8 | from_ptr[1];
      lvm_set_variable_value(result_attr->name, operand_value);
    } else if(result_attr->domain == DOMAIN_LONG) {
      operand_value.l = (uint32_t)from_ptr[0] << 24 |
                        (uint32_t)from_ptr[1] << 16 |
                        (uint32_t)from_ptr[2] << 8 |
                        from_ptr[3];
      lvm_set_variable_value(result_attr->name, operand_value);
    }

    if(result_attr->flags & ATTRIBUTE_FLAG_NO_STORE) {
      /* The attribute is used just for the predicate,
         so do not copy the current value into the result. */
      continue;
    }

    if(!(AQL_GET_FLAGS(adt) & AQL_FLAG_AGGREGATE)) {
      /* No aggregators. Copy the original value into the resulting tuple. */
      memcpy(result_row + attr_map_ptr->to_offset, from_ptr,
             result_attr->element_size);
    }
  }

  wanted_result = TRUE;
  if(AQL_GET_FLAGS(adt) & AQL_FLAG_INVERSE_LOGIC) {
    wanted_result = FALSE;
  }

  /* Check whether the given predicate is true for this tuple. */
  if(adt->lvm_instance == NULL ||
     lvm_execute(adt->lvm_instance) == wanted_result) {
    if(AQL_GET_FLAGS(adt) & AQL_FLAG_AGGREGATE) {
      for(attr_map_ptr = attr_map; attr_map_ptr < attr_map_end; attr_map_ptr++) {
        from_ptr = row + attr_map_ptr->from_offset;
        result = db_phy_to_value(&value, attr_map_ptr->to_attr, from_ptr);
        if(DB_ERROR(result)) {
	  return result;
        }
        aggregate(attr_map_ptr->to_attr, &value);
      }
    } else {
      if(AQL_GET_FLAGS(adt) & AQL_FLAG_ASSIGN) {
        if(DB_ERROR(storage_put_row(handle->result_rel, result_row))) {
          PRINTF("DB: Failed to store a row in the result relation!\n");
          return DB_STORAGE_ERROR;
        }
      }
      handle->current_row++;
      return DB_GOT_ROW;
    }
  }

  return DB_OK;

end_aggregation:
  /* Generate aggregated result if requested. */
  for(attr_map_ptr = attr_map; attr_map_ptr < attr_map_end; attr_map_ptr++) {
    result_attr = attr_map_ptr->to_attr;
    to_ptr = result_row + attr_map_ptr->to_offset;

    intbuf[0] = result_attr->aggregation_value >> 8;
    intbuf[1] = result_attr->aggregation_value & 0xff;
    from_ptr = intbuf;
    memcpy(to_ptr, from_ptr, result_attr->element_size);
  }

  if(AQL_GET_FLAGS(adt) & AQL_FLAG_ASSIGN) {
    if(DB_ERROR(storage_put_row(handle->result_rel, result_row))) {
      PRINTF("DB: Failed to store a row in the result relation!\n");
      return DB_STORAGE_ERROR;
    }
  }

  handle->current_row = 1;
  AQL_GET_FLAGS(adt) &= ~AQL_FLAG_AGGREGATE; /* Stop the aggregation. */

  return DB_GOT_ROW;
}

db_result_t
relation_select(void *handle_ptr, relation_t *rel, void *adt_ptr)
{
  aql_adt_t *adt;
  db_handle_t *handle;
  char *name;
  db_direction_t dir;
  char *attribute_name;
  attribute_t *attr;
  int i;
  int normal_attributes;

  adt = (aql_adt_t *)adt_ptr;

  handle = (db_handle_t *)handle_ptr;
  handle->rel = rel;
  handle->adt = adt;

  if(AQL_GET_FLAGS(adt) & AQL_FLAG_ASSIGN) {
    name = adt->relations[0];
    dir = DB_STORAGE;
  } else {
    name = RESULT_RELATION;
    dir = DB_MEMORY;
  }
  relation_remove(name, 1);
  relation_create(name, dir);
  handle->result_rel = relation_load(name);

  if(handle->result_rel == NULL) {
    PRINTF("DB: Failed to load a relation for the query result\n");
    return DB_ALLOCATION_ERROR;
  }

  for(i = normal_attributes = 0; i < AQL_ATTRIBUTE_COUNT(adt); i++) {
    attribute_name = adt->attributes[i].name;

    attr = relation_attribute_get(rel, attribute_name);
    if(attr == NULL) {
      PRINTF("DB: Select for invalid attribute %s in relation %s!\n",
	     attribute_name, rel->name);
      return DB_NAME_ERROR;
    }

    PRINTF("DB: Found attribute %s in relation %s\n",
	attribute_name, rel->name);

    attr = relation_attribute_add(handle->result_rel, dir,
				  attribute_name, 
				  adt->aggregators[i] ? DOMAIN_INT : attr->domain,
				  attr->element_size);
    if(attr == NULL) {
      PRINTF("DB: Failed to add a result attribute\n");
      relation_release(handle->result_rel);
      return DB_ALLOCATION_ERROR;
    }

    attr->aggregator = adt->aggregators[i];
    switch(attr->aggregator) {
    case AQL_NONE:
      if(!(adt->attributes[i].flags & ATTRIBUTE_FLAG_NO_STORE)) {
        /* Only count attributes projected into the result set. */
        normal_attributes++;
      }
      break;
    case AQL_MAX:
      attr->aggregation_value = LONG_MIN;
      break;
    case AQL_MIN:
      attr->aggregation_value = LONG_MAX;
      break;
    default:
      attr->aggregation_value = 0;
      break;
    }

    attr->flags = adt->attributes[i].flags;
  }

  /* Preclude mixes of normal attributes and aggregated ones in 
     selection results. */
  if(normal_attributes > 0 &&
     handle->result_rel->attribute_count > normal_attributes) {
     return DB_RELATIONAL_ERROR;
  }

  return generate_selection_result(handle, rel, adt);
}

#if DB_FEATURE_JOIN
db_result_t
relation_process_join(void *handle_ptr)
{
  db_handle_t *handle;
  db_result_t result;
  relation_t *left_rel;
  relation_t *right_rel;
  relation_t *join_rel;
  unsigned char *join_next_attribute_ptr;
  size_t element_size;
  tuple_id_t right_tuple_id;
  attribute_value_t value;
  int i;

  handle = (db_handle_t *)handle_ptr;
  left_rel = handle->left_rel;
  right_rel = handle->right_rel;
  join_rel = handle->join_rel;

  if(!(handle->flags & DB_HANDLE_FLAG_INDEX_STEP)) {
    goto inner_loop;
  }

  /* Equi-join for indexed attributes only. In the outer loop, we iterate over
     each tuple in the left relation. */
  for(handle->tuple_id = 0;; handle->tuple_id++) {
    result = storage_get_row(left_rel, &handle->tuple_id, left_row);
    if(DB_ERROR(result)) {
      PRINTF("DB: Failed to get a row in left relation %s!\n", left_rel->name);
      return result;
    } else if(result == DB_FINISHED) {
      return DB_FINISHED;
    }

    if(DB_ERROR(relation_get_value(left_rel, handle->left_join_attr, left_row, &value))) {
      PRINTF("DB: Failed to get a value of the attribute \"%s\" to join on\n",
	handle->left_join_attr->name);
      return DB_IMPLEMENTATION_ERROR;
    }

    if(DB_ERROR(index_get_iterator(&handle->index_iterator, 
                                   handle->right_join_attr->index, 
                                   &value, &value))) { 
      PRINTF("DB: Failed to get an index iterator\n");
      return DB_INDEX_ERROR;
    }
    handle->flags &= ~DB_HANDLE_FLAG_INDEX_STEP;

    /* In the inner loop, we iterate over all rows with a matching value for the
       join attribute. The index component provides an iterator for this purpose. */
inner_loop:
    for(;;) {
      /* Get all rows matching the attribute value in the right relation. */
      right_tuple_id = index_get_next(&handle->index_iterator);
      if(right_tuple_id == INVALID_TUPLE) {
        /* Exclude this row from the left relation in the result,
           and step to the next value in the index iteration. */
        handle->flags |= DB_HANDLE_FLAG_INDEX_STEP;
        break;
      }

      result = storage_get_row(right_rel, &right_tuple_id, right_row);
      if(DB_ERROR(result)) {
        PRINTF("DB: Failed to get a row in right relation %s!\n", right_rel->name);
        return result;
      } else if(result == DB_FINISHED) {
	PRINTF("DB: The index refers to an invalid row: %lu\n",
	       (unsigned long)right_tuple_id);
        return DB_IMPLEMENTATION_ERROR;
      }

      /* Use the source attribute map to fill in the physical representation
	 of the resulting tuple. */
      join_next_attribute_ptr = join_row;

      for(i = 0; i < join_rel->attribute_count; i++) {
	element_size = source_map[i].attr->element_size;

	memcpy(join_next_attribute_ptr, source_map[i].from_ptr, element_size);
	join_next_attribute_ptr += element_size;
      }

      if(((aql_adt_t *)handle->adt)->flags & AQL_FLAG_ASSIGN) {
        if(DB_ERROR(storage_put_row(join_rel, join_row))) {
          return DB_STORAGE_ERROR;
        }
      }

      handle->current_row++;
      return DB_GOT_ROW;
    }
  }

  return DB_OK;
}

static db_result_t
generate_join_result(db_handle_t *handle)
{
  relation_t *left_rel;
  relation_t *right_rel;
  relation_t *join_rel;
  attribute_t *attr;
  attribute_t *result_attr;
  struct source_map *source_pair;
  int i;
  int offset;
  unsigned char *from_ptr;

  handle->tuple = (tuple_t)join_row;
  handle->tuple_id = 0;

  left_rel = handle->left_rel;
  right_rel = handle->right_rel;
  join_rel = handle->join_rel;

  /* Generate a map over the source attributes for each
     attribute in the join relation. */
  for(i = 0, result_attr = list_head(join_rel->attributes);
      result_attr != NULL; 
      result_attr = result_attr->next, i++) {
    source_pair = &source_map[i];
    attr = attribute_find(left_rel, result_attr->name);
    if(attr != NULL) {
      offset = get_attribute_value_offset(left_rel, attr);
      from_ptr = left_row + offset;
    } else if((attr = attribute_find(right_rel, result_attr->name)) != NULL) {
      offset = get_attribute_value_offset(right_rel, attr);
      from_ptr = right_row + offset;
    } else {
      PRINTF("DB: The attribute %s could not be found\n", result_attr->name);
      return DB_NAME_ERROR;
    }

    if(offset < 0) {
      PRINTF("DB: Unable to retrieve attribute values for the JOIN result\n");
      return DB_IMPLEMENTATION_ERROR;
    }
    
    source_pair->attr = attr;
    source_pair->from_ptr = from_ptr;
  }

  handle->flags |= DB_HANDLE_FLAG_PROCESSING;

  return DB_OK;
}

db_result_t
relation_join(void *query_result, void *adt_ptr)
{
  aql_adt_t *adt;
  db_handle_t *handle;
  relation_t *left_rel;
  relation_t *right_rel;
  relation_t *join_rel;
  char *name;
  db_direction_t dir;
  int i;
  char *attribute_name;
  attribute_t *attr;

  adt = (aql_adt_t *)adt_ptr;

  handle = (db_handle_t *)query_result;
  handle->current_row = 0;
  handle->ncolumns = 0;
  handle->adt = adt;
  handle->flags = DB_HANDLE_FLAG_INDEX_STEP;

  if(AQL_GET_FLAGS(adt) & AQL_FLAG_ASSIGN) {
    name = adt->relations[0];
    dir = DB_STORAGE;
  } else {
    name = RESULT_RELATION;
    dir = DB_MEMORY;
  }
  relation_remove(name, 1);
  relation_create(name, dir);
  join_rel = relation_load(name);
  handle->result_rel = join_rel;

  if(join_rel == NULL) {
    PRINTF("DB: Failed to create a join relation!\n");
    return DB_ALLOCATION_ERROR;
  }

  handle->join_rel = handle->result_rel = join_rel;
  left_rel = handle->left_rel;
  right_rel = handle->right_rel;

  handle->left_join_attr = relation_attribute_get(left_rel, adt->attributes[0].name);
  handle->right_join_attr = relation_attribute_get(right_rel, adt->attributes[0].name);
  if(handle->left_join_attr == NULL || handle->right_join_attr == NULL) {
    PRINTF("DB: The attribute (\"%s\") to join on does not exist in both relations\n",
	adt->attributes[0].name);
    return DB_RELATIONAL_ERROR;
  }

  if(!index_exists(handle->right_join_attr)) {
    PRINTF("DB: The attribute to join on is not indexed\n");
    return DB_INDEX_ERROR;
  }

  /*
   * Define the resulting relation. We start from 1 when counting attributes
   * because the first attribute is only the one to join, and is not included
   * by default in the projected attributes.
   */
  for(i = 1; i < AQL_ATTRIBUTE_COUNT(adt); i++) {
    attribute_name = adt->attributes[i].name;
    attr = relation_attribute_get(left_rel, attribute_name);
    if(attr == NULL) {
      attr = relation_attribute_get(right_rel, attribute_name);
      if(attr == NULL) {
	PRINTF("DB: The projection attribute \"%s\" does not exist in any of the relations to join\n",
		attribute_name);
        return DB_RELATIONAL_ERROR;
      }
    }

    if(relation_attribute_add(join_rel, dir, attr->name, attr->domain, 
                              attr->element_size) == NULL) {
      PRINTF("DB: Failed to add an attribute to the join relation\n");
      return DB_ALLOCATION_ERROR;
    }

    handle->ncolumns++;
  }

  return generate_join_result(handle);
}
#endif /* DB_FEATURE_JOIN */

tuple_id_t
relation_cardinality(relation_t *rel)
{
  tuple_id_t tuple_id;


  if(rel->cardinality != INVALID_TUPLE) {
    return rel->cardinality;
  }

  if(!RELATION_HAS_TUPLES(rel)) {
    return 0;
  }

  if(DB_ERROR(storage_get_row_amount(rel, &tuple_id))) {
    return INVALID_TUPLE;
  }

  rel->cardinality = tuple_id;

  PRINTF("DB: Relation %s has cardinality %lu\n", rel->name,
	(unsigned long)tuple_id);

  return tuple_id;
}