From 147c4696c8c5ccea90db20e487e3fc88229ddc06 Mon Sep 17 00:00:00 2001 From: Justin Balthrop Date: Thu, 19 Nov 2009 11:13:38 -0800 Subject: [PATCH] replace Bdb::Simple with Bdb::Database --- lib/bdb/database.rb | 263 +++++++++++++++++++++++++++++++++++++++++ lib/bdb/environment.rb | 118 ++++++++++++++++++ lib/bdb/simple.rb | 200 ------------------------------- 3 files changed, 381 insertions(+), 200 deletions(-) create mode 100644 lib/bdb/database.rb create mode 100644 lib/bdb/environment.rb delete mode 100644 lib/bdb/simple.rb diff --git a/lib/bdb/database.rb b/lib/bdb/database.rb new file mode 100644 index 0000000..51afa6c --- /dev/null +++ b/lib/bdb/database.rb @@ -0,0 +1,263 @@ +require 'bdb' +require 'tuple' +require File.dirname(__FILE__) + '/environment' + +class Bdb::Database + def initialize(name, opts = {}) + @name = name + @config = Bdb::Environment.config.merge(opts) + @indexes = {} + end + attr_reader :name, :indexes + + def config(config = {}) + @config.merge!(config) + end + + def index_by(field, opts = {}) + raise "index on #{field} already exists" if indexes[field] + indexes[field] = opts + end + + def db(index = nil) + if @db.nil? + @db = {} + transaction(false) do + primary_db = environment.env.db + primary_db.pagesize = config[:page_size] if config[:page_size] + primary_db.open(transaction, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0) + @db[:primary] = primary_db + + indexes.each do |field, opts| + index_callback = lambda do |db, key, data| + value = Marshal.load(data) + index_key = value.kind_of?(Hash) ? value[:field] : value.send(field) + if opts[:multi_key] and index_key.kind_of?(Array) + # Index multiple keys. If the key is an array, you must wrap it with an outer array. + index_key.collect {|k| Tuple.dump(k)} + elsif index_key + # Index a single key. + Tuple.dump(index_key) + end + end + index_db = environment.env.db + index_db.flags = Bdb::DB_DUPSORT unless opts[:unique] + index_db.pagesize = config[:page_size] if config[:page_size] + index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0) + primary_db.associate(transaction, index_db, Bdb::DB_CREATE, index_callback) + @db[field] = index_db + end + end + end + @db[index || :primary] + end + + def close + return unless @db + synchronize do + @db.each {|field, db| db.close(0)} + @db = nil + end + end + + def count(field, key) + with_cursor(db(field)) do |cursor| + k, v = cursor.get(Tuple.dump(key), nil, Bdb::DB_SET) + k ? cursor.count : 0 + end + end + + def get(*keys, &block) + opts = keys.last.kind_of?(Hash) ? keys.pop : {} + db = db(opts[:field]) + set = ResultSet.new(opts, &block) + flags = opts[:modify] ? Bdb::DB_RMW : 0 + flags = 0 if environment.disable_transactions? + + keys.each do |key| + if opts[:partial] and not key.kind_of?(Range) and not key == :all + first = [*key] + last = first + [true] + key = first..last + end + + if key == :all + with_cursor(db) do |cursor| + if opts[:reverse] + k,v = cursor.get(nil, nil, Bdb::DB_LAST | flags) # Start at the last item. + iter = lambda {cursor.get(nil, nil, Bdb::DB_PREV | flags)} # Move backward. + else + k,v = cursor.get(nil, nil, Bdb::DB_FIRST | flags) # Start at the first item. + iter = lambda {cursor.get(nil, nil, Bdb::DB_NEXT | flags)} # Move forward. + end + + while k + set << unmarshal(v, :tuple => k) + k,v = iter.call + end + end + elsif key.kind_of?(Range) + # Fetch a range of keys. + with_cursor(db) do |cursor| + first = Tuple.dump(key.first) + last = Tuple.dump(key.last) + + # Return false once we pass the end of the range. + cond = key.exclude_end? ? lambda {|k| k < last} : lambda {|k| k <= last} + if opts[:reverse] + iter = lambda {cursor.get(nil, nil, Bdb::DB_PREV | flags)} # Move backward. + + # Position the cursor at the end of the range. + k,v = cursor.get(last, nil, Bdb::DB_SET_RANGE | flags) || cursor.get(nil, nil, Bdb::DB_LAST | flags) + while k and not cond.call(k) + k,v = iter.call + end + + cond = lambda {|k| k >= first} # Change the condition to stop when we move past the start. + else + k,v = cursor.get(first, nil, Bdb::DB_SET_RANGE | flags) # Start at the beginning of the range. + iter = lambda {cursor.get(nil, nil, Bdb::DB_NEXT | flags)} # Move forward. + end + + while k and cond.call(k) + set << unmarshal(v, :tuple => k) + k,v = iter.call + end + end + else + if (db.flags & Bdb::DB_DUPSORT) == 0 + synchronize do + # There can only be one item for each key. + data = db.get(transaction, Tuple.dump(key), nil, flags) + set << unmarshal(data, :key => key) if data + end + else + # Have to use a cursor because there may be multiple items with each key. + with_cursor(db) do |cursor| + k,v = cursor.get(Tuple.dump(key), nil, Bdb::DB_SET | flags) + while k + set << unmarshal(v, :tuple => k) + k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP | flags) + end + end + end + end + end + set.results + rescue ResultSet::LimitReached + set.results + end + + def set(key, value, opts = {}) + synchronize do + key = Tuple.dump(key) + value = Marshal.dump(value) + flags = opts[:create] ? Bdb::DB_NOOVERWRITE : 0 + db.put(transaction, key, value, flags) + end + end + + def delete(key) + synchronize do + key = Tuple.dump(key) + db.del(transaction, key, 0) + end + end + + # Deletes all records in the database. Beware! + def truncate! + synchronize do + db.truncate(transaction) + end + end + + def environment + @environment ||= Bdb::Environment.new(config[:path], self) + end + + def transaction(nested = true, &block) + environment.transaction(nested, &block) + end + + def synchronize(&block) + environment.synchronize(&block) + end + + def checkpoint(opts = {}) + environment.synchronize(opts) + end + +private + + def unmarshal(value, opts = {}) + value = Marshal.load(value) + value.bdb_locator_key = opts[:tuple] ? Tuple.load(opts[:tuple]) : [*opts[:key]] + value + end + + def with_cursor(db) + synchronize do + begin + cursor = db.cursor(transaction, 0) + yield(cursor) + ensure + cursor.close if cursor + end + end + end + + class ResultSet + class LimitReached < Exception; end + + def initialize(opts, &block) + @block = block + @count = 0 + @limit = opts[:limit] || opts[:per_page] + @limit = @limit.to_i if @limit + @offset = opts[:offset] || (opts[:page] ? @limit * (opts[:page] - 1) : 0) + @offset = @offset.to_i if @offset + + if @group = opts[:group] + raise 'block not supported with group' if @block + @results = {} + else + @results = [] + end + end + attr_reader :count, :group, :limit, :offset, :results + + def <<(item) + key = item[1] + @count += 1 + return if count <= offset + + raise LimitReached if limit and count > limit + offset + + if group + group_key = group.is_a?(Fixnum) ? key[0,group] : key + (results[group_key] ||= []) << item + elsif @block + @block.call(item) + else + results << item + end + end + end +end + +class Object + attr_accessor :bdb_locator_key +end + +# Array comparison should try Tuple comparison first. +class Array + cmp = instance_method(:<=>) + + define_method(:<=>) do |other| + begin + Tuple.dump(self) <=> Tuple.dump(other) + rescue TypeError => e + cmp.bind(self).call(other) + end + end +end diff --git a/lib/bdb/environment.rb b/lib/bdb/environment.rb new file mode 100644 index 0000000..1a907eb --- /dev/null +++ b/lib/bdb/environment.rb @@ -0,0 +1,118 @@ +class Bdb::Environment + @@env = {} + def self.new(path, database) + @@env[path] ||= super(path) + @@env[path].databases << database + @@env[path] + end + + def self.config(config = {}) + @config ||= { + :max_locks => 5000, + :lock_timeout => 30 * 1000 * 1000, + :txn_timeout => 30 * 1000 * 1000, + :cache_size => 1 * 1024 * 1024, + } + @config.merge!(config) + end + + def config(config = {}) + @config ||= self.class.config + @config.merge!(config) + end + + def initialize(path) + @path = path + end + attr_reader :path + + def databases + @databases ||= [] + end + + def env + if @env.nil? + synchronize do + @env = Bdb::Env.new(0) + if disable_transactions? + env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_MPOOL + else + env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_INIT_LOCK | + Bdb::DB_REGISTER | Bdb::DB_RECOVER | Bdb::DB_INIT_MPOOL + end + + @env.cachesize = config[:cache_size] if config[:cache_size] + @env.set_timeout(config[:txn_timeout], Bdb::DB_SET_TXN_TIMEOUT) if config[:txn_timeout] + @env.set_timeout(config[:lock_timeout], Bdb::DB_SET_LOCK_TIMEOUT) if config[:lock_timeout] + @env.set_lk_max_locks(config[:max_locks]) if config[:max_locks] + @env.set_lk_detect(Bdb::DB_LOCK_RANDOM) + @env.flags_on = Bdb::DB_TXN_WRITE_NOSYNC | Bdb::DB_TIME_NOTGRANTED + @env.open(path, env_flags, 0) + + @exit_handler ||= at_exit { close } + end + end + @env + end + + def close + return unless @env + synchronize do + databases.each {|database| database.close} + @env.close + @env = nil + end + end + + def transaction(nested = true) + return @transaction unless block_given? + return yield if disable_transactions? + + synchronize do + parent = @transaction + begin + @transaction = env.txn_begin(nested ? parent : nil, 0) + value = yield + @transaction.commit(0) + @transaction = nil + value + ensure + @transaction.abort if @transaction + @transaction = parent + end + end + end + + def checkpoint(opts = {}) + return if disable_transactions? + env.txn_checkpoint(opts[:kbyte] || 0, opts[:min] || 0, opts[:force] ? Bdb::DB_FORCE : 0) + end + + def disable_transactions? + config[:disable_transactions] + end + + def synchronize + @mutex ||= Mutex.new + if @thread_id == thread_id + yield + else + @mutex.synchronize do + begin + @thread_id = thread_id + Thread.exclusive { yield } + ensure + @thread_id = nil + end + end + end + rescue Bdb::DbError => e + exit!(9) if e.code == Bdb::DB_RUNRECOVERY + retry if transaction.nil? and e.code == Bdb::DB_LOCK_DEADLOCK + raise e + end + + def thread_id + Thread.current.object_id + end +end diff --git a/lib/bdb/simple.rb b/lib/bdb/simple.rb deleted file mode 100644 index 4cfdbd2..0000000 --- a/lib/bdb/simple.rb +++ /dev/null @@ -1,200 +0,0 @@ -require 'bdb' if not defined?(Bdb) - -class Bdb::Simple - include Enumerable - - def initialize(path, opts = {}) - @dup = opts[:dup] ? true : false - - if opts[:raw] - @raw = true - @sort = false - else - @raw = false - @sort = opts[:sort] == false ? false : true - end - - @name = opts[:name] || 'default' - @path = path - end - - def dup?; @dup; end - def sort?; @sort; end - def raw?; @raw; end - - attr_reader :path, :name - - def env - if @env.nil? - @env = Bdb::Env.new(0) - env_flags = Bdb::DB_CREATE | # Create the environment if it does not already exist. - Bdb::DB_INIT_TXN | # Initialize transactions. - Bdb::DB_INIT_LOCK | # Initialize locking. - Bdb::DB_INIT_LOG | # Initialize logging. - Bdb::DB_INIT_MPOOL # Initialize the in-memory cache. - # @env.set_lk_detect(Bdb::DB_LOCK_DEFAULT) - @env.open(path, env_flags, 0); - end - @env - end - - def db - if @db.nil? - @db = env.db - @db.flags = Bdb::DB_DUPSORT if dup? - if sort? - @db.btree_compare = lambda do |db, key1, key2| - self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2)) - end - end - @db.open(nil, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE | Bdb::DB_AUTO_COMMIT, 0) - end - @db - end - - def []=(key, value) - db.put(nil, dump(key), dump(value), 0) - end - - def delete(key) - db.del(nil, dump(key), 0) - end - - def update(key) - k = dump(key) - txn = env.txn_begin(nil, 0) - begin - v = db.get(txn, k, nil, Bdb::DB_RMW) - value = yield(load(v)) - db.put(txn, k, dump(value), 0) - txn.commit(0) - rescue Exception => e - txn.abort - raise e - end - value - end - - def [](key) - if key.kind_of?(Range) or dup? - Bdb::SimpleSet.new(self, key) - else - v = db.get(nil, dump(key), nil, 0) - load(v) - end - end - - def each - cursor = db.cursor(nil, 0) - while data = cursor.get(nil, nil, Bdb::DB_NEXT) - key = load(data[0]) - value = load(data[1]) - yield(key, value) - end - cursor.close - end - - def close - db.close(0) - env.close - @db = nil - @env = nil - end - - CLASS_ORDER = {} - [FalseClass, TrueClass, Fixnum, Numeric, Float, Symbol, String, Array].each_with_index {|c, i| CLASS_ORDER[c] = i} - - def self.compare_absolute(left, right) - if left.is_a?(right.class) - case left - when Array - # Arrays: compare one element at a time. - n = [left.size, right.size].min - n.times do |i| - comp = compare_absolute(left[i], right[i]) - return comp if comp != 0 - end - left.size <=> right.size - when Hash - # Hashes: sort the keys and compare as an array of arrays. This may be slow. - left = left.to_a.sort {|a,b| compare_absolute(a[0],b[0])} - right = right.to_a.sort {|a,b| compare_absolute(a[0],b[0])} - compare_absolute(left, right) - when NilClass, TrueClass, FalseClass - 0 - when Symbol - left.to_s <=> right.to_s - else - # Use the spaceship operator. - left <=> right - end - elsif left.kind_of?(Numeric) and right.kind_of?(Numeric) - # Numerics are always comparable. - left <=> right - else - # Nil is the smallest. Hash is the largest. - return -1 if left.is_a?(NilClass) or right.is_a?(Hash) - return 1 if left.is_a?(Hash) or right.is_a?(NilClass) - - # Try to use the class sort order so we don't have to do a string comparison. - left_order = CLASS_ORDER[left.class] - right_order = CLASS_ORDER[right.class] - if left_order.nil? and right_order.nil? - left.class.name <=> right.class.name - else - (left_order || 9999) <=> (right_order || 9999) - end - end - end - -private - - def load(value) - return unless value - raw? ? value : Marshal.load(value) - end - - def dump(value) - raw? ? value.to_s : Marshal.dump(value) - end -end - -class Bdb::SimpleSet - include Enumerable - - def initialize(source, key) - @source = source - @key = key - end - attr_reader :source, :key - - def each - if key.kind_of?(Range) - cursor = source.db.cursor(nil, 0) - k,v = cursor.get(dump(key.first), nil, Bdb::DB_SET_RANGE) - while k and key.include?(load(k)) - yield load(v) - k, v = cursor.get(nil, nil, Bdb::DB_NEXT) - end - cursor.close - else - cursor = source.db.cursor(nil, 0) - k,v = cursor.get(dump(key), nil, Bdb::DB_SET) - while k - yield load(v) - k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP) - end - cursor.close - end - end - -private - - def load(value) - source.send(:load, value) - end - - def dump(value) - source.send(:dump, value) - end -end