Inc: files moved and splitted

This commit is contained in:
Denis Knauf 2010-03-25 10:39:30 +01:00
parent 64a715a1b6
commit 9f9b6b63a2
4 changed files with 149 additions and 139 deletions

View file

@ -1,106 +1,9 @@
require 'select'
require 'logan/inc/server'
require 'logan/inc/fileparser'
require 'logan/inc/command'
module Logan
module Inc
class Select <::Select
attr_reader :entries
def initialize *p
super *p
@entries=[]
end
def run
until @exit || (@exit_on_empty && self.empty?)
cron
run_once 1
end
end
def cron
@entries.each do |e|
return if e > Time.now
e.call
@entries.shift
end
end
class Entry < Time
attr_reader :do
def do &e
@do = e
end
def call *p
@do.call *p
end
def self.new *a, &e
x = self.at *a
x.do &e
x
end
end
def at a, &e
a = Entry.new a, &e if e
@entries << a
@entries.sort!
end
end
class Socket <::Select::Socket
def event_read sock = @sock, event = :read
begin
@linebuf += sock.readpartial( @bufsize)
rescue EOFError
self.event_eof sock
rescue Errno::EPIPE => e
self.event_errno e, sock, event
rescue IOError
self.event_ioerror sock, event
rescue Errno::ECONNRESET => e
self.event_errno e, sock, event
end
loop do
return if @linebuf.size < 4
l = @linebuf.unpack( 'N')[0]
return if l > @linebuf.length
@linebuf.remove 4
event_cmd @linebuf.remove( l)
end
end
end
class Server < ::Select::Server
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_new_client sock
{ :clientclass => LogAn::Inc::Server::Socket, :config => @config }
end
class Socket < LogAn::Inc::Server::Socket
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_cmd cmd
sid, line = cmd.unpack 'Na*'
begin
@config[ :fileparser, sid].event_line line, self
rescue Didi::Config::NoSIDFound, Didi::Config::SIDLoadingError
$stderr.puts( {:sid => sid, :exception => $!, :backtrace => $!.backtrace}.inspect)
end
end
end
end
end
end

40
lib/logan/inc/command.rb Normal file
View file

@ -0,0 +1,40 @@
module Logan
module Inc
class Command < ::Array
attr_reader :sid
def initialize sid = 0
@sid = sid
end
def event_read line, sock
cmd, l = line.unpack 'na*'
self[cmd].call( l, sock) if self[cmd]
end
end
class SID0 < Command
def initialize store, config, sid = 0
@store, @config = store, config
self[9] = method :event_hostname
self[10] = method :event_filerotated
end
def event_filerotated line, sock
sid, d = line.unpack 'Na8'
@store[ :seeks, sid] = d
end
def event_hostname line, sock
@config[ :hosts].each do |sid,host|
next unless line == host
file = @config[ :files, sid]
next unless file
# command, SID, (inode, seek), file
pc = [1, sid, @store[ :seeks, sid] || "\x00\x00\x00\x00\x00\x00\x00\x00", file].pack 'nNa8a*'
sock.write [pc.length, pc].pack( 'Na*')
end
end
end
end
end

View file

@ -1,6 +1,4 @@
require 'select'
module Logan
module Inc
module FileParser
@ -57,43 +55,6 @@ module Logan
end
end
end
end
class Command < ::Array
attr_reader :sid
def initialize sid = 0
@sid = sid
end
def event_read line, sock
cmd, l = line.unpack 'na*'
self[cmd].call( l, sock) if self[cmd]
end
end
class SID0 < Command
def initialize store, config, sid = 0
@store, @config = store, config
self[9] = method :event_hostname
self[10] = method :event_filerotated
end
def event_filerotated line, sock
sid, d = line.unpack 'Na8'
@store[ :seeks, sid] = d
end
def event_hostname line, sock
@config[ :hosts].each do |sid,host|
next unless line == host
file = @config[ :files, sid]
next unless file
# command, SID, (inode, seek), file
pc = [1, sid, @store[ :seeks, sid] || "\x00\x00\x00\x00\x00\x00\x00\x00", file].pack 'nNa8a*'
sock.write [pc.length, pc].pack( 'Na*')
end
end
end
end
end

106
lib/logan/inc/server.rb Normal file
View file

@ -0,0 +1,106 @@
require 'select'
module Logan
module Inc
class Select <::Select
attr_reader :entries
def initialize *p
super *p
@entries=[]
end
def run
until @exit || (@exit_on_empty && self.empty?)
cron
run_once 1
end
end
def cron
@entries.each do |e|
return if e > Time.now
e.call
@entries.shift
end
end
class Entry < Time
attr_reader :do
def do &e
@do = e
end
def call *p
@do.call *p
end
def self.new *a, &e
x = self.at *a
x.do &e
x
end
end
def at a, &e
a = Entry.new a, &e if e
@entries << a
@entries.sort!
end
end
class Socket <::Select::Socket
def event_read sock = @sock, event = :read
begin
@linebuf += sock.readpartial( @bufsize)
rescue EOFError
self.event_eof sock
rescue Errno::EPIPE => e
self.event_errno e, sock, event
rescue IOError
self.event_ioerror sock, event
rescue Errno::ECONNRESET => e
self.event_errno e, sock, event
end
loop do
return if @linebuf.size < 4
l = @linebuf.unpack( 'N')[0]
return if l > @linebuf.length
@linebuf.remove 4
event_cmd @linebuf.remove( l)
end
end
end
class Server < ::Select::Server
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_new_client sock
{ :clientclass => LogAn::Inc::Server::Socket, :config => @config }
end
class Socket < LogAn::Inc::Server::Socket
attr_reader :config
def init opts
super opts
@config = opts[:config] or raise( ArgumentError, "#{self.class} needs a Config!")
end
def event_cmd cmd
sid, line = cmd.unpack 'Na*'
begin
@config[ :fileparser, sid].event_line line, self
rescue Didi::Config::NoSIDFound, Didi::Config::SIDLoadingError
$stderr.puts( {:sid => sid, :exception => $!, :backtrace => $!.backtrace}.inspect)
end
end
end
end
end
end