Committer: gariev
LJSUP-10124: [internal] Make user cluster DB switching dynamicallyU trunk/bin/ljdb D trunk/bin/moveucluster.pl D trunk/bin/moveuclusterd.pl U trunk/cgi-bin/LJ/ConfCheck/General.pm U trunk/cgi-bin/LJ/DBUtil.pm U trunk/cgi-bin/LJ/Entry.pm U trunk/cgi-bin/ljdb.pl
Modified: trunk/bin/ljdb =================================================================== --- trunk/bin/ljdb 2011-10-13 22:08:12 UTC (rev 20309) +++ trunk/bin/ljdb 2011-10-14 00:22:16 UTC (rev 20310) @@ -3,6 +3,7 @@ ## post-svn-commit script test IV use lib "$ENV{LJHOME}/cgi-bin"; +require 'ljlib.pl'; require 'ljdb.pl'; use Getopt::Long; @@ -83,22 +84,8 @@ my ($userid, $cid) = $dbs->selectrow_array("SELECT userid, clusterid FROM user $whereClause", undef, $sqlArg); die "no such user as '$user'\n" unless $userid && $cid; - $role = "cluster" . $cid; - print "user: $user / userid: $userid / clusterid: $cid"; - - if (my $ab = $LJ::CLUSTER_PAIR_ACTIVE{$cid}) { - print " / active=$ab\n"; - if ($inactive) { - $role .= "b" if $ab eq 'a'; - $role .= "a" if $ab eq 'b'; - } else { - $role .= $ab; - } - } else { - # type must be master/slave - $role .= "slave" if $inactive && grep { $_->{role}{"${role}slave"} } values %LJ::DBINFO; - } - print "\n"; + my $u = LJ::load_userid($userid); + $role = ($inactive) ? LJ::get_inactive_role($u) : LJ::master_role($u); } $role ||= "master"; Deleted: trunk/bin/moveucluster.pl =================================================================== --- trunk/bin/moveucluster.pl 2011-10-13 22:08:12 UTC (rev 20309) +++ trunk/bin/moveucluster.pl 2011-10-14 00:22:16 UTC (rev 20310) @@ -1,1123 +0,0 @@ -#!/usr/bin/perl -############################################################################## - -=head1 NAME - -moveucluster.pl - Moves a LiveJournal user between database clusters - -=head1 SYNOPSIS - - $ moveucluster.pl OPTIONS <user> <dest_clusterid> - -=head1 OPTIONS - -=over 4 - -=item -h, --help - -Output a help message and exit. - -=item --verbose[=<n>] - -Verbosity level, 0, 1, or 2. - -=item --verify - -Verify count of copied rows to ensure accuracy (slower) - -=item --ignorebit - -Ignore the move in progress bit (force user move) - -=item --prelocked - -Do not set user readonly and sleep (somebody else did it) - -=item --delete - -Delete data from source cluster when done moving - -=item --destdelete - -Delete data from destination cluster before moving - -=item --expungedel - -The expungedel option is used to indicate that when a user is encountered -with a statusvis of D (deleted journal) and they've been deleted for at -least 31 days, instead of moving their data, mark the user as expunged. - -Further, if you specify the delete and expungedel options at the same time, -if the user is expunged, all of their data will be deleted from the source -cluster. THIS IS IRREVERSIBLE AND YOU WILL NOT BE ASKED FOR CONFIRMATION. - -=item --jobserver=host:port - -Specify a job server to get tasks from. In this mode, no other -arguments are necessary, and moveucluster.pl just runs in a loop -getting directions from the job server. - -=back - -=head1 AUTHOR - -Brad Fitzpatrick E<lt>brad@danga.comE<gt> -Copyright (c) 2002-2004 Danga Interactive. All rights reserved. - -=cut - -############################################################################## - -use strict; -use Getopt::Long; -use Pod::Usage qw{pod2usage}; -use IO::Socket::INET; -use lib "$ENV{'LJHOME'}/cgi-bin/"; -use LJ::TimeUtil; - -# NOTE: these options are used both by Getopt::Long for command-line parsing -# in single user move move, and also set by hand when in --jobserver mode, -# and the jobserver gives us directions, including whether or not users -# are prelocked, need to be source-deleted, verified, etc, etc, etc. -my $opt_del = 0; -my $opt_destdel = 0; -my $opt_verbose = 1; -my $opt_movemaster = 0; -my $opt_prelocked = 0; -my $opt_expungedel = 0; -my $opt_ignorebit = 0; -my $opt_verify = 0; -my $opt_help = 0; -my $opt_jobserver = ""; - -abortWithUsage() unless - GetOptions('delete' => \$opt_del, # from source - 'destdelete' => \$opt_destdel, # from dest (if exists, before moving) - 'verbose=i' => \$opt_verbose, - 'movemaster|mm' => \$opt_movemaster, # use separate dedicated source - 'prelocked' => \$opt_prelocked, # don't do own locking; master does (harness, ljumover) - 'expungedel' => \$opt_expungedel, # mark as expunged if possible (+del to delete) - 'ignorebit' => \$opt_ignorebit, # ignore move in progress bit cap (force) - 'verify' => \$opt_verify, # slow verification pass (just for debug) - 'jobserver=s' => \$opt_jobserver, - 'help' => \$opt_help, - ); -my $optv = $opt_verbose; - -my $dbo; # original cluster db handle. (may be a movemaster (a slave)) -my $dboa; # the actual master handle, which we delete from if deleting from source - -abortWithUsage() if $opt_help; - -if ($opt_jobserver) { - multiMove(); -} else { - singleMove(); -} - -sub multiMove { - # the job server can keep giving us new jobs to move (or a stop command) - # over and over, so we avoid perl exec times - require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl"; - - my $sock; - ITER: - while (1) { - if ($sock && $sock->connected) { - my $pipe = 0; - local $SIG{PIPE} = sub { $pipe = 1; }; - - LJ::start_request(); - my $dbh = get_validated_role_dbh("master"); - unless ($dbh) { - print " master db unavailable\n"; - sleep 2; - next ITER; - } - - my $rv = $sock->write("get_job\r\n"); - - if ($pipe || ! $rv) { - $sock = undef; - sleep 1; - next ITER; - } - my $line = <$sock>; - unless ($line) { - $sock = undef; - sleep 1; - next ITER; - } - - if ($line =~ /^OK IDLE/) { - print "Idling.\n"; - sleep 5; - next ITER; - } elsif ($line =~ /^OK JOB (\d+):(\d+):(\d+)\s+([\d.]+)(?:\s+([\w= ]+))?\r?\n/) { - my ($uid, $srcid, $dstid, $locktime) = ($1, $2, $3, $4); - my $opts = parseOpts($5); - - print "Got a job: $uid:$srcid:$dstid, locked for=$locktime, opts: [", - join(", ", map { "$_=$opts->{$_}" } grep { $opts->{$_} } keys %$opts), - "]\n"; - - my $u = LJ::load_userid($uid, "force"); - - next ITER unless $u; - next ITER unless $u->{clusterid} == $srcid; - - my $verify = sub { - my $pipe = 0; - local $SIG{PIPE} = sub { $pipe = 1; }; - my $rv = $sock->write("finish $uid:$srcid:$dstid\r\n"); - return 0 unless $rv; - my $res = <$sock>; - return $res =~ /^OK/ ? 1 : 0; - }; - - # If the user is supposed to be prelocked, but the lock didn't - # happen more than 3 seconds ago, wait until it has time to - # "settle" and then move the user - if ( $opts->{prelocked} && $locktime < 3 ) { - sleep 3 - $locktime; - } - - my $rv = eval { moveUser($dbh, $u, $dstid, $verify, $opts); }; - if ($rv) { - print "moveUser($u->{user}/$u->{userid}) = 1\n"; - } else { - print "moveUser($u->{user}/$u->{userid}) = fail: $@\n"; - } - LJ::end_request(); - LJ::disconnect_dbs(); # end_request could do this, but we want to force it - } else { - die "Unknown response from server: $line\n"; - } - } else { - print "Need job server sock...\n"; - $sock = IO::Socket::INET->new(PeerAddr => $opt_jobserver, - Proto => 'tcp', ); - unless ($sock) { - print " failed.\n"; - sleep 1; - next ITER; - } - my $ready = <$sock>; - if ($ready =~ /Ready/) { - print "Connected.\n"; - } else { - print "Bogus greeting.\n"; - $sock = undef; - sleep 1; - next ITER; - } - } - - } -} - -### Parse options from job specs into a hashref -sub parseOpts { - my $raw = shift || ""; - my $opts = {}; - - while ( $raw =~ m{\s*(\w+)=(\w+)}g ) { - $opts->{ $1 } = $2; - } - - foreach my $opt (qw(del destdel movemaster prelocked - expungedel ignorebit verify)) { - next if defined $opts->{$opt}; - $opts->{$opt} = eval "\$opt_$opt"; - } - - # Have the same delete behavior despite of how the input delete parameter is specified: by 'delete=1' or by 'del=1' - $opts->{del} = $opts->{'delete'} if defined $opts->{'delete'} and not $opts->{del}; - - return $opts; -} - - -sub singleMove { - my $user = shift @ARGV; - my $dclust = shift @ARGV; - $dclust = 0 if !defined $dclust && $opt_expungedel; - - # check arguments - abortWithUsage() unless defined $user && defined $dclust; - - require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl"; - - $user = LJ::canonical_username($user); - abortWithUsage("Invalid username") unless length($user); - - my $dbh = get_validated_role_dbh("master"); - die "No master db available.\n" unless $dbh; - - my $u = LJ::load_user($user, "force"); - - my $opts = parseOpts(""); # gets command-line opts - my $rv = eval { moveUser($dbh, $u, $dclust, undef, $opts); }; - - if ($rv) { - print "Moved '$user' to cluster $dclust.\n"; - exit 0; - } - if ($@) { - die "Failed to move '$user' to cluster $dclust: $@\n"; - } - - print "ERROR: move failed.\n"; - exit 1; -} - -sub get_validated_cluster_dbh { - my $arg = shift; - - my $clusterid = $arg; - if (LJ::isu($arg)) { - $clusterid = $arg->clusterid; - } - - # revalidate any db that is found in cache - $LJ::DBIRole->clear_req_cache; - - # get the destination DB handle, with a long timeout - my $dbch = LJ::get_cluster_master({raw=>1}, $clusterid); - die "Undefined or down cluster \#$clusterid\n" unless $dbch; - - # make sure any error is a fatal error. no silent mistakes. - $dbch->{'RaiseError'} = 1; - - $dbch->do("SET wait_timeout=28800"); - - return $dbch; -} - -sub get_validated_role_dbh { - my $role = shift; - - # revalidate any db that is found in cache - $LJ::DBIRole->clear_req_cache; - - my $db = LJ::get_dbh({raw=>1}, $role); - report_error_ops() unless $db; - die "Couldn't get handle for role: $role" unless $db; - - # make sure any error is a fatal error. no silent mistakes. - $db->{'RaiseError'} = 1; - - $db->do("SET wait_timeout=28800"); - - return $db; -} - -# lets inform ops that the script has stopped at least. -sub report_error_ops { - my $subject = 'Moveucluster.pl Script Failure'; - my $to = 'sf-ops@livejournalinc.com'; - - open (MAIL, "|/usr/bin/mail -s \"$subject\" $to"); - print MAIL "moveucluster.pl failed, please login and restart the job"; - close (MAIL); -} - -# some might call this $dboa -sub get_definitive_source_dbh { - my $u = shift; - - # the actual master handle, which we delete from if deleting from source - my $db = get_validated_cluster_dbh($u); - die "Can't get source cluster handle.\n" unless $db; - - return $db; -} - -# some might call this $dbo -sub get_move_source_dbh { - my $u = shift; - - # $opt_movemaster comes from GetOpt when this script is called... - # Generally it's accessed as $opts->{movemaster}, but that's because - # it's in a hashref to pass to moveUser. In any case, the definitive - # value is in $opt_movemaster - if ($opt_movemaster) { - # if an a/b cluster, the movemaster (the source for moving) is - # the opposite side. if not a/b, then look for a special "movemaster" - # role for that clusterid - my $mm_role = "cluster$u->{clusterid}"; - my $ab = lc($LJ::CLUSTER_PAIR_ACTIVE{$u->{clusterid}}); - if ($ab eq "a") { - $mm_role .= "b"; - } elsif ($ab eq "b") { - $mm_role .= "a"; - } else { - $mm_role .= "movemaster"; - } - - my $db = get_validated_role_dbh($mm_role); - - my $ss = $db->selectrow_hashref("show slave status"); - die "Move master not a slave?" unless $ss; - - return $db; - } - - # otherwise use the definitive source - return get_definitive_source_dbh($u); -} - -sub moveUser { - my ($dbh, $u, $dclust, $verify_code, $opts) = @_; - die "Non-existent db.\n" unless $dbh; - die "Non-existent user.\n" unless $u && $u->{userid}; - - my $user = $u->{user}; - my $userid = $u->{userid}; - - # get lock - die "Failed to get move lock.\n" - unless $dbh->selectrow_array("SELECT GET_LOCK('moveucluster-$u->{userid}', 5)"); - - # we can't move to the same cluster - my $sclust = $u->{'clusterid'}; - if ($sclust == $dclust) { - die "User '$user' is already on cluster $dclust\n"; - } - - # we don't support "cluster 0" (the really old format) - die "This mover tool doesn't support moving from cluster 0.\n" unless $sclust; - die "Can't move back to legacy cluster 0\n" unless $dclust || $opts->{expungedel}; - - # for every DB handle we touch, make a signature of a sorted - # comma-delimited signature onto this list. likewise with the - # list of tables this mover script knows about. if ANY signature - # in this list isn't identical, we just abort. perhaps this - # script wasn't updated, or a long-running mover job wasn't - # restarted and new tables were added to the schema. - my @alltables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL); - my $mover_sig = join(",", sort @alltables); - - my $get_sig = sub { - my $hnd = shift; - return join(",", sort - @{ $hnd->selectcol_arrayref("SHOW TABLES") }); - }; - - my $global_sig = $get_sig->($dbh); - - my $check_sig = sub { - my $hnd = shift; - my $name = shift; - - # no signature checks on expunges - return if ! $hnd && $opts->{expungedel}; - - my $sig = $get_sig->($hnd); - - # special case: signature can be that of the global - return if $sig eq $global_sig; - - if ($sig ne $mover_sig) { - my %sigt = map { $_ => 1 } split(/,/, $sig); - my @err; - foreach my $tbl (@alltables) { - unless ($sigt{$tbl}) { - # missing a table the mover knows about - push @err, "-$tbl"; - next; - } - delete $sigt{$tbl}; - } - foreach my $tbl (sort keys %sigt) { - push @err, "?$tbl"; - } - if (@err) { - die "Table signature for $name doesn't match! Stopping. [@err]\n"; - } - } - }; - - # if we want to delete the user, we don't need a destination cluster, so only get - # one if we have a real valid destination cluster - my $dbch; - if ($dclust) { - $dbch = get_validated_cluster_dbh($dclust); - } - - # this is okay to call even if ! $dclust above - $check_sig->($dbch, "dbch(database dst)"); - - # get a definitive source handle where deletes should happen in - # cases of sourcedel, etc - $dboa = get_definitive_source_dbh($u); - $check_sig->($dboa, "dboa(database src)"); - - # get a source handle to move from, which is not necessarily a - # definitive copy of the source data... it could just be a - # movemaster slave - $dbo = get_move_source_dbh($u); - $check_sig->($dbo, "dbo(movemaster)"); - - # load the info on how we'll move each table. this might die (if new tables - # with bizarre layouts are added which this thing can't auto-detect) so want - # to do it early. - my $tinfo; # hashref of $table -> { - # 'idx' => $index_name # which we'll be using to iterate over - # 'idxcol' => $col_name # first part of index - # 'cols' => [ $col1, $col2, ] - # 'pripos' => $idxcol_pos, # what field in 'cols' is $col_name - # 'verifykey' => $col # key used in the debug --verify pass - # } - $tinfo = fetchTableInfo(); - - # see hack below - my $prop_icon = LJ::get_prop("talk", "subjecticon"); - my %rows_skipped; # $tablename -> $skipped_rows_count - - # find readonly cap class, complain if not found - my $readonly_bit = undef; - foreach (keys %LJ::CAP) { - if ($LJ::CAP{$_}->{'_name'} eq "_moveinprogress" && - $LJ::CAP{$_}->{'readonly'} == 1) { - $readonly_bit = $_; - last; - } - } - unless (defined $readonly_bit) { - die "Won't move user without %LJ::CAP capability class named '_moveinprogress' with readonly => 1\n"; - } - - # make sure a move isn't already in progress - if ($opts->{prelocked}) { - unless (($u->{'caps'}+0) & (1 << $readonly_bit)) { - die "User '$user' should have been prelocked.\n"; - } - } else { - if (($u->{'caps'}+0) & (1 << $readonly_bit)) { - die "User '$user' is already in the process of being moved? (cap bit $readonly_bit set)\n" - unless $opts->{ignorebit}; - } - } - - if ($opts->{expungedel} && $u->{'statusvis'} eq "D" && - LJ::TimeUtil->mysqldate_to_time($u->{'statusvisdate'}) < time() - 86400*31) { - - print "Expunging user '$u->{'user'}'\n"; - $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart, timedone) ". - "VALUES (?,?,?,UNIX_TIMESTAMP(),UNIX_TIMESTAMP())", undef, - $userid, $sclust, 0); - - LJ::update_user($userid, { clusterid => 0, - statusvis => 'X', - raw => "caps=caps&~(1<<$readonly_bit), statusvisdate=NOW()" }) - or die "Couldn't update user to expunged"; - - # note that we've expunged this user in the "expunged_users" db table - $dbh->do("REPLACE INTO expunged_users SET userid=?, user=?, expunge_time=UNIX_TIMESTAMP()", - undef, $u->{userid}, $u->{user}); - - # now delete all content from user cluster for this user - if ($opts->{del}) { - print "Deleting expungeable user data...\n" if $optv; - - # figure out if they have any S1 styles - my $styleids = $dboa->selectcol_arrayref("SELECT styleid FROM s1style WHERE userid = $userid"); - - $dbh->do("DELETE FROM domains WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM email_aliases WHERE alias = ?", - undef, "$u->{user}\@$LJ::USER_DOMAIN"); - $dbh->do("DELETE FROM userinterests WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM comminterests WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM syndicated WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM supportnotify WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM reluser WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM smsusermap WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM friends WHERE userid = ?", undef, $u->id); - $dbh->do("DELETE FROM phonepostlogin WHERE userid = ?", undef, $u->id); - - # no need for other users to ban this user any more - while ($dbh->do("DELETE FROM reluser WHERE targetid = ? AND type = 'B' LIMIT 1000", undef, $u->id) > 0) { - print " deleted bans from reluser\n" if $optv; - } - - # now delete from the main tables - foreach my $table (keys %$tinfo) { - my $pri = $tinfo->{$table}->{idxcol}; - while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) { - print " deleted from $table\n" if $optv; - } - } - - # and from the s1stylecache table - if (@$styleids) { - my $styleids_in = join(",", map { $dboa->quote($_) } @$styleids); - if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) { - print " deleted from s1stylecache\n" if $optv; - } - } - - $dboa->do("DELETE FROM clustertrack2 WHERE userid=?", undef, $userid); - } - - # fire event noting this user was expunged - if (eval "use LJ::Event::UserExpunged; 1;") { - LJ::Event::UserExpunged->new($u)->fire; - } else { - die "Could not load module LJ::Event::UserExpunged: $@"; - } - LJ::run_hooks('purged_user', $u); - - return 1; - } - - # if we get to this point we have to enforce that there's a destination cluster, because - # apparently the user failed the expunge test - if (!defined $dclust || !defined $dbch) { - die "User is not eligible for expunging.\n" if $opts->{expungedel}; - } - - - # returns state string, with a/b, readonly, and flush states. - # string looks like: - # "src(34)=a,dst(42)=b,readonly(34)=0,readonly(42)=0,src_flushes=32 - # because if: - # src a/b changes: lose readonly lock? - # dst a/b changes: suspect. did one side crash? was other side caught up? - # read-only changes: signals maintenance - # flush counts change: causes HANDLER on src to lose state and reset - my $stateString = sub { - my $post = shift; # false for before, true for "after", which forces a config reload - - if ($post) { - LJ::Config->reload; - } - - my @s; - push @s, "src($sclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$sclust}; - push @s, "dst($dclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$dclust}; - push @s, "readonly($sclust)=" . ($LJ::READONLY_CLUSTER{$sclust} ? 1 : 0); - push @s, "readonly($dclust)=" . ($LJ::READONLY_CLUSTER{$dclust} ? 1 : 0); - - my $flushes = 0; - my $sth = $dbo->prepare("SHOW STATUS LIKE '%flush%'"); - $sth->execute; - while (my $r = $sth->fetchrow_hashref) { - $flushes += $r->{Value} if $r->{Variable_name} =~ /^Com_flush|Flush_commands$/; - } - push @s, "src_flushes=" . $flushes; - - return join(",", @s); - }; - - print "Moving '$u->{'user'}' from cluster $sclust to $dclust\n" if $optv >= 1; - my $pre_state = $stateString->(); - - # mark that we're starting the move - $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart) ". - "VALUES (?,?,?,UNIX_TIMESTAMP())", undef, $userid, $sclust, $dclust); - my $cmid = $dbh->{'mysql_insertid'}; - - # set readonly cap bit on user - unless ($opts->{prelocked} || - LJ::update_user($userid, { raw => "caps=caps|(1<<$readonly_bit)" })) - { - die "Failed to set readonly bit on user: $user\n"; - } - $dbh->do("SELECT RELEASE_LOCK('moveucluster-$u->{userid}')"); - - unless ($opts->{prelocked}) { - # wait a bit for writes to stop if journal is somewhat active (last week update) - my $secidle = $dbh->selectrow_array("SELECT UNIX_TIMESTAMP()-UNIX_TIMESTAMP(timeupdate) ". - "FROM userusage WHERE userid=$userid"); - if ($secidle) { - sleep(2) unless $secidle > 86400*7; - sleep(1) unless $secidle > 86400; - } - } - - if ($opts->{movemaster}) { - my $diff = 999_999; - my $tolerance = 50_000; - while ($diff > $tolerance) { - my $ss = $dbo->selectrow_hashref("show slave status"); - if ($ss->{'Slave_IO_Running'} eq "Yes" && $ss->{'Slave_SQL_Running'} eq "Yes") { - if ($ss->{'Master_Log_File'} eq $ss->{'Relay_Master_Log_File'}) { - $diff = $ss->{'Read_Master_Log_Pos'} - $ss->{'Exec_master_log_pos'}; - print " diff: $diff\n" if $optv >= 1; - sleep 1 if $diff > $tolerance; - } else { - print " (Wrong log file): $ss->{'Relay_Master_Log_File'}($ss->{'Exec_master_log_pos'}) not $ss->{'Master_Log_File'}($ss->{'Read_Master_Log_Pos'})\n" if $optv >= 1; - } - } else { - die "Movemaster slave not running"; - } - } - } - - print "Moving away from cluster $sclust\n" if $optv; - - my %cmd_done; # cmd_name -> 1 - while (my $cmd = $dboa->selectrow_array("SELECT cmd FROM cmdbuffer WHERE journalid=$userid")) { - die "Already flushed cmdbuffer job '$cmd' -- it didn't take?\n" if $cmd_done{$cmd}++; - print "Flushing cmdbuffer for cmd: $cmd\n" if $optv > 1; - require "$ENV{'LJHOME'}/cgi-bin/ljcmdbuffer.pl"; - LJ::Cmdbuffer::flush($dbh, $dboa, $cmd, $userid); - } - - # setup dependencies (we can skip work by not checking a table if we know - # its dependent table was empty). then we have to order things so deps get - # processed first. - my %was_empty; # $table -> bool, table was found empty - my %dep = ( - "logtext2" => "log2", - "logprop2" => "log2", - "logsec2" => "log2", - "talkprop2" => "talk2", - "talktext2" => "talk2", - "phoneposttrans" => "phonepostentry", # FIXME: ljcom - "modblob" => "modlog", - "sessions_data" => "sessions", - "memkeyword2" => "memorable2", - "userpicmap2" => "userpic2", - "logtagsrecent" => "usertags", - "logtags" => "usertags", - "logkwsum" => "usertags", - ); - - # all tables we could be moving. we need to sort them in - # order so that we check dependant tables first - my @tables; - push @tables, grep { ! $dep{$_} } @alltables; - push @tables, grep { $dep{$_} } @alltables; - - # these are ephemeral or handled elsewhere - my %skip_table = ( - "cmdbuffer" => 1, # pre-flushed - "events" => 1, # handled by qbufferd (not yet used) - "s1stylecache" => 1, # will be recreated - "captcha_session" => 1, # temporary - "tempanonips" => 1, # temporary ip storage for spam reports - "recentactions" => 1, # pre-flushed by clean_caches - "pendcomments" => 1, # don't need to copy these - "active_user" => 1, # don't need to copy these - "random_user_set" => 1, # " - ); - - $skip_table{'inviterecv'} = 1 if $u->{journaltype} ne 'P'; # non-person, skip invites received - $skip_table{'invitesent'} = 1 if $u->{journaltype} ne 'C'; # not community, skip invites sent - - # we had a concern at the time of writing this dependency optization - # that we might use "log3" and "talk3" tables in the future with the - # old talktext2/etc tables. if that happens and we forget about this, - # this code will trip it up and make us remember: - if (grep { $_ eq "log3" || $_ eq "talk3" } @tables) { - die "This script needs updating.\n"; - } - - # - # NOTE: this is the start of long reads from the largest user tables! - # - # db handles used during this block are: - # $dbo -- validated source cluster handle for reading - # $dbch -- validated destination cluster handle - # - - # check if dest has existing data for this user. (but only check a few key tables) - # if anything else happens to have data, we'll just fail later. but unlikely. - print "Checking for existing data on target cluster...\n" if $optv > 1; - foreach my $table (qw(userbio talkleft log2 talk2 sessions userproplite2)) { - my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; - - eval { $dbch->do("HANDLER $table OPEN"); }; - if ($@) { - die "This mover currently only works on MySQL 4.x and above.\n" . - $@; - } - - my $idx = $ti->{idx}; - my $is_there = $dbch->selectrow_array("HANDLER $table READ `$idx` = ($userid) LIMIT 1"); - $dbch->do("HANDLER $table CLOSE"); - next unless $is_there; - - if ($opts->{destdel}) { - foreach my $table (@tables) { - # these are ephemeral or handled elsewhere - next if $skip_table{$table}; - my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; - my $pri = $ti->{idxcol}; - while ($dbch->do("DELETE FROM $table WHERE $pri=$userid LIMIT 500") > 0) { - print " deleted from $table\n" if $optv; - } - } - last; - } else { - die " Existing data on destination cluster\n"; - } - } - - # start copying from source to dest. - my $rows = 0; - my @to_delete; # array of [ $table, $prikey ] - my @styleids; # to delete, potentially - - foreach my $table (@tables) { - next if $skip_table{$table}; - - # people accounts don't have moderated posts - next if $u->{'journaltype'} eq "P" && ($table eq "modlog" || - $table eq "modblob"); - - # don't waste time looking at dependent tables with empty parents - next if $dep{$table} && $was_empty{$dep{$table}}; - - my $ti = $tinfo->{$table} or die "No table info for $table. Aborting."; - my $idx = $ti->{idx}; - my $idxcol = $ti->{idxcol}; - my $cols = $ti->{cols}; - my $pripos = $ti->{pripos}; - - # if we're going to be doing a verify operation later anyway, let's do it - # now, so we can use the knowledge of rows per table to hint our $batch_size - my $expected_rows = undef; - my $expected_remain = undef; # expected rows remaining (unread) - my $verifykey = $ti->{verifykey}; - my %pre; - - if ($opts->{verify} && $verifykey) { - $expected_rows = 0; - if ($table eq "dudata" || $table eq "ratelog") { - $expected_rows = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); - } else { - my $sth; - $sth = $dbo->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid"); - $sth->execute; - while (my @ar = $sth->fetchrow_array) { - $_ = join(",",@ar); - $pre{$_} = 1; - $expected_rows++; - } - } - - # no need to continue with tables that don't have any data - unless ($expected_rows) { - $was_empty{$table} = 1; - next; - } - - $expected_remain = $expected_rows; - } - - eval { $dbo->do("HANDLER $table OPEN"); }; - if ($@) { - die "This mover currently only works on MySQL 4.x and above.\n". - $@; - } - - my $tct = 0; # total rows read for this table so far. - my $hit_otheruser = 0; # bool, set to true when we encounter data from a different userid - my $batch_size; # how big of a LIMIT we'll be doing - my $ct = 0; # rows read in latest batch - my $did_start = 0; # bool, if process has started yet (used to enter loop, and control initial HANDLER commands) - my $pushed_delete = 0; # bool, if we've pushed this table on the delete list (once we find it has something) - - my $sqlins = ""; - my $sqlvals = 0; - my $flush = sub { - return unless $sqlins; - print "# Flushing $table ($sqlvals recs, ", length($sqlins), " bytes)\n" if $optv; - $dbch->do($sqlins); - $sqlins = ""; - $sqlvals = 0; - }; - - my $insert = sub { - my $r = shift; - - # there was an old bug where we'd populate in the database - # the choice of "none" for comment subject icon, instead of - # just storing nothing. this hack prevents migrating those. - if ($table eq "talkprop2" && - $r->[2] == $prop_icon->{id} && - $r->[3] eq "none") { - $rows_skipped{"talkprop2"}++; - return; - } - - # now that we know it has something to delete (many tables are empty for users) - unless ($pushed_delete++) { - push @to_delete, [ $table, $idxcol ]; - } - - if ($sqlins) { - $sqlins .= ", "; - } else { - $sqlins = "INSERT INTO $table (" . join(', ', @{$cols}) . ") VALUES "; - } - $sqlins .= "(" . join(", ", map { $dbo->quote($_) } @$r) . ")"; - - $sqlvals++; - $flush->() if $sqlvals > 5000 || length($sqlins) > 800_000; - }; - - # let tables perform extra processing on the $r before it's - # sent off for inserting. - my $magic; - - # we know how to compress these two tables (currently the only two) - if ($table eq "logtext2" || $table eq "talktext2") { - $magic = sub { - my $r = shift; - return unless length($r->[3]) > 200; - LJ::text_compress(\$r->[3]); - }; - } - if ($table eq "s1style") { - $magic = sub { - my $r = shift; - push @styleids, $r->[0]; - }; - } - - # calculate the biggest batch size that can reasonably fit in memory - my $max_batch = 10000; - $max_batch = 1000 if $table eq "logtext2" || $table eq "talktext2"; - - while (! $hit_otheruser && ($ct == $batch_size || ! $did_start)) { - my $qry; - if ($did_start) { - # once we've done the initial big read, we want to walk slowly, because - # a LIMIT of 1000 will read 1000 rows, regardless, which may be 995 - # seeks into somebody else's journal that we don't care about. - # on the other hand, if we did a --verify check above, we have a good - # idea what to expect still, so we'll use that instead of just 25 rows. - $batch_size = $expected_remain > 0 ? $expected_remain + 1 : 25; - if ($batch_size > $max_batch) { $batch_size = $max_batch; } - $expected_remain -= $batch_size; - - $qry = "HANDLER $table READ `$idx` NEXT LIMIT $batch_size"; - } else { - # when we're first starting out, though, let's LIMIT as high as possible, - # since MySQL (with InnoDB only?) will only return rows matching the primary key, - # so we'll try as big as possible. but not with myisam -- need to start - # small there too, unless we have a guess at the number of rows remaining. - - my $src_is_innodb = 0; # FIXME: detect this. but first verify HANDLER differences. - if ($src_is_innodb) { - $batch_size = $max_batch; - } else { - # MyISAM's HANDLER behavior seems to be different. - # it always returns batch_size, so we keep it - # small to avoid seeks, even on the first query - # (where InnoDB differs and stops when primary key - # doesn't match) - $batch_size = 25; - if ($table eq "clustertrack2" || $table eq "userbio" || - $table eq "s1usercache" || $table eq "s1overrides") { - # we know these only have 1 row, so 2 will be enough to show - # in one pass that we're done. - $batch_size = 2; - } elsif (defined $expected_rows) { - # if we know how many rows remain, let's try to use that (+1 to stop it) - $batch_size = $expected_rows + 1; - if ($batch_size > $max_batch) { $batch_size = $max_batch; } - $expected_remain -= $batch_size; - } - } - - $qry = "HANDLER $table READ `$idx` = ($userid) LIMIT $batch_size"; - $did_start = 1; - } - - my $sth = $dbo->prepare($qry); - $sth->execute; - - $ct = 0; - while (my $r = $sth->fetchrow_arrayref) { - if ($r->[$pripos] != $userid) { - $hit_otheruser = 1; - last; - } - $magic->($r) if $magic; - $insert->($r); - $tct++; - $ct++; - } - } - $flush->(); - - $dbo->do("HANDLER $table CLOSE"); - - # verify the important tables, even if --verify is off. - if (! $opts->{verify} && $table =~ /^(talk|log)(2|text2)$/) { - my $dblcheck = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); - die "# Expecting: $dblcheck, but got $tct\n" unless $dblcheck == $tct; - } - - if ($opts->{verify} && $verifykey) { - if ($table eq "dudata" || $table eq "ratelog") { - print "# Verifying $table on size\n"; - my $post = $dbch->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid"); - die "Moved sized is smaller" if $post < $expected_rows; - } else { - print "# Verifying $table on key $verifykey\n"; - my %post; - my $sth; - - $sth = $dbch->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid"); - $sth->execute; - while (my @ar = $sth->fetchrow_array) { - $_ = join(",",@ar); - unless (delete $pre{$_}) { - die "Mystery row showed up in $table: uid=$userid, $verifykey=$_"; - } - } - my $count = scalar keys %pre; - die "Rows not moved for uid=$userid, table=$table. unmoved count = $count" - if $count && $count != $rows_skipped{$table}; - } - } - - $was_empty{$table} = 1 unless $tct; - $rows += $tct; - } - - print "# Rows done for '$user': $rows\n" if $optv; - - # - # NOTE: we've just finished moving a bunch of rows form $dbo to $dbch, - # which could have potentially been a very slow process since the - # time for the copy is directly proportional to the data a user - # had to move. We'll revalidate handles now to ensure that they - # haven't died due to (insert eleventy billion circumstances here). - # - - $dbh = get_validated_role_dbh("master"); - $dboa = get_definitive_source_dbh($u); - $dbo = get_move_source_dbh($u); - - # db handles should be good to go now - - my $post_state = $stateString->("post"); - if ($post_state ne $pre_state) { - die "Move aborted due to state change during move: Before: [$pre_state], After: [$post_state]\n"; - } - $check_sig->($dbo, "dbo(aftermove)"); - - my $unlocked; - if (! $verify_code || $verify_code->()) { - # unset readonly and move to new cluster in one update - $unlocked = LJ::update_user($userid, { clusterid => $dclust, raw => "caps=caps&~(1<<$readonly_bit)" }); - print "Moved.\n" if $optv; - } else { - # job server went away or we don't have permission to flip the clusterid attribute - # so just unlock them - $unlocked = LJ::update_user($userid, { raw => "caps=caps&~(1<<$readonly_bit)" }); - die "Job server said no.\n"; - } - - # delete from the index of who's read-only. if this fails we don't really care - # (not all sites might have this table anyway) because it's not used by anything - # except the readonly-cleaner which can deal with all cases. - if ($unlocked) { - eval { - $dbh->do("DELETE FROM readonly_user WHERE userid=?", undef, $userid); - }; - } - - # delete from source cluster - if ($opts->{del}) { - print "Deleting from source cluster...\n" if $optv; - foreach my $td (@to_delete) { - my ($table, $pri) = @$td; - while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) { - print " deleted from $table\n" if $optv; - } - } - - # s1stylecache table - if (@styleids) { - my $styleids_in = join(",", map { $dboa->quote($_) } @styleids); - if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) { - print " deleted from s1stylecache\n" if $optv; - } - } - } else { - # at minimum, we delete the clustertrack2 row so it doesn't get - # included in a future ljumover.pl query from that cluster. - $dboa->do("DELETE FROM clustertrack2 WHERE userid=$userid"); - } - - $dbh->do("UPDATE clustermove SET sdeleted=?, timedone=UNIX_TIMESTAMP() ". - "WHERE cmid=?", undef, $opts->{del} ? 1 : 0, $cmid); - - return 1; -} - -sub fetchTableInfo -{ - my @tables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL); - my $memkey = "moveucluster:" . Digest::MD5::md5_hex(join(",",@tables)); - my $tinfo = LJ::MemCache::get($memkey) || {}; - foreach my $table (@tables) { - next if grep { $_ eq $table } qw(events s1stylecache cmdbuffer captcha_session recentactions pendcomments active_user random_user_set); - next if $tinfo->{$table}; # no need to load this one - - # find the index we'll use - my $idx; # the index name we'll be using - my $idxcol; # "userid" or "journalid" - - my $sth = $dbo->prepare("SHOW INDEX FROM $table"); - $sth->execute; - my @pris; - - while (my $r = $sth->fetchrow_hashref) { - push @pris, $r->{'Column_name'} if $r->{'Key_name'} eq "PRIMARY"; - next unless $r->{'Seq_in_index'} == 1; - next if $idx; - if ($r->{'Column_name'} eq "journalid" || - $r->{'Column_name'} eq "userid" || - $r->{'Column_name'} eq "uid" || - $r->{'Column_name'} eq "commid") { - $idx = $r->{'Key_name'}; - $idxcol = $r->{'Column_name'}; - } - } - - shift @pris if @pris && ($pris[0] eq "journalid" || $pris[0] eq "userid"); - my $verifykey = join(",", @pris); - - die "can't find index for table $table\n" unless $idx; - - $tinfo->{$table}{idx} = $idx; - $tinfo->{$table}{idxcol} = $idxcol; - $tinfo->{$table}{verifykey} = $verifykey; - - my $cols = $tinfo->{$table}{cols} = []; - my $colnum = 0; - $sth = $dboa->prepare("DESCRIBE $table"); - $sth->execute; - while (my $r = $sth->fetchrow_hashref) { - push @$cols, $r->{'Field'}; - if ($r->{'Field'} eq $idxcol) { - $tinfo->{$table}{pripos} = $colnum; - } - $colnum++; - } - } - LJ::MemCache::set($memkey, $tinfo, 90); # not for long, but quick enough to speed a series of moves - return $tinfo; -} - -### FUNCTION: abortWithUsage( $message ) -### Abort the program showing usage message. -sub abortWithUsage { - my $msg = join '', @_; - - if ( $msg ) { - pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" ); - } else { - pod2usage( -verbose => 1, -exitval => 1 ); - } -} - Deleted: trunk/bin/moveuclusterd.pl =================================================================== --- trunk/bin/moveuclusterd.pl 2011-10-13 22:08:12 UTC (rev 20309) +++ trunk/bin/moveuclusterd.pl 2011-10-14 00:22:16 UTC (rev 20310) @@ -1,2751 +0,0 @@ -#!/usr/bin/perl -############################################################################## - -=head1 NAME - -moveuclusterd - User-mover task coordinater daemon - -=head1 SYNOPSIS - - $ moveuclusterd OPTIONS - -=head2 OPTIONS - -=over 4 - -=item -d, --debug - -Output debugging information in addition to normal progress messages. May be -specified more than once to increase debug level. - -=item -D, --daemon - -Background the program. - -=item -h, --help - -Output a help message and exit. - -=item -H, --host=HOST - -Listen on the specified I<HOST> instead of the default '0.0.0.0'. - -=item -m, --maxlocktime=SECONDS - -Set the number of seconds that is targeted as the timespan to keep jobs locked -before assigning them. If the oldest job in a cluster's queue is older than this -value (120 by default), no users will be locked for that queue until the next -check. - -=item -p, --port=PORT - -Listen to the given I<PORT> instead of the default 2789. - -=item -r, --defaultrate=INTEGER - -Set the default rate limit for any source cluster which has not had its rate set -to I<INTEGER>. The default rate is 1. - -=item -s, --lockscale=INTEGER - -Set the lock-scaling factor to I<INTEGER>. The lock scaling factor is used to -decide how many users to lock per source cluster; a scaling factor of C<3> (the -default) would cause the jobserver to try to maintain 3 x the number of jobs as -there are allowed connections for a given cluster, modulo the C<maxlocktime>. - -=item -v, --verbose - -Output the jobserver's log to STDERR. - -=back - -=head1 REQUIRES - -I<Token requires line> - -=head1 DESCRIPTION - -None yet. - -=head1 AUTHOR - -Michael Granger E<lt>ged@danga.comE<gt> - -Copyright (c) 2004 Danga Interactive. All rights reserved. - -This module is free software. You may use, modify, and/or redistribute this -software under the terms of the Perl Artistic License. (See -http://language.perl.com/misc/Artistic.html) - -=cut - -############################################################################## -package moveuclusterd; -use strict; -use warnings qw{all}; - - -############################################################################### -### I N I T I A L I Z A T I O N -############################################################################### -BEGIN { - - # Turn STDOUT buffering off - $| = 1; - - # Versioning stuff and custom includes - use vars qw{$VERSION $RCSID}; - $VERSION = do { my @r = (q$Revision$ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r }; - $RCSID = q$Id$; - - # Define some constants - use constant TRUE => 1; - use constant FALSE => 0; - - use lib "$ENV{LJHOME}/cgi-bin"; - require "ljlib.pl"; - - # Modules - use Carp qw{croak confess}; - use Getopt::Long qw{GetOptions}; - use Pod::Usage qw{pod2usage}; - - Getopt::Long::Configure( 'bundling' ); -} - - -############################################################################### -### C O N F I G U R A T I O N G L O B A L S -############################################################################### - -### Main body -sub MAIN { - my ( - $debugLevel, # Debugging level to set in server - $helpFlag, # User requested help? - $daemonFlag, # Background after starting? - $defaultRate, # Default src cluster rate cmdline setting - $verboseFlag, # Output the log or no? - $server, # JobServer object - %config, # JobServer configuration - $port, # Port to listen on - $host, # Address to listen on - $lockScale, # Lock scaling factor - $maxLockTime, # Max time to keep users locked - ); - - # Print the program header and read in command line options - GetOptions( - 'D|daemon' => \$daemonFlag, - 'H|host=s' => \$host, - 'd|debug+' => \$debugLevel, - 'h|help' => \$helpFlag, - 'm|maxlocktime=i' => \$maxLockTime, - 'p|port=i' => \$port, - 'r|defaultrate=i' => \$defaultRate, - 's|lockscale=i' => \$lockScale, - 'v|verbose' => \$verboseFlag, - ) or abortWithUsage(); - - # If the -h flag was given, just show the usage and quit - helpMode() and exit if $helpFlag; - - # Build the configuration hash - $config{host} = $host if $host; - $config{port} = $port if $port; - $config{daemon} = $daemonFlag; - $config{debugLevel} = $debugLevel || 0; - $config{defaultRate} = $defaultRate if $defaultRate; - $config{lockScale} = $lockScale if $lockScale; - $config{maxLockTime} = $maxLockTime if defined $maxLockTime; - - # Create a new daemon object - $server = new JobServer ( %config ); - - # Add a simple log handler if they've requested verbose output - if ( $verboseFlag ) { - my $tmplogger = sub { - my ( $level, $msg ) = @_; - print STDERR "[$level] $msg\n"; - }; - $server->addHandler( 'log', 'verboselogger', $tmplogger ); - } - - # Start the server - $server->start(); -} - - -### FUNCTION: helpMode() -### Exit normally after printing the usage message -sub helpMode { - pod2usage( -verbose => 1, -exitval => 0 ); -} - - -### FUNCTION: abortWithUsage( $message ) -### Abort the program showing usage message. -sub abortWithUsage { - my $msg = @_ ? join('', @_) : ""; - - if ( $msg ) { - pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" ); - } else { - pod2usage( -verbose => 1, -exitval => 1 ); - } -} - - - -### If run from the command line, run the server. -if ( $0 eq __FILE__ ) { MAIN() } - - -##################################################################### -### T I M E D B U F F E R C L A S S -##################################################################### -package TimedBuffer; - -BEGIN { - use Carp qw{croak confess}; -} - -our $DefaultExpiration = 120; - -### (CONSTRUCTOR) METHOD: new( $seconds ) -### Create a new timed buffer which will remove entries the specified number of -### I<seconds> after being added. -sub new { - my $proto = shift; - my $class = ref $proto || $proto; - my $seconds = shift || $DefaultExpiration; - - my $self = bless { - buffer => [], - seconds => $seconds, - }, $class; - - return $self; -} - - -### METHOD: add( @items ) -### Add the given I<items> to the buffer, shifting off older ones if they are -### expired. -sub add { - my $self = shift or confess "Cannot be used as a function"; - my @items = @_; - - my $expiration = time - $self->{seconds}; - my $buffer = $self->{buffer}; - - # Expire old entries and add the new ones - @$buffer = grep { $_->[1] > $expiration } @$buffer; - push @$buffer, map {[ $_, time ]} @items; - - return scalar @$buffer; -} - - -### METHOD: get( [@indices] ) -### Return the items in the buffer at the specified I<indices>, or all items in -### the buffer if no I<indices> are given. -sub get { - my $self = shift or confess "Cannot be used as a function"; - - my $expiration = time - $self->{seconds}; - my $buffer = $self->{buffer}; - - # Expire old entries - @$buffer = grep { $_->[1] > $expiration } @$buffer; - - # Return just the values from the buffer, either in a slice if they - # specified indexes, or the whole thing if not. - if ( @_ ) { - return map { $_->[0] } @{$buffer}[ @_ ]; - } else { - return map { $_->[0] } @$buffer; - } -} - - - -##################################################################### -### D A E M O N C L A S S -##################################################################### -package JobServer; - -BEGIN { - use IO::Socket qw{}; - use Data::Dumper qw{Dumper}; - use Carp qw{croak confess}; - use Time::HiRes qw{gettimeofday tv_interval}; - use POSIX qw{}; - - use fields ( - 'clients', # Connected client objects - 'config', # Configuration hash - 'listener', # The listener socket - 'handlers', # Client event handlers - 'jobs', # Mover jobs - 'totaljobs', # Count of jobs processed - 'assignments', # Jobs that have been assigned - 'users', # Users in the queue - 'ratelimits', # Cached cluster ratelimits - 'raterules', # Rules for building ratelimit table - 'jobcounts', # Counts per cluster of running jobs - 'starttime', # Server startup epoch time - 'recentmoves', # Timed buffer of recently-completed jobs - ); - - use lib "$ENV{LJHOME}/cgi-bin"; - require 'ljlib.pl'; - - use base qw{fields}; -} - - -### Class globals - -# Default configuration -our ( %DefaultConfig, %LogLevels ); - -INIT { - - # Default server configuration; this is merged with any config args the user - # specifies in the call to the constructor. Most of these correspond with - # command-line flags, so see that section of the POD header for more - # information. - %DefaultConfig = ( - port => 2789, # Port to listen on - host => '0.0.0.0', # Host to bind to - listenQueue => 5, # Listen queue depth - daemon => 0, # Daemonize or not? - debugLevel => 0, # Debugging log level - defaultRate => 1, # The default src cluster rate - lockScale => 3, # Scaling factor for locking users - maxLockTime => 120, # Max seconds to keep users locked - ); - - my $level = 0; - %LogLevels = map { - $_ => $level++, - } qw{debug info notice warn crit fatal}; - - - $Data::Dumper::Terse = 1; - $Data::Dumper::Indent = 1; -} - -# -# Datastructures of class members: -# -# clients: Hashref of connected clients, keyed by fdno -# -# jobs: A hash of arrays of JobServer::Job objects: -# { -# <srcclusterid> => [ $job1, $job2, ... ], -# ... -# } -# -# users: A hash index into the inner arrays of 'jobs', keyed by -# userid. -# -# assignments: A hash of arrays; when a job is assigned to a mover, the -# corresponding JobServer::Job is moved into this hash, -# keyed by the fdno of the mover responsible. -# -# handlers: Hash of hashes; this is used to register callbacks for clients that -# want to monitor the server, receiving log or debugging messages, -# new job notifications, etc. -# -# totaljobs: Count of total jobs added to the daemon. -# -# raterules: Maximum number of jobs which can be run against source clusters, -# keyed by clusterid. If a global rate limit has been set, this -# hash also contains a special key 'global' to contain it. -# -# ratelimits: Cached ratelimits for clusters -- this is rebuilt whenever a -# ratelimit rule is added, and is partially rebuilt when new jobs -# are added. -# -# jobcounts: Count of jobs running against source clusters, keyed by -# source clusterid. - -### (CONSTRUCTOR) METHOD: new( %config ) -### Create a new JobServer object with the given I<config>. -sub new { - my JobServer $self = shift; - my %config = @_; - - $self = fields::new( $self ) unless ref $self; - - # Client and job queues - $self->{clients} = {}; # fd => client obj - $self->{jobs} = {}; # pending jobs: srcluster => [ jobs ] - $self->{users} = {}; # by-userid hash of jobs - $self->{assignments} = {}; # fd => job object - $self->{totaljobs} = 0; # Count of total jobs added - $self->{raterules} = {}; # User-set rate-limit rules - $self->{ratelimits} = {}; # Cached rate limits by srcclusterid - $self->{jobcounts} = {}; # Count of jobs by srcclusterid - - # Create a timed buffer to contain the jobs which have completed in the last - # 6 minutes. - $self->{recentmoves} = new TimedBuffer 360; - - # Merge the user-specified configuration with the defaults, with the user's - # overriding. - $self->{config} = { - %DefaultConfig, - %config, - }; # merge - - # These two get set by start() - $self->{listener} = undef; - $self->{starttime} = undef; - - # CODE refs for handling various events. Keyed by event name, each subhash - # contains registrations for event callbacks. Each subhash is keyed by the - # fdno of the client that requested it, or an arbitrary string if the - # handler belongs to something other than a client. - $self->{handlers} = { - debug => {}, - log => {}, - }; - - return $self; -} - - - -### METHOD: start() -### Start the event loop. -sub start { - my JobServer $self = shift; - - # Start the listener socket - my $listener = new IO::Socket::INET - Proto => 'tcp', - LocalAddr => $self->{config}{host}, - LocalPort => $self->{config}{port}, - Listen => $self->{config}{listenQueue}, - ReuseAddr => 1, - Blocking => 0 - or die "new socket: $!"; - - # Log the server startup, then daemonize if it's called for - $self->logMsg( 'notice', "Server listening on %s:%d\n", - $listener->sockhost, $listener->sockport ); - $self->{listener} = $listener; - $self->daemonize if $self->{config}{daemon}; - - # Remember the startup time - $self->{starttime} = time; - - # I don't understand this design -- the Client class is where the event loop - # is? Weird. Thanks to SPUD, though, for the example code. - JobServer::Client->OtherFds( $listener->fileno => sub {$self->createClient} ); - JobServer::Client->EventLoop(); - - return 1; -} - - - -### METHOD: createClient( undef ) -### Listener socket readable callback. Accepts a new client socket and wraps a -### JobServer::Client around it. -sub createClient { - my JobServer $self = shift; - - my ( - $csock, # Client socket - $client, # JobServer::Client object - $fd, # File descriptor for client - ); - - # Get the client socket and set it nonblocking - $csock = $self->{listener}->accept or return; - $csock->blocking(0); - $fd = fileno( $csock ); - - $self->logMsg( 'info', 'Client %d connect: %s:%d', - $fd, $csock->peerhost, $csock->peerport ); - - # Wrap a client object around it, tell it to watch for input, and send the - # greeting. - $client = JobServer::Client->new( $self, $csock ); - $client->watch_read( 1 ); - $client->write( "Ready.\r\n" ); - - return $self->{clients}{$fd} = $client; -} - - -### METHOD: disconnectClient( $client=JobServer::Client[, $requeue] ) -### Disconnect the specified I<client> from the server. If I<requeue> is true, -### the job belonging to the client (if any) will be put back into the queue of -### pending jobs. -sub disconnectClient { - my JobSe... (truncated)