Rotation and Listener (LogAn::Inc) importet

This commit is contained in:
Denis Knauf 2010-03-23 11:06:05 +01:00
parent fc04254b8e
commit 64a715a1b6
5 changed files with 312 additions and 67 deletions

View file

@ -12,6 +12,7 @@ begin
gem.authors = ["Denis Knauf"] gem.authors = ["Denis Knauf"]
gem.files = %w[AUTHORS README.md VERSION lib/**/*.rb test/**/*.rb] gem.files = %w[AUTHORS README.md VERSION lib/**/*.rb test/**/*.rb]
gem.require_paths = %w[lib] gem.require_paths = %w[lib]
gem.add_dependency 'sbdb'
end end
Jeweler::GemcutterTasks.new Jeweler::GemcutterTasks.new
rescue LoadError rescue LoadError

View file

@ -2,21 +2,33 @@
require 'sbdb' require 'sbdb'
require 'safebox' require 'safebox'
require 'robustserver'
_ = nil class LogAn::Main < RobustServer
Dir.mkdir 'logs' rescue Errno::EEXIST def initialize conf
SBDB::Env.new 'logs', SBDB::CREATE | SBDB::Env::INIT_TRANSACTION do |logs| super
db = logs['test', :type => SBDB::Btree, :flags => SBDB::CREATE] @conf = conf
db = Safebox::Persistent.new db, db.cursor @logs = LogAn::Loglines.new 'logs'
$stdout.print "(0)$ " Dir.mkdir 'etc' rescue Errno::EEXIST
STDIN.each_with_index do |line, i| @etc = SBDB::Env.new( 'etc',
ret = Safebox.run line, Safebox::Box, db, _ log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE,
if :value == ret.first flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL)
_ = ret.last @serv = LogAn::Inc.new :sock => TCPServer.new( *@conf[:server])
$stdout.puts "=> #{ret.last.inspect}" @sigs[:INT] = @sigs[:TERM] = method(:shutdown)
else end
$stdout.puts ret.last.inspect
end def at_exit
$stdout.print "(#{i+1})$ " @logs and @logs.close
@etc and @etc.close
end
def shutdown s = nil
$stderr.puts [:signal, s, Signal[s]].inspect
@serv.close
exit 0
end
def run
@serv.run
end end
end end

106
lib/logan/inc.rb Normal file
View file

@ -0,0 +1,106 @@
require 'select'
module Logan
module Inc
class Select <::Select
attr_reader :entries
def initialize *p
super *p
@entries=[]
end
def run
until @exit || (@exit_on_empty && self.empty?)
cron
run_once 1
end
end
def cron
@entries.each do |e|
return if e > Time.now
e.call
@entries.shift
end
end
class Entry < Time
attr_reader :do
def do &e
@do = e
end
def call *p
@do.call *p
end
def self.new *a, &e
x = self.at *a
x.do &e
x
end
end
def at a, &e
a = Entry.new a, &e if e
@entries << a
@entries.sort!
end
end
class Socket <::Select::Socket
def event_read sock = @sock, event = :read
begin
@linebuf += sock.readpartial( @bufsize)
rescue EOFError
self.event_eof sock
rescue Errno::EPIPE => e
self.event_errno e, sock, event
rescue IOError
self.event_ioerror sock, event
rescue Errno::ECONNRESET => e
self.event_errno e, sock, event
end
loop do
return if @linebuf.size < 4
l = @linebuf.unpack( 'N')[0]
return if l > @linebuf.length
@linebuf.remove 4
event_cmd @linebuf.remove( l)
end
end
end
class Server < ::Select::Server
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_new_client sock
{ :clientclass => LogAn::Inc::Server::Socket, :config => @config }
end
class Socket < LogAn::Inc::Server::Socket
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_cmd cmd
sid, line = cmd.unpack 'Na*'
begin
@config[ :fileparser, sid].event_line line, self
rescue Didi::Config::NoSIDFound, Didi::Config::SIDLoadingError
$stderr.puts( {:sid => sid, :exception => $!, :backtrace => $!.backtrace}.inspect)
end
end
end
end
end
end

View file

