#!/usr/bin/perl # FREP_Server - file replication/synchronization daemon. # File monitoring done with inotify/epoll, file differences calculated using gdiff # algorithm (see http://www.w3.org/TR/NOTE-gdiff-19970901), Zlib compression used # to minimize file transfers. # # This product uses software developed by Spread Concepts LLC for use in the Spread toolkit. # For more information about Spread see http://www.spread.org # # Copyright (C) 2007 Mark Steele http://www.control-alt-del.org/code # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # use Linux::Inotify2; use File::Find; use POSIX qw(nice SIGCHLD SIGTERM SIGHUP); use strict; use File::Path; use File::Basename; use Event::Lib; use Spread qw(:ERROR :MESS); use Digest::MD5 qw(md5_hex); use Algorithm::GDiffDelta qw(gdiff_delta gdiff_apply); use File::Temp qw(:seekable :POSIX); use Sys::Syslog; use Compress::Zlib; use File::Copy; use XML::Simple; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); no strict 'subs'; $| = 1; $0 = 'FREP_Server => starting up'; openlog($0, 'cons,pid', 'user'); my $CONFIG; eval {$CONFIG = XMLin("/etc/FREP.conf",ForceArray => ['Dir','Pool']);}; die "Couldn't parse config: $!" if ($@); ## Configuration sanity checks for (qw(ShadowRoot HeartbeatTimer MaxWatches QueueTimer)) { die("Missing configuration parameter: $_") if (!$CONFIG->{Sync}->{$_}); } die "No pools defined" if (!scalar(keys %{$CONFIG->{Pool}})); ## Go through each pool and make sure it contains an array for (keys %{$CONFIG->{Pool}}) { die "Configuration error, pools seem misconfigured" if (ref($CONFIG->{Pool}->{$_}->{Dir}) ne 'ARRAY'); } ## End sanity checking open(F,">/proc/sys/fs/inotify/max_user_watches") || die "couldn't open kernel max_user_watches: $!"; print F $CONFIG->{Sync}->{MaxWatches}; close(F); my $EVENT_PID; nice(-19); my $inotify = new Linux::Inotify2; die if (!$inotify); if (!-d $CONFIG->{Sync}->{ShadowRoot}) { eval {mkpath($CONFIG->{Sync}->{ShadowRoot},0755);}; if ($@) { die "error creating path: $!"; } } else { opendir(DIR,$CONFIG->{Sync}->{ShadowRoot}); while(readdir(DIR)) { next if (/^\.{1,2}$/); if (-d "$CONFIG->{Sync}->{ShadowRoot}/$_") { eval {rmtree("$CONFIG->{Sync}->{ShadowRoot}/$_")}; die "couldn't delete tree $CONFIG->{Sync}->{ShadowRoot}/$_: $!" if ($@); } else { unlink("$CONFIG->{Sync}->{ShadowRoot}/$_") || die "couldn't unlink file $CONFIG->{Sync}->{ShadowRoot}/$_: $!"; } } closedir(DIR); } ## Build fast regular expression lookup tables to determine if an item is part of ## a specific watch group or ignore list. my (%WatchRegex,$ignore_regex); if (defined($CONFIG->{IgnoreList}->{Dir}) && ref($CONFIG->{IgnoreList}->{Dir}) eq 'ARRAY') { my $expr = join('||', map { "m#^@{$CONFIG->{IgnoreList}->{Dir}}[$_]#o" } 0..$#{$CONFIG->{IgnoreList}->{Dir}}); $ignore_regex = eval "sub { $expr }"; die "couldn't build regexes" if $@; } else { $ignore_regex = eval "sub { return 0 }"; ## Avoid errors if no ignore paths defined } ########## $WatchRegex{'Admin'}->($itm) to test... foreach my $itm (keys %{$CONFIG->{Pool}}) { my $expr = join('||',map {"\$a =~ m#^@{$CONFIG->{Pool}->{$itm}->{Dir}}[$_]#o" } 0..$#{$CONFIG->{Pool}->{$itm}->{Dir}}); $WatchRegex{$itm} = eval "sub { \$a = shift; $expr }"; die "couldn't build regexes" if $@; } ## Unique folder list my %dirs; foreach my $group (keys %{$CONFIG->{Pool}}) { for (@{$CONFIG->{Pool}->{$group}->{Dir}}) { if (!-d) { eval {mkpath($_,0755);}; if ($@) { die "error creating path: $!"; } } $dirs{$_} = 1; } } ## Shadow copy of all files/folders will allow us to create differences between files debug_log("Creating shadow copy"); ## There may be race conditions here, however this shouldn't be too much of a problem as ## the copy will converge to the live filesystem over time since files are copied on change/modify for (keys %dirs) { find({ wanted => \&init_process, follow => 1, no_chdir => 1}, $_); ## On start-up, we want to sync the watchgroups and shadowroot folders. ## Eventually, should replace this with a pure perl copy just to keep this clean. `cp -a --parent $_ $CONFIG->{Sync}->{ShadowRoot}`; } undef %dirs; my @DataQueue; my %OpsQueue; my $timer_clear_data_queue = timer_new(\&clear_data_queue); $timer_clear_data_queue->add($CONFIG->{Sync}->{QueueTimer}); my $send_heartbeat = timer_new(\&send_heartbeat); $send_heartbeat->add($CONFIG->{Sync}->{HeartbeatTimer}); ## Manage signals for process management my $sig_chld = signal_new(SIGCHLD, \&reap); $sig_chld->add; my $sig_term = signal_new(SIGTERM,\&ASSASSIN); $sig_term->add; ## Open a filehandle to inotify's filedescriptor open(INOTIFY_FH,"<&=",$inotify->fileno) || die "couldn't open fd: $!"; my $flags = fcntl(INOTIFY_FH, F_GETFL, 0) or die "Can't get flags for the inotify FH: $!\n"; fcntl(INOTIFY_FH, F_SETFL, $flags | O_NONBLOCK) or die "Can't set O_NONBLOCK flag for the inotify FH: $!\n"; my $ev_inotify_read = event_new(*INOTIFY_FH, EV_READ|EV_PERSIST, \&process_inotify_events); $ev_inotify_read->add(); debug_log("Starting main loop"); $0 = "FREP_Server => Watching files"; event_mainloop(); sub process_inotify_events { ## NOTE: The last event received for a file in the same batch of events will be the one that 'wins'. my @events = $inotify->read; ## Shouldn't block as we've been triggered by epoll my %move_queue; foreach my $e (@events) { my $file = $e->fullname; if ($e->IN_CLOSE_WRITE) { debug_log("Detected CLOSE_WRITE on $file"); if (!defined($OpsQueue{$file})) { if (-e "$CONFIG->{Sync}->{ShadowRoot}$file") { push(@DataQueue,{'Command' => 'MODIFY', 'FilePath' => $file}); } else { debug_log("Origin file $CONFIG->{Sync}->{ShadowRoot}$file doesn't exist"); push(@DataQueue,{'Command' => 'CREATE', 'FilePath' => $file}); } $OpsQueue{$file} = $#DataQueue; ## This contains the array index of the last operation that occurred on this file in the data queue } else { # The file is already in the queue. if (-e "$CONFIG->{Sync}->{ShadowRoot}$file") { ## It's not a new file $DataQueue[$OpsQueue{$file}]->{Command} = 'MODIFY'; } else { $DataQueue[$OpsQueue{$file}]->{Command} = 'CREATE'; } } } if ($e->IN_CREATE) { # We'll catch this when data is written and the file is closed with close_write events debug_log("Detected CREATE on $file"); init_process($file); find({ wanted => \&init_process, follow => 1, no_chdir => 1}, $file) if ($e->IN_ISDIR); } if ($e->IN_DELETE) { debug_log("Detected DELETE on $file"); if (!defined($OpsQueue{$file})) { push(@DataQueue, {'Command' => 'DELETE','FilePath' => $file}); $OpsQueue{$file} = $#DataQueue; } else { $DataQueue[$OpsQueue{$file}]->{Command} = 'DELETE'; } } ### THIS CODE PROBABLY NEEDS SOME REVIEW... Moves are more complicated... Not dealing with possibility of having the file ### already in the data queue. Need to fix this. if ($e->IN_MOVED_FROM) { ## Use the cookie to store the name of the file being moved (from) debug_log("Received MOVE from, setting cookie"); $move_queue{$e->cookie} = $file if ($file ne ''); } if ($e->IN_MOVED_TO) { if ($move_queue{$e->cookie}) { ## Use the cookie to figure out what the original file name was if ($e->IN_ISDIR) { ## Ack, folder move. debug_log("Received MOVETO with cookie for DIR"); find({ wanted => \&init_process, follow => 1, no_chdir => 1}, $file); push(@DataQueue,{'Command' => 'MOVEDIR','FilePath' => $move_queue{$e->cookie},'FileTo' => $file}); } else { debug_log("Received MOVETO with cookie"); push(@DataQueue,{'Command' => 'MOVE','FilePath' => $move_queue{$e->cookie},'FileTo' => $file}); } delete($move_queue{$e->cookie}); ## don't need cookie anymore } else { ## No cookie?? debug_log("Move to with no cookie :("); if ($e->IN_ISDIR) { ## Maybe we weren't watching this folder before the move. Add it, and resync will take care of files in that folder. find({ wanted => \&init_process, follow => 1, no_chdir => 1}, $file); } else { ## This is a file that's been moved. Treat it as a new file. push(@DataQueue,{'Command' => 'CREATE', 'FilePath' => $file}); $OpsQueue{$file} = $#DataQueue; } } init_process($file); } if ($e->IN_Q_OVERFLOW) { $0 = "FREP_Server => Watching files (got some overflowed events)"; debug_log("Lost events!"); } } } sub reap { my $p = wait; if ($p == $EVENT_PID) { $EVENT_PID = 0; } } sub ASSASSIN { if ($EVENT_PID) { kill(9,$EVENT_PID); } exit; } sub clear_data_queue { my $e = shift; $e->add($CONFIG->{Sync}->{QueueTimer}); ## We have received events. if (scalar @DataQueue <= 0 || $EVENT_PID) { ## Nothing in queue or already have spawned a child. return; } if ($EVENT_PID = fork()) { @DataQueue = (); %OpsQueue = (); return; } else { ## Child nice(19); $0 = "FREP_Server => child clearing queue"; ## Connect to spread my ($mbox, $private_group) = Spread::connect({ spread_name => $CONFIG->{Spread}->{Port}.'@'.$CONFIG->{Sperad}->{Address},private_name => "$$-".int(rand(100)) }); if (!$mbox) { debug_log("Ack no mailbox, bailing (losing messages that were queued)"); exit; } for (@DataQueue) { if ($_->{Command} eq 'CREATE') { send_new_file($mbox,$_->{FilePath}); } elsif ($_->{Command} eq 'MODIFY') { send_modified_file($mbox,$_->{FilePath}); } elsif ($_->{Command} eq 'DELETE') { send_delete_file($mbox,$_->{FilePath}); } elsif ($_->{Command} eq 'MOVE') { send_move_file($mbox,$_->{FilePath},$_->{FileTo}); } elsif ($_->{Command} eq 'MOVEDIR') { send_move_dir($mbox,$_->{FilePath},$_->{FileTo}); } else { debug_log("ACK: Unknown command"); next; } } Spread::disconnect($mbox); exit(0); } } sub init_process { $_ ||= shift; ## $_ is set when using File::Find, otherwise we grab arg passed to function if ($ignore_regex && &$ignore_regex) { undef $_; return; } $inotify->watch($_,IN_CLOSE_WRITE|IN_CREATE|IN_DELETE|IN_IGNORED|IN_MOVED_TO|IN_MOVED_FROM) || debug_log("Couldn't add watch for $_: $!"); undef $_; } sub md5_file { my $file = shift; if (!open(FILE, $file)) { debug_log("couldn't open file for MD5 checksum"); return undef; } binmode(FILE); my $digest = Digest::MD5->new->addfile(*FILE)->hexdigest; close(FILE); return $digest; } sub find_watchgroup { my @groups; my $itm = shift; for (keys %{$CONFIG->{Pool}}) { push(@groups,$_) if ($WatchRegex{$_}->($itm)); } return @groups; } sub debug_log { return if (!$CONFIG->{Sync}->{Debug}); syslog('debug', '%s', shift); } sub crit_log { openlog($0, 'cons,pid', 'user'); syslog('debug', '%s', shift); } sub send_new_file { my ($mbox,$file) = @_; my ($filesize,$modtime) = ((stat($file))[7,9]); my $sig = md5_file($file) . '|' . $modtime; my @groups = find_watchgroup($file); my $cookie = time . int(rand(1000)); debug_log("Sending new file"); my ($chunk,$lastchunk,$totalread); if (!open(FILE_SEND,"<$file")) { crit_log("Unable to open file in send_new_file: $!"); return; } binmode(FILE_SEND); while (my $t = sysread(FILE_SEND, my $buf,90000)) { if (!defined($t)) { next if $! =~ /^Interrupted/; debug_log("ACK: system read error: $!"); return; } $totalread += $t; if ($totalread == $filesize) { debug_log("Setting lastchunk bit"); $lastchunk = "LastChunk:1\n"; } my $sendbuf = compress($buf); if (!$sendbuf) { debug_log("Ack error compressing buffer"); return; } $chunk++; foreach my $group (@groups) { debug_log("Sending $chunk to $group length:" . length($sendbuf)); my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:CREATE\nChunk:$chunk\n".$lastchunk."Cookie:$cookie\nFilePath:$file\nFileSig:$sig\n__DATA__\n$sendbuf"."__DATAEND__"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong"); } } } } close(FILE_SEND); ## We want to move the file to it's new location. if (!-d File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$file")) { eval {mkpath(File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$file"),0755);}; if ($@) { crit_log("error creating path: $!"); return; } } if (!copy($file,"$CONFIG->{Sync}->{ShadowRoot}$file") || !chmod(0644,"$CONFIG->{Sync}->{ShadowRoot}$file") || !utime($modtime,$modtime,"$CONFIG->{Sync}->{ShadowRoot}$file")) { crit_log("error copying file to shadowcopy: $!"); } } sub send_modified_file { my ($mbox,$file) = @_; my $oldsig = md5_file("$CONFIG->{Sync}->{ShadowRoot}$file") . '|' . (stat("$CONFIG->{Sync}->{ShadowRoot}$file"))[9]; my ($filesize,$modtime) = (stat($file))[7,9]; my $sig = md5_file($file) . '|' . $modtime; my @groups = find_watchgroup($file); my $cookie = time . int(rand(1000)); debug_log("Sending diff file"); my $fh_diff = new File::Temp(DIR => '/www/synctemp'); if (!$fh_diff) { debug_log("Couldn't create tempfile: $!"); return; } binmode($fh_diff); if (!open(FH_ORIG,"<$CONFIG->{Sync}->{ShadowRoot}$file")) { debug_log("couldn't open original: $!"); return; } binmode(FH_ORIG); if (!open(FH_NEW,"<$file")) { debug_log("couldn't open new: $!"); return; } binmode(FH_NEW); eval {gdiff_delta(FH_ORIG, FH_NEW, $fh_diff);}; if ($@) { debug_log("Error creating patch file"); close(FH_NEW); close(FH_ORIG); return; } close(FH_NEW); close(FH_ORIG); $fh_diff->seek(0,0); my ($chunk,$lastchunk); while (my $t = read($fh_diff, my $buf,90000)) { if (!defined($t)) { next if $! =~ /^Interrupted/; debug_log("ACK: system read error: $!"); return; } if (eof($fh_diff)) { debug_log("Setting lastchunk bit"); $lastchunk = "LastChunk:1\n"; } my $sendbuf = compress($buf); if (!$sendbuf) { debug_log("Ack error compressing buffer"); return; } $chunk++; foreach my $group (@groups) { my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:MODIFY\nChunk:$chunk\n".$lastchunk."Cookie:$cookie\nFilePath:$file\nFileSig:$sig\nOriginFileSig:$oldsig\n__DATA__\n$sendbuf"."__DATAEND__"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong"); } } } } close($fh_diff); ## We want to move the file to it's new location. if (!-d File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$file")) { eval {mkpath(File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$file"),0755);}; if ($@) { crit_log("error creating path: $!"); return; } } if (!copy($file,"$CONFIG->{Sync}->{ShadowRoot}$file") || !chmod(0644,"$CONFIG->{Sync}->{ShadowRoot}$file") || !utime($modtime,$modtime,"$CONFIG->{Sync}->{ShadowRoot}$file")) { crit_log("error copying file to shadowcopy: $!"); } } sub send_delete_file { my ($mbox, $file) = @_; my @groups = find_watchgroup($file); my $cookie = time; foreach my $group (@groups) { my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:DELETE\nFilePath:$file\n"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong"); } } } if (!-d "$CONFIG->{Sync}->{ShadowRoot}$file" && -f "$CONFIG->{Sync}->{ShadowRoot}$file") { ## File if (!unlink("$CONFIG->{Sync}->{ShadowRoot}$file")) { crit_log("Couldn't unlink shadow file $file: $!"); } else { debug_log("Deleted shadow $file"); } } else { ## Folder eval {rmtree("$CONFIG->{Sync}->{ShadowRoot}$file");}; if ($@) { debug_log("Couldn't remove shadow file/folder $file: $@"); } else { debug_log("Deleted shadow $file"); } } } sub send_move_file { my ($mbox, $filefrom, $fileto) = @_; my @groups = find_watchgroup($filefrom); my $cookie = time; foreach my $group (@groups) { my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:MOVE\nFilePath:$filefrom\nMovedTo:$fileto\n"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong ($r)"); } } } if (!-d File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$fileto")) { eval {mkpath(File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$fileto"),0755);}; if ($@) { crit_log("error creating path during file move: $!"); return; } } if (!-f "$CONFIG->{Sync}->{ShadowRoot}$filefrom") { crit_log("ACK origin file doesn't exist for move"); if (!copy($fileto,"$CONFIG->{Sync}->{ShadowRoot}$fileto") || !chmod(0644,"$CONFIG->{Sync}->{ShadowRoot}$fileto")) { crit_log("error copying file to shadowcopy: $!"); } } else { if (!rename("$CONFIG->{Sync}->{ShadowRoot}$filefrom","$CONFIG->{Sync}->{ShadowRoot}$fileto")) { crit_log("error renaming file in move file: $!"); if (!copy($fileto,"$CONFIG->{Sync}->{ShadowRoot}$fileto") || !chmod(0644,"$CONFIG->{Sync}->{ShadowRoot}$fileto")) { crit_log("error copying file to shadowcopy after rename failutre: $!"); } unlink("$CONFIG->{Sync}->{ShadowRoot}$filefrom"); } } } sub send_move_dir { my ($mbox, $filefrom, $fileto) = @_; my @groups = find_watchgroup($filefrom); my $cookie = time; foreach my $group (@groups) { my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:MOVEDIR\nFilePath:$filefrom\nMovedTo:$fileto\n"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong ($r)"); } } } eval {mkpath(File::Basename::dirname("$CONFIG->{Sync}->{ShadowRoot}$fileto"),0755);}; if ($@) { debug_log("error creating path: $!"); return; } rename("$CONFIG->{Sync}->{ShadowRoot}$filefrom","$CONFIG->{Sync}->{ShadowRoot}$fileto"); } sub send_heartbeat { my $e = shift; $e->add($CONFIG->{Sync}->{HeartbeatTimer}); #debug_log("Sending heartbeat"); ## Connect to spread my ($mbox, $private_group) = Spread::connect({ spread_name => $CONFIG->{Spread}->{Port}.'@'.$CONFIG->{Sperad}->{Address},private_name => "$$-".int(rand(100)) }); if (!$mbox) { debug_log("Ack no mailbox, bailing"); return; } foreach my $group (keys %{$CONFIG->{Pool}}) { my $r = Spread::multicast($mbox,SAFE_MESS|SELF_DISCARD,$group,0,"Command:HEARTBEAT\n"); if ($r < 0) { if ($r == ILLEGAL_SESSION) { debug_log("Ack: mailbox no good! (maybe disconnected?)"); } elsif ($r == ILLEGAL_MESSAGE) { debug_log("Ack: the message had an illegal structure! (oops)"); } elsif ($r == CONNECTION_CLOSED) { debug_log("Ack: spread closed!"); } else { debug_log("Ack: dunno whats wrong"); } } } Spread::disconnect($mbox); } __END__