file-queue
This commit is contained in:
parent
1efdf76ffc
commit
3bdd2e6ceb
50
bin/s2l.rb
50
bin/s2l.rb
|
@ -1,48 +1,24 @@
|
||||||
#!/usr/bin/ruby
|
#!/usr/bin/ruby
|
||||||
|
|
||||||
|
$:.push File.join( File.dirname( $0), '..', 'lib')
|
||||||
require 'logger'
|
require 'logger'
|
||||||
|
require 'json'
|
||||||
require 'rubygems'
|
require 'rubygems'
|
||||||
require 'sbdb'
|
|
||||||
require 'uuidtools'
|
require 'uuidtools'
|
||||||
require 'socket'
|
require 'socket'
|
||||||
require 'select'
|
require 'select'
|
||||||
require 'robustserver'
|
require 'robustserver'
|
||||||
require 'active_support'
|
require 'active_support'
|
||||||
require 'syslog2logan/rotate'
|
require 'syslog2logan/rotate'
|
||||||
|
require 'syslog2logan/server'
|
||||||
|
|
||||||
$logger = Logger.new $stderr
|
$logger = Logger.new $stderr
|
||||||
|
$logger.formatter = proc { |severity, datetime, progname, msg| [severity, datetime, progname, msg.inspect].to_json+"\n" }
|
||||||
class S2L < Select::Server
|
|
||||||
attr_accessor :dbs
|
|
||||||
|
|
||||||
def init p
|
|
||||||
super p
|
|
||||||
@dbs = p[:dbs]
|
|
||||||
end
|
|
||||||
|
|
||||||
def event_new_client a
|
|
||||||
logger.debug :connection => {:new => a}
|
|
||||||
{ :clientclass => S2L::Socket, :dbs => @dbs }
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
module Kernel
|
module Kernel
|
||||||
def logger() $logger end
|
def logger() $logger end
|
||||||
end
|
end
|
||||||
|
|
||||||
class S2L::Socket < Select::Socket
|
|
||||||
def init opts
|
|
||||||
@dbs = opts[ :dbs]
|
|
||||||
super opts
|
|
||||||
end
|
|
||||||
|
|
||||||
def event_line v
|
|
||||||
logger.debug :line => v
|
|
||||||
@dbs.emit v
|
|
||||||
end
|
|
||||||
alias emit event_line
|
|
||||||
end
|
|
||||||
|
|
||||||
class Main < RobustServer
|
class Main < RobustServer
|
||||||
def initialize conf
|
def initialize conf
|
||||||
super
|
super
|
||||||
|
@ -50,14 +26,12 @@ class Main < RobustServer
|
||||||
@conf = conf
|
@conf = conf
|
||||||
logger.info :open => S2L
|
logger.info :open => S2L
|
||||||
@serv = S2L.new :sock => TCPServer.new( *@conf[:server])
|
@serv = S2L.new :sock => TCPServer.new( *@conf[:server])
|
||||||
logger.info :create => {:home => @conf[:home]}
|
|
||||||
Dir.mkdir @conf[:home] rescue Errno::EEXIST
|
|
||||||
@sigs[:INT] = @sigs[:TERM] = method(:shutdown)
|
@sigs[:INT] = @sigs[:TERM] = method(:shutdown)
|
||||||
@sigs[:USR1] = method(:state)
|
@sigs[:USR1] = method(:state)
|
||||||
end
|
end
|
||||||
|
|
||||||
def state s = nil
|
def state s = nil
|
||||||
logger.debug :server => @serv
|
logger.debug :server => @serv.class
|
||||||
end
|
end
|
||||||
|
|
||||||
def shutdown s = nil
|
def shutdown s = nil
|
||||||
|
@ -67,18 +41,18 @@ class Main < RobustServer
|
||||||
end
|
end
|
||||||
|
|
||||||
def run
|
def run
|
||||||
logger.info :open => SBDB::Env
|
logger.info :open => @conf[:backend]
|
||||||
SBDB::Env.new( @conf[:home],
|
@conf[:backend][0].new( @conf[:backend][1]) do |backend|
|
||||||
log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE,
|
|
||||||
flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) do |dbenv|
|
|
||||||
logger.info :open => Rotate
|
logger.info :open => Rotate
|
||||||
@serv.dbs = Rotate.new dbenv[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | Bdb::DB_AUTO_COMMIT]
|
@serv.dbs = Rotate.new &backend.to_proc
|
||||||
logger.info :run => @serv
|
logger.info :run => @serv.class
|
||||||
@serv.run
|
@serv.run
|
||||||
|
logger.info :close => @conf[:backend]
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
Main.main :home => 'logs', :server => [ '', 1514], :retries => [1,1] # [10, 10]
|
require 'syslog2logan/backend/tch'
|
||||||
|
Main.main :backend => [ Backend::TCH, {:dir => 'logs'}], :server => [ '', 1514], :retries => [1,1] # [10, 10]
|
||||||
|
|
||||||
logger.info :halted
|
logger.info :halted
|
||||||
|
|
48
lib/file_queue.rb
Normal file
48
lib/file_queue.rb
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
class File
|
||||||
|
def exclusive_lock
|
||||||
|
flock File::LOCK_EX
|
||||||
|
end
|
||||||
|
|
||||||
|
def shared_lock
|
||||||
|
flock File::LOCK_SH
|
||||||
|
end
|
||||||
|
|
||||||
|
def unblock
|
||||||
|
flock File::LOCK_UN
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class FileQueue
|
||||||
|
attr_reader :file, :size
|
||||||
|
alias to_io file
|
||||||
|
|
||||||
|
def initialize file, size = 16
|
||||||
|
@file = case file
|
||||||
|
when File then file
|
||||||
|
else File.open file, 'a+'
|
||||||
|
end
|
||||||
|
@size, @pack = size, "A#{size}"
|
||||||
|
end
|
||||||
|
|
||||||
|
def push *a
|
||||||
|
f = @file
|
||||||
|
f.seek 0, IO::SEEK_END
|
||||||
|
f.exclusive_lock
|
||||||
|
f.write a.pack( @pack*a.length)
|
||||||
|
f.unblock
|
||||||
|
end
|
||||||
|
|
||||||
|
def pop
|
||||||
|
f = @file
|
||||||
|
f.rewind
|
||||||
|
f.exclusive_lock
|
||||||
|
s = f.read( @size).unpack( 'L')[0]
|
||||||
|
f.rewind
|
||||||
|
f.write [s.succ].pack( 'L')
|
||||||
|
f.sync
|
||||||
|
f.shared_lock
|
||||||
|
f.pos = s
|
||||||
|
f.read( @size).unpack( 'L')[0]
|
||||||
|
f.unblock
|
||||||
|
end
|
||||||
|
end
|
83
lib/in_tch.rb
Normal file
83
lib/in_tch.rb
Normal file
|
@ -0,0 +1,83 @@
|
||||||
|
#!/usr/bin/ruby
|
||||||
|
|
||||||
|
require 'logger'
|
||||||
|
require 'rubygems'
|
||||||
|
require 'uuidtools'
|
||||||
|
require 'socket'
|
||||||
|
require 'select'
|
||||||
|
require 'robustserver'
|
||||||
|
require 'active_support'
|
||||||
|
require 'syslog2logan/rotate'
|
||||||
|
|
||||||
|
$logger = Logger.new $stderr
|
||||||
|
|
||||||
|
class S2L < Select::Server
|
||||||
|
attr_accessor :dbs
|
||||||
|
|
||||||
|
def init p
|
||||||
|
super p
|
||||||
|
@dbs = p[:dbs]
|
||||||
|
end
|
||||||
|
|
||||||
|
def event_new_client a
|
||||||
|
logger.debug :connection => {:new => a}
|
||||||
|
{ :clientclass => S2L::Socket, :dbs => @dbs }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
module Kernel
|
||||||
|
def logger() $logger end
|
||||||
|
end
|
||||||
|
|
||||||
|
class S2L::Socket < Select::Socket
|
||||||
|
def init opts
|
||||||
|
@dbs = opts[ :dbs]
|
||||||
|
super opts
|
||||||
|
end
|
||||||
|
|
||||||
|
def event_line v
|
||||||
|
logger.debug :line => v
|
||||||
|
@dbs.emit v
|
||||||
|
end
|
||||||
|
alias emit event_line
|
||||||
|
end
|
||||||
|
|
||||||
|
class Main < RobustServer
|
||||||
|
def initialize conf
|
||||||
|
super
|
||||||
|
@logger = $logger
|
||||||
|
@conf = conf
|
||||||
|
logger.info :open => S2L
|
||||||
|
@serv = S2L.new :sock => TCPServer.new( *@conf[:server])
|
||||||
|
logger.info :create => {:home => @conf[:home]}
|
||||||
|
Dir.mkdir @conf[:home] rescue Errno::EEXIST
|
||||||
|
@sigs[:INT] = @sigs[:TERM] = method(:shutdown)
|
||||||
|
@sigs[:USR1] = method(:state)
|
||||||
|
end
|
||||||
|
|
||||||
|
def state s = nil
|
||||||
|
logger.debug :server => @serv
|
||||||
|
end
|
||||||
|
|
||||||
|
def shutdown s = nil
|
||||||
|
logger.info :shutdown => [s, Signal[s]]
|
||||||
|
@serv.close
|
||||||
|
exit 0
|
||||||
|
end
|
||||||
|
|
||||||
|
def run
|
||||||
|
logger.info :open => SBDB::Env
|
||||||
|
SBDB::Env.new( @conf[:home],
|
||||||
|
log_config: SBDB::Env::LOG_IN_MEMORY | SBDB::Env::LOG_AUTO_REMOVE,
|
||||||
|
flags: SBDB::CREATE | SBDB::Env::INIT_TXN | Bdb::DB_INIT_MPOOL) do |dbenv|
|
||||||
|
logger.info :open => Rotate
|
||||||
|
@serv.dbs = Rotate.new dbenv[ 'rotates.db', :type => SBDB::Btree, :flags => SBDB::CREATE | Bdb::DB_AUTO_COMMIT]
|
||||||
|
logger.info :run => @serv
|
||||||
|
@serv.run
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
Main.main :home => 'logs', :server => [ '', 1514], :retries => [1,1] # [10, 10]
|
||||||
|
|
||||||
|
logger.info :halted
|
16
lib/syslog2logan/backend/base.rb
Normal file
16
lib/syslog2logan/backend/base.rb
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
module Backend
|
||||||
|
end
|
||||||
|
|
||||||
|
class Backend::Base
|
||||||
|
def initialize opts = {}
|
||||||
|
if block_given?
|
||||||
|
yield self
|
||||||
|
else
|
||||||
|
self
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def to_proc
|
||||||
|
method :open
|
||||||
|
end
|
||||||
|
end
|
31
lib/syslog2logan/backend/tch.rb
Normal file
31
lib/syslog2logan/backend/tch.rb
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
require 'rufus/tokyo'
|
||||||
|
require 'syslog2logan/backend/base'
|
||||||
|
|
||||||
|
class Backend::TCH < Backend::Base
|
||||||
|
attr_reader :dir
|
||||||
|
def initialize opts = {}, &e
|
||||||
|
@dir = opts[:dir]
|
||||||
|
Dir.mkdir @dir rescue Errno::EEXIST
|
||||||
|
@dbs = []
|
||||||
|
if block_given?
|
||||||
|
begin
|
||||||
|
super opts, &e
|
||||||
|
ensure
|
||||||
|
close
|
||||||
|
end
|
||||||
|
else
|
||||||
|
super opts
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def close
|
||||||
|
@dbs.each &:close
|
||||||
|
end
|
||||||
|
|
||||||
|
def open name
|
||||||
|
logger.info :open => name, :backend => self.class
|
||||||
|
db = Rufus::Tokyo::Cabinet.new File.join( @dir, name)+".tch"
|
||||||
|
@dbs.push db
|
||||||
|
db
|
||||||
|
end
|
||||||
|
end
|
8
lib/syslog2logan/rotate/base.rb
Executable file → Normal file
8
lib/syslog2logan/rotate/base.rb
Executable file → Normal file
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
require 'logger'
|
require 'logger'
|
||||||
require 'uuidtools'
|
require 'uuidtools'
|
||||||
require 'active_support'
|
require 'active_support/core_ext'
|
||||||
|
|
||||||
class Rotate
|
class Rotate
|
||||||
# open_db_func: must returns a db-object with #[] and #[]=.
|
# open_db_func: must returns a db-object with #[] and #[]=.
|
||||||
|
@ -10,19 +10,19 @@ class Rotate
|
||||||
def initialize hash_func = nil, &open_db_func
|
def initialize hash_func = nil, &open_db_func
|
||||||
@dbs = Hash.new {|h,k| h[k] = open_db_func.call(k) }
|
@dbs = Hash.new {|h,k| h[k] = open_db_func.call(k) }
|
||||||
hash_func ||= lambda {|k| [k.timestamp.to_i/1.day].pack 'N' }
|
hash_func ||= lambda {|k| [k.timestamp.to_i/1.day].pack 'N' }
|
||||||
define_method :hashing, &hash_func
|
define_singleton_method :hashing, &hash_func
|
||||||
@rotate = @dbs['rotate']
|
@rotate = @dbs['rotate']
|
||||||
@queue = @dbs['queue']
|
@queue = @dbs['queue']
|
||||||
end
|
end
|
||||||
|
|
||||||
def db_name id
|
def db_name id
|
||||||
h = hashing id
|
h = hashing id
|
||||||
n = @rdb[ h]
|
n = @rotate[ h]
|
||||||
if n
|
if n
|
||||||
n = UUIDTools::UUID.parse_raw n
|
n = UUIDTools::UUID.parse_raw n
|
||||||
else
|
else
|
||||||
n = UUIDTools::UUID.timestamp_create
|
n = UUIDTools::UUID.timestamp_create
|
||||||
@rdb[ h] = n.raw
|
@rotate[ h] = n.raw
|
||||||
logger.info :create => n.to_s
|
logger.info :create => n.to_s
|
||||||
end
|
end
|
||||||
n
|
n
|
||||||
|
|
0
lib/syslog2logan/rotate/bdb.rb
Executable file → Normal file
0
lib/syslog2logan/rotate/bdb.rb
Executable file → Normal file
29
lib/syslog2logan/server.rb
Normal file
29
lib/syslog2logan/server.rb
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
require 'socket'
|
||||||
|
require 'select'
|
||||||
|
|
||||||
|
class S2L < Select::Server
|
||||||
|
attr_accessor :dbs
|
||||||
|
|
||||||
|
def init p
|
||||||
|
super p
|
||||||
|
@dbs = p[:dbs]
|
||||||
|
end
|
||||||
|
|
||||||
|
def event_new_client a
|
||||||
|
logger.debug :connection => {:new => a}
|
||||||
|
{ :clientclass => S2L::Socket, :dbs => @dbs }
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class S2L::Socket < Select::Socket
|
||||||
|
def init opts
|
||||||
|
@dbs = opts[ :dbs]
|
||||||
|
super opts
|
||||||
|
end
|
||||||
|
|
||||||
|
def event_line v
|
||||||
|
logger.debug :line => v
|
||||||
|
@dbs.emit v
|
||||||
|
end
|
||||||
|
alias emit event_line
|
||||||
|
end
|
Loading…
Reference in a new issue