@ -0,0 +1,99 @@
require 'select'
module Logan
module Inc
module FileParser
module Base
def emit v
@logdb.push @sid, line
end
def seeks read
inode, seek = (@store[ :seeks, @sid] || "\0\0\0\0\0\0\0\0").unpack 'a4N'
@store[ :seeks, @sid] = [inode, read + seek].pack( 'a4N')
end
end
class Line
extend Base
attr_reader :sid, :delimiter, :buffer, :linebuffer
@@fileparser = []
def self.[] sid
@@fileparser[sid] ||= self.new sid
end
def initialize logdb, store, sid, delimiter = nil
@logdb, @store, @sid, @delimiter = logdb, store, sid, delimiter || "\n"
@delimiter = Regexp.new "^.*?#{@delimiter}"
@buffer, @linebuffer = Select::Buffer.new( ''), Select::Buffer.new( '')
end
def event_read str, sock
@buffer += str
@buffer.each! @delimiter do |line|
emit line
seeks line.length
end
end
end
class Multiline < Line
def initialize sid, delimiter = nil, multiline = nil
super sid, delimiter
@multiline = multiline || /^\d\d-\d\d-\d\d:/
end
def event_read str, sock
@buffer += str
@buffer.each! @delimiter do |line|
if line =~ @multiline
emit @linebuffer.to_s
seeks @linebuffer.length
@linebuffer.replace line
else @linebuffer += line
end
end
end
end
end
class Command < ::Array
attr_reader :sid
def initialize sid = 0
@sid = sid
end
def event_read line, sock
cmd, l = line.unpack 'na*'
self[cmd].call( l, sock) if self[cmd]
end
end
class SID0 < Command
def initialize store, config, sid = 0
@store, @config = store, config
self[9] = method :event_hostname
self[10] = method :event_filerotated
end
def event_filerotated line, sock
sid, d = line.unpack 'Na8'
@store[ :seeks, sid] = d
end
def event_hostname line, sock
@config[ :hosts].each do |sid,host|
next unless line == host
file = @config[ :files, sid]
next unless file
# command, SID, (inode, seek), file
pc = [1, sid, @store[ :seeks, sid] || "\x00\x00\x00\x00\x00\x00\x00\x00", file].pack 'nNa8a*'
sock.write [pc.length, pc].pack( 'Na*')
end
end
end
end
end

View file

@ -2,65 +2,92 @@
require 'sbdb' require 'sbdb'
require 'uuidtools' require 'uuidtools'
require 'logan'
class Rotate module LogAn
def initialize db, env = db.home, &e class Loglines
@rdb, @env, @dbs = db, env, {} attr_reader :env, :rdb, :dbs
self.hash = e || lambda {|k|
[k.timestamp.to_i/60/60/24].pack 'N'
}
end
def hash= e def self.new *paras
self.hash &e ret = obj = super( *paras)
end begin ret = yield obj
ensure
def hash &e SBDB.raise_barrier &obj.method( :close)
@hash_func = e if e end if block_given?
@hash_func ret
end
def hashing k
@hash_func.call k
end
def db_name id
h = hashing id
n = @rdb[ h]
if n
n = UUIDTools::UUID.parse_raw n
else
n = UUIDTools::UUID.timestamp_create
@rdb[ h] = n.raw
info :create => n.to_s
end end
n
end
def db n def initialize env = nil
@env[ n.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] env ||= 'logs'
end @env = if String === env
Dir.mkdir env rescue Errno::EEXIST
SBDB::Env.new env,
log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE,
flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL
else env
end
@rdb = @env[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT]
@queue = @env[ "newids.queue", :type => SBDB::Queue, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT, :re_len => 16]
@dbs = {}
self.hash = lambda {|k|
[k.timestamp.to_i/60/60].pack 'N' # Hour-based rotation
}
end
def sync def close
@dbs.each{|n,db|db.sync} @env.close
@rdb.sync end
end
def close def hash_func= exe
@dbs.each{|n,db|db.close 0} hash_func &exe
@rdb.close 0 end
end
def put k, v, f = k def hash_func &exe
id = UUIDTools::UUID.timestamp_create @hash_func = exe if exe
s = [0x10, v].pack 'Na*' @hash_func
n = db_name id end
db( n)[ id.raw] = s
end
alias emit put
def get k def hashing key
n = db_name id @hash_func.call key
db( n)[ id.raw] = s end
def db_name id
hash = hashing id
name = @rdb[ hash]
if name
name = UUIDTools::UUID.parse_raw name
else
name = UUIDTools::UUID.timestamp_create
@rdb[ hash] = name.raw
end
name
end
def db name
@env[ name.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT]
end
def sync
@dbs.each {|name, db| db.sync }
@rdb.sync
end
def close
@dbs.each {|name, db| db.close 0 }
@rdb.close 0
end
def put val, sid = nil
id = UUIDTools::UUID.timestamp_create
dat = [sid || 0x10, val].pack 'Na*'
name = db_name id
db( name)[ id.raw] = dat
end
alias emit put
def get key
name = db_name id
db( name)[ id.raw]
end
end end
end end