bin/logan: Proof of Concept, but it looks nice. CStruct: I do not know. It is not what i want. bin/box.rb: $SAFE-tests.

This commit is contained in:
Denis Knauf 2010-02-26 01:57:26 +01:00
parent 2c0a76f604
commit c6ff411ae1
7 changed files with 247 additions and 15 deletions

33
README.md Normal file
View file

@ -0,0 +1,33 @@
Status
======
Proof of Concept!
Derzeit lote ich nur aus, was alles benoetigt wird.
Sicherheit
==========
Verarbeitung wird jeweils abgeschottet und darf nicht auf andere Daten zugreifen.
$SAFE = 4
Map auf Logdaten
================
Liest aus der Queue, verarbeitet und schreibt in eine andere Datenbank.
Parallelisierung
----------------
Eine DB, die speichert, wer an was arbeitet. Koennte langsam werden.
MapReduce allgemein
====================
Woher kommt die Information, dass gearbeitet werden kann? Queue/Stream/im Prozess.
Piping
======
MapReduce hintereinander gepipet. Queue/Stream simpel,
wenn jeweils ein Prozess/Thread zustaendig ist. Ein Prozess komplexer.

40
bin/box.rb Executable file
View file

@ -0,0 +1,40 @@
#!/usr/bin/ruby
require 'thread'
class Queue
attr_reader :que, :waiting
end
Thread.abort_on_exception = true
q, o = Queue.new, Queue.new
puts q.inspect
t = Thread.new( q, o) do |q, o|
begin
o << 3
o.que.taint
q.que.taint
o.waiting.taint
q.waiting.taint
$SAFE = 3
loop do
i = q.pop
begin
o.push eval(i)
rescue Object
o.push [$!.class, $!, $!.backtrace].inspect
end
end
rescue Object
o.push [$!.class, $!, $!.backtrace].inspect
end
end
Thread.new( o) {|o| loop{$stdout.puts "=> #{o.pop.inspect}"} }
STDIN.each_with_index do |l,i|
l.untaint
q.push l
$stdout.print "(#{i})> "
end

60
bin/logan Normal file
View file

@ -0,0 +1,60 @@
#!/usr/bin/ruby
require 'sbdb'
class Emit
def initialize env
@env = env
end
def emit f, k, v
env[ "#{f}/"][ k] = v
end
end
class Worker
class Box
def initialize e
@emit = e
end
def emit f, k, v
@emit.emit f, k, v
end
end
def emit f, k, v
@out.push [f, k, v]
end
def initalize i, o
@in, @out = i, o
Thread.new do
$SAFE = 3
@in.each do |o|
o.data
end
end
end
end
SBDB::Env.new 'conf' do |conf|
SBDB::Env.new 'logs' do |logs|
SBDB::Env.new 'cache' do |cache|
begin
wn = conf['worker','conf',flags: SBDB::READONLY]['worker']
inq, outq = SizedQueue.new( 1), SizedQueue.new( 1)
@worker = wn.times.map{ Worker.new inq, outq }
Thread.new( oq) do |oq|
cache[ "#{oq[0]}/#{}"][ oq[]]
end
emit = Emit.new cache
box = Box.new emit
while line = logs['newids'].get nil, "\0\0\0\0", nil, SBDB::CONSUME_WAIT
box.map line
end
ensure
end
end
end
end

View file

@ -1,5 +1,6 @@
class CStruct
def new post, *p
def self.new *p
pre = p.pop if p.last.kind_of? Hash
Class.new do
include CStruct::Mix
eval <<-EOF
@ -7,8 +8,8 @@ class CStruct
def self._directives
["#{p.map{|n,v|"#{v}"}.join}", #{post}]
end
def initialize data
#{p.map{|n,v|"@#{n}"}.join', '} = super data
def initialize data = nil
#{p.map{|n,v|"@#{n}"}.join', '} = _parse data
end
def _create
super #{p.map{|n,v|"@#{n}"}.join', '}
@ -19,7 +20,8 @@ class CStruct
end
module CStruct::Mix
def initialize data
def _parse data = nil
return unless data
pre, pos = self.class._directives
if pos.nil? or pos == 0 then data.unpack pre
elsif pos == 1 then data.unpack pre+'a*'
@ -41,7 +43,7 @@ module CStruct::Mix
else
r = data[ 0...-pos ].pack pre
r += data[ -pos..-1 ].inject( [r.length]) {|i,j| j.push i+j.last}[1...-1].pack 'N*'
r + data[ -pos..-1 ].pack 'a*'*(pos-1)
r + data[ -pos..-1 ].pack( 'a*'*(pos-1))
end
end
end

View file

@ -1,12 +1,3 @@
require 'logan'
class Logan::Type::Syslog
include CStruct
def self._directives
['a*', 0, :]
end
def parse
end
end
Logan.add CStruct.new( :line)

66
lib/rotates.rb Normal file
View file

@ -0,0 +1,66 @@
#!/usr/bin/ruby
require 'sbdb'
require 'uuidtools'
class Rotate
def initialize db, env = db.home, &e
@rdb, @env, @dbs = db, env, {}
self.hash = e || lambda {|k|
[k.timestamp.to_i/60/60/24].pack 'N'
}
end
def hash= e
self.hash &e
end
def hash &e
@hash_func = e if e
@hash_func
end
def hashing k
@hash_func.call k
end
def db_name id
h = hashing id
n = @rdb[ h]
if n
n = UUIDTools::UUID.parse_raw n
else
n = UUIDTools::UUID.timestamp_create
@rdb[ h] = n.raw
info :create => n.to_s
end
n
end
def db n
@env[ n.to_s, :type => SBDB::Btree, :flags => SBDB::CREATE | SBDB::AUTO_COMMIT]
end
def sync
@dbs.each{|n,db|db.sync}
@rdb.sync
end
def close
@dbs.each{|n,db|db.close 0}
@rdb.close 0
end
def put k, v, f = k
id = UUIDTools::UUID.timestamp_create
s = [0x10, v].pack 'Na*'
n = db_name id
db( n)[ id.raw] = s
end
alias emit put
def get k
n = db_name id
db( n)[ id.raw] = s
end
end

40
logan.gemspec Normal file
View file

@ -0,0 +1,40 @@
# Generated by jeweler
# DO NOT EDIT THIS FILE DIRECTLY
# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command
# -*- encoding: utf-8 -*-
Gem::Specification.new do |s|
s.name = %q{logan}
s.version = "0.0.0"
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Denis Knauf"]
s.date = %q{2010-02-24}
s.description = %q{}
s.email = %q{Denis.Knauf@gmail.com}
s.extra_rdoc_files = [
"LICENSE"
]
s.files = [
"VERSION",
"lib/cstruct.rb",
"lib/logan.rb",
"lib/logan/types/syslog.rb"
]
s.homepage = %q{http://github.com/DenisKnauf/logan}
s.rdoc_options = ["--charset=UTF-8"]
s.require_paths = ["lib"]
s.rubygems_version = %q{1.3.5}
s.summary = %q{Logdata analysing database}
if s.respond_to? :specification_version then
current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
s.specification_version = 3
if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then
else
end
else
end
end