replace Bdb::Simple with Bdb::Database

master
Justin Balthrop 2009-11-19 11:13:38 -08:00
parent 9809bb805d
commit 147c4696c8
3 changed files with 381 additions and 200 deletions

263
lib/bdb/database.rb Normal file
View File

@ -0,0 +1,263 @@
require 'bdb'
require 'tuple'
require File.dirname(__FILE__) + '/environment'
class Bdb::Database
def initialize(name, opts = {})
@name = name
@config = Bdb::Environment.config.merge(opts)
@indexes = {}
end
attr_reader :name, :indexes
def config(config = {})
@config.merge!(config)
end
def index_by(field, opts = {})
raise "index on #{field} already exists" if indexes[field]
indexes[field] = opts
end
def db(index = nil)
if @db.nil?
@db = {}
transaction(false) do
primary_db = environment.env.db
primary_db.pagesize = config[:page_size] if config[:page_size]
primary_db.open(transaction, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0)
@db[:primary] = primary_db
indexes.each do |field, opts|
index_callback = lambda do |db, key, data|
value = Marshal.load(data)
index_key = value.kind_of?(Hash) ? value[:field] : value.send(field)
if opts[:multi_key] and index_key.kind_of?(Array)
# Index multiple keys. If the key is an array, you must wrap it with an outer array.
index_key.collect {|k| Tuple.dump(k)}
elsif index_key
# Index a single key.
Tuple.dump(index_key)
end
end
index_db = environment.env.db
index_db.flags = Bdb::DB_DUPSORT unless opts[:unique]
index_db.pagesize = config[:page_size] if config[:page_size]
index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0)
primary_db.associate(transaction, index_db, Bdb::DB_CREATE, index_callback)
@db[field] = index_db
end
end
end
@db[index || :primary]
end
def close
return unless @db
synchronize do
@db.each {|field, db| db.close(0)}
@db = nil
end
end
def count(field, key)
with_cursor(db(field)) do |cursor|
k, v = cursor.get(Tuple.dump(key), nil, Bdb::DB_SET)
k ? cursor.count : 0
end
end
def get(*keys, &block)
opts = keys.last.kind_of?(Hash) ? keys.pop : {}
db = db(opts[:field])
set = ResultSet.new(opts, &block)
flags = opts[:modify] ? Bdb::DB_RMW : 0
flags = 0 if environment.disable_transactions?
keys.each do |key|
if opts[:partial] and not key.kind_of?(Range) and not key == :all
first = [*key]
last = first + [true]
key = first..last
end
if key == :all
with_cursor(db) do |cursor|
if opts[:reverse]
k,v = cursor.get(nil, nil, Bdb::DB_LAST | flags) # Start at the last item.
iter = lambda {cursor.get(nil, nil, Bdb::DB_PREV | flags)} # Move backward.
else
k,v = cursor.get(nil, nil, Bdb::DB_FIRST | flags) # Start at the first item.
iter = lambda {cursor.get(nil, nil, Bdb::DB_NEXT | flags)} # Move forward.
end
while k
set << unmarshal(v, :tuple => k)
k,v = iter.call
end
end
elsif key.kind_of?(Range)
# Fetch a range of keys.
with_cursor(db) do |cursor|
first = Tuple.dump(key.first)
last = Tuple.dump(key.last)
# Return false once we pass the end of the range.
cond = key.exclude_end? ? lambda {|k| k < last} : lambda {|k| k <= last}
if opts[:reverse]
iter = lambda {cursor.get(nil, nil, Bdb::DB_PREV | flags)} # Move backward.
# Position the cursor at the end of the range.
k,v = cursor.get(last, nil, Bdb::DB_SET_RANGE | flags) || cursor.get(nil, nil, Bdb::DB_LAST | flags)
while k and not cond.call(k)
k,v = iter.call
end
cond = lambda {|k| k >= first} # Change the condition to stop when we move past the start.
else
k,v = cursor.get(first, nil, Bdb::DB_SET_RANGE | flags) # Start at the beginning of the range.
iter = lambda {cursor.get(nil, nil, Bdb::DB_NEXT | flags)} # Move forward.
end
while k and cond.call(k)
set << unmarshal(v, :tuple => k)
k,v = iter.call
end
end
else
if (db.flags & Bdb::DB_DUPSORT) == 0
synchronize do
# There can only be one item for each key.
data = db.get(transaction, Tuple.dump(key), nil, flags)
set << unmarshal(data, :key => key) if data
end
else
# Have to use a cursor because there may be multiple items with each key.
with_cursor(db) do |cursor|
k,v = cursor.get(Tuple.dump(key), nil, Bdb::DB_SET | flags)
while k
set << unmarshal(v, :tuple => k)
k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP | flags)
end
end
end
end
end
set.results
rescue ResultSet::LimitReached
set.results
end
def set(key, value, opts = {})
synchronize do
key = Tuple.dump(key)
value = Marshal.dump(value)
flags = opts[:create] ? Bdb::DB_NOOVERWRITE : 0
db.put(transaction, key, value, flags)
end
end
def delete(key)
synchronize do
key = Tuple.dump(key)
db.del(transaction, key, 0)
end
end
# Deletes all records in the database. Beware!
def truncate!
synchronize do
db.truncate(transaction)
end
end
def environment
@environment ||= Bdb::Environment.new(config[:path], self)
end
def transaction(nested = true, &block)
environment.transaction(nested, &block)
end
def synchronize(&block)
environment.synchronize(&block)
end
def checkpoint(opts = {})
environment.synchronize(opts)
end
private
def unmarshal(value, opts = {})
value = Marshal.load(value)
value.bdb_locator_key = opts[:tuple] ? Tuple.load(opts[:tuple]) : [*opts[:key]]
value
end
def with_cursor(db)
synchronize do
begin
cursor = db.cursor(transaction, 0)
yield(cursor)
ensure
cursor.close if cursor
end
end
end
class ResultSet
class LimitReached < Exception; end
def initialize(opts, &block)
@block = block
@count = 0
@limit = opts[:limit] || opts[:per_page]
@limit = @limit.to_i if @limit
@offset = opts[:offset] || (opts[:page] ? @limit * (opts[:page] - 1) : 0)
@offset = @offset.to_i if @offset
if @group = opts[:group]
raise 'block not supported with group' if @block
@results = {}
else
@results = []
end
end
attr_reader :count, :group, :limit, :offset, :results
def <<(item)
key = item[1]
@count += 1
return if count <= offset
raise LimitReached if limit and count > limit + offset
if group
group_key = group.is_a?(Fixnum) ? key[0,group] : key
(results[group_key] ||= []) << item
elsif @block
@block.call(item)
else
results << item
end
end
end
end
class Object
attr_accessor :bdb_locator_key
end
# Array comparison should try Tuple comparison first.
class Array
cmp = instance_method(:<=>)
define_method(:<=>) do |other|
begin
Tuple.dump(self) <=> Tuple.dump(other)
rescue TypeError => e
cmp.bind(self).call(other)
end
end
end

118
lib/bdb/environment.rb Normal file
View File

@ -0,0 +1,118 @@
class Bdb::Environment
@@env = {}
def self.new(path, database)
@@env[path] ||= super(path)
@@env[path].databases << database
@@env[path]
end
def self.config(config = {})
@config ||= {
:max_locks => 5000,
:lock_timeout => 30 * 1000 * 1000,
:txn_timeout => 30 * 1000 * 1000,
:cache_size => 1 * 1024 * 1024,
}
@config.merge!(config)
end
def config(config = {})
@config ||= self.class.config
@config.merge!(config)
end
def initialize(path)
@path = path
end
attr_reader :path
def databases
@databases ||= []
end
def env
if @env.nil?
synchronize do
@env = Bdb::Env.new(0)
if disable_transactions?
env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_MPOOL
else
env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_INIT_LOCK |
Bdb::DB_REGISTER | Bdb::DB_RECOVER | Bdb::DB_INIT_MPOOL
end
@env.cachesize = config[:cache_size] if config[:cache_size]
@env.set_timeout(config[:txn_timeout], Bdb::DB_SET_TXN_TIMEOUT) if config[:txn_timeout]
@env.set_timeout(config[:lock_timeout], Bdb::DB_SET_LOCK_TIMEOUT) if config[:lock_timeout]
@env.set_lk_max_locks(config[:max_locks]) if config[:max_locks]
@env.set_lk_detect(Bdb::DB_LOCK_RANDOM)
@env.flags_on = Bdb::DB_TXN_WRITE_NOSYNC | Bdb::DB_TIME_NOTGRANTED
@env.open(path, env_flags, 0)
@exit_handler ||= at_exit { close }
end
end
@env
end
def close
return unless @env
synchronize do
databases.each {|database| database.close}
@env.close
@env = nil
end
end
def transaction(nested = true)
return @transaction unless block_given?
return yield if disable_transactions?
synchronize do
parent = @transaction
begin
@transaction = env.txn_begin(nested ? parent : nil, 0)
value = yield
@transaction.commit(0)
@transaction = nil
value
ensure
@transaction.abort if @transaction
@transaction = parent
end
end
end
def checkpoint(opts = {})
return if disable_transactions?
env.txn_checkpoint(opts[:kbyte] || 0, opts[:min] || 0, opts[:force] ? Bdb::DB_FORCE : 0)
end
def disable_transactions?
config[:disable_transactions]
end
def synchronize
@mutex ||= Mutex.new
if @thread_id == thread_id
yield
else
@mutex.synchronize do
begin
@thread_id = thread_id
Thread.exclusive { yield }
ensure
@thread_id = nil
end
end
end
rescue Bdb::DbError => e
exit!(9) if e.code == Bdb::DB_RUNRECOVERY
retry if transaction.nil? and e.code == Bdb::DB_LOCK_DEADLOCK
raise e
end
def thread_id
Thread.current.object_id
end
end

View File

@ -1,200 +0,0 @@
require 'bdb' if not defined?(Bdb)
class Bdb::Simple
include Enumerable
def initialize(path, opts = {})
@dup = opts[:dup] ? true : false
if opts[:raw]
@raw = true
@sort = false
else
@raw = false
@sort = opts[:sort] == false ? false : true
end
@name = opts[:name] || 'default'
@path = path
end
def dup?; @dup; end
def sort?; @sort; end
def raw?; @raw; end
attr_reader :path, :name
def env
if @env.nil?
@env = Bdb::Env.new(0)
env_flags = Bdb::DB_CREATE | # Create the environment if it does not already exist.
Bdb::DB_INIT_TXN | # Initialize transactions.
Bdb::DB_INIT_LOCK | # Initialize locking.
Bdb::DB_INIT_LOG | # Initialize logging.
Bdb::DB_INIT_MPOOL # Initialize the in-memory cache.
# @env.set_lk_detect(Bdb::DB_LOCK_DEFAULT)
@env.open(path, env_flags, 0);
end
@env
end
def db
if @db.nil?
@db = env.db
@db.flags = Bdb::DB_DUPSORT if dup?
if sort?
@db.btree_compare = lambda do |db, key1, key2|
self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2))
end
end
@db.open(nil, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE | Bdb::DB_AUTO_COMMIT, 0)
end
@db
end
def []=(key, value)
db.put(nil, dump(key), dump(value), 0)
end
def delete(key)
db.del(nil, dump(key), 0)
end
def update(key)
k = dump(key)
txn = env.txn_begin(nil, 0)
begin
v = db.get(txn, k, nil, Bdb::DB_RMW)
value = yield(load(v))
db.put(txn, k, dump(value), 0)
txn.commit(0)
rescue Exception => e
txn.abort
raise e
end
value
end
def [](key)
if key.kind_of?(Range) or dup?
Bdb::SimpleSet.new(self, key)
else
v = db.get(nil, dump(key), nil, 0)
load(v)
end
end
def each
cursor = db.cursor(nil, 0)
while data = cursor.get(nil, nil, Bdb::DB_NEXT)
key = load(data[0])
value = load(data[1])
yield(key, value)
end
cursor.close
end
def close
db.close(0)
env.close
@db = nil
@env = nil
end
CLASS_ORDER = {}
[FalseClass, TrueClass, Fixnum, Numeric, Float, Symbol, String, Array].each_with_index {|c, i| CLASS_ORDER[c] = i}
def self.compare_absolute(left, right)
if left.is_a?(right.class)
case left
when Array
# Arrays: compare one element at a time.
n = [left.size, right.size].min
n.times do |i|
comp = compare_absolute(left[i], right[i])
return comp if comp != 0
end
left.size <=> right.size
when Hash
# Hashes: sort the keys and compare as an array of arrays. This may be slow.
left = left.to_a.sort {|a,b| compare_absolute(a[0],b[0])}
right = right.to_a.sort {|a,b| compare_absolute(a[0],b[0])}
compare_absolute(left, right)
when NilClass, TrueClass, FalseClass
0
when Symbol
left.to_s <=> right.to_s
else
# Use the spaceship operator.
left <=> right
end
elsif left.kind_of?(Numeric) and right.kind_of?(Numeric)
# Numerics are always comparable.
left <=> right
else
# Nil is the smallest. Hash is the largest.
return -1 if left.is_a?(NilClass) or right.is_a?(Hash)
return 1 if left.is_a?(Hash) or right.is_a?(NilClass)
# Try to use the class sort order so we don't have to do a string comparison.
left_order = CLASS_ORDER[left.class]
right_order = CLASS_ORDER[right.class]
if left_order.nil? and right_order.nil?
left.class.name <=> right.class.name
else
(left_order || 9999) <=> (right_order || 9999)
end
end
end
private
def load(value)
return unless value
raw? ? value : Marshal.load(value)
end
def dump(value)
raw? ? value.to_s : Marshal.dump(value)
end
end
class Bdb::SimpleSet
include Enumerable
def initialize(source, key)
@source = source
@key = key
end
attr_reader :source, :key
def each
if key.kind_of?(Range)
cursor = source.db.cursor(nil, 0)
k,v = cursor.get(dump(key.first), nil, Bdb::DB_SET_RANGE)
while k and key.include?(load(k))
yield load(v)
k, v = cursor.get(nil, nil, Bdb::DB_NEXT)
end
cursor.close
else
cursor = source.db.cursor(nil, 0)
k,v = cursor.get(dump(key), nil, Bdb::DB_SET)
while k
yield load(v)
k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP)
end
cursor.close
end
end
private
def load(value)
source.send(:load, value)
end
def dump(value)
source.send(:dump, value)
end
end