From c6ff411ae1c7de6c1516b98c9dc3839ab54ce33b Mon Sep 17 00:00:00 2001 From: Denis Knauf Date: Fri, 26 Feb 2010 01:57:26 +0100 Subject: [PATCH] bin/logan: Proof of Concept, but it looks nice. CStruct: I do not know. It is not what i want. bin/box.rb: $SAFE-tests. --- README.md | 33 ++++++++++++++++++++ bin/box.rb | 40 ++++++++++++++++++++++++ bin/logan | 60 +++++++++++++++++++++++++++++++++++ lib/cstruct.rb | 12 ++++--- lib/logan/types/syslog.rb | 11 +------ lib/rotates.rb | 66 +++++++++++++++++++++++++++++++++++++++ logan.gemspec | 40 ++++++++++++++++++++++++ 7 files changed, 247 insertions(+), 15 deletions(-) create mode 100644 README.md create mode 100755 bin/box.rb create mode 100644 bin/logan create mode 100644 lib/rotates.rb create mode 100644 logan.gemspec diff --git a/README.md b/README.md new file mode 100644 index 0000000..c74947b --- /dev/null +++ b/README.md @@ -0,0 +1,33 @@ +Status +====== + +Proof of Concept! + +Derzeit lote ich nur aus, was alles benoetigt wird. + +Sicherheit +========== + +Verarbeitung wird jeweils abgeschottet und darf nicht auf andere Daten zugreifen. +$SAFE = 4 + +Map auf Logdaten +================ + +Liest aus der Queue, verarbeitet und schreibt in eine andere Datenbank. + +Parallelisierung +---------------- + +Eine DB, die speichert, wer an was arbeitet. Koennte langsam werden. + +MapReduce allgemein +==================== + +Woher kommt die Information, dass gearbeitet werden kann? Queue/Stream/im Prozess. + +Piping +====== + +MapReduce hintereinander gepipet. Queue/Stream simpel, +wenn jeweils ein Prozess/Thread zustaendig ist. Ein Prozess komplexer. diff --git a/bin/box.rb b/bin/box.rb new file mode 100755 index 0000000..dc8ce62 --- /dev/null +++ b/bin/box.rb @@ -0,0 +1,40 @@ +#!/usr/bin/ruby + +require 'thread' + +class Queue + attr_reader :que, :waiting +end + +Thread.abort_on_exception = true +q, o = Queue.new, Queue.new +puts q.inspect + +t = Thread.new( q, o) do |q, o| + begin + o << 3 +o.que.taint +q.que.taint +o.waiting.taint +q.waiting.taint + $SAFE = 3 + loop do + i = q.pop + begin + o.push eval(i) + rescue Object + o.push [$!.class, $!, $!.backtrace].inspect + end + end + rescue Object + o.push [$!.class, $!, $!.backtrace].inspect + end +end + +Thread.new( o) {|o| loop{$stdout.puts "=> #{o.pop.inspect}"} } + +STDIN.each_with_index do |l,i| + l.untaint + q.push l + $stdout.print "(#{i})> " +end diff --git a/bin/logan b/bin/logan new file mode 100644 index 0000000..9c9eab9 --- /dev/null +++ b/bin/logan @@ -0,0 +1,60 @@ +#!/usr/bin/ruby + +require 'sbdb' + +class Emit + def initialize env + @env = env + end + + def emit f, k, v + env[ "#{f}/"][ k] = v + end +end + +class Worker + class Box + def initialize e + @emit = e + end + + def emit f, k, v + @emit.emit f, k, v + end + end + + def emit f, k, v + @out.push [f, k, v] + end + + def initalize i, o + @in, @out = i, o + Thread.new do + $SAFE = 3 + @in.each do |o| + o.data + end + end + end +end + +SBDB::Env.new 'conf' do |conf| + SBDB::Env.new 'logs' do |logs| + SBDB::Env.new 'cache' do |cache| + begin + wn = conf['worker','conf',flags: SBDB::READONLY]['worker'] + inq, outq = SizedQueue.new( 1), SizedQueue.new( 1) + @worker = wn.times.map{ Worker.new inq, outq } + Thread.new( oq) do |oq| + cache[ "#{oq[0]}/#{}"][ oq[]] + end + emit = Emit.new cache + box = Box.new emit + while line = logs['newids'].get nil, "\0\0\0\0", nil, SBDB::CONSUME_WAIT + box.map line + end + ensure + end + end + end +end diff --git a/lib/cstruct.rb b/lib/cstruct.rb index 5a9db4f..7d274e8 100644 --- a/lib/cstruct.rb +++ b/lib/cstruct.rb @@ -1,5 +1,6 @@ class CStruct - def new post, *p + def self.new *p + pre = p.pop if p.last.kind_of? Hash Class.new do include CStruct::Mix eval <<-EOF @@ -7,8 +8,8 @@ class CStruct def self._directives ["#{p.map{|n,v|"#{v}"}.join}", #{post}] end - def initialize data - #{p.map{|n,v|"@#{n}"}.join', '} = super data + def initialize data = nil + #{p.map{|n,v|"@#{n}"}.join', '} = _parse data end def _create super #{p.map{|n,v|"@#{n}"}.join', '} @@ -19,7 +20,8 @@ class CStruct end module CStruct::Mix - def initialize data + def _parse data = nil + return unless data pre, pos = self.class._directives if pos.nil? or pos == 0 then data.unpack pre elsif pos == 1 then data.unpack pre+'a*' @@ -41,7 +43,7 @@ module CStruct::Mix else r = data[ 0...-pos ].pack pre r += data[ -pos..-1 ].inject( [r.length]) {|i,j| j.push i+j.last}[1...-1].pack 'N*' - r + data[ -pos..-1 ].pack 'a*'*(pos-1) + r + data[ -pos..-1 ].pack( 'a*'*(pos-1)) end end end diff --git a/lib/logan/types/syslog.rb b/lib/logan/types/syslog.rb index 032e912..b172515 100644 --- a/lib/logan/types/syslog.rb +++ b/lib/logan/types/syslog.rb @@ -1,12 +1,3 @@ require 'logan' -class Logan::Type::Syslog - include CStruct - def self._directives - ['a*', 0, :] - end - - def parse - - end -end +Logan.add CStruct.new( :line) diff --git a/lib/rotates.rb b/lib/rotates.rb new file mode 100644 index 0000000..cea9fa4 --- /dev/null +++ b/lib/rotates.rb @@ -0,0 +1,66 @@ +#!/usr/bin/ruby + +require 'sbdb' +require 'uuidtools' + +class Rotate + def initialize db, env = db.home, &e + @rdb, @env, @dbs = db, env, {} + self.hash = e || lambda {|k| + [k.timestamp.to_i/60/60/24].pack 'N' + } + end + + def hash= e + self.hash &e + end + + def hash &e + @hash_func = e if e + @hash_func + end + + def hashing k + @hash_func.call k + end + + def db_name id + h = hashing id + n = @rdb[ h] + if n + n = UUIDTools::UUID.parse_raw n + else + n = UUIDTools::UUID.timestamp_create + @rdb[ h] = n.raw + info :create => n.to_s + end + n + end + + def db n + @env[ n.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT] + end + + def sync + @dbs.each{|n,db|db.sync} + @rdb.sync + end + + def close + @dbs.each{|n,db|db.close 0} + @rdb.close 0 + end + + def put k, v, f = k + id = UUIDTools::UUID.timestamp_create + s = [0x10, v].pack 'Na*' + n = db_name id + db( n)[ id.raw] = s + end + alias emit put + + def get k + n = db_name id + db( n)[ id.raw] = s + end +end diff --git a/logan.gemspec b/logan.gemspec new file mode 100644 index 0000000..e8d9c39 --- /dev/null +++ b/logan.gemspec @@ -0,0 +1,40 @@ +# Generated by jeweler +# DO NOT EDIT THIS FILE DIRECTLY +# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command +# -*- encoding: utf-8 -*- + +Gem::Specification.new do |s| + s.name = %q{logan} + s.version = "0.0.0" + + s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version= + s.authors = ["Denis Knauf"] + s.date = %q{2010-02-24} + s.description = %q{} + s.email = %q{Denis.Knauf@gmail.com} + s.extra_rdoc_files = [ + "LICENSE" + ] + s.files = [ + "VERSION", + "lib/cstruct.rb", + "lib/logan.rb", + "lib/logan/types/syslog.rb" + ] + s.homepage = %q{http://github.com/DenisKnauf/logan} + s.rdoc_options = ["--charset=UTF-8"] + s.require_paths = ["lib"] + s.rubygems_version = %q{1.3.5} + s.summary = %q{Logdata analysing database} + + if s.respond_to? :specification_version then + current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION + s.specification_version = 3 + + if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then + else + end + else + end +end +