duperemoverb/lib/deduperemoverb.rb

360 lines
9.3 KiB
Ruby
Executable File

#!/usr/bin/env ruby
# vim: set noet sw=2 ts=2 sts=2:
require 'sqlite3'
require 'pathname'
require 'ostruct'
require 'deduperemoverb/file_dedupe_range'
class SQLite3::ResultSet
def each_enum
loop do
row = @stmt.step
return nil if @stmt.done?
row = @db.translate_from_db @stmt.types, row
yield ArrayWithTypesAndFields.new( row)
end
end
end
def _recursive dir, &exe
Dir.each_child dir do |e|
next if '.' == e or '..' == e
e = File.join dir, e
if File.directory? e
_recursive e
elsif File.file? e
yield e
end
end
end
def recursive dir, &exe
return to_enum( __method__, dir) unless block_given?
dir = dir.to_s
if File.directory? dir
_recursive dir, &exe
elsif File.file? dir
yield dir
else
raise ArgumentError, "Directory [#{dir}] does not exist." if File.exist? dir
end
end
def hash_file path, chunksize, &exe
File.open path do |f|
b = f.pos
s = f.read chunksize
e = f.pos
unless chunksize == s.bytesize
if f.eof?
yield s, b
return
else
raise "Read lesser than chunksize, but did not reach end of file. [#{path}]"
end
end
yield s, b
end
return
end
def hash_recursive dir, &exe
recursive dir do |path|
ino = File::Stat.new( path).ino
hash_file path do |dgs, pos|
yield path, ino, dgs, pos
end
end
end
class Duperemove
class DFH
class OpenFailed < Exception
attr_reader :file, :error
def initialize file, error
@file, @error = file, error
super "Open file `#{file}` failed: #{error}"
end
end
attr_reader :digest, :filename, :ino, :subvol, :size, :offset, :file, :dedupe_seq, :last_dedupe_seq
def initialize digest, filename, ino, subvol, size, offset, dedupe_seq, last_dedupe_seq
@digest, @filename, @ino, @subvol, @size, @offset, @dedupe_seq, @last_dedupe_seq =
digest, filename, ino, subvol, size, offset, dedupe_seq, last_dedupe_seq
@file = nil
end
def open mode = nil
f = File.new @filename, mode || 'r+'
s = f.stat
unless s.ino == @ino
f.close
return OpenFailed.new( @filename, "Not same inode: actual file #{s.ino} <=> hashed #{@ino}")
end
unless s.size == @size # and s.subvol == @subvol
f.close
return OpenFailed.new( @filename, "Size differs: actual file #{s.size} <=> hashed #{@size}")
end
@file = f
self
rescue Exception
f.close
return OpenFailed.new( @filename, "#{$!.to_s} (#{$!.class.name})")
end
def close
@file.close if @file and not @file.closed?
end
end
def self.duperemove fd, offset, length, dupes
rr = FileDedupeRange[dupes.size].new offset, length
fds = []
dupes.each_with_index do |(fd,os),i|
fn = fd.is_a?(File) ? fd.fileno : fd
fds[fn] = fd
rr[:destinations][i][:fd] = fn
rr[:destinations][i][:offset] = os
end
fn = fd.is_a?(File) ? fd.fileno : fd
rv = FileDedupeRange.dedup fn, rr
raise FileDedupeRange.errno, "Deduplication failed for file" if -1 == rv
r = {}
rr[:destinations].each {|d| r[fds[d[:fd]]] = [d[:bytes_deduped], d[:status]] }
r
end
def per_digest &exe
return to_enum( __method__, db) unless block_given?
sql = <<-EOSQL
SELECT h.digest, count(1)
FROM files NATURAL JOIN selected h
GROUP BY h.digest
HAVING count(1) > 1
EOSQL
@db.query "select count(1) from (#{sql})" do |rs|
rs.each {|row| $logger.info "have Selected #digests: #{row.first}" }
end
@db.prepare sql do |stmt|
stmt.execute
$logger.info 'am iterating...'
stmt.each &exe
end
end
def create_selected dirs
dirs.map! {|dir| Pathname.new( dir).expand_path.to_s }
sep, pattern = @dedupe_sequence, "^(?:#{dirs.map{|dir|Regexp.quote dir}.join '|'}).*"
$logger.info sprintf( "query for: pattern: %p", pattern)
@db.execute <<-EOSQL, dirs.empty? ? {} : {pattern: pattern}
CREATE TABLE ext.candidates AS
SELECT hashes.digest
FROM hashes NATURAL JOIN files
WHERE dedupe_seq > last_dedupe_seq
#{'AND filename REGEXP :pattern' unless dirs.empty?}
EOSQL
$logger.info 'select digests'
@db.execute <<-EOSQL
CREATE TABLE ext.selected AS
SELECT hashes.digest, hashes.ino, hashes.subvol, loff, last_dedupe_seq
FROM (
SELECT distinct digest
FROM candidates
) h NATURAL JOIN hashes
EOSQL
@db.execute 'DROP TABLE ext.candidates'
$logger.info 'index digests'
@db.execute 'CREATE INDEX ext.sel_digest_idx ON selected (digest)'
end
class Stat
attr_accessor :step, :done, :ok, :skip, :error
def initialize() @step = @done = @ok = @skip = @error = 0 end
def to_s() sprintf "%010d (o%010d|s%010d|e%010d)", @step, @ok, @skip, @error end
def step!() @step += 1 end
def ok!() @done += 1; @ok += 1 end
def skip!() @done += 1; @skip += 1 end
def error! err
STDERR.puts err
@done += 1
@error += 1
end
end
def dedup_digest_files digest, (fst, *es)
stat, block_size = @stat, @config.block_size
#STDERR.printf "%p: %p | %p\n", digest, fst, es
errors, rt = [], 0.0
stat.step!
@db.transaction do
until fst.nil?
x = fst.open 'r'
case x
when DFH
stat.ok!
break
when Exception
stat.error! x
fst, *es = es
end
end
@dedupe_ok_stmt.execute digest: fst.digest, ino: fst.ino, subvol: fst.subvol, seq: fst.dedupe_seq
es.lazy.
reject do |f|
if f.dedupe_seq <= f.last_dedupe_seq
stat.skip!
nil
end
end.
select do |f|
case r = f.open
when DFH
true
else
stat.error! r
nil
end
end.
each_slice 16 do |fs|
begin
t1 = Time.now
rs =
Duperemove.duperemove fst.file,
fst.offset,
[ fst.size - fst.offset, block_size ].min,
fs.map {|f| [f.file, f.offset] }.to_h
t2 = Time.now
rt += t2-t1
#STDERR.printf "%p: results: %p\n", digest, es.map {|e| [ e.filename, rs[e.file] ] }.to_h
fs.each do |e|
err = rs[e.file][1]
if 0 == err
@dedupe_ok_stmt.execute digest: e.digest, ino: e.ino, subvol: e.subvol, seq: e.dedupe_seq
stat.ok!
else
stat.error! DedupError.new( e, err)
end
end
ensure
fs.each &:close
end
end
end
rt
ensure
fst&.close
end
def prepared_statements &exe
@files_stmt = @db.prepare <<-EOSQL
SELECT digest, filename, h.ino, h.subvol, size, loff, dedupe_seq, last_dedupe_seq
FROM selected h NATURAL JOIN files
WHERE digest = :digest
ORDER BY dedupe_seq - last_dedupe_seq
EOSQL
@setseq_stmt = @db.prepare <<-EOSQL if false
UPDATE files
SET dedupe_seq = :value
WHERE filename = :filename AND 0 < dedupe_seq
EOSQL
@dedupe_ok_stmt = @db.prepare <<-EOSQL
UPDATE hashes
SET last_dedupe_seq = :seq
WHERE digest = :digest AND ino = :ino AND subvol = :subvol
EOSQL
@dedupe_ok_stmt = @db.prepare <<-EOSQL if false
INSERT INTO log (ino, subvol, ok)
VALUES (:ino, :subvol, 1)
ON CONFLICT (ino, subvol)
DO UPDATE SET ok = ok + 1
EOSQL
@dedupe_error_stmt = @db.prepare <<-EOSQL if false
INSERT INTO log (ino, subvol, failed)
VALUES (:ino, :subvol, 1)
ON CONFLICT (ino, subvol)
DO UPDATE SET failed = failed + 1
EOSQL
yield
ensure
@files_stmt.close if @files_stmt&.closed?
@setseq_stmt.close if @setseq_stmt&.closed?
@dedupe_ok_stmt.close if @dedupe_ok_stmt&.closed?
end
def run dirs = nil
$logger.info "am starting..."
dirs ||= []
digest_rs = nil
block_size, seq = @config.block_size, @config.dedupe_sequence
create_selected dirs
stat = @stat = Stat.new
prepared_statements do
per_digest do |digest, count|
dgs = Hash.new {|h,k| h[k] = [] }
#STDERR.printf "digest (%d): %p\n", count.to_i, digest
STDERR.print "\r#{stat}"
#@files_stmt.execute rs.flatten do |digest, filename, ino, subvol, size, offset|
t1 = Time.now
@files_stmt.execute digest: digest do |rs|
rs.each do |digest, filename, ino, subvol, size, offset, dedupe_seq, last_dedupe_seq|
dgs[digest].push DFH.new( digest, filename, ino, subvol, size, offset, dedupe_seq, last_dedupe_seq)
end
end
t2 = Time.now
STDERR.printf "\r%s %0.6f", stat.to_s, t2-t1
rt = 0
dgs.each do |digest, fs|
rt += dedup_digest_files( digest, fs)
end
STDERR.printf "\r%s %0.6f %0.6f %0.6f", stat, t2-t1, rt, Time.now-t1
end
end
ensure
STDERR.puts
end
def initialize dbfile, tempfile
@db = SQLite3::Database.new dbfile.to_s
@db.enable_load_extension true
@db.load_extension '/usr/lib/sqlite3/pcre.so'
@config = OpenStruct.new
@db.execute( "SELECT * FROM config") {|k, v| @config[k.to_sym] = v }
@config.version = "#{@config.version_major}.#{@config.version_minor}"
@config.dedupe_sequence = ENV['dedupe_sequence'].to_i if ENV['dedupe_sequence']
@dedupe_sequence = @config.dedupe_sequence
raise "Database created by newer duperemove than supported (expected: 2.0, got: #{@config.version})" unless '2.0' == @config.version
tempfile.unlink if tempfile.exist?
$logger.info "use ext-DB: #{tempfile}"
@db.execute "ATTACH ? AS ext", tempfile.to_s
migrate_last_dedupe_seq
end
def migrate_last_dedupe_seq
found = false
@db.query 'PRAGMA table_info(hashes)' do |rs|
found = rs.any? {|(_, name, _,_,_,_)| 'last_dedupe_seq' == name }
end
unless found
$logger.info "add missing column last_dedupe_seq to table hashes"
@db.execute 'ALTER TABLE hashes ADD COLUMN last_dedupe_seq INTEGER DEFAULT ?',
@config.dedupe_sequence
end
end
def finish
$logger.info "finish (sequence: #{@dedupe_sequence})"
end
end