From bef69b90eb4ff8bcb552674dc36544b7cabe9798 Mon Sep 17 00:00:00 2001 From: Justin Balthrop Date: Mon, 23 Nov 2009 23:29:26 -0800 Subject: [PATCH] add support for replication --- ext/bdb.c | 237 ++++++++++++++++++++++++++++++++++++++--- lib/bdb/base.rb | 14 ++- lib/bdb/database.rb | 27 ++++- lib/bdb/environment.rb | 26 ++++- test/test_helper.rb | 9 +- 5 files changed, 288 insertions(+), 25 deletions(-) diff --git a/ext/bdb.c b/ext/bdb.c index 07dbf32..24e5181 100644 --- a/ext/bdb.c +++ b/ext/bdb.c @@ -267,9 +267,9 @@ VALUE db_open(VALUE obj, VALUE vtxn, VALUE vdisk_file, dbh->db->app_private=dbh; rv = dbh->db->open(dbh->db,txn?txn->txn:NULL, - StringValueCStr(vdisk_file), - logical_db, - dbtype,flags,mode); + StringValueCStr(vdisk_file), + logical_db, + dbtype,flags,mode); if (rv != 0) { raise_error(rv,"db_open failure: %s(%d)",db_strerror(rv),rv); } @@ -1637,6 +1637,8 @@ VALUE env_open(VALUE obj, VALUE vhome, VALUE vflags, VALUE vmode) raise_error(0,"env handle already used and closed"); rv = eh->env->open(eh->env,StringValueCStr(vhome),flags,mode); + eh->env->app_private=eh; + if (rv != 0) { raise_error(rv, "env_open failure: %s",db_strerror(rv)); } @@ -2749,6 +2751,205 @@ VALUE env_get_home(VALUE obj) return rb_str_new2(home); } +/* + * call-seq: + * env.set_verbose(which, onoff) + * + * set verbose messages on or off + */ +VALUE env_set_verbose(VALUE obj, VALUE which, VALUE onoff) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->set_verbose(eh->env, NUM2UINT(which), RTEST(onoff)); + + if ( rv != 0 ) raise_error(rv, "env_set_verbose: %s",db_strerror(rv)); + + return Qtrue; +} + +/* + * call-seq: + * env.rep_priority = int + * + * specify how the replication manager will handle acknowledgement of replication messages + */ +VALUE env_rep_set_priority(VALUE obj, VALUE priority) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->rep_set_priority(eh->env, NUM2UINT(priority)); + + if ( rv != 0 ) raise_error(rv, "env_rep_set_priority: %s", db_strerror(rv)); + return priority; +} + +/* + * call-seq: + * env.rep_priority -> int + * + * returns the replication manager's acknowledgement policy + */ +VALUE env_rep_get_priority(VALUE obj) +{ + t_envh *eh; + u_int32_t priority; + + Data_Get_Struct(obj,t_envh,eh); + eh->env->rep_get_priority(eh->env, &priority); + + return INT2NUM(priority); +} + +/* + * call-seq: + * env.rep_nsites = int + * + * specify how the replication manager will handle acknowledgement of replication messages + */ +VALUE env_rep_set_nsites(VALUE obj, VALUE nsites) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->rep_set_nsites(eh->env, NUM2UINT(nsites)); + + if ( rv != 0 ) raise_error(rv, "env_rep_set_nsites: %s", db_strerror(rv)); + return nsites; +} + +/* + * call-seq: + * env.rep_nsites -> int + * + * returns the replication manager's acknowledgement policy + */ +VALUE env_rep_get_nsites(VALUE obj) +{ + t_envh *eh; + u_int32_t nsites; + + Data_Get_Struct(obj,t_envh,eh); + eh->env->rep_get_nsites(eh->env, &nsites); + + return INT2NUM(nsites); +} + + +/* + * call-seq: + * env.repmgr_set_local_site(host, port) + * + * specify the local site for the replication manager + */ +VALUE env_repmgr_set_local_site(VALUE obj, VALUE host, VALUE port) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->repmgr_set_local_site(eh->env, StringValuePtr(host), NUM2UINT(port), 0); + + if ( rv != 0 ) raise_error(rv, "env_repmgr_set_local_site: %s", db_strerror(rv)); + return Qtrue; +} + +/* + * call-seq: + * env.repmgr_add_remote_site(host, port) + * + * add a remote for the replication manager + */ +VALUE env_repmgr_add_remote_site(VALUE obj, VALUE host, VALUE port) +{ + t_envh *eh; + int rv; + int eidp; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->repmgr_add_remote_site(eh->env, StringValuePtr(host), NUM2UINT(port), &eidp, 0); + + if ( rv != 0 ) raise_error(rv, "env_repmgr_add_remote_site: %s", db_strerror(rv)); + return INT2NUM(eidp); +} + +/* + * call-seq: + * env.repmgr_ack_policy = int + * + * specify how the replication manager will handle acknowledgement of replication messages + */ +VALUE env_repmgr_set_ack_policy(VALUE obj, VALUE policy) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->repmgr_set_ack_policy(eh->env, NUM2INT(policy)); + + if ( rv != 0 ) raise_error(rv, "env_repmgr_set_ack_policy: %s", db_strerror(rv)); + return policy; +} + +/* + * call-seq: + * env.repmgr_ack_policy -> int + * + * returns the replication manager's acknowledgement policy + */ +VALUE env_repmgr_get_ack_policy(VALUE obj) +{ + t_envh *eh; + int policy; + + Data_Get_Struct(obj,t_envh,eh); + eh->env->repmgr_get_ack_policy(eh->env, &policy); + + return INT2NUM(policy); +} + +/* + * call-seq: + * env.repmgr_start(num_threads, flags) + * + * start the replication manager + */ +VALUE env_repmgr_start(VALUE obj, VALUE num_threads, VALUE flags) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->repmgr_start(eh->env, NUM2INT(num_threads), NUM2UINT(flags)); + + if ( rv != 0 ) raise_error(rv, "env_repmgr_start: %s", db_strerror(rv)); + return Qtrue; +} + +/* + * call-seq: + * env.repmgr_stat_print + * + * prints replication manager stats + */ +VALUE env_repmgr_stat_print(VALUE obj, VALUE flags) +{ + t_envh *eh; + int rv; + + Data_Get_Struct(obj,t_envh,eh); + rv = eh->env->repmgr_stat_print(eh->env, NUM2UINT(flags)); + + if ( rv != 0 ) raise_error(rv, "env_repmgr_stat_print: %s", db_strerror(rv)); + return Qtrue; +} + + static void txn_finish(t_txnh *txn) { if ( RTEST(ruby_debug) ) @@ -2935,7 +3136,7 @@ void Init_bdb() { rb_define_method(cDb,"associate",db_associate,4); rb_define_method(cDb,"btree_compare=",db_btree_compare_set,1); rb_define_method(cDb,"flags=",db_flags_set,1); - rb_define_method(cDb,"flags",db_flags_get,0); + rb_define_method(cDb,"flags",db_flags_get,0); rb_define_method(cDb,"open",db_open,6); rb_define_method(cDb,"close",db_close,1); rb_define_method(cDb,"[]",db_aget,1); @@ -2977,8 +3178,8 @@ void Init_bdb() { rb_define_method(cEnv,"close",env_close,0); rb_define_method(cEnv,"db",env_db,0); rb_define_method(cEnv,"cachesize=",env_set_cachesize,1); - rb_define_method(cEnv,"cachesize",env_get_cachesize,0); - rb_define_method(cEnv,"flags",env_get_flags,0); + rb_define_method(cEnv,"cachesize",env_get_cachesize,0); + rb_define_method(cEnv,"flags",env_get_flags,0); rb_define_method(cEnv,"flags_on=",env_set_flags_on,1); rb_define_method(cEnv,"flags_off=",env_set_flags_off,1); rb_define_method(cEnv,"list_dbs",env_list_dbs,0); @@ -2995,19 +3196,31 @@ void Init_bdb() { rb_define_method(cEnv,"set_lk_detect",env_set_lk_detect,1); rb_define_method(cEnv,"get_lk_detect",env_get_lk_detect,0); rb_define_method(cEnv,"set_lk_max_locks",env_set_lk_max_locks,1); - rb_define_method(cEnv,"get_lk_max_locks",env_get_lk_max_locks,0); + rb_define_method(cEnv,"get_lk_max_locks",env_get_lk_max_locks,0); rb_define_method(cEnv,"set_lk_max_objects",env_set_lk_max_objects,1); - rb_define_method(cEnv,"get_lk_max_objects",env_get_lk_max_objects,0); + rb_define_method(cEnv,"get_lk_max_objects",env_get_lk_max_objects,0); rb_define_method(cEnv,"set_shm_key",env_set_shm_key,1); rb_define_method(cEnv,"get_shm_key",env_get_shm_key,0); rb_define_method(cEnv,"set_data_dir",env_set_data_dir,1); - rb_define_method(cEnv,"get_data_dirs",env_get_data_dirs,0); + rb_define_method(cEnv,"get_data_dirs",env_get_data_dirs,0); rb_define_method(cEnv,"set_lg_dir",env_set_lg_dir,1); - rb_define_method(cEnv,"get_lg_dir",env_get_lg_dir,0); + rb_define_method(cEnv,"get_lg_dir",env_get_lg_dir,0); rb_define_method(cEnv,"set_tmp_dir",env_set_tmp_dir,1); - rb_define_method(cEnv,"get_tmp_dir",env_get_tmp_dir,0); - rb_define_method(cEnv,"get_home",env_get_home,0); + rb_define_method(cEnv,"get_tmp_dir",env_get_tmp_dir,0); + rb_define_method(cEnv,"get_home",env_get_home,0); + rb_define_method(cEnv,"set_verbose",env_set_verbose,2); + + rb_define_method(cEnv,"rep_priority=", env_rep_set_priority, 1); + rb_define_method(cEnv,"rep_priority", env_rep_get_priority, 0); + rb_define_method(cEnv,"rep_nsites=", env_rep_set_nsites, 1); + rb_define_method(cEnv,"rep_nsites", env_rep_get_nsites, 0); + rb_define_method(cEnv,"repmgr_set_local_site", env_repmgr_set_local_site, 2); + rb_define_method(cEnv,"repmgr_add_remote_site", env_repmgr_add_remote_site, 2); + rb_define_method(cEnv,"repmgr_ack_policy=", env_repmgr_set_ack_policy, 1); + rb_define_method(cEnv,"repmgr_ack_policy", env_repmgr_get_ack_policy, 0); + rb_define_method(cEnv,"repmgr_start", env_repmgr_start, 2); + rb_define_method(cEnv,"repmgr_stat_print", env_repmgr_stat_print, 1); cTxnStat = rb_define_class_under(mBdb,"TxnStat",rb_cObject); rb_define_method(cTxnStat,"[]",stat_aref,1); diff --git a/lib/bdb/base.rb b/lib/bdb/base.rb index b56f538..9e7941f 100644 --- a/lib/bdb/base.rb +++ b/lib/bdb/base.rb @@ -19,8 +19,12 @@ class Bdb::Base indexes[field] = opts end + def path + config[:path] || Dir.pwd + end + def environment - @environment ||= Bdb::Environment.new(config[:path], self) + @environment ||= Bdb::Environment.new(path, self) end def transaction(nested = true, &block) @@ -32,9 +36,13 @@ class Bdb::Base end def checkpoint(opts = {}) - environment.synchronize(opts) + environment.checkpoint(opts) end - + + def master? + environment.master? + end + private def get_field(field, value) diff --git a/lib/bdb/database.rb b/lib/bdb/database.rb index 16c5955..5038cc0 100644 --- a/lib/bdb/database.rb +++ b/lib/bdb/database.rb @@ -10,10 +10,11 @@ class Bdb::Database < Bdb::Base def db(index = nil) if @db.nil? @db = {} + open_flags = master? ? Bdb::DB_CREATE : Bdb::DB_RDONLY transaction(false) do primary_db = environment.env.db primary_db.pagesize = config[:page_size] if config[:page_size] - primary_db.open(transaction, name, nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0) + primary_db.open(transaction, name, nil, Bdb::Db::BTREE, open_flags, 0) @db[:primary_key] = primary_db indexes.each do |field, opts| @@ -31,13 +32,22 @@ class Bdb::Database < Bdb::Base index_db = environment.env.db index_db.flags = Bdb::DB_DUPSORT unless opts[:unique] index_db.pagesize = config[:page_size] if config[:page_size] - index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, Bdb::DB_CREATE, 0) - primary_db.associate(transaction, index_db, Bdb::DB_CREATE, index_callback) + index_db.open(transaction, "#{name}_by_#{field}", nil, Bdb::Db::BTREE, open_flags, 0) + primary_db.associate(transaction, index_db, open_flags, index_callback) @db[field] = index_db end end end @db[index || :primary_key] + rescue Bdb::DbError => e + # Retry if the database doesn't exist and we are a replication client. + if not master? and e.code == Errno::ENOENT::Errno + close + sleep 1 + retry + else + raise(e) + end end def close @@ -131,13 +141,18 @@ class Bdb::Database < Bdb::Base set.results end + def [](key) + get(key).first + end + def set(key, value, opts = {}) synchronize do key = Tuple.dump(key) value = Marshal.dump(value) flags = opts[:create] ? Bdb::DB_NOOVERWRITE : 0 db.put(transaction, key, value, flags) - end + value + end end def delete(key) @@ -154,6 +169,10 @@ class Bdb::Database < Bdb::Base end end + def sync + db.sync + end + private def get_key(key, opts) diff --git a/lib/bdb/environment.rb b/lib/bdb/environment.rb index 8999602..a09a97f 100644 --- a/lib/bdb/environment.rb +++ b/lib/bdb/environment.rb @@ -1,12 +1,25 @@ +require 'thread' +require 'bdb/replication' + class Bdb::Environment @@env = {} def self.new(path, database = nil) + # Only allow one environment per path. path = File.expand_path(path) @@env[path] ||= super(path) @@env[path].databases << database if database @@env[path] end + def initialize(path) + @path = path + end + attr_reader :path + + def self.[](path) + new(path) + end + def self.config(config = {}) @config ||= { :max_locks => 5000, @@ -22,10 +35,10 @@ class Bdb::Environment @config.merge!(config) end - def initialize(path) - @path = path + include Replication + def self.replicate(path, opts) + self[path].replicate(opts) end - attr_reader :path def databases @databases ||= [] @@ -40,16 +53,19 @@ class Bdb::Environment else env_flags = Bdb::DB_CREATE | Bdb::DB_INIT_TXN | Bdb::DB_INIT_LOCK | Bdb::DB_REGISTER | Bdb::DB_RECOVER | Bdb::DB_INIT_MPOOL | Bdb::DB_THREAD - end + env_flags |= Bdb::DB_INIT_REP if replicate? + end @env.cachesize = config[:cache_size] if config[:cache_size] @env.set_timeout(config[:txn_timeout], Bdb::DB_SET_TXN_TIMEOUT) if config[:txn_timeout] @env.set_timeout(config[:lock_timeout], Bdb::DB_SET_LOCK_TIMEOUT) if config[:lock_timeout] @env.set_lk_max_locks(config[:max_locks]) if config[:max_locks] @env.set_lk_detect(Bdb::DB_LOCK_RANDOM) @env.flags_on = Bdb::DB_TXN_WRITE_NOSYNC | Bdb::DB_TIME_NOTGRANTED - @env.open(path, env_flags, 0) + init_replication(@env) if replicate? + @env.open(path, env_flags, 0) + start_replication(@env) if replicate? @exit_handler ||= at_exit { close } end end diff --git a/test/test_helper.rb b/test/test_helper.rb index 52767e5..37c6520 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -1,6 +1,13 @@ require 'test/unit' require 'fileutils' -require File.dirname(__FILE__) + '/../ext/bdb' +require 'pp' + +$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../ext') +$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../lib') +$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../../tuple/ext') + +require 'bdb' +require 'bdb/database' class Test::Unit::TestCase include FileUtils