add support for replication

master
Justin Balthrop 2009-11-23 23:29:26 -08:00
parent e0a617bde7
commit bef69b90eb
5 changed files with 288 additions and 25 deletions

237
ext/bdb.c
View File

@ -267,9 +267,9 @@ VALUE db_open(VALUE obj, VALUE vtxn, VALUE vdisk_file,
dbh->db->app_private=dbh;
rv = dbh->db->open(dbh->db,txn?txn->txn:NULL,
StringValueCStr(vdisk_file),
logical_db,
dbtype,flags,mode);
StringValueCStr(vdisk_file),
logical_db,
dbtype,flags,mode);
if (rv != 0) {
raise_error(rv,"db_open failure: %s(%d)",db_strerror(rv),rv);
}
@ -1637,6 +1637,8 @@ VALUE env_open(VALUE obj, VALUE vhome, VALUE vflags, VALUE vmode)
raise_error(0,"env handle already used and closed");
rv = eh->env->open(eh->env,StringValueCStr(vhome),flags,mode);
eh->env->app_private=eh;
if (rv != 0) {
raise_error(rv, "env_open failure: %s",db_strerror(rv));
}
@ -2749,6 +2751,205 @@ VALUE env_get_home(VALUE obj)
return rb_str_new2(home);
}
/*
* call-seq:
* env.set_verbose(which, onoff)
*
* set verbose messages on or off
*/
VALUE env_set_verbose(VALUE obj, VALUE which, VALUE onoff)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->set_verbose(eh->env, NUM2UINT(which), RTEST(onoff));
if ( rv != 0 ) raise_error(rv, "env_set_verbose: %s",db_strerror(rv));
return Qtrue;
}
/*
* call-seq:
* env.rep_priority = int
*
* specify how the replication manager will handle acknowledgement of replication messages
*/
VALUE env_rep_set_priority(VALUE obj, VALUE priority)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->rep_set_priority(eh->env, NUM2UINT(priority));
if ( rv != 0 ) raise_error(rv, "env_rep_set_priority: %s", db_strerror(rv));
return priority;
}
/*
* call-seq:
* env.rep_priority -> int
*
* returns the replication manager's acknowledgement policy
*/
VALUE env_rep_get_priority(VALUE obj)
{
t_envh *eh;
u_int32_t priority;
Data_Get_Struct(obj,t_envh,eh);
eh->env->rep_get_priority(eh->env, &priority);
return INT2NUM(priority);
}
/*
* call-seq:
* env.rep_nsites = int
*
* specify how the replication manager will handle acknowledgement of replication messages
*/
VALUE env_rep_set_nsites(VALUE obj, VALUE nsites)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->rep_set_nsites(eh->env, NUM2UINT(nsites));
if ( rv != 0 ) raise_error(rv, "env_rep_set_nsites: %s", db_strerror(rv));
return nsites;
}
/*
* call-seq:
* env.rep_nsites -> int
*
* returns the replication manager's acknowledgement policy
*/
VALUE env_rep_get_nsites(VALUE obj)
{
t_envh *eh;
u_int32_t nsites;
Data_Get_Struct(obj,t_envh,eh);
eh->env->rep_get_nsites(eh->env, &nsites);
return INT2NUM(nsites);
}
/*
* call-seq:
* env.repmgr_set_local_site(host, port)
*
* specify the local site for the replication manager
*/
VALUE env_repmgr_set_local_site(VALUE obj, VALUE host, VALUE port)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->repmgr_set_local_site(eh->env, StringValuePtr(host), NUM2UINT(port), 0);
if ( rv != 0 ) raise_error(rv, "env_repmgr_set_local_site: %s", db_strerror(rv));
return Qtrue;
}
/*
* call-seq:
* env.repmgr_add_remote_site(host, port)
*
* add a remote for the replication manager
*/
VALUE env_repmgr_add_remote_site(VALUE obj, VALUE host, VALUE port)
{
t_envh *eh;
int rv;
int eidp;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->repmgr_add_remote_site(eh->env, StringValuePtr(host), NUM2UINT(port), &eidp, 0);
if ( rv != 0 ) raise_error(rv, "env_repmgr_add_remote_site: %s", db_strerror(rv));
return INT2NUM(eidp);
}
/*
* call-seq:
* env.repmgr_ack_policy = int
*
* specify how the replication manager will handle acknowledgement of replication messages
*/
VALUE env_repmgr_set_ack_policy(VALUE obj, VALUE policy)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->repmgr_set_ack_policy(eh->env, NUM2INT(policy));
if ( rv != 0 ) raise_error(rv, "env_repmgr_set_ack_policy: %s", db_strerror(rv));
return policy;
}
/*
* call-seq:
* env.repmgr_ack_policy -> int
*
* returns the replication manager's acknowledgement policy
*/
VALUE env_repmgr_get_ack_policy(VALUE obj)
{
t_envh *eh;
int policy;
Data_Get_Struct(obj,t_envh,eh);
eh->env->repmgr_get_ack_policy(eh->env, &policy);
return INT2NUM(policy);
}
/*
* call-seq:
* env.repmgr_start(num_threads, flags)
*
* start the replication manager
*/
VALUE env_repmgr_start(VALUE obj, VALUE num_threads, VALUE flags)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->repmgr_start(eh->env, NUM2INT(num_threads), NUM2UINT(flags));
if ( rv != 0 ) raise_error(rv, "env_repmgr_start: %s", db_strerror(rv));
return Qtrue;
}
/*
* call-seq:
* env.repmgr_stat_print
*
* prints replication manager stats
*/
VALUE env_repmgr_stat_print(VALUE obj, VALUE flags)
{
t_envh *eh;
int rv;
Data_Get_Struct(obj,t_envh,eh);
rv = eh->env->repmgr_stat_print(eh->env, NUM2UINT(flags));
if ( rv != 0 ) raise_error(rv, "env_repmgr_stat_print: %s", db_strerror(rv));
return Qtrue;
}
static void txn_finish(t_txnh *txn)
{
if ( RTEST(ruby_debug) )
@ -2935,7 +3136,7 @@ void Init_bdb() {
rb_define_method(cDb,"associate",db_associate,4);
rb_define_method(cDb,"btree_compare=",db_btree_compare_set,1);
rb_define_method(cDb,"flags=",db_flags_set,1);
rb_define_method(cDb,"flags",db_flags_get,0);
rb_define_method(cDb,"flags",db_flags_get,0);
rb_define_method(cDb,"open",db_open,6);
rb_define_method(cDb,"close",db_close,1);
rb_define_method(cDb,"[]",db_aget,1);
@ -2977,8 +3178,8 @@ void Init_bdb() {
rb_define_method(cEnv,"close",env_close,0);
rb_define_method(cEnv,"db",env_db,0);
rb_define_method(cEnv,"cachesize=",env_set_cachesize,1);
rb_define_method(cEnv,"cachesize",env_get_cachesize,0);
rb_define_method(cEnv,"flags",env_get_flags,0);
rb_define_method(cEnv,"cachesize",env_get_cachesize,0);
rb_define_method(cEnv,"flags",env_get_flags,0);
rb_define_method(cEnv,"flags_on=",env_set_flags_on,1);
rb_define_method(cEnv,"flags_off=",env_set_flags_off,1);
rb_define_method(cEnv,"list_dbs",env_list_dbs,0);
@ -2995,19 +3196,31 @@ void Init_bdb() {
rb_define_method(cEnv,"set_lk_detect",env_set_lk_detect,1);
rb_define_method(cEnv,"get_lk_detect",env_get_lk_detect,0);
rb_define_method(cEnv,"set_lk_max_locks",env_set_lk_max_locks,1);
rb_define_method(cEnv,"get_lk_max_locks",env_get_lk_max_locks,0);
rb_define_method(cEnv,"get_lk_max_locks",env_get_lk_max_locks,0);
rb_define_method(cEnv,"set_lk_max_objects",env_set_lk_max_objects,1);
rb_define_method(cEnv,"get_lk_max_objects",env_get_lk_max_objects,0);
rb_define_method(cEnv,"get_lk_max_objects",env_get_lk_max_objects,0);
rb_define_method(cEnv,"set_shm_key",env_set_shm_key,1);
rb_define_method(cEnv,"get_shm_key",env_get_shm_key,0);
rb_define_method(cEnv,"set_data_dir",env_set_data_dir,1);
rb_define_method(cEnv,"get_data_dirs",env_get_data_dirs,0);
rb_define_method(cEnv,"get_data_dirs",env_get_data_dirs,0);
rb_define_method(cEnv,"set_lg_dir",env_set_lg_dir,1);
rb_define_method(cEnv,"get_lg_dir",env_get_lg_dir,0);
rb_define_method(cEnv,"get_lg_dir",env_get_lg_dir,0);
rb_define_method(cEnv,"set_tmp_dir",env_set_tmp_dir,1);
rb_define_method(cEnv,"get_tmp_dir",env_get_tmp_dir,0);
rb_define_method(cEnv,"get_home",env_get_home,0);
rb_define_method(cEnv,"get_tmp_dir",env_get_tmp_dir,0);
rb_define_method(cEnv,"get_home",env_get_home,0);
rb_define_method(cEnv,"set_verbose",env_set_verbose,2);
rb_define_method(cEnv,"rep_priority=", env_rep_set_priority, 1);
rb_define_method(cEnv,"rep_priority", env_rep_get_priority, 0);
rb_define_method(cEnv,"rep_nsites=", env_rep_set_nsites, 1);
rb_define_method(cEnv,"rep_nsites", env_rep_get_nsites, 0);
rb_define_method(cEnv,"repmgr_set_local_site", env_repmgr_set_local_site, 2);
rb_define_method(cEnv,"repmgr_add_remote_site", env_repmgr_add_remote_site, 2);
rb_define_method(cEnv,"repmgr_ack_policy=", env_repmgr_set_ack_policy, 1);
rb_define_method(cEnv,"repmgr_ack_policy", env_repmgr_get_ack_policy, 0);
rb_define_method(cEnv,"repmgr_start", env_repmgr_start, 2);
rb_define_method(cEnv,"repmgr_stat_print", env_repmgr_stat_print, 1);
cTxnStat = rb_define_class_under(mBdb,"TxnStat",rb_cObject);
rb_define_method(cTxnStat,"[]",stat_aref,1);

View File

@ -19,8 +19,12 @@ class Bdb::Base
indexes[field] = opts
end
def path
config[:path] || Dir.pwd
end
def environment
@environment ||= Bdb::Environment.new(config[:path], self)
@environment ||= Bdb::Environment.new(path, self)
end
def transaction(nested = true, &block)
@ -32,9 +36,13 @@ class Bdb::Base
end
def checkpoint(opts = {})
environment.synchronize(opts)
environment.checkpoint(opts)
end
def master?
environment.master?
end
private
def get_field(field, value)

View File

@ -10,10 +10,11 @@ class Bdb::Database < Bdb::Base
def db(index = nil)
if @db.nil?
@db = {}
open_flags = master? ? Bdb::DB_CREATE : Bdb::DB_RDONLY
transaction(false) do
primary_db = environment.env.db
primary_db.pagesize = config[:page_size] if config[:page_size]
primary_db.open(transaction, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0)
primary_db.open(transaction, name, nil, Bdb::Db::BTREE, open_flags, 0)
@db[:primary_key] = primary_db
indexes.each do |field, opts|
@ -31,13 +32,22 @@ class Bdb::Database < Bdb::Base
index_db = environment.env.db
index_db.flags = Bdb::DB_DUPSORT unless opts[:unique]
index_db.pagesize = config[:page_size] if config[:page_size]
index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0)
primary_db.associate(transaction, index_db, Bdb::DB_CREATE, index_callback)
index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, open_flags, 0)
primary_db.associate(transaction, index_db, open_flags, index_callback)
@db[field] = index_db
end
end
end
@db[index || :primary_key]
rescue Bdb::DbError => e
# Retry if the database doesn't exist and we are a replication client.
if not master? and e.code == Errno::ENOENT::Errno
close
sleep 1
retry
else
raise(e)
end
end
def close
@ -131,13 +141,18 @@ class Bdb::Database < Bdb::Base
set.results
end
def [](key)
get(key).first
end
def set(key, value, opts = {})
synchronize do
key = Tuple.dump(key)
value = Marshal.dump(value)
flags = opts[:create] ? Bdb::DB_NOOVERWRITE : 0
db.put(transaction, key, value, flags)
end
value
end
end
def delete(key)
@ -154,6 +169,10 @@ class Bdb::Database < Bdb::Base
end
end
def sync
db.sync
end
private
def get_key(key, opts)

View File

@ -1,12 +1,25 @@
require 'thread'
require 'bdb/replication'
class Bdb::Environment
@@env = {}
def self.new(path, database = nil)
# Only allow one environment per path.
path = File.expand_path(path)
@@env[path] ||= super(path)
@@env[path].databases << database if database
@@env[path]
end
def initialize(path)
@path = path
end
attr_reader :path
def self.[](path)
new(path)
end
def self.config(config = {})
@config ||= {
:max_locks => 5000,
@ -22,10 +35,10 @@ class Bdb::Environment
@config.merge!(config)
end
def initialize(path)
@path = path
include Replication
def self.replicate(path, opts)
self[path].replicate(opts)
end
attr_reader :path
def databases
@databases ||= []
@ -40,16 +53,19 @@ class Bdb::Environment
else
env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_INIT_LOCK |
Bdb::DB_REGISTER | Bdb::DB_RECOVER | Bdb::DB_INIT_MPOOL | Bdb::DB_THREAD
end
env_flags |= Bdb::DB_INIT_REP if replicate?
end
@env.cachesize = config[:cache_size] if config[:cache_size]
@env.set_timeout(config[:txn_timeout], Bdb::DB_SET_TXN_TIMEOUT) if config[:txn_timeout]
@env.set_timeout(config[:lock_timeout], Bdb::DB_SET_LOCK_TIMEOUT) if config[:lock_timeout]
@env.set_lk_max_locks(config[:max_locks]) if config[:max_locks]
@env.set_lk_detect(Bdb::DB_LOCK_RANDOM)
@env.flags_on = Bdb::DB_TXN_WRITE_NOSYNC | Bdb::DB_TIME_NOTGRANTED
@env.open(path, env_flags, 0)
init_replication(@env) if replicate?
@env.open(path, env_flags, 0)
start_replication(@env) if replicate?
@exit_handler ||= at_exit { close }
end
end

View File

@ -1,6 +1,13 @@
require 'test/unit'
require 'fileutils'
require File.dirname(__FILE__) + '/../ext/bdb'
require 'pp'
$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../ext')
$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../lib')
$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../../tuple/ext')
require 'bdb'
require 'bdb/database'
class Test::Unit::TestCase
include FileUtils