diff --git a/VERSION b/VERSION index 334bcc4..092cda7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.0.2.1 +0.0.3.0 diff --git a/bin/s2l.rb b/bin/s2l.rb index fa2cf28..e5fc371 100755 --- a/bin/s2l.rb +++ b/bin/s2l.rb @@ -1,48 +1,24 @@ #!/usr/bin/ruby +$:.push File.join( File.dirname( $0), '..', 'lib') require 'logger' +require 'json' require 'rubygems' -require 'sbdb' require 'uuidtools' require 'socket' require 'select' require 'robustserver' require 'active_support' require 'syslog2logan/rotate' +require 'syslog2logan/server' $logger = Logger.new $stderr - -class S2L < Select::Server - attr_accessor :dbs - - def init p - super p - @dbs = p[:dbs] - end - - def event_new_client a - logger.debug :connection => {:new => a} - { :clientclass => S2L::Socket, :dbs => @dbs } - end -end +$logger.formatter = proc { |severity, datetime, progname, msg| [severity, datetime, progname, msg.inspect].to_json+"\n" } module Kernel def logger() $logger end end -class S2L::Socket < Select::Socket - def init opts - @dbs = opts[ :dbs] - super opts - end - - def event_line v - logger.debug :line => v - @dbs.emit v - end - alias emit event_line -end - class Main < RobustServer def initialize conf super @@ -50,14 +26,12 @@ class Main < RobustServer @conf = conf logger.info :open => S2L @serv = S2L.new :sock => TCPServer.new( *@conf[:server]) - logger.info :create => {:home => @conf[:home]} - Dir.mkdir @conf[:home] rescue Errno::EEXIST @sigs[:INT] = @sigs[:TERM] = method(:shutdown) @sigs[:USR1] = method(:state) end def state s = nil - logger.debug :server => @serv + logger.debug :server => @serv.class end def shutdown s = nil @@ -67,18 +41,18 @@ class Main < RobustServer end def run - logger.info :open => SBDB::Env - SBDB::Env.new( @conf[:home], - log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE, - flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) do |dbenv| + logger.info :open => @conf[:backend] + @conf[:backend][0].new( @conf[:backend][1]) do |backend| logger.info :open => Rotate - @serv.dbs = Rotate.new dbenv[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | Bdb::DB_AUTO_COMMIT] - logger.info :run => @serv + @serv.dbs = Rotate.new &backend.to_proc + logger.info :run => @serv.class @serv.run + logger.info :close => @conf[:backend] end end end -Main.main :home => 'logs', :server => [ '', 1514], :retries => [1,1] # [10, 10] +require 'syslog2logan/backend/tch' +Main.main :backend => [ Backend::TCH, {:dir => 'logs'}], :server => [ '', 1514], :retries => [1,1] # [10, 10] logger.info :halted diff --git a/lib/file_queue.rb b/lib/file_queue.rb new file mode 100644 index 0000000..53a4f7a --- /dev/null +++ b/lib/file_queue.rb @@ -0,0 +1,48 @@ +class File + def exclusive_lock + flock File::LOCK_EX + end + + def shared_lock + flock File::LOCK_SH + end + + def unblock + flock File::LOCK_UN + end +end + +class FileQueue + attr_reader :file, :size + alias to_io file + + def initialize file, size = 16 + @file = case file + when File then file + else File.open file, 'a+' + end + @size, @pack = size, "A#{size}" + end + + def push *a + f = @file + f.seek 0, IO::SEEK_END + f.exclusive_lock + f.write a.pack( @pack*a.length) + f.unblock + end + + def pop + f = @file + f.rewind + f.exclusive_lock + s = f.read( @size).unpack( 'L')[0] + f.rewind + f.write [s.succ].pack( 'L') + f.sync + f.shared_lock + f.pos = s + f.read( @size).unpack( 'L')[0] + f.unblock + end +end diff --git a/lib/in_tch.rb b/lib/in_tch.rb new file mode 100644 index 0000000..b5e21c9 --- /dev/null +++ b/lib/in_tch.rb @@ -0,0 +1,83 @@ +#!/usr/bin/ruby + +require 'logger' +require 'rubygems' +require 'uuidtools' +require 'socket' +require 'select' +require 'robustserver' +require 'active_support' +require 'syslog2logan/rotate' + +$logger = Logger.new $stderr + +class S2L < Select::Server + attr_accessor :dbs + + def init p + super p + @dbs = p[:dbs] + end + + def event_new_client a + logger.debug :connection => {:new => a} + { :clientclass => S2L::Socket, :dbs => @dbs } + end +end + +module Kernel + def logger() $logger end +end + +class S2L::Socket < Select::Socket + def init opts + @dbs = opts[ :dbs] + super opts + end + + def event_line v + logger.debug :line => v + @dbs.emit v + end + alias emit event_line +end + +class Main < RobustServer + def initialize conf + super + @logger = $logger + @conf = conf + logger.info :open => S2L + @serv = S2L.new :sock => TCPServer.new( *@conf[:server]) + logger.info :create => {:home => @conf[:home]} + Dir.mkdir @conf[:home] rescue Errno::EEXIST + @sigs[:INT] = @sigs[:TERM] = method(:shutdown) + @sigs[:USR1] = method(:state) + end + + def state s = nil + logger.debug :server => @serv + end + + def shutdown s = nil + logger.info :shutdown => [s, Signal[s]] + @serv.close + exit 0 + end + + def run + logger.info :open => SBDB::Env + SBDB::Env.new( @conf[:home], + log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE, + flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) do |dbenv| + logger.info :open => Rotate + @serv.dbs = Rotate.new dbenv[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | Bdb::DB_AUTO_COMMIT] + logger.info :run => @serv + @serv.run + end + end +end + +Main.main :home => 'logs', :server => [ '', 1514], :retries => [1,1] # [10, 10] + +logger.info :halted diff --git a/lib/syslog2logan/backend/base.rb b/lib/syslog2logan/backend/base.rb new file mode 100644 index 0000000..bf98610 --- /dev/null +++ b/lib/syslog2logan/backend/base.rb @@ -0,0 +1,16 @@ +module Backend +end + +class Backend::Base + def initialize opts = {} + if block_given? + yield self + else + self + end + end + + def to_proc + method :open + end +end diff --git a/lib/syslog2logan/backend/tch.rb b/lib/syslog2logan/backend/tch.rb new file mode 100644 index 0000000..fbf1a9e --- /dev/null +++ b/lib/syslog2logan/backend/tch.rb @@ -0,0 +1,31 @@ +require 'rufus/tokyo' +require 'syslog2logan/backend/base' + +class Backend::TCH < Backend::Base + attr_reader :dir + def initialize opts = {}, &e + @dir = opts[:dir] + Dir.mkdir @dir rescue Errno::EEXIST + @dbs = [] + if block_given? + begin + super opts, &e + ensure + close + end + else + super opts + end + end + + def close + @dbs.each &:close + end + + def open name + logger.info :open => name, :backend => self.class + db = Rufus::Tokyo::Cabinet.new File.join( @dir, name)+".tch" + @dbs.push db + db + end +end diff --git a/lib/syslog2logan/rotate/base.rb b/lib/syslog2logan/rotate/base.rb old mode 100755 new mode 100644 index 30b85cc..703abfa --- a/lib/syslog2logan/rotate/base.rb +++ b/lib/syslog2logan/rotate/base.rb @@ -2,7 +2,7 @@ require 'logger' require 'uuidtools' -require 'active_support' +require 'active_support/core_ext' class Rotate # open_db_func: must returns a db-object with #[] and #[]=. @@ -10,19 +10,19 @@ class Rotate def initialize hash_func = nil, &open_db_func @dbs = Hash.new {|h,k| h[k] = open_db_func.call(k) } hash_func ||= lambda {|k| [k.timestamp.to_i/1.day].pack 'N' } - define_method :hashing, &hash_func + define_singleton_method :hashing, &hash_func @rotate = @dbs['rotate'] @queue = @dbs['queue'] end def db_name id h = hashing id - n = @rdb[ h] + n = @rotate[ h] if n n = UUIDTools::UUID.parse_raw n else n = UUIDTools::UUID.timestamp_create - @rdb[ h] = n.raw + @rotate[ h] = n.raw logger.info :create => n.to_s end n diff --git a/lib/syslog2logan/rotate/bdb.rb b/lib/syslog2logan/rotate/bdb.rb old mode 100755 new mode 100644 diff --git a/lib/syslog2logan/server.rb b/lib/syslog2logan/server.rb new file mode 100644 index 0000000..166d863 --- /dev/null +++ b/lib/syslog2logan/server.rb @@ -0,0 +1,29 @@ +require 'socket' +require 'select' + +class S2L < Select::Server + attr_accessor :dbs + + def init p + super p + @dbs = p[:dbs] + end + + def event_new_client a + logger.debug :connection => {:new => a} + { :clientclass => S2L::Socket, :dbs => @dbs } + end +end + +class S2L::Socket < Select::Socket + def init opts + @dbs = opts[ :dbs] + super opts + end + + def event_line v + logger.debug :line => v + @dbs.emit v + end + alias emit event_line +end