From 64a715a1b6f2aea4b9df22ff6df3caeaf28975e2 Mon Sep 17 00:00:00 2001 From: Denis Knauf Date: Tue, 23 Mar 2010 11:06:05 +0100 Subject: [PATCH] Rotation and Listener (LogAn::Inc) importet --- Rakefile | 1 + bin/loganinc | 42 +++++++----- lib/logan/inc.rb | 106 +++++++++++++++++++++++++++++ lib/logan/inc/fileparser.rb | 99 +++++++++++++++++++++++++++ lib/rotates.rb | 131 ++++++++++++++++++++++-------------- 5 files changed, 312 insertions(+), 67 deletions(-) create mode 100644 lib/logan/inc.rb create mode 100644 lib/logan/inc/fileparser.rb diff --git a/Rakefile b/Rakefile index e1ade84..b87f34e 100644 --- a/Rakefile +++ b/Rakefile @@ -12,6 +12,7 @@ begin gem.authors = ["Denis Knauf"] gem.files = %w[AUTHORS README.md VERSION lib/**/*.rb test/**/*.rb] gem.require_paths = %w[lib] + gem.add_dependency 'sbdb' end Jeweler::GemcutterTasks.new rescue LoadError diff --git a/bin/loganinc b/bin/loganinc index beae5a7..25117d7 100755 --- a/bin/loganinc +++ b/bin/loganinc @@ -2,21 +2,33 @@ require 'sbdb' require 'safebox' +require 'robustserver' -_ = nil -Dir.mkdir 'logs' rescue Errno::EEXIST -SBDB::Env.new 'logs', SBDB::CREATE | SBDB::Env::INIT_TRANSACTION do |logs| - db = logs['test', :type => SBDB::Btree, :flags => SBDB::CREATE] - db = Safebox::Persistent.new db, db.cursor - $stdout.print "(0)$ " - STDIN.each_with_index do |line, i| - ret = Safebox.run line, Safebox::Box, db, _ - if :value == ret.first - _ = ret.last - $stdout.puts "=> #{ret.last.inspect}" - else - $stdout.puts ret.last.inspect - end - $stdout.print "(#{i+1})$ " +class LogAn::Main < RobustServer + def initialize conf + super + @conf = conf + @logs = LogAn::Loglines.new 'logs' + Dir.mkdir 'etc' rescue Errno::EEXIST + @etc = SBDB::Env.new( 'etc', + log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE, + flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) + @serv = LogAn::Inc.new :sock => TCPServer.new( *@conf[:server]) + @sigs[:INT] = @sigs[:TERM] = method(:shutdown) + end + + def at_exit + @logs and @logs.close + @etc and @etc.close + end + + def shutdown s = nil + $stderr.puts [:signal, s, Signal[s]].inspect + @serv.close + exit 0 + end + + def run + @serv.run end end diff --git a/lib/logan/inc.rb b/lib/logan/inc.rb new file mode 100644 index 0000000..c5f17d0 --- /dev/null +++ b/lib/logan/inc.rb @@ -0,0 +1,106 @@ + +require 'select' + +module Logan + module Inc + class Select <::Select + attr_reader :entries + def initialize *p + super *p + @entries=[] + end + + def run + until @exit || (@exit_on_empty && self.empty?) + cron + run_once 1 + end + end + + def cron + @entries.each do |e| + return if e > Time.now + e.call + @entries.shift + end + end + + class Entry < Time + attr_reader :do + def do &e + @do = e + end + + def call *p + @do.call *p + end + + def self.new *a, &e + x = self.at *a + x.do &e + x + end + end + + def at a, &e + a = Entry.new a, &e if e + @entries << a + @entries.sort! + end + end + + class Socket <::Select::Socket + def event_read sock = @sock, event = :read + begin + @linebuf += sock.readpartial( @bufsize) + rescue EOFError + self.event_eof sock + rescue Errno::EPIPE => e + self.event_errno e, sock, event + rescue IOError + self.event_ioerror sock, event + rescue Errno::ECONNRESET => e + self.event_errno e, sock, event + end + loop do + return if @linebuf.size < 4 + l = @linebuf.unpack( 'N')[0] + return if l > @linebuf.length + @linebuf.remove 4 + event_cmd @linebuf.remove( l) + end + end + end + + class Server < ::Select::Server + attr_reader :config + + def init opts + super opts + @config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!") + end + + def event_new_client sock + { :clientclass => LogAn::Inc::Server::Socket, :config => @config } + end + + class Socket < LogAn::Inc::Server::Socket + attr_reader :config + + def init opts + super opts + @config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!") + end + + def event_cmd cmd + sid, line = cmd.unpack 'Na*' + begin + @config[ :fileparser, sid].event_line line, self + rescue Didi::Config::NoSIDFound, Didi::Config::SIDLoadingError + $stderr.puts( {:sid => sid, :exception => $!, :backtrace => $!.backtrace}.inspect) + end + end + end + end + end +end diff --git a/lib/logan/inc/fileparser.rb b/lib/logan/inc/fileparser.rb new file mode 100644 index 0000000..88616ca --- /dev/null +++ b/lib/logan/inc/fileparser.rb @@ -0,0 +1,99 @@ + +require 'select' + +module Logan + module Inc + module FileParser + module Base + def emit v + @logdb.push @sid, line + end + + def seeks read + inode, seek = (@store[ :seeks, @sid] || "\0\0\0\0\0\0\0\0").unpack 'a4N' + @store[ :seeks, @sid] = [inode, read + seek].pack( 'a4N') + end + end + + class Line + extend Base + attr_reader :sid, :delimiter, :buffer, :linebuffer + @@fileparser = [] + + def self.[] sid + @@fileparser[sid] ||= self.new sid + end + + def initialize logdb, store, sid, delimiter = nil + @logdb, @store, @sid, @delimiter = logdb, store, sid, delimiter || "\n" + @delimiter = Regexp.new "^.*?#{@delimiter}" + @buffer, @linebuffer = Select::Buffer.new( ''), Select::Buffer.new( '') + end + + def event_read str, sock + @buffer += str + @buffer.each! @delimiter do |line| + emit line + seeks line.length + end + end + end + + class Multiline < Line + def initialize sid, delimiter = nil, multiline = nil + super sid, delimiter + @multiline = multiline || /^\d\d-\d\d-\d\d:/ + end + + def event_read str, sock + @buffer += str + @buffer.each! @delimiter do |line| + if line =~ @multiline + emit @linebuffer.to_s + seeks @linebuffer.length + @linebuffer.replace line + else @linebuffer += line + end + end + end + end + + end + + class Command < ::Array + attr_reader :sid + def initialize sid = 0 + @sid = sid + end + + def event_read line, sock + cmd, l = line.unpack 'na*' + self[cmd].call( l, sock) if self[cmd] + end + end + + class SID0 < Command + def initialize store, config, sid = 0 + @store, @config = store, config + self[9] = method :event_hostname + self[10] = method :event_filerotated + end + + def event_filerotated line, sock + sid, d = line.unpack 'Na8' + @store[ :seeks, sid] = d + end + + def event_hostname line, sock + @config[ :hosts].each do |sid,host| + next unless line == host + file = @config[ :files, sid] + next unless file + # command, SID, (inode, seek), file + pc = [1, sid, @store[ :seeks, sid] || "\x00\x00\x00\x00\x00\x00\x00\x00", file].pack 'nNa8a*' + sock.write [pc.length, pc].pack( 'Na*') + end + end + end + end +end diff --git a/lib/rotates.rb b/lib/rotates.rb index cea9fa4..86cdfa4 100644 --- a/lib/rotates.rb +++ b/lib/rotates.rb @@ -2,65 +2,92 @@ require 'sbdb' require 'uuidtools' +require 'logan' -class Rotate - def initialize db, env = db.home, &e - @rdb, @env, @dbs = db, env, {} - self.hash = e || lambda {|k| - [k.timestamp.to_i/60/60/24].pack 'N' - } - end +module LogAn + class Loglines + attr_reader :env, :rdb, :dbs - def hash= e - self.hash &e - end - - def hash &e - @hash_func = e if e - @hash_func - end - - def hashing k - @hash_func.call k - end - - def db_name id - h = hashing id - n = @rdb[ h] - if n - n = UUIDTools::UUID.parse_raw n - else - n = UUIDTools::UUID.timestamp_create - @rdb[ h] = n.raw - info :create => n.to_s + def self.new *paras + ret = obj = super( *paras) + begin ret = yield obj + ensure + SBDB.raise_barrier &obj.method( :close) + end if block_given? + ret end - n - end - def db n - @env[ n.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] - end + def initialize env = nil + env ||= 'logs' + @env = if String === env + Dir.mkdir env rescue Errno::EEXIST + SBDB::Env.new env, + log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE, + flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL + else env + end + @rdb = @env[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] + @queue = @env[ "newids.queue", :type => SBDB::Queue, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT, :re_len => 16] + @dbs = {} + self.hash = lambda {|k| + [k.timestamp.to_i/60/60].pack 'N' # Hour-based rotation + } + end - def sync - @dbs.each{|n,db|db.sync} - @rdb.sync - end + def close + @env.close + end - def close - @dbs.each{|n,db|db.close 0} - @rdb.close 0 - end + def hash_func= exe + hash_func &exe + end - def put k, v, f = k - id = UUIDTools::UUID.timestamp_create - s = [0x10, v].pack 'Na*' - n = db_name id - db( n)[ id.raw] = s - end - alias emit put + def hash_func &exe + @hash_func = exe if exe + @hash_func + end - def get k - n = db_name id - db( n)[ id.raw] = s + def hashing key + @hash_func.call key + end + + def db_name id + hash = hashing id + name = @rdb[ hash] + if name + name = UUIDTools::UUID.parse_raw name + else + name = UUIDTools::UUID.timestamp_create + @rdb[ hash] = name.raw + end + name + end + + def db name + @env[ name.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] + end + + def sync + @dbs.each {|name, db| db.sync } + @rdb.sync + end + + def close + @dbs.each {|name, db| db.close 0 } + @rdb.close 0 + end + + def put val, sid = nil + id = UUIDTools::UUID.timestamp_create + dat = [sid || 0x10, val].pack 'Na*' + name = db_name id + db( name)[ id.raw] = dat + end + alias emit put + + def get key + name = db_name id + db( name)[ id.raw] + end end end