fix memory allocation error in associate. was causing random DB_SECONDARY_BAD errors if garbage collection reclaimed the space for the secondary key returned by the associate callback
This commit is contained in:
parent
6e2dec6ff9
commit
9809bb805d
3 changed files with 162 additions and 65 deletions
33
ext/bdb.c
33
ext/bdb.c
|
@ -1091,15 +1091,18 @@ VALUE db_del(VALUE obj, VALUE vtxn, VALUE vkey, VALUE vflags)
|
|||
}
|
||||
|
||||
void assoc_key(DBT* result, VALUE obj) {
|
||||
VALUE str=StringValue(obj);
|
||||
VALUE key = StringValue(obj);
|
||||
int len = RSTRING_LEN(key);
|
||||
char *str = malloc(len);
|
||||
memcpy(str, RSTRING_PTR(key), len);
|
||||
|
||||
#ifdef DEBUG_DB
|
||||
fprintf(stderr,"assoc_key %*s", RSTRING_LEN(str),RSTRING_PTR(str));
|
||||
fprintf(stderr,"assoc_key %*s", len, str);
|
||||
#endif
|
||||
|
||||
result->data=RSTRING_PTR(str);
|
||||
result->size=RSTRING_LEN(str);
|
||||
result->flags=LMEMFLAG;
|
||||
result->size = len;
|
||||
result->flags = LMEMFLAG | DB_DBT_APPMALLOC;
|
||||
result->data = str;
|
||||
}
|
||||
|
||||
VALUE assoc_call(VALUE *args)
|
||||
|
@ -1114,7 +1117,7 @@ VALUE assoc_rescue(VALUE *error, VALUE e)
|
|||
*error = e;
|
||||
}
|
||||
|
||||
int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result)
|
||||
int assoc_callback(DB *secdb, const DBT* pkey, const DBT* data, DBT* skey)
|
||||
{
|
||||
t_dbh *dbh;
|
||||
VALUE proc;
|
||||
|
@ -1124,12 +1127,12 @@ int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result)
|
|||
VALUE keys;
|
||||
int i;
|
||||
|
||||
memset(result,0,sizeof(DBT));
|
||||
memset(skey,0,sizeof(DBT));
|
||||
dbh=secdb->app_private;
|
||||
|
||||
args[0]=dbh->aproc;
|
||||
args[1]=dbh->self;
|
||||
args[2]=rb_str_new(key->data,key->size);
|
||||
args[2]=rb_str_new(pkey->data,pkey->size);
|
||||
args[3]=rb_str_new(data->data,data->size);
|
||||
|
||||
#ifdef DEBUG_DB
|
||||
|
@ -1152,19 +1155,19 @@ int assoc_callback(DB *secdb, const DBT* key, const DBT* data, DBT* result)
|
|||
retv=RARRAY(keys)->ptr[0];
|
||||
break;
|
||||
default:
|
||||
result->size=RARRAY(keys)->len;
|
||||
result->flags=DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
|
||||
result->data=malloc(result->size * sizeof(DBT));
|
||||
memset(result->data,0,result->size * sizeof(DBT));
|
||||
skey->size = RARRAY(keys)->len;
|
||||
skey->flags = LMEMFLAG | DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
|
||||
skey->data = malloc(skey->size * sizeof(DBT));
|
||||
memset(skey->data, 0, skey->size * sizeof(DBT));
|
||||
|
||||
for (i=0; i<result->size; i++) {
|
||||
assoc_key(result->data + i*sizeof(DBT), (VALUE)RARRAY(keys)->ptr[i]);
|
||||
for (i=0; i<skey->size; i++) {
|
||||
assoc_key(skey->data + i * sizeof(DBT), (VALUE)RARRAY(keys)->ptr[i]);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
assoc_key(result, retv);
|
||||
assoc_key(skey, retv);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -3,69 +3,102 @@ require 'bdb' if not defined?(Bdb)
|
|||
class Bdb::Simple
|
||||
include Enumerable
|
||||
|
||||
def initialize(file, opts = {})
|
||||
@dup = opts[:dup]
|
||||
@file = file
|
||||
def initialize(path, opts = {})
|
||||
@dup = opts[:dup] ? true : false
|
||||
|
||||
if opts[:raw]
|
||||
@raw = true
|
||||
@sort = false
|
||||
else
|
||||
@raw = false
|
||||
@sort = opts[:sort] == false ? false : true
|
||||
end
|
||||
|
||||
@name = opts[:name] || 'default'
|
||||
@path = path
|
||||
end
|
||||
|
||||
def dup?
|
||||
@dup
|
||||
end
|
||||
def dup?; @dup; end
|
||||
def sort?; @sort; end
|
||||
def raw?; @raw; end
|
||||
|
||||
attr_reader :file
|
||||
attr_reader :path, :name
|
||||
|
||||
def env
|
||||
if @env.nil?
|
||||
@env = Bdb::Env.new(0)
|
||||
env_flags = Bdb::DB_CREATE | # Create the environment if it does not already exist.
|
||||
Bdb::DB_INIT_TXN | # Initialize transactions.
|
||||
Bdb::DB_INIT_LOCK | # Initialize locking.
|
||||
Bdb::DB_INIT_LOG | # Initialize logging.
|
||||
Bdb::DB_INIT_MPOOL # Initialize the in-memory cache.
|
||||
# @env.set_lk_detect(Bdb::DB_LOCK_DEFAULT)
|
||||
@env.open(path, env_flags, 0);
|
||||
end
|
||||
@env
|
||||
end
|
||||
|
||||
def db
|
||||
if @db.nil?
|
||||
@db = Bdb::Db.new
|
||||
@db = env.db
|
||||
@db.flags = Bdb::DB_DUPSORT if dup?
|
||||
@db.btree_compare = lambda do |db, key1, key2|
|
||||
self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2))
|
||||
if sort?
|
||||
@db.btree_compare = lambda do |db, key1, key2|
|
||||
self.class.compare_absolute(Marshal.load(key1), Marshal.load(key2))
|
||||
end
|
||||
end
|
||||
@db.open(nil, file, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0)
|
||||
@db.open(nil, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE | Bdb::DB_AUTO_COMMIT, 0)
|
||||
end
|
||||
@db
|
||||
end
|
||||
|
||||
def []=(key, value)
|
||||
db[Marshal.dump(key)] = Marshal.dump(value)
|
||||
db.put(nil, dump(key), dump(value), 0)
|
||||
end
|
||||
|
||||
def delete(key)
|
||||
db.del(nil, Marshal.dump(key), 0)
|
||||
db.del(nil, dump(key), 0)
|
||||
end
|
||||
|
||||
def update(key)
|
||||
k = dump(key)
|
||||
txn = env.txn_begin(nil, 0)
|
||||
begin
|
||||
v = db.get(txn, k, nil, Bdb::DB_RMW)
|
||||
value = yield(load(v))
|
||||
db.put(txn, k, dump(value), 0)
|
||||
txn.commit(0)
|
||||
rescue Exception => e
|
||||
txn.abort
|
||||
raise e
|
||||
end
|
||||
value
|
||||
end
|
||||
|
||||
def [](key)
|
||||
if key.kind_of?(Range) or dup?
|
||||
Bdb::SimpleSet.new(db, key)
|
||||
Bdb::SimpleSet.new(self, key)
|
||||
else
|
||||
v = db.get(nil, Marshal.dump(key), nil, 0)
|
||||
Marshal.load(v) if v
|
||||
v = db.get(nil, dump(key), nil, 0)
|
||||
load(v)
|
||||
end
|
||||
end
|
||||
|
||||
def each
|
||||
cursor = db.cursor(nil, 0)
|
||||
while data = cursor.get(nil, nil, Bdb::DB_NEXT)
|
||||
key = Marshal.load(data[0])
|
||||
value = Marshal.load(data[1])
|
||||
key = load(data[0])
|
||||
value = load(data[1])
|
||||
yield(key, value)
|
||||
end
|
||||
cursor.close
|
||||
end
|
||||
|
||||
def sync
|
||||
db.sync
|
||||
end
|
||||
|
||||
def clear
|
||||
close
|
||||
File.delete(file) if File.exists?(file)
|
||||
nil
|
||||
end
|
||||
|
||||
def close
|
||||
db.close(0)
|
||||
@db = nil
|
||||
env.close
|
||||
@db = nil
|
||||
@env = nil
|
||||
end
|
||||
|
||||
CLASS_ORDER = {}
|
||||
|
@ -113,34 +146,55 @@ class Bdb::Simple
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def load(value)
|
||||
return unless value
|
||||
raw? ? value : Marshal.load(value)
|
||||
end
|
||||
|
||||
def dump(value)
|
||||
raw? ? value.to_s : Marshal.dump(value)
|
||||
end
|
||||
end
|
||||
|
||||
class Bdb::SimpleSet
|
||||
include Enumerable
|
||||
|
||||
def initialize(db, key)
|
||||
@db = db
|
||||
@key = key
|
||||
def initialize(source, key)
|
||||
@source = source
|
||||
@key = key
|
||||
end
|
||||
attr_reader :db, :key
|
||||
attr_reader :source, :key
|
||||
|
||||
def each
|
||||
if key.kind_of?(Range)
|
||||
cursor = db.cursor(nil, 0)
|
||||
k,v = cursor.get(Marshal.dump(key.first), nil, Bdb::DB_SET_RANGE)
|
||||
while k and key.include?(Marshal.load(k))
|
||||
yield Marshal.load(v)
|
||||
cursor = source.db.cursor(nil, 0)
|
||||
k,v = cursor.get(dump(key.first), nil, Bdb::DB_SET_RANGE)
|
||||
while k and key.include?(load(k))
|
||||
yield load(v)
|
||||
k, v = cursor.get(nil, nil, Bdb::DB_NEXT)
|
||||
end
|
||||
cursor.close
|
||||
else
|
||||
cursor = db.cursor(nil, 0)
|
||||
k,v = cursor.get(Marshal.dump(key), nil, Bdb::DB_SET)
|
||||
cursor = source.db.cursor(nil, 0)
|
||||
k,v = cursor.get(dump(key), nil, Bdb::DB_SET)
|
||||
while k
|
||||
yield Marshal.load(v)
|
||||
yield load(v)
|
||||
k,v = cursor.get(nil, nil, Bdb::DB_NEXT_DUP)
|
||||
end
|
||||
cursor.close
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def load(value)
|
||||
source.send(:load, value)
|
||||
end
|
||||
|
||||
def dump(value)
|
||||
source.send(:dump, value)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,21 +3,27 @@ require File.dirname(__FILE__) + '/../lib/bdb/simple'
|
|||
|
||||
class SimpleTest < Test::Unit::TestCase
|
||||
def setup
|
||||
mkdir File.join(File.dirname(__FILE__), 'tmp')
|
||||
|
||||
file = File.join(File.dirname(__FILE__), 'tmp', 'simple.db')
|
||||
@db = Bdb::Simple.new(file)
|
||||
|
||||
file = File.join(File.dirname(__FILE__), 'tmp', 'simple-dup.db')
|
||||
@dbd = Bdb::Simple.new(file, :dup => true)
|
||||
@path = File.join(File.dirname(__FILE__), 'tmp')
|
||||
rm_rf @path
|
||||
mkdir @path
|
||||
open
|
||||
end
|
||||
|
||||
def teardown
|
||||
@db.close
|
||||
@dbd.close
|
||||
rm_rf File.join(File.dirname(__FILE__), 'tmp')
|
||||
close
|
||||
rm_rf @path
|
||||
end
|
||||
|
||||
def open
|
||||
@db = Bdb::Simple.new(@path)
|
||||
@dbd = Bdb::Simple.new(@path, :name => 'dup', :dup => true)
|
||||
end
|
||||
|
||||
def close
|
||||
@db.close
|
||||
@dbd.close
|
||||
end
|
||||
|
||||
def test_put_and_get
|
||||
@db['key'] = 'data'
|
||||
assert_equal 'data', @db['key']
|
||||
|
@ -27,6 +33,30 @@ class SimpleTest < Test::Unit::TestCase
|
|||
assert_equal ['data1', 'data2'], @dbd['key'].to_a
|
||||
end
|
||||
|
||||
def test_update
|
||||
@db[:key] = 0
|
||||
close
|
||||
|
||||
pids = []
|
||||
5.times do
|
||||
pids << Process.fork do
|
||||
db = Bdb::Simple.new(@path)
|
||||
10.times do
|
||||
db.update(:key) do |v|
|
||||
sleep(0.1)
|
||||
v + 1
|
||||
end
|
||||
end
|
||||
db.close
|
||||
end
|
||||
end
|
||||
pids.each {|pid| Process.wait(pid)}
|
||||
|
||||
open
|
||||
assert_equal 50, @db[:key]
|
||||
end
|
||||
|
||||
|
||||
def test_delete
|
||||
@db['key'] = 'data'
|
||||
assert_equal 'data', @db['key']
|
||||
|
@ -50,4 +80,14 @@ class SimpleTest < Test::Unit::TestCase
|
|||
assert_equal expected, list.shuffle.sort {|a,b| Bdb::Simple.compare_absolute(a,b)}
|
||||
end
|
||||
end
|
||||
|
||||
def parallel(n)
|
||||
threads = []
|
||||
n.times do |i|
|
||||
threads << Thread.new do
|
||||
yield(i)
|
||||
end
|
||||
end
|
||||
threads.each { |thread| thread.join }
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue