diff --git a/bin/box.rb b/bin/box.rb deleted file mode 100755 index 7515402..0000000 --- a/bin/box.rb +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/ruby - -require 'thread' - -class Box - attr_reader :_, :emited - alias db emited - alias persistent emited - attr_accessor :emited - - def initialize db, _ - @_, @emited = _, db - end - - def emit k, v - @emited[k] = v - end - - def do code - instance_eval code, self.class.to_s, 0 - end -end - -require 'sbdb' - -class Persistent - include Enumerable - def initialize( db) @db, @cursor = db, db.cursor end - def emit( k, v) @db[k] = v end - alias push emit - alias put emit - alias []= emit - def get( k) @db[k] end - alias [] get - alias fetch get - def inspect() "#<%s:0x%016x>" % [ self.class, self.object_id ] end - def each &e - e ? @cursor.each( &e) : Enumerator.new( self, :each) - end - def to_hash - h = {} - each {|k, v| h[ k] = v } - h - end -end - -#Persistent.freeze - -r = nil -Dir.mkdir 'logs' rescue Errno::EEXIST -SBDB::Env.new 'logs', SBDB::CREATE | SBDB::Env::INIT_TRANSACTION do |logs| - db = Persistent.new logs['test', :type => SBDB::Btree, :flags => SBDB::CREATE] - $stdout.print "(0)$ " - STDIN.each_with_index do |l, i| - r = Thread.new do - l.untaint - $SAFE = 4 - b = Box.new db, r - begin - b.do( l) - rescue Object - $! - end - end.value - $stdout.print "=> #{r.inspect}\n(#{i+1})$ " - end -end diff --git a/bin/box2.rb b/bin/box2.rb deleted file mode 100755 index dd02b06..0000000 --- a/bin/box2.rb +++ /dev/null @@ -1,87 +0,0 @@ -#!/usr/bin/ruby - -require 'sbdb' - -module Sandbox - def self.run *paras, &exe - exe = paras.shift unless exe - box = paras.shift || Class - Thread.new do - $SAFE = 4 - this = box.new *paras - begin - [:value, this.instance_eval( exe, "Sandbox")] - rescue Object - [:exception, $!] - end - end.value - end - - def self.create_class *paras, &exe - exe = paras.shift unless exe - run Class, *paras do - eval exe - self - end - end - alias new_class create_class -end - -class Box - attr_reader :_, :db - - def initialize db, _ - @_, @db = _, db - end - - def put( key, val) @db[key] = val end - def get( key) @db[key] end -end - -class ClassBuilder -end - -class Emit - def initialize( db) @db = db end - def emit( key, val) @db[key] = val end - def inspect() "#<%s:0x%016x>" % [ self.class, self.object_id ] end -end - -class Persistent < Emit - include Enumerable - def initialize db, cursor - super db - @cursor = cursor - end - alias put emit - alias []= emit - def get( key) @db[key] end - alias [] get - alias fetch get - def each &exe - exe ? @cursor.each( &exe) : Enumerator.new( self, :each) - end - def to_hash - rh = {} - each {|key, val| rh[ key] = val } - rh - end -end - -_ = nil -Dir.mkdir 'logs' rescue Errno::EEXIST -SBDB::Env.new 'logs', SBDB::CREATE | SBDB::Env::INIT_TRANSACTION do |logs| - db = logs['test', :type => SBDB::Btree, :flags => SBDB::CREATE] - db = Persistent.new db, db.cursor - $stdout.print "(0)$ " - STDIN.each_with_index do |line, i| - ret = Sandbox.run line, Box, db, _ - if :value == ret.first - _ = ret.last - $stdout.puts "=> #{ret.last.inspect}" - else - $stdout.puts ret.last.inspect - end - $stdout.print "(#{i+1})$ " - end -end diff --git a/bin/box3.rb b/bin/box3.rb deleted file mode 100755 index beae5a7..0000000 --- a/bin/box3.rb +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/ruby - -require 'sbdb' -require 'safebox' - -_ = nil -Dir.mkdir 'logs' rescue Errno::EEXIST -SBDB::Env.new 'logs', SBDB::CREATE | SBDB::Env::INIT_TRANSACTION do |logs| - db = logs['test', :type => SBDB::Btree, :flags => SBDB::CREATE] - db = Safebox::Persistent.new db, db.cursor - $stdout.print "(0)$ " - STDIN.each_with_index do |line, i| - ret = Safebox.run line, Safebox::Box, db, _ - if :value == ret.first - _ = ret.last - $stdout.puts "=> #{ret.last.inspect}" - else - $stdout.puts ret.last.inspect - end - $stdout.print "(#{i+1})$ " - end -end diff --git a/bin/loganinc b/bin/loganinc index 94edc3d..18563dd 100755 --- a/bin/loganinc +++ b/bin/loganinc @@ -4,6 +4,7 @@ require 'logan' require 'logan/inc' opts = {} +opts[:inspector] = ARGV[0] == '--inspector' ? ARGV.shift : false opts[:server] = if ARGV[1] ARGV elsif ARGV[0] @@ -14,6 +15,17 @@ opts[:server][1] = opts[:server][1].to_i logan = LogAn::Inc::Main.new opts begin + logan.instance_eval do + @inspector_server = UNIXServer.new 'loganinc.inspector.sock' + Thread.new do + loop do + sock = @inspector_server.accept + sock.each_line do |line| + sock.puts eval( line).inspect + end + end + end + end if opts[:inspector] logan.main rescue Object logan.at_exit diff --git a/bin/logansh b/bin/logansh new file mode 100755 index 0000000..7c8cd72 --- /dev/null +++ b/bin/logansh @@ -0,0 +1,12 @@ +#!/usr/bin/env ruby + +require 'logan/analyse' +require 'active_support' +require 'irb' + +$logan = LogAn::Analyse.new 'logs' +begin + IRB.start __FILE__ +ensure + $logan.close +end diff --git a/lib/logan/analyse.rb b/lib/logan/analyse.rb new file mode 100644 index 0000000..cbf366b --- /dev/null +++ b/lib/logan/analyse.rb @@ -0,0 +1,51 @@ + +require 'logan/loglines' +require 'time' + +class LogAn::Analyse + attr_reader :lines + + def close + @lines.close + end + + def initialize lines + @lines = String === lines ? LogAn::Loglines.new( lines) : lines + end + + def extremum val + val = case val + when String then Time.parse val + when Integer then Time.at val + when Time then val + else raise ArgumentError, "Unknwon type: #{val}", caller[ 1..-1] + end + end + + def timerange min, max = nil + exend = false + min, max, exend = min.min, min.max, min.exclude_end? if Range === min + Range.new extremum( min), extremum( max), exend + end + + def dbs min, max = nil, &exe + return Enumerator.new( self, :dbs, min, max) unless exe + range = timerange min, max + @lines.rdb.each do |time, db| + exe.call db + end + end + + def search min, max = nil, &exe + dbs = @lines.dbs + range = timerange min, max + @lines.rdb.each do |time, db| + dbs[ UUIDTools::UUID.parse_raw( db)].each &exe if range === Time.at( *time.unpack( 'N')) + end + end + alias [] search + + def each min, max = nil, &exe + exe ? search( min, max, &exe) : Enumerator.new( self, :search, min, max) + end +end diff --git a/lib/logan/cache.rb b/lib/logan/cache.rb index 8c6087b..bfd13b8 100644 --- a/lib/logan/cache.rb +++ b/lib/logan/cache.rb @@ -9,17 +9,21 @@ class LogAn::Cache @source, @data, self.type = source, data || {}, type end + def close + @source.close + end + def flush! - @data.each {|k,v| @obj[k] = v } + @data.each {|k,v| @source[k] = v } @data = {} end def dget k - @data[k] ||= @obj[k] + @data[k] ||= @source[k] end def oget k - @data[k] || @obj[k] + @data[k] || @source[k] end def dset k, v @@ -27,7 +31,7 @@ class LogAn::Cache end def oset k, v - @obj[k] = v + @source[k] = v end def type= type @@ -42,38 +46,89 @@ class LogAn::Cache def write_cache= type @type &= ~ (type ? 0 : 2) - define_singleton_method :[], method( type ? :oset : :dset) + define_singleton_method :[]=, method( type ? :oset : :dset) end - #include Enumerable - #def each &e - #return Enumerator.new self, :each unless e - #flush! - #@obj.each &e - #self - #end + include Enumerable + def each *paras + return Enumerator.new self, :each unless block_given? + flush! if @type&2 == 2 + @source.each_keys( *paras) do |key| + yield key, self[key] + end + self + end end class LogAn::AutoValueConvertHash include Enumerable + attr_reader :source - def initialize obj, encode = nil, each = nil, &decode - @object, @encoder = obj, decode.nil? ? encode || Marshal.method( :dump) : nil, - @each = each || obj.method( :each) rescue NameError - @decode = decode || Marshal.method( :restore) + def initialize source, encode = nil, each = nil, &decode + @source, @encode = source, encode || ( decode.nil? && Marshal.method( :dump) ) + @each, @decode = each, decode || Marshal.method( :restore) + @each ||= source.method( :each) rescue NameError + define_singleton_method :encode, &@encode if @encode + define_singleton_method :decode, &@decode if @decode + LogAn::Logging.debug encode: @encode, decode: @decode, each: @each + end + + def close + @source.close end def [] k - decode.call @object[k] + decode @source[k] end def []= k, v - @object[k] = encode.call v + @source[k] = encode v end def each *paras + return Enumerator.new self, :each unless block_given? @each.call *paras do |k, v| yield k, decode( v) end end + + def each_keys *paras + return Enumerator.new self, :each_keys unless block_given? + @each.call *paras do |k, v| + yield k + end + end +end + +class LogAn::AutoKeyConvertHash + include Enumerable + attr_reader :source + + def initialize source, encode = nil, each = nil, &decode + @source, @encode = source, encode || ( decode.nil? && Marshal.method( :dump) ) + @each, @decode = each, decode || Marshal.method( :restore) + @each ||= source.method( :each) rescue NameError + define_singleton_method :encode, &@encode if @encode + define_singleton_method :decode, &@decode if @decode + LogAn::Logging.debug encode: @encode, decode: @decode, each: @each + end + + def close + @source.close + end + + def [] k + @source[ encode( k)] + end + + def []= k, v + @source[ encode( k)] = v + end + + def each *paras + return Enumerator.new self, :each unless block_given? + @each.call *paras do |k, v| + yield decode( k), v + end + end end diff --git a/lib/logan/inc/command.rb b/lib/logan/inc/command.rb index 29aab03..2525ce2 100644 --- a/lib/logan/inc/command.rb +++ b/lib/logan/inc/command.rb @@ -14,24 +14,31 @@ module LogAn end class SID0 < Command - def initialize store, config, sid = 0 - @store, @config = store, config + class < flags || SBDB::RDONLY] - ret = LogAn::AutoValueConvertHash.new ret if type&4 > 0 + ret = LogAn::AutoValueConvertHash.new ret, &e if type&4 > 0 or e ret = LogAn::Cache.new ret, type&3 if type&3 > 0 ret end + # Open Store. + def store env, db, type = nil, flags = nil, &e + LogAn::Logging.info :store, :open, "sids.cnf", db, type + cache env[ 'sids.cnf', db, :flags => flags || SBDB::CREATE | SBDB::AUTO_COMMIT], type, &e + end + + # Open Config. + def config env, db, type = nil, flags = nil, &e + LogAn::Logging.info :config, :open, "sids.cnf", db, type + cache env[ 'sids.cnf', db, :flags => flags || SBDB::RDONLY], type, &e + end + # Prepare Server. # # * conf: @@ -30,44 +51,79 @@ module LogAn::Inc # : Server-Configuration. default { port: 1087 } def initialize conf super - @conf = {} + # Copy config - changes possible + @conf = {} conf.each {|key, val| @conf[key]= val } + # Default directories %w[logs etc].each {|key| @conf[key.to_sym] = key } + # Open Loglines-databases @logs = LogAn::Loglines.new @conf[:logs] + LogAn::Inc::FileParser::Base.logdb = @logs + # Open config-databases Dir.mkdir @conf[:etc] rescue Errno::EEXIST @etc = SBDB::Env.new( @conf[:etc], log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE, flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) - # Set inc-config - stored in etc/inc.cnf - @conf[:inc] = {} - %w[hosts files fileparser].each {|key| @conf[:inc][key.to_sym] = config( @etc, key) } - @store = LogAn::Cache.new LogAn::AutoValueConvertHash.new( @etc[ 'sids.store', 'seeks', SBDB::Recno, SBDB::CREATE | SBDB::AUTO_COMMIT]), 3 + + # Open configs + begin + configs = @conf[:configs] = {} + %w[hosts files].each {|key| configs[key.to_sym] = config( @etc, key) {|l| l } } + configs[:fileparser] = config @etc, 'fileparser', &Safebox.method( :eval) + LogAn::Inc::SID0.config = configs + end + + # Open seeks-database + begin + stores = @conf[:stores] = {} + db = @etc[ 'sids.store', 'seeks', SBDB::Recno, SBDB::CREATE | SBDB::AUTO_COMMIT] + db = LogAn::AutoValueConvertHash.new( db, lambda {|val| val.pack( 'NN') }) {|val| + (val||0.chr*8).unpack( 'NN') + } + stores[:seeks] = LogAn::Cache.new db + LogAn::Inc::FileParser::Base.store = LogAn::Inc::SID0.store = stores + end + + # Select-framework + @select = LogAn::Inc::Select.new + @status = method :status + status # Init Status + # Prepare Inc-server - create server - LogAn::Inc::FileParser::Base.logdb = @logs - LogAn::Inc::FileParser::Base.store = @store - @serv = LogAn::Inc::Server.new :sock => TCPServer.new( *@conf[:server]), :config => @conf[:inc] + @serv = LogAn::Inc::Server.new :sock => TCPServer.new( *@conf[:server]), + :config => @conf[:configs], :select => @select + LogAn::Logging.debug @serv + # Shutdown on signals @sigs[:INT] = @sigs[:TERM] = method( :shutdown) + rescue Object # It's better to close everything, because BDB doesn't like unexpected exits self.at_exit raise $! end + def status + @select.at Time.now+5, &@status + LogAn::Logging.info :recv_lines => @logs.counter, + :connections => @serv && @serv.clients.map {|cl| cl.sock.peeraddr } + @conf[ :stores].each {|key, db| db.flush!} + end + # Will be called at exit. Will close all opened BDB::Env def at_exit - $stderr.puts :at_exit + LogAn::Logging.info :at_exit @logs and @logs.close @etc and @etc.close end # Shutdown Server cleanly. First shutdown TCPServer. def shutdown signal = nil - $stderr.puts [:signal, signal, Signal[signal]].inspect if signal + LogAn::Logging.info :signal, signal, Signal[ signal] if signal @serv.close exit 0 end diff --git a/lib/logan/inc/server.rb b/lib/logan/inc/server.rb index cacc9d8..bb5b42e 100644 --- a/lib/logan/inc/server.rb +++ b/lib/logan/inc/server.rb @@ -53,13 +53,18 @@ class LogAn::Inc::Select <::Select end class LogAn::Inc::Socket <::Select::Socket + def initialize *p + super( *p) + LogAn::Logging.info :connected, self + end + def event_read sock = @sock, event = :read begin @linebuf += sock.readpartial( @bufsize) rescue EOFError self.event_eof sock - rescue Errno::EPIPE => e - self.event_errno e, sock, event + rescue Errno::EPIPE + self.event_errno $!, sock, event rescue IOError self.event_ioerror sock, event rescue Errno::ECONNRESET => e @@ -73,10 +78,15 @@ class LogAn::Inc::Socket <::Select::Socket event_cmd @linebuf.remove( l) end end + + def close + LogAn::Logging.info :disconnect, self + super + end end class LogAn::Inc::Server < ::Select::Server - attr_reader :config + attr_reader :config, :clients def init opts super opts @@ -92,13 +102,14 @@ class LogAn::Inc::Server < ::Select::Server def init opts super opts + @sid0 = LogAn::Inc::SID0.new @config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!") end def event_cmd cmd sid, line = cmd.unpack 'Na*' - fileparser = @config[:fileparser][sid] - fileparser.event_line line, self if fileparser + fp = sid == 0 ? @sid0 : @config[:fileparser][sid] + fp.event_read line, self if fp end end end diff --git a/lib/logan/loglines.rb b/lib/logan/loglines.rb index cdfcdc2..2f85d78 100644 --- a/lib/logan/loglines.rb +++ b/lib/logan/loglines.rb @@ -6,7 +6,7 @@ require 'logan' module LogAn class Loglines - attr_reader :env, :rdb, :dbs + attr_reader :env, :rdb, :dbs, :counter def self.new *paras ret = obj = super( *paras) @@ -26,11 +26,17 @@ module LogAn flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL else env end - @rdb = @env[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] - @queue = @env[ "newids.queue", :type => SBDB::Queue, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT, :re_len => 16] - @dbs = {} + @rdb = AutoValueConvertHash.new( + AutoKeyConvertHash.new( + @env[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT], + lambda {|key| [key.to_i].pack 'N' }) {|key| Time.at key.unpack( 'N') }, + lambda {|val| String === val ? val : val.raw }) {|val| val && UUIDTools::UUID.parse_raw( val) } + @queue = @env[ "newids.queue", :type => SBDB::Queue, + :flags => SBDB::CREATE | SBDB::AUTO_COMMIT, :re_len => 16] + @dbs, @counter = {}, 0 self.hash_func = lambda {|k| - [k.timestamp.to_i/60/60].pack 'N' # Hour-based rotation + n = k.timestamp.to_i + n -= n % 3600 } end @@ -64,7 +70,8 @@ module LogAn end def db name - @env[ name.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] + @dbs[name] ||= @env[ name.to_s, :type => SBDB::Btree, + :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] end def sync @@ -77,6 +84,8 @@ module LogAn dat = [sid || 0x10, val].pack 'Na*' name = db_name id db( name)[ id.raw] = dat + @counter += 1 + @queue.push id.raw end alias emit put