#!/usr/bin/perl # FREP_Client - File replication client # # This product uses software developed by Spread Concepts LLC for use in the Spread toolkit. # For more information about Spread see http://www.spread.org # # Copyright (C) 2007 Mark Steele http://www.control-alt-del.org # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # use Spread qw(:ERROR :MESS); use Event::Lib; use strict; use POSIX qw(nice SIGCHLD SIGTERM setuid setgid :sys_wait_h); use Digest::MD5 qw(md5_hex); use Sys::Syslog; use File::Path; use File::Basename; use File::Temp qw(:POSIX); use Algorithm::GDiffDelta qw(gdiff_apply); use IO::Socket; use Digest::MD5; use Compress::Zlib; use Getopt::Long; $| = 1; my ($DEBUG,$Pool,$USERID,$GROUPID,$SPREAD_PORT,$SPREAD_ADDRESS,$FILESERVER_PORT,$FILESERVER_ADDRESS, $HEARTBEAT_TIMEOUT,$QUEUE_TIMEOUT); GetOptions ("debug" => \$DEBUG, "pool=s" => \$Pool, "userid|u=i" => \$USERID, "groupid|g=i" => \$GROUPID, "spread-port=i" => \$SPREAD_PORT, "spread-address=s" => \$SPREAD_ADDRESS, "fs-port=i" => \$FILESERVER_PORT, "fs-address=s" => \$FILESERVER_ADDRESS, "heartbeat=i" => \$HEARTBEAT_TIMEOUT, "queue-timer=i" => \$QUEUE_TIMEOUT) || show_help(); $HEARTBEAT_TIMEOUT ||= 20; $QUEUE_TIMEOUT ||= 5; if (!$Pool || !$USERID || !$GROUPID || !$SPREAD_PORT || !$SPREAD_ADDRESS || !$FILESERVER_PORT || !$FILESERVER_ADDRESS) { show_help(); } if ($SPREAD_ADDRESS !~ /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ || $FILESERVER_ADDRESS !~ /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/) { print "Invalid IP address\n"; show_help(); } my $CHILD; nice(-19); $0 = "FREP_Client($Pool)"; openlog($0, 'cons,pid', 'user'); setgid($GROUPID) || die; setuid($USERID) || die; umask(0022) || die; ## Just to be on safe side my @DataQueue; my ($mbox, $private_group) = Spread::connect({ spread_name => $SPREAD_PORT.'@'.$SPREAD_ADDRESS,private_name => "$$-".int(rand(100)) }); if (!$mbox) { crit_log("couldn't connect to spread $!"); die $!; } if (Spread::join($mbox, $Pool) < 0) { crit_log("couldn't join group"); die; } open(SPREAD_FH,"<&=$mbox") || die "couldn't open fd: $!"; my $HEARTBEAT = time; my %MSGDATA; my $timer_clear_data_queue = timer_new(\&clear_data_queue); $timer_clear_data_queue->add($QUEUE_TIMEOUT); my $timer_check_heartbeat = timer_new(\&check_heartbeat); $timer_check_heartbeat->add($HEARTBEAT_TIMEOUT); my $event = event_new(*SPREAD_FH, EV_READ|EV_PERSIST, \&get_spread_messages); $event->add; my $sig_chld = signal_new(SIGCHLD, \&REAPER); $sig_chld->add; my $sig_term = signal_new(SIGTERM,\&ASSASSIN); $sig_term->add; event_mainloop(); sub REAPER { if (waitpid($CHILD,0) != $CHILD) { debug_log("Ack! Child waited pid not the same!"); } $CHILD = 0; } sub ASSASSIN { kill(15, $CHILD) if ($CHILD); # 15 = SIGTERM exit; } sub clear_data_queue { my $e = shift; if (scalar @DataQueue) { if ($CHILD) { ## Don't spawn a child if there's already one running if (waitpid($CHILD, WNOHANG) != -1) { # Child already exists, wait for it to finish before spawning a new one $e->add($QUEUE_TIMEOUT); return; } else { # Child isn't there??? $CHILD = 0; } } if ($CHILD = fork()) { ## Parent undef @DataQueue; } else { ## Child nice(19); process_queue(); exit; } } $e->add($QUEUE_TIMEOUT); } sub get_spread_messages { while(my ($st, $s, $g, $mt, $e, $mess) = Spread::receive($mbox,0)) { return if (!$mess); my ($headers,$body) = parse_message($mess); process_message($headers,$body) if ($headers); } } sub process_queue { for (@DataQueue) { if ($_->{Headers}->{Command} eq 'DELETE') { delete_file($_->{Headers}->{FilePath}); } elsif ($_->{Headers}->{Command} eq 'FETCH') { fetch_file($_->{Headers}->{FilePath},0); } elsif ($_->{Headers}->{Command} eq 'CREATE') { create_file($_->{Headers}->{FilePath},$_->{Headers}->{FileSig},$_->{Body}), } elsif ($_->{Headers}->{Command} eq 'MODIFY') { modify_file($_->{Headers}->{FilePath},$_->{Headers}->{FileSig},$_->{Headers}->{OriginFileSig},$_->{Body}); } elsif ($_->{Headers}->{Command} eq 'MOVE') { move_file($_->{Headers}->{FilePath},$_->{Headers}->{MovedTo}); } elsif ($_->{Headers}->{Command} eq 'MOVEDIR') { move_dir($_->{Headers}->{FilePath},$_->{Headers}->{MovedTo}); } else { ## Invalid debug_log("Invalid command"); } } } sub md5_file { my $file = shift; if (!open(FILE,"$file")) { debug_log("couldn't open file for MD5: $!"); return undef; } binmode(FILE); my $digest = Digest::MD5->new->addfile(*FILE)->hexdigest; close(FILE); return $digest; } sub debug_log { return if (!$DEBUG); syslog('debug', '%s', shift); } sub crit_log { syslog('debug', '%s', shift); } sub parse_message { my $mess = shift; return if ($mess !~ /^Command:/); my ($headers,$body,%headers); if ($mess =~ /^(.*?)__DATA__\n(.+?)__DATAEND__$/s) { ## This is a message that contains a data segment ($headers,$body) = ($1,$2); } else { $headers = $mess; } ## Parse headers my @headers = split(/\n/,$headers); for (@headers) { my ($key,$val) = split(/:/,$_); $headers{$key} = $val; } return(\%headers,$body); } sub process_message { my ($headers,$body) = @_; ## If this is a heartbeat, update time stamp. if ($headers->{Command} eq 'HEARTBEAT') { $HEARTBEAT = time; return; } if (defined($MSGDATA{'Cookie'})) { debug_log("Continuation of multi-chunk"); if ($headers->{'Cookie'} != $MSGDATA{'Cookie'}) { debug_log("Cookie doesn't match. Retrieve file from fileserver, process this new message"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); process_message($headers,$body); return; } if ($MSGDATA{'Chunk'} !~ /^(\d+)$/) { ## Odd, chunks doesn't contain what we expect. Abort. debug_log("Chunks seem messed up, couldn't parse current chunk data"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } my $curchunk = $1; if ($headers->{Chunk} !~ /^(\d+)$/) { ## Odd, chunks doens't contain what we expect. Abort. debug_log("Chunks weird. Last chunk processed: $curchunk. This chunk couldn't be parsed"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } my $newchunk = $1; if ($newchunk != ($curchunk + 1)) { ## Out of order. debug_log("Chunks out of order or missing. Retrieving file from fileserver, scrapping this message"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } ## Step 3. Write data to tmpfile. if (!open(FH,">>".$MSGDATA{'FILENAME'})) { debug_log("Error opening temp file: $!"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); close(FH); unlink($MSGDATA{'FILENAME'}); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } close(FH); ## Step 4. Last chunk? if ($headers->{LastChunk}) { debug_log("Got last chunk"); push(@DataQueue,{Headers => $headers, Body => $MSGDATA{'FILENAME'}}); $timer_clear_data_queue->add(1); %MSGDATA = (); return; } else { ## Update chunk count debug_log("Not last chunk"); $MSGDATA{'Chunk'} = $headers->{'Chunk'}; } return; } elsif (!$headers->{Chunk}) { ## Single chunk message if ($body) { debug_log("Single chunk message with body"); my $file = tmpnam(); if (!open(FH,">$file")) { debug_log("Ack couldn't open tmp file: $!"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); close(FH); unlink($file); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); close(FH); unlink($file); return; } close(FH); push(@DataQueue,{Headers => $headers, Body => $file}); } else { debug_log("single chunk with no body"); push(@DataQueue,{Headers => $headers}); } return; } else { ## First chunk of a multi chunk message, or bad chunk if ($headers->{Chunk} == 1) { #debug_log("Start of multichunk"); $timer_clear_data_queue->remove; ## Stop processing queue while we are getting the contents of a multi-chunk message. for (keys %{$headers}) { $MSGDATA{$_} = $headers->{$_}; } $MSGDATA{'FILENAME'} = tmpnam(); if (!open(FH,">".$MSGDATA{'FILENAME'})) { debug_log("Error creating temp file"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); %MSGDATA = (); $timer_clear_data_queue->add(1); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); close(FH); unlink($MSGDATA{'FILENAME'}); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); $timer_clear_data_queue->add(1); close(FH); return; } close(FH); if ($headers->{LastChunk}) { #debug_log("Got last chunk"); push(@DataQueue,{Headers => $headers, Body => $MSGDATA{'FILENAME'}}); $timer_clear_data_queue->add(1); %MSGDATA = (); return; } } else { ## Bad chunk? Scrap whatever chunked file we might have, request files directly. debug_log("Chunks out of order or missing. "); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $headers->{'FilePath'}}}); push(@DataQueue,{ Headers => { Command => 'FETCH', FilePath => $MSGDATA{'FilePath'}}}) if ($MSGDATA{'FilePath'} && $MSGDATA{'FilePath'} ne $headers->{'FilePath'}); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } } } sub delete_file { my $file = shift; if (!-d $file && -f $file) { ## File debug_log("attempting to delete $file"); if (!unlink($file)) { debug_log("Couldn't unlink file $file: $!"); } else { debug_log("Deleted $file"); } } else { ## Folder eval {File::Path::rmtree($file);}; if ($@) { debug_log("Couldn't remove file/folder $file: $@"); } else { debug_log("Deleted $file"); } } } sub move_file { my ($file,$moveto) = @_; return if (!-f $file); if (!-d File::Basename::dirname($moveto)) { eval {File::Path::mkpath(File::Basename::dirname($moveto,0755));}; if ($@) { debug_log("error creating file path for $moveto: $!"); fetch_file($moveto,0); return; } } if (!rename($file,$moveto)) { debug_log("error renaming file path for $moveto: $!"); fetch_file($moveto,0); return; } debug_log("$file moved to $moveto"); } sub move_dir { my ($file,$moveto) = @_; return if (!-d $file); return if (-d $moveto); ## this _should not_ happen if (!rename($file,$moveto)) { debug_log("error moving folder: $!"); return; } debug_log("folder $file moved to $moveto"); } sub create_file { my ($file,$filesig,$tmp) = @_; my ($sig,$mtime) = split(/\|/,$filesig); ## Does our temp file's signature match the origin's signature? if (!-e $tmp || (md5_file($tmp) ne $sig)) { debug_log("something weird happened to our tmp file $tmp"); unlink($tmp); fetch_file($file,0); return; } else { ## Copy tmp file over if (!-d File::Basename::dirname($file)) { eval {File::Path::mkpath(File::Basename::dirname($file,0755));}; if ($@) { debug_log("couldn't create path: $!"); unlink($tmp); fetch_file($file,0); return; } } if (!rename($tmp,$file) || !utime($mtime,$mtime,$file)) { debug_log("error renaming or setting mtime: $!"); fetch_file($file,0); return; } debug_log("Successfully received new $file"); } } sub modify_file { my ($file,$filesig,$originfilesig,$tmp) = @_; my ($sig,$mtime) = split(/\|/,$filesig); my ($oldsig,$oldmtime) = split(/\|/,$originfilesig); if ((stat($file))[9] != $oldmtime) { ## No need to check MD5, not the same mtime debug_log("Current mtime " . (stat($file))[9] . " doesn't match on old server file mtime $oldmtime"); unlink($tmp); ## Grab the file fetch_file($file,0); return; } elsif (md5_file($file) ne $oldsig) { debug_log("MD5 doesn't match on old file"); unlink($tmp); ## Grab the file fetch_file($file,0); return; } else { ## Old signatures match, apply the patch. no strict 'subs'; open(FH_ORIG,$file); open(FH_DELTA,$tmp); my $fh_new = new File::Temp(); eval { gdiff_apply(FH_ORIG, FH_DELTA, $fh_new); }; if ($@) { ## Error applying patch close(FH_ORIG); close(FH_DELTA); close($fh_new); unlink($tmp); fetch_file($file,0); return; } ## Patch seems to have worked. Check new checksum. close(FH_ORIG); close(FH_DELTA); close($fh_new); unlink($tmp); if (md5_file($fh_new->filename) ne $sig) { ## MD5 signatures don't match, fetch file debug_log("MD5 doesnt' match on patched file"); fetch_file($file,0); return; } else { ## Signatures do match, rename file debug_log("Successfully received mod $file"); if (!-d File::Basename::dirname($file)) { eval {File::Path::mkpath(File::Basename::dirname($file,0755));}; if ($@) { debug_log("couldn't create path: $!"); unlink($tmp); fetch_file($file,0); return; } } if (!rename($fh_new->filename,$file) || !utime($mtime,$mtime,$file)) { debug_log("error renaming or setting mtime: $!"); fetch_file($file,0); return; } } } } sub fetch_file { my ($file,$retries) = @_; my ($md5,$length,$mtime) = fetch_file_sig($file); if (!$md5 || !$length || !$mtime) { debug_log("Missing info while attempting to retrieve $file"); if ($retries > 10) { debug_log("Too many retries, info missing aborting."); return; } fetch_file($file,++$retries); return; } my $sock = IO::Socket::INET->new(PeerAddr => $FILESERVER_ADDRESS, PeerPort => $FILESERVER_PORT, Proto => "tcp") or fetch_file($file,++$retries); return if (!$sock); my $fh = new File::Temp(SUFFIX=> '.resyncclient'); binmode($fh); syswrite($sock, "GET $file\n",length("GET $file\n")); while(sysread($sock, my $buf, 32768) > 0) { syswrite($fh,$buf,length($buf)); } close($fh); close($sock); if (md5_file($fh->filename) ne $md5) { if ($retries > 10) { debug_log("FAIL: signatures don't match for $file... Too many retries"); debug_log(md5_file($fh->filename) . "!= ($md5)"); return 0; } debug_log("FAIL: signatures didn't match, trying again ($retries)"); debug_log(md5_file($fh->filename) . "!= ($md5)"); fetch_file($file,++$retries); return 0; } if (!-d File::Basename::dirname($file)) { eval {File::Path::mkpath(File::Basename::dirname($file,0755));}; if ($@) { return; } } rename($fh->filename,$file); chmod(0644,$file); utime($mtime,$mtime,$file) || warn "couldn't set utime on $file: $!"; #debug_log("Fetched $file"); } sub fetch_file_sig { my $file = shift; ## First get the signature/filesize/mtime my $sock; for (1..5) { if ($sock = IO::Socket::INET->new(PeerAddr => $FILESERVER_ADDRESS,PeerPort => $FILESERVER_PORT,Proto => "tcp")) { last; } } if (!$sock) { debug_log("Too many connection attempts to retrieve file signature"); return undef; } my $r = syswrite($sock, "SIG $file\n",length("SIG $file\n")); if ($r != length("SIG $file\n")) { ## shouldn't happen, we're blocking debug_log("Partial write for some reason to retrieve file signature"); } my $sig; while(sysread($sock, my $buf, 32768) > 0) { $sig .= $buf; } if ($sig !~ /^([a-zA-Z0-9]{32,32})\|(\d+)\|(\d+)/) { debug_log("Invalid signature received: $sig"); return(undef) } return($1,$2,$3); } sub check_heartbeat { my $e = shift; $e->add($HEARTBEAT_TIMEOUT); if ((time() - $HEARTBEAT) > $HEARTBEAT_TIMEOUT) { debug_log("ACK HEARTBEAT TIMEOUT, RECONNECTING"); Spread::disconnect($mbox); my ($mbox, $private_group) = Spread::connect({ spread_name => $SPREAD_PORT.'@'.$SPREAD_ADDRESS,private_name => "$$-".int(rand(100)) }); if (!$mbox) { crit_log("Couldn't reconnect after heartbeat failure $!"); die; } if (Spread::join($mbox, $Pool) < 0) { crit_log("Couldn't re-join group after heartbeat failure $!"); die; } $event->remove; close(SPREAD_FH); open(SPREAD_FH,"<&=$mbox") || die "couldn't open fd: $!"; $event = event_new(*SPREAD_FH, EV_READ|EV_PERSIST, \&get_spread_messages); $event->add; } } sub show_help { print <<"EOF"; $0 Options: --debug Turn on debugging --pool= Name of server pool --userid= UserID to change to on start-up --groupid= Group id to change to on start-up --spread-port= Spread port (usually 4803) --spread-address= Spread server IP address --fs-port= Port that the file server is running on --fs-address= IP address of the file server --heartbeat= Number of seconds to wait for a heartbeat before reconnecting (default 20) --queue-timer= Number of seconds between queue runs (default 5) EOF exit; }