# Batched writes for Madeleine
#
# Copyright(c) Håkan Råberg 2003
#
# 
# This is an experimental implementation of batched log writes to mininize 
# calls to fsync. It uses a Shared/Exclusive-Lock, implemented in sync.rb, 
# which is included in Ruby 1.8.
#
# Writes are batched for a specified amount of time, before written to disk and
# then executed.
#
# For a detailed discussion about the problem, see
#  http://www.prevayler.org/wiki.jsp?topic=OvercomingTheWriteBottleneck
#
#
# Usage is identical to normal SnapshotMadeleine, and it can also be used as
# persister for AutomaticSnapshotMadeleine. (One difference: the log isn't 
# visible on disk until any commands are executed.)
#
# You can also use the execute_query method for shared synchronzied queries,
# for eaay coarse-grained locking of the system.
#
# The exclusive lock is only locked during the actual execution of commands and
# while closing.
#
# Keeping both log writes and executes of commands in the originating thread 
# is needed by AutomaticSnapshotPrevayler. Hence the strange SimplisticPipe
# class.
#
# Todo: 
#    - It seems like Sync (sync.rb) prefers shared locks. This should probably
#       be changed.
#
#
# Madeleine - Ruby Object Prevalence
#
# Copyright(c) Anders Bengtsson 2003
#

require 'madeleine'
require 'madeleine/clock'

include Madeleine::Clock

module Madeleine
  module Batch
    class BatchedSnapshotMadeleine < SnapshotMadeleine

      def initialize(directory_name, marshaller=Marshal, &new_system_block)
        super(directory_name, marshaller, &new_system_block)
        @log_actor = LogActor.launch(self)
      end

      def execute_command(command)
        verify_command_sane(command)
        queued_command = QueuedCommand.new(command)
        @lock.synchronize(:SH) do
          raise "closed" if @closed
          @logger.store(queued_command)
        end
        queued_command.wait_for
      end

      def execute_query(query)
        verify_command_sane(query)
        @lock.synchronize(:SH) do
          execute_without_storing(query)
        end
      end

      def close
        @log_actor.destroy
        @lock.synchronize do
          @logger.close
          @closed = true
        end
      end

      def flush
        @lock.synchronize do
          @logger.flush
        end
      end

      def take_snapshot
        @lock.synchronize(:SH) do
          @lock.synchronize do
            @logger.close
          end
          Snapshot.new(@directory_name, system, @marshaller).take
          @logger.reset
        end
      end

      private

      def create_lock
        Sync.new
      end

      def create_logger(directory_name, log_factory)
        BatchedLogger.new(directory_name, log_factory, self.system)
      end

      def log_factory
        BatchedLogFactory.new
      end
    end

    private

    class LogActor
      def self.launch(madeleine, delay=0.01)
        result = new(madeleine, delay)
        result
      end

      def destroy
        @is_destroyed = true
        if @thread.alive?
          @thread.wakeup
          @thread.join
        end
      end

      private

      def initialize(madeleine, delay)
        @is_destroyed = false

        madeleine.flush
        @thread = Thread.new {
          until @is_destroyed
            sleep(delay)
            madeleine.flush
          end
        }
      end
    end

    class BatchedLogFactory
      def create_log(directory_name)
        BatchedLog.new(directory_name)
      end
    end

    class BatchedLogger < Logger
      def initialize(directory_name, log_factory, system)
        super(directory_name, log_factory)
        @buffer = []
        @system = system
      end

      def store(queued_command)
        @buffer << queued_command
      end

      def close
        return if @log.nil?
        flush
        @log.close
        @log = nil
      end

      def flush
        return if @buffer.empty?

        open_new_log if @log.nil?

        if @system.kind_of?(ClockedSystem)
          @buffer.unshift(QueuedTick.new)
        end

        @buffer.each do |queued_command|
          queued_command.store(@log)
        end

        @log.flush

        @buffer.each do |queued_command|
          queued_command.execute(@system)
        end

        @buffer.clear
      end
    end

    class BatchedLog < CommandLog
      def store(command)
        Marshal.dump(command, @file)
      end

      def flush
        @file.flush
        @file.fsync
      end
    end

    class QueuedCommand
      def initialize(command)
        @command = command
        @pipe = SimplisticPipe.new
      end

      def store(log)
        @pipe.write(log)
      end

      def execute(system)
        @pipe.write(system)
      end

      def wait_for
        @pipe.read do |log|
          log.store(@command)
        end

        @pipe.read do |system|          
          return @command.execute(system)
        end
      end
    end

    class QueuedTick
      def initialize
        @tick = Tick.new(Time.now)
      end

      def store(log)
        log.store(@tick)
      end

      def execute(system)
        @tick.execute(system)
      end
    end

    class SimplisticPipe
      def initialize 
        @receive_lock = Mutex.new.lock
        @consume_lock = Mutex.new.lock
        @message = nil
      end

      def read
        begin
          wait_for_message_received

          if block_given?
            yield @message
          else
            return @message
          end

        ensure
          message_consumed
        end
      end

      def write(message)
        raise WriteBlockedException unless can_write?

        @message = message
        message_received
        wait_for_message_consumed
        @message = nil
      end

      def can_write?
        @message.nil?
      end

      private
 
      def message_received
        @receive_lock.unlock
      end

      def wait_for_message_received
        @receive_lock.lock
      end

      def message_consumed
        @consume_lock.unlock
      end

      def wait_for_message_consumed
        @consume_lock.lock
      end
    end

    class WriteBlockedException < Exception
    end
  end
end

BatchedSnapshotMadeleine = Madeleine::Batch::BatchedSnapshotMadeleine