#!/usr/bin/perl # FREP_ResyncClient - File replication resynchronization client # # This product uses software developed by Spread Concepts LLC for use in the Spread toolkit. # For more information about Spread see http://www.spread.org # # Copyright (C) 2007 Mark Steele # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # use Spread qw(:ERROR :MESS); use Event::Lib; use strict; use POSIX qw(nice setuid setgid); use Digest::MD5 qw(md5_hex); use Sys::Syslog; use File::Path; use File::Basename; use File::Temp qw(:POSIX); use IO::Socket; use Compress::Zlib; use File::Find; use Getopt::Long; $| = 1; my ($DEBUG,$Pool,$USERID,$GROUPID,$SPREAD_PORT,$SPREAD_ADDRESS,$FILESERVER_PORT,$FILESERVER_ADDRESS, $HEARTBEAT_TIMEOUT,$QUEUE_TIMEOUT); GetOptions ("debug" => \$DEBUG, "pool=s" => \$Pool, "userid|u=i" => \$USERID, "groupid|g=i" => \$GROUPID, "spread-port=i" => \$SPREAD_PORT, "spread-address=s" => \$SPREAD_ADDRESS, "fs-port=i" => \$FILESERVER_PORT, "fs-address=s" => \$FILESERVER_ADDRESS) || show_help(); if (!$Pool || !$USERID || !$GROUPID || !$SPREAD_PORT || !$SPREAD_ADDRESS || !$FILESERVER_PORT || !$FILESERVER_ADDRESS) { show_help(); } if ($SPREAD_ADDRESS !~ /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ || $FILESERVER_ADDRESS !~ /^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/) { print "Invalid IP address\n"; show_help(); } setgid($GROUPID) || die; setuid($USERID) || die; umask(0022) || die; ## Just to be on safe side nice(19); $0 = "FREP_ResyncClient($Pool)"; openlog($0, 'cons,pid', 'user'); my ($mbox, $private_group) = Spread::connect({ spread_name => $SPREAD_PORT.'@'.$SPREAD_ADDRESS,private_name => "$$-".int(rand(100)) }); if (!$mbox) { debug_log("couldn't connect to spread"); exit; } else { debug_log("Connected to spread"); } if (Spread::join($mbox, "Resync-$Pool") < 0) { debug_log("couldn't connect to spread"); } else { debug_log("Joined Resync-$Pool"); } open(SPREAD_FH,"<&=$mbox") || die "couldn't open fd: $!"; my $event = event_new(*SPREAD_FH, EV_READ|EV_PERSIST, \&get_spread_messages); $event->add; my %MSGDATA; event_mainloop(); sub get_spread_messages { while(my ($st, $s, $g, $mt, $e, $mess) = Spread::receive($mbox,0)) { return if (!$mess); my ($headers,$body) = parse_message($mess); process_message($headers,$body) if ($headers); } } sub md5_file { my $file = shift; if (!open(FILE,"$file")) { debug_log("couldn't open file for MD5: $!"); return undef; } binmode(FILE); my $digest = Digest::MD5->new->addfile(*FILE)->hexdigest; close(FILE); return $digest; } sub debug_log { return if (!$DEBUG); syslog('debug', '%s', shift); } sub crit_log { syslog('debug', '%s', shift); } sub parse_message { my $mess = shift; return if ($mess !~ /^Command:/); my ($headers,$body,%headers); if ($mess =~ /^(.*?)__DATA__\n(.+?)__DATAEND__$/s) { ## This is a message that contains a data segment ($headers,$body) = ($1,$2); } else { $headers = $mess; } ## Parse headers my @headers = split(/\n/,$headers); for (@headers) { my ($key,$val) = split(/:/,$_); $headers{$key} = $val; } return(\%headers,$body); } sub process_message { my ($headers,$body) = @_; if (defined($MSGDATA{'Cookie'})) { if ($headers->{'Cookie'} != $MSGDATA{'Cookie'}) { debug_log("Cookie doesn't match."); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } if ($MSGDATA{'Chunk'} !~ /^(\d+)$/) { ## Odd, chunks doesn't contain what we expect. Abort. debug_log("Chunks seem messed up, couldn't parse current chunk data"); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } my $curchunk = $1; if ($headers->{Chunk} !~ /^(\d+)$/) { ## Odd, chunks doens't contain what we expect. Abort. debug_log("Chunks weird. Last chunk processed: $curchunk. This chunk couldn't be parsed"); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } my $newchunk = $1; if ($newchunk != ($curchunk + 1)) { ## Out of order. debug_log("Chunks out of order or missing."); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } ## Step 3. Write data to tmpfile. if (!open(FH,">>".$MSGDATA{'FILENAME'})) { debug_log("Error opening temp file: $!"); %MSGDATA = (); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } close(FH); ## Step 4. Last chunk? if ($headers->{LastChunk}) { process_resync($MSGDATA{'FILENAME'},$headers->{FileSig}); %MSGDATA = (); return; } else { ## Update chunk count $MSGDATA{'Chunk'} = $headers->{'Chunk'}; } return; } elsif (!$headers->{Chunk}) { ## Single chunk message if ($body) { my $file = tmpnam() . ".resyncclient"; if (!open(FH,">$file")) { debug_log("Ack couldn't open tmp file: $!"); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); close(FH); unlink($file); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); close(FH); unlink($file); return; } close(FH); process_resync($file,$headers->{FileSig}); } return; } else { ## First chunk of a multi chunk message, or bad chunk if ($headers->{Chunk} == 1) { for (keys %{$headers}) { $MSGDATA{$_} = $headers->{$_}; } $MSGDATA{'FILENAME'} = tmpnam() . ".resyncclient"; if (!open(FH,">".$MSGDATA{'FILENAME'})) { debug_log("Error creating temp file"); %MSGDATA = (); return; } binmode(FH); my $output = uncompress($body); if (!$output) { debug_log("Couldn't uncompress data"); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } if (syswrite(FH,$output) != length($output)) { debug_log("Couldn't write all data to file for some reason"); close(FH); unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } close(FH); if ($headers->{LastChunk}) { process_resync($MSGDATA{'FILENAME'},$headers->{FileSig}); %MSGDATA = (); return; } } else { ## Bad chunk? Scrap whatever chunked file we might have, request files directly. unlink($MSGDATA{'FILENAME'}); %MSGDATA = (); return; } } } sub fetch_file { my ($file,$retries) = @_; if ($retries > 10) { crit_log("Too many retries retrieving $file, info missing aborting."); return; } my ($md5,$length,$mtime) = fetch_file_sig($file); if (!$md5 || !$length || !$mtime) { debug_log("Missing info while attempting to retrieve $file"); fetch_file($file,++$retries); return; } my $sock = IO::Socket::INET->new(PeerAddr => $FILESERVER_ADDRESS, PeerPort => $FILESERVER_PORT, Proto => "tcp"); if (!$sock) { crit_log("Failed to connect to fileserver: $!"); fetch_file($file,++$retries); return; } my $fh = new File::Temp(SUFFIX=> '.resyncclient'); binmode($fh); syswrite($sock, "GET $file\n",length("GET $file\n")); while(sysread($sock, my $buf, 32768) > 0) { syswrite($fh,$buf,length($buf)); } close($fh); close($sock); if (md5_file($fh->filename) ne $md5) { if ($retries > 10) { crit_log("FAIL: signatures don't match for $file... Too many retries"); return; } debug_log("FAIL: signatures didn't match, trying again ($retries)"); fetch_file($file,++$retries); return; } if (!-d File::Basename::dirname($file)) { eval {File::Path::mkpath(File::Basename::dirname($file,0755));}; if ($@) { crit_log("couldn't create path for $file: $!"); return; } } rename($fh->filename,$file); chmod(0644,$file); utime($mtime,$mtime,$file) || warn "couldn't set utime on $file: $!"; debug_log("Fetched $file"); } sub fetch_file_sig { my $file = shift; ## First get the signature/filesize/mtime my $sock; for (1..5) { if ($sock = IO::Socket::INET->new(PeerAddr => $FILESERVER_ADDRESS,PeerPort => $FILESERVER_PORT,Proto => "tcp")) { last; } } if (!$sock) { crit_log("Too many connection attempts to retrieve file signature"); return undef; } my $r = syswrite($sock, "SIG $file\n",length("SIG $file\n")); if ($r != length("SIG $file\n")) { debug_log("Partial write for some reason to retrieve file signature"); ## shouldn't happen, this socket is blocking return undef; } my $sig; while(sysread($sock, my $buf, 32768) > 0) { $sig .= $buf; } if ($sig !~ /^([a-zA-Z0-9]{32,32})\|(\d+)\|(\d+)/) { debug_log("Invalid signature received: $sig"); return(undef) } return($1,$2,$3); } sub process_resync { my ($file,$sig) = @_; debug_log("Starting resync"); if (md5_file($file) ne $sig) { debug_log("File ($file) sig ($sig) doesn't match: " . md5_file($file)); return; } my @fetch; ## Read tempfile and start comparing local filesystem. if (!open(F,"<$file")) { debug_log("Couldn't open tempfile $file: $!"); unlink($file) if (-e $file); return; } my ($CURDIR,%DIRFILES); while() { chomp; if (/^DIR:(.+)$/) { my $newdir = $1; #debug_log("Examining dir $newdir"); if ($CURDIR) { ## We should have something in %DIRFILES. Compare with current folder. if (scalar(keys %DIRFILES) && opendir(DIR,$CURDIR)) { while(my $itm = readdir(DIR)) { next if ($itm =~ /^\.{1,2}$/ || -d "$CURDIR/$itm"); if (!defined($DIRFILES{$itm})) { ## We've got a file locally that the server didn't. Remove local file. debug_log("Deleting $CURDIR/$itm"); unlink("$CURDIR/$itm"); } } closedir(DIR); } } $CURDIR = $newdir; File::Find::finddepth(sub{ rmdir },$CURDIR); ## Remove empty folders %DIRFILES = (); } else { ## This is supposed to be a file, and $CURDIR is supposed to be defined. if (!$CURDIR) { debug_log("ACK NO CURDIR: >>$_<<"); return; } my ($file,$size,$mtime) = split(/:/); $DIRFILES{$file} = 1; if (-s "$CURDIR/$file" != $size || (stat(_))[9] != $mtime) { ## File isn't there, or size doesn't match, or mtime doesn't match. Get the file. push(@fetch,"$CURDIR/$file"); } } } close(F); unlink($file); if (!$CURDIR) { crit_log("ACK NO CURDIR: $CURDIR after traversal"); return; } ## We should have something in %DIRFILES. Compare with current folder. if (opendir(DIR,"$CURDIR")) { while(my $itm = readdir(DIR)) { next if ($itm =~ /^\.{1,2}$/ || -d "$CURDIR/$itm"); if (!defined($DIRFILES{$itm})) { ## We've got a file locally that the server didn't. Remove local file. debug_log("Deleting $CURDIR/$itm"); unlink("$CURDIR/$itm"); } } closedir(DIR); } %DIRFILES = (); File::Find::finddepth(sub{ rmdir },$CURDIR); ## Will remove empty folders for (@fetch) { debug_log("fetching $_ for resync"); fetch_file($_,0); } unlink(); crit_log("Resync finished (".scalar @fetch.")"); } sub show_help { print <<"EOF"; $0 Options: --debug Turn on debugging --pool= Name of server pool --userid= UserID to change to on start-up --groupid= Group id to change to on start-up --spread-port= Spread port (usually 4803) --spread-address= Spread server IP address --fs-port= Port that the file server is running on --fs-address= IP address of the file server EOF exit; } __END__