From 9809bb805db74bf3c253b72d917a57c03c948785 Mon Sep 17 00:00:00 2001 From: Justin Balthrop Date: Fri, 16 Oct 2009 17:57:12 -0700 Subject: [PATCH] fix memory allocation error in associate. was causing random DB_SECONDARY_BAD errors if garbage collection reclaimed the space for the secondary key returned by the associate callback --- ext/bdb.c | 33 ++++++----- lib/bdb/simple.rb | 134 +++++++++++++++++++++++++++++++------------- test/simple_test.rb | 60 ++++++++++++++++---- 3 files changed, 162 insertions(+), 65 deletions(-) diff --git a/ext/bdb.c b/ext/bdb.c index b7c3c8e..7ed27ea 100644 --- a/ext/bdb.c +++ b/ext/bdb.c @@ -1091,15 +1091,18 @@ VALUE db_del(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vflags) } void assoc_key(DBT* result, VALUE obj) { - VALUE str=StringValue(obj); + VALUE key = StringValue(obj); + int len = RSTRING_LEN(key); + char *str = malloc(len); + memcpy(str, RSTRING_PTR(key), len); #ifdef DEBUG_DB - fprintf(stderr,"assoc_key %*s", RSTRING_LEN(str),RSTRING_PTR(str)); + fprintf(stderr,"assoc_key %*s", len, str); #endif - result->data=RSTRING_PTR(str); - result->size=RSTRING_LEN(str); - result->flags=LMEMFLAG; + result->size = len; + result->flags = LMEMFLAG | DB_DBT_APPMALLOC; + result->data = str; } VALUE assoc_call(VALUE *args) @@ -1114,7 +1117,7 @@ VALUE assoc_rescue(VALUE *error, VALUE e) *error = e; } -int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result) +int assoc_callback(DB *secdb, const DBT* pkey, const DBT* data, DBT* skey) { t_dbh *dbh; VALUE proc; @@ -1124,12 +1127,12 @@ int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result) VALUE keys; int i; - memset(result,0,sizeof(DBT)); + memset(skey,0,sizeof(DBT)); dbh=secdb->app_private; args[0]=dbh->aproc; args[1]=dbh->self; - args[2]=rb_str_new(key->data,key->size); + args[2]=rb_str_new(pkey->data,pkey->size); args[3]=rb_str_new(data->data,data->size); #ifdef DEBUG_DB @@ -1152,19 +1155,19 @@ int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result) retv=RARRAY(keys)->ptr[0]; break; default: - result->size=RARRAY(keys)->len; - result->flags=DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; - result->data=malloc(result->size * sizeof(DBT)); - memset(result->data,0,result->size * sizeof(DBT)); + skey->size = RARRAY(keys)->len; + skey->flags = LMEMFLAG | DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; + skey->data = malloc(skey->size * sizeof(DBT)); + memset(skey->data, 0, skey->size * sizeof(DBT)); - for (i=0; isize; i++) { - assoc_key(result->data + i*sizeof(DBT), (VALUE)RARRAY(keys)->ptr[i]); + for (i=0; isize; i++) { + assoc_key(skey->data + i * sizeof(DBT), (VALUE)RARRAY(keys)->ptr[i]); } return 0; } } - assoc_key(result, retv); + assoc_key(skey, retv); return 0; } diff --git a/lib/bdb/simple.rb b/lib/bdb/simple.rb index ba75ae8..4cfdbd2 100644 --- a/lib/bdb/simple.rb +++ b/lib/bdb/simple.rb @@ -3,69 +3,102 @@ require 'bdb' if not defined?(Bdb) class Bdb::Simple include Enumerable - def initialize(file, opts = {}) - @dup = opts[:dup] - @file = file + def initialize(path, opts = {}) + @dup = opts[:dup] ? true : false + + if opts[:raw] + @raw = true + @sort = false + else + @raw = false + @sort = opts[:sort] == false ? false : true + end + + @name = opts[:name] || 'default' + @path = path end - def dup? - @dup - end + def dup?; @dup; end + def sort?; @sort; end + def raw?; @raw; end - attr_reader :file + attr_reader :path, :name + + def env + if @env.nil? + @env = Bdb::Env.new(0) + env_flags = Bdb::DB_CREATE | # Create the environment if it does not already exist. + Bdb::DB_INIT_TXN | # Initialize transactions. + Bdb::DB_INIT_LOCK | # Initialize locking. + Bdb::DB_INIT_LOG | # Initialize logging. + Bdb::DB_INIT_MPOOL # Initialize the in-memory cache. + # @env.set_lk_detect(Bdb::DB_LOCK_DEFAULT) + @env.open(path, env_flags, 0); + end + @env + end def db if @db.nil? - @db = Bdb::Db.new + @db = env.db @db.flags = Bdb::DB_DUPSORT if dup? - @db.btree_compare = lambda do |db, key1, key2| - self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2)) + if sort? + @db.btree_compare = lambda do |db, key1, key2| + self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2)) + end end - @db.open(nil, file, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0) + @db.open(nil, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE | Bdb::DB_AUTO_COMMIT, 0) end @db end def []=(key, value) - db[Marshal.dump(key)] = Marshal.dump(value) + db.put(nil, dump(key), dump(value), 0) end def delete(key) - db.del(nil, Marshal.dump(key), 0) + db.del(nil, dump(key), 0) + end + + def update(key) + k = dump(key) + txn = env.txn_begin(nil, 0) + begin + v = db.get(txn, k, nil, Bdb::DB_RMW) + value = yield(load(v)) + db.put(txn, k, dump(value), 0) + txn.commit(0) + rescue Exception => e + txn.abort + raise e + end + value end def [](key) if key.kind_of?(Range) or dup? - Bdb::SimpleSet.new(db, key) + Bdb::SimpleSet.new(self, key) else - v = db.get(nil, Marshal.dump(key), nil, 0) - Marshal.load(v) if v + v = db.get(nil, dump(key), nil, 0) + load(v) end end def each cursor = db.cursor(nil, 0) while data = cursor.get(nil, nil, Bdb::DB_NEXT) - key = Marshal.load(data[0]) - value = Marshal.load(data[1]) + key = load(data[0]) + value = load(data[1]) yield(key, value) end cursor.close end - def sync - db.sync - end - - def clear - close - File.delete(file) if File.exists?(file) - nil - end - def close db.close(0) - @db = nil + env.close + @db = nil + @env = nil end CLASS_ORDER = {} @@ -113,34 +146,55 @@ class Bdb::Simple end end end + +private + + def load(value) + return unless value + raw? ? value : Marshal.load(value) + end + + def dump(value) + raw? ? value.to_s : Marshal.dump(value) + end end class Bdb::SimpleSet include Enumerable - def initialize(db, key) - @db = db - @key = key + def initialize(source, key) + @source = source + @key = key end - attr_reader :db, :key + attr_reader :source, :key def each if key.kind_of?(Range) - cursor = db.cursor(nil, 0) - k,v = cursor.get(Marshal.dump(key.first), nil, Bdb::DB_SET_RANGE) - while k and key.include?(Marshal.load(k)) - yield Marshal.load(v) + cursor = source.db.cursor(nil, 0) + k,v = cursor.get(dump(key.first), nil, Bdb::DB_SET_RANGE) + while k and key.include?(load(k)) + yield load(v) k, v = cursor.get(nil, nil, Bdb::DB_NEXT) end cursor.close else - cursor = db.cursor(nil, 0) - k,v = cursor.get(Marshal.dump(key), nil, Bdb::DB_SET) + cursor = source.db.cursor(nil, 0) + k,v = cursor.get(dump(key), nil, Bdb::DB_SET) while k - yield Marshal.load(v) + yield load(v) k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP) end cursor.close end end + +private + + def load(value) + source.send(:load, value) + end + + def dump(value) + source.send(:dump, value) + end end diff --git a/test/simple_test.rb b/test/simple_test.rb index 809283d..c6209d6 100644 --- a/test/simple_test.rb +++ b/test/simple_test.rb @@ -3,21 +3,27 @@ require File.dirname(__FILE__) + '/../lib/bdb/simple' class SimpleTest < Test::Unit::TestCase def setup - mkdir File.join(File.dirname(__FILE__), 'tmp') - - file = File.join(File.dirname(__FILE__), 'tmp', 'simple.db') - @db = Bdb::Simple.new(file) - - file = File.join(File.dirname(__FILE__), 'tmp', 'simple-dup.db') - @dbd = Bdb::Simple.new(file, :dup => true) + @path = File.join(File.dirname(__FILE__), 'tmp') + rm_rf @path + mkdir @path + open end def teardown - @db.close - @dbd.close - rm_rf File.join(File.dirname(__FILE__), 'tmp') + close + rm_rf @path end + def open + @db = Bdb::Simple.new(@path) + @dbd = Bdb::Simple.new(@path, :name => 'dup', :dup => true) + end + + def close + @db.close + @dbd.close + end + def test_put_and_get @db['key'] = 'data' assert_equal 'data', @db['key'] @@ -27,6 +33,30 @@ class SimpleTest < Test::Unit::TestCase assert_equal ['data1', 'data2'], @dbd['key'].to_a end + def test_update + @db[:key] = 0 + close + + pids = [] + 5.times do + pids << Process.fork do + db = Bdb::Simple.new(@path) + 10.times do + db.update(:key) do |v| + sleep(0.1) + v + 1 + end + end + db.close + end + end + pids.each {|pid| Process.wait(pid)} + + open + assert_equal 50, @db[:key] + end + + def test_delete @db['key'] = 'data' assert_equal 'data', @db['key'] @@ -50,4 +80,14 @@ class SimpleTest < Test::Unit::TestCase assert_equal expected, list.shuffle.sort {|a,b| Bdb::Simple.compare_absolute(a,b)} end end + + def parallel(n) + threads = [] + n.times do |i| + threads << Thread.new do + yield(i) + end + end + threads.each { |thread| thread.join } + end end