Igor Gariev (gariev) wrote in changelog,
Igor Gariev
gariev
changelog

[livejournal] r20310: LJSUP-10124: [internal] Make user cluste...

Committer: gariev
LJSUP-10124: [internal] Make user cluster DB switching dynamically
U   trunk/bin/ljdb
D   trunk/bin/moveucluster.pl
D   trunk/bin/moveuclusterd.pl
U   trunk/cgi-bin/LJ/ConfCheck/General.pm
U   trunk/cgi-bin/LJ/DBUtil.pm
U   trunk/cgi-bin/LJ/Entry.pm
U   trunk/cgi-bin/ljdb.pl
Modified: trunk/bin/ljdb
===================================================================
--- trunk/bin/ljdb	2011-10-13 22:08:12 UTC (rev 20309)
+++ trunk/bin/ljdb	2011-10-14 00:22:16 UTC (rev 20310)
@@ -3,6 +3,7 @@
 
 ##   post-svn-commit script test IV
 use lib "$ENV{LJHOME}/cgi-bin";
+require 'ljlib.pl';
 require 'ljdb.pl';
 use Getopt::Long;
 
@@ -83,22 +84,8 @@
 
     my ($userid, $cid) = $dbs->selectrow_array("SELECT userid, clusterid FROM user $whereClause", undef, $sqlArg);
     die "no such user as '$user'\n" unless $userid && $cid;
-    $role = "cluster" . $cid;
-    print "user: $user / userid: $userid / clusterid: $cid";
-
-    if (my $ab = $LJ::CLUSTER_PAIR_ACTIVE{$cid}) {
-        print " / active=$ab\n";
-        if ($inactive) {
-            $role .= "b" if $ab eq 'a';
-            $role .= "a" if $ab eq 'b';
-        } else {
-            $role .= $ab;
-        }
-    } else {
-        # type must be master/slave
-        $role .= "slave" if $inactive && grep { $_->{role}{"${role}slave"} } values %LJ::DBINFO;
-    }
-    print "\n";
+    my $u = LJ::load_userid($userid);
+    $role = ($inactive) ? LJ::get_inactive_role($u) : LJ::master_role($u);
 }
 
 $role ||= "master";

Deleted: trunk/bin/moveucluster.pl
===================================================================
--- trunk/bin/moveucluster.pl	2011-10-13 22:08:12 UTC (rev 20309)
+++ trunk/bin/moveucluster.pl	2011-10-14 00:22:16 UTC (rev 20310)
@@ -1,1123 +0,0 @@
-#!/usr/bin/perl
-##############################################################################
-
-=head1 NAME
-
-moveucluster.pl - Moves a LiveJournal user between database clusters
-
-=head1 SYNOPSIS
-
-  $ moveucluster.pl OPTIONS <user> <dest_clusterid>
-
-=head1 OPTIONS
-
-=over 4
-
-=item -h, --help
-
-Output a help message and exit.
-
-=item --verbose[=<n>]
-
-Verbosity level, 0, 1, or 2.
-
-=item --verify
-
-Verify count of copied rows to ensure accuracy (slower)
-
-=item --ignorebit
-
-Ignore the move in progress bit (force user move)
-
-=item --prelocked
-
-Do not set user readonly and sleep (somebody else did it)
-
-=item --delete
-
-Delete data from source cluster when done moving
-
-=item --destdelete
-
-Delete data from destination cluster before moving
-
-=item --expungedel
-
-The expungedel option is used to indicate that when a user is encountered
-with a statusvis of D (deleted journal) and they've been deleted for at
-least 31 days, instead of moving their data, mark the user as expunged.
-
-Further, if you specify the delete and expungedel options at the same time,
-if the user is expunged, all of their data will be deleted from the source
-cluster.  THIS IS IRREVERSIBLE AND YOU WILL NOT BE ASKED FOR CONFIRMATION.
-
-=item --jobserver=host:port
-
-Specify a job server to get tasks from.  In this mode, no other
-arguments are necessary, and moveucluster.pl just runs in a loop
-getting directions from the job server.
-
-=back
-
-=head1 AUTHOR
-
-Brad Fitzpatrick E<lt>brad@danga.comE<gt>
-Copyright (c) 2002-2004 Danga Interactive. All rights reserved.
-
-=cut
-
-##############################################################################
-
-use strict;
-use Getopt::Long;
-use Pod::Usage qw{pod2usage};
-use IO::Socket::INET;
-use lib "$ENV{'LJHOME'}/cgi-bin/";
-use LJ::TimeUtil;
-
-# NOTE: these options are used both by Getopt::Long for command-line parsing
-# in single user move move, and also set by hand when in --jobserver mode,
-# and the jobserver gives us directions, including whether or not users
-# are prelocked, need to be source-deleted, verified, etc, etc, etc.
-my $opt_del = 0;
-my $opt_destdel = 0;
-my $opt_verbose = 1;
-my $opt_movemaster = 0;
-my $opt_prelocked = 0;
-my $opt_expungedel = 0;
-my $opt_ignorebit = 0;
-my $opt_verify = 0;
-my $opt_help = 0;
-my $opt_jobserver = "";
-
-abortWithUsage() unless
-    GetOptions('delete' => \$opt_del, # from source
-               'destdelete' => \$opt_destdel, # from dest (if exists, before moving)
-               'verbose=i' => \$opt_verbose,
-               'movemaster|mm' => \$opt_movemaster, # use separate dedicated source
-               'prelocked' => \$opt_prelocked, # don't do own locking; master does (harness, ljumover)
-               'expungedel' => \$opt_expungedel, # mark as expunged if possible (+del to delete)
-               'ignorebit' => \$opt_ignorebit, # ignore move in progress bit cap (force)
-               'verify' => \$opt_verify,  # slow verification pass (just for debug)
-               'jobserver=s' => \$opt_jobserver,
-               'help' => \$opt_help,
-               );
-my $optv = $opt_verbose;
-
-my $dbo;  # original cluster db handle.  (may be a movemaster (a slave))
-my $dboa; # the actual master handle, which we delete from if deleting from source
-
-abortWithUsage() if $opt_help;
-
-if ($opt_jobserver) {
-    multiMove();
-} else {
-    singleMove();
-}
-
-sub multiMove {
-    # the job server can keep giving us new jobs to move (or a stop command)
-    # over and over, so we avoid perl exec times
-    require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl";
-
-    my $sock;
-  ITER:
-    while (1) {
-        if ($sock && $sock->connected) {
-            my $pipe = 0;
-            local $SIG{PIPE} = sub { $pipe = 1; };
-
-            LJ::start_request();
-            my $dbh = get_validated_role_dbh("master");
-            unless ($dbh) {
-                print "  master db unavailable\n";
-                sleep 2;
-                next ITER;
-            }
-
-            my $rv = $sock->write("get_job\r\n");
-
-            if ($pipe || ! $rv) {
-                $sock = undef;
-                sleep 1;
-                next ITER;
-            }
-            my $line = <$sock>;
-            unless ($line) {
-                $sock = undef;
-                sleep 1;
-                next ITER;
-            }
-
-            if ($line =~ /^OK IDLE/) {
-                print "Idling.\n";
-                sleep 5;
-                next ITER;
-            } elsif ($line =~ /^OK JOB (\d+):(\d+):(\d+)\s+([\d.]+)(?:\s+([\w= ]+))?\r?\n/) {
-                my ($uid, $srcid, $dstid, $locktime) = ($1, $2, $3, $4);
-                my $opts = parseOpts($5);
-
-                print "Got a job: $uid:$srcid:$dstid, locked for=$locktime, opts: [",
-                  join(", ", map { "$_=$opts->{$_}" } grep { $opts->{$_} } keys %$opts),
-                "]\n";
-
-                my $u = LJ::load_userid($uid, "force");
-
-                next ITER unless $u;
-                next ITER unless $u->{clusterid} == $srcid;
-
-                my $verify = sub {
-                    my $pipe = 0;
-                    local $SIG{PIPE} = sub { $pipe = 1; };
-                    my $rv = $sock->write("finish $uid:$srcid:$dstid\r\n");
-                    return 0 unless $rv;
-                    my $res = <$sock>;
-                    return $res =~ /^OK/ ? 1 : 0;
-                };
-
-                # If the user is supposed to be prelocked, but the lock didn't
-                # happen more than 3 seconds ago, wait until it has time to
-                # "settle" and then move the user
-                if ( $opts->{prelocked} && $locktime < 3 ) {
-                    sleep 3 - $locktime;
-                }
-
-                my $rv = eval { moveUser($dbh, $u, $dstid, $verify, $opts); };
-                if ($rv) {
-                    print "moveUser($u->{user}/$u->{userid}) = 1\n";
-                } else {
-                    print "moveUser($u->{user}/$u->{userid}) = fail: $@\n";
-                }
-                LJ::end_request();
-                LJ::disconnect_dbs();  # end_request could do this, but we want to force it
-            } else {
-                die "Unknown response from server: $line\n";
-            }
-        } else {
-            print "Need job server sock...\n";
-            $sock = IO::Socket::INET->new(PeerAddr => $opt_jobserver,
-                                          Proto    => 'tcp', );
-            unless ($sock) {
-                print "  failed.\n";
-                sleep 1;
-                next ITER;
-            }
-            my $ready = <$sock>;
-            if ($ready =~ /Ready/) {
-                print "Connected.\n";
-            } else {
-                print "Bogus greeting.\n";
-                $sock = undef;
-                sleep 1;
-                next ITER;
-            }
-        }
-
-    }
-}
-
-### Parse options from job specs into a hashref
-sub parseOpts {
-    my $raw = shift || "";
-    my $opts = {};
-
-    while ( $raw =~ m{\s*(\w+)=(\w+)}g ) {
-        $opts->{ $1 } = $2;
-    }
-
-    foreach my $opt (qw(del destdel movemaster prelocked
-                        expungedel ignorebit verify)) {
-        next if defined $opts->{$opt};
-        $opts->{$opt} = eval "\$opt_$opt";
-    }
-
-    # Have the same delete behavior despite of how the input delete parameter is specified: by 'delete=1' or by 'del=1'
-    $opts->{del} = $opts->{'delete'} if defined $opts->{'delete'} and not $opts->{del};
-
-    return $opts;
-}
-
-
-sub singleMove {
-    my $user = shift @ARGV;
-    my $dclust = shift @ARGV;
-    $dclust = 0 if !defined $dclust && $opt_expungedel;
-
-    # check arguments
-    abortWithUsage() unless defined $user && defined $dclust;
-
-    require "$ENV{'LJHOME'}/cgi-bin/ljlib.pl";
-
-    $user = LJ::canonical_username($user);
-    abortWithUsage("Invalid username") unless length($user);
-
-    my $dbh = get_validated_role_dbh("master");
-    die "No master db available.\n" unless $dbh;
-
-    my $u = LJ::load_user($user, "force");
-
-    my $opts = parseOpts("");  # gets command-line opts
-    my $rv = eval { moveUser($dbh, $u, $dclust, undef, $opts); };
-
-    if ($rv) {
-        print "Moved '$user' to cluster $dclust.\n";
-        exit 0;
-    }
-    if ($@) {
-        die "Failed to move '$user' to cluster $dclust: $@\n";
-    }
-
-    print "ERROR: move failed.\n";
-    exit 1;
-}
-
-sub get_validated_cluster_dbh {
-    my $arg = shift;
-
-    my $clusterid = $arg;
-    if (LJ::isu($arg)) {
-        $clusterid = $arg->clusterid;
-    }
-
-    # revalidate any db that is found in cache
-    $LJ::DBIRole->clear_req_cache;
-
-    # get the destination DB handle, with a long timeout
-    my $dbch = LJ::get_cluster_master({raw=>1}, $clusterid);
-    die "Undefined or down cluster \#$clusterid\n" unless $dbch;
-
-    # make sure any error is a fatal error.  no silent mistakes.
-    $dbch->{'RaiseError'} = 1;
-
-    $dbch->do("SET wait_timeout=28800");
-
-    return $dbch;
-}
-
-sub get_validated_role_dbh {
-    my $role = shift;
-
-    # revalidate any db that is found in cache
-    $LJ::DBIRole->clear_req_cache;
-
-    my $db = LJ::get_dbh({raw=>1}, $role);
-    report_error_ops() unless $db;
-    die "Couldn't get handle for role: $role" unless $db;
-
-    # make sure any error is a fatal error.  no silent mistakes.
-    $db->{'RaiseError'} = 1;
-
-    $db->do("SET wait_timeout=28800");
-
-    return $db;
-}
-
-# lets inform ops that the script has stopped at least.
-sub report_error_ops {
-    my $subject = 'Moveucluster.pl Script Failure';
-    my $to = 'sf-ops@livejournalinc.com';
-
-    open (MAIL, "|/usr/bin/mail -s \"$subject\" $to");
-    print MAIL "moveucluster.pl failed, please login and restart the job";
-    close (MAIL);
-}
-
-# some might call this $dboa
-sub get_definitive_source_dbh {
-    my $u = shift;
-
-    # the actual master handle, which we delete from if deleting from source
-    my $db = get_validated_cluster_dbh($u);
-    die "Can't get source cluster handle.\n" unless $db;
-
-    return $db;
-}
-
-# some might call this $dbo
-sub get_move_source_dbh {
-    my $u = shift;
-
-    # $opt_movemaster comes from GetOpt when this script is called...
-    # Generally it's accessed as $opts->{movemaster}, but that's because
-    # it's in a hashref to pass to moveUser.  In any case, the definitive
-    # value is in $opt_movemaster
-    if ($opt_movemaster) {
-        # if an a/b cluster, the movemaster (the source for moving) is
-        # the opposite side.  if not a/b, then look for a special "movemaster"
-        # role for that clusterid
-        my $mm_role = "cluster$u->{clusterid}";
-        my $ab = lc($LJ::CLUSTER_PAIR_ACTIVE{$u->{clusterid}});
-        if ($ab eq "a") {
-            $mm_role .= "b";
-        } elsif  ($ab eq "b") {
-            $mm_role .= "a";
-        } else {
-            $mm_role .= "movemaster";
-        }
-
-        my $db = get_validated_role_dbh($mm_role);
-
-        my $ss = $db->selectrow_hashref("show slave status");
-        die "Move master not a slave?" unless $ss;
-
-        return $db;
-    }
-
-    # otherwise use the definitive source
-    return get_definitive_source_dbh($u);
-}
-
-sub moveUser {
-    my ($dbh, $u, $dclust, $verify_code, $opts) = @_;
-    die "Non-existent db.\n" unless $dbh;
-    die "Non-existent user.\n" unless $u && $u->{userid};
-
-    my $user = $u->{user};
-    my $userid = $u->{userid};
-
-    # get lock
-    die "Failed to get move lock.\n"
-        unless $dbh->selectrow_array("SELECT GET_LOCK('moveucluster-$u->{userid}', 5)");
-
-    # we can't move to the same cluster
-    my $sclust = $u->{'clusterid'};
-    if ($sclust == $dclust) {
-        die "User '$user' is already on cluster $dclust\n";
-    }
-
-    # we don't support "cluster 0" (the really old format)
-    die "This mover tool doesn't support moving from cluster 0.\n" unless $sclust;
-    die "Can't move back to legacy cluster 0\n" unless $dclust || $opts->{expungedel};
-
-    # for every DB handle we touch, make a signature of a sorted
-    # comma-delimited signature onto this list.  likewise with the
-    # list of tables this mover script knows about. if ANY signature
-    # in this list isn't identical, we just abort.  perhaps this
-    # script wasn't updated, or a long-running mover job wasn't
-    # restarted and new tables were added to the schema.
-    my @alltables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL);
-    my $mover_sig = join(",", sort @alltables);
-
-    my $get_sig = sub {
-        my $hnd = shift;
-        return join(",", sort
-                    @{ $hnd->selectcol_arrayref("SHOW TABLES") });
-    };
-
-    my $global_sig = $get_sig->($dbh);
-
-    my $check_sig = sub {
-        my $hnd = shift;
-        my $name = shift;
-
-        # no signature checks on expunges
-        return if ! $hnd && $opts->{expungedel};
-
-        my $sig = $get_sig->($hnd);
-
-        # special case:  signature can be that of the global
-        return if $sig eq $global_sig;
-
-        if ($sig ne $mover_sig) {
-            my %sigt = map { $_ => 1 } split(/,/, $sig);
-            my @err;
-            foreach my $tbl (@alltables) {
-                unless ($sigt{$tbl}) {
-                    # missing a table the mover knows about
-                    push @err, "-$tbl";
-                    next;
-                }
-                delete $sigt{$tbl};
-            }
-            foreach my $tbl (sort keys %sigt) {
-                push @err, "?$tbl";
-            }
-            if (@err) {
-                die "Table signature for $name doesn't match!  Stopping.  [@err]\n";
-            }
-        }
-    };
-
-    # if we want to delete the user, we don't need a destination cluster, so only get
-    # one if we have a real valid destination cluster
-    my $dbch;
-    if ($dclust) {
-        $dbch = get_validated_cluster_dbh($dclust);
-    }
-
-    # this is okay to call even if ! $dclust above
-    $check_sig->($dbch, "dbch(database dst)");
-
-    # get a definitive source handle where deletes should happen in 
-    # cases of sourcedel, etc
-    $dboa = get_definitive_source_dbh($u);
-    $check_sig->($dboa, "dboa(database src)");
-
-    # get a source handle to move from, which is not necessarily a 
-    # definitive copy of the source data... it could just be a 
-    # movemaster slave
-    $dbo = get_move_source_dbh($u);
-    $check_sig->($dbo, "dbo(movemaster)");
-
-    # load the info on how we'll move each table.  this might die (if new tables
-    # with bizarre layouts are added which this thing can't auto-detect) so want
-    # to do it early.
-    my $tinfo;   # hashref of $table -> {
-                 #   'idx' => $index_name   # which we'll be using to iterate over
-                 #   'idxcol' => $col_name  # first part of index
-                 #   'cols' => [ $col1, $col2, ]
-                 #   'pripos' => $idxcol_pos,   # what field in 'cols' is $col_name
-                 #   'verifykey' => $col        # key used in the debug --verify pass
-                 # }
-    $tinfo = fetchTableInfo();
-
-    # see hack below
-    my $prop_icon = LJ::get_prop("talk", "subjecticon");
-    my %rows_skipped;  #  $tablename -> $skipped_rows_count
-
-    # find readonly cap class, complain if not found
-    my $readonly_bit = undef;
-    foreach (keys %LJ::CAP) {
-        if ($LJ::CAP{$_}->{'_name'} eq "_moveinprogress" &&
-            $LJ::CAP{$_}->{'readonly'} == 1) {
-            $readonly_bit = $_;
-            last;
-        }
-    }
-    unless (defined $readonly_bit) {
-        die "Won't move user without %LJ::CAP capability class named '_moveinprogress' with readonly => 1\n";
-    }
-
-    # make sure a move isn't already in progress
-    if ($opts->{prelocked}) {
-        unless (($u->{'caps'}+0) & (1 << $readonly_bit)) {
-            die "User '$user' should have been prelocked.\n";
-        }
-    } else {
-        if (($u->{'caps'}+0) & (1 << $readonly_bit)) {
-            die "User '$user' is already in the process of being moved? (cap bit $readonly_bit set)\n"
-                unless $opts->{ignorebit};
-        }
-    }
-
-    if ($opts->{expungedel} && $u->{'statusvis'} eq "D" &&
-        LJ::TimeUtil->mysqldate_to_time($u->{'statusvisdate'}) < time() - 86400*31) {
-
-        print "Expunging user '$u->{'user'}'\n";
-        $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart, timedone) ".
-                 "VALUES (?,?,?,UNIX_TIMESTAMP(),UNIX_TIMESTAMP())", undef, 
-                 $userid, $sclust, 0);
-
-        LJ::update_user($userid, { clusterid => 0,
-                                   statusvis => 'X',
-                                   raw => "caps=caps&~(1<<$readonly_bit), statusvisdate=NOW()" })
-            or die "Couldn't update user to expunged";
-
-        # note that we've expunged this user in the "expunged_users" db table
-        $dbh->do("REPLACE INTO expunged_users SET userid=?, user=?, expunge_time=UNIX_TIMESTAMP()",
-                 undef, $u->{userid}, $u->{user});
-
-        # now delete all content from user cluster for this user
-        if ($opts->{del}) {
-            print "Deleting expungeable user data...\n" if $optv;
-
-            # figure out if they have any S1 styles
-            my $styleids = $dboa->selectcol_arrayref("SELECT styleid FROM s1style WHERE userid = $userid");
-
-            $dbh->do("DELETE FROM domains WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM email_aliases WHERE alias = ?",
-                     undef, "$u->{user}\@$LJ::USER_DOMAIN");
-            $dbh->do("DELETE FROM userinterests WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM comminterests WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM syndicated WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM supportnotify WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM reluser WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM smsusermap WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM friends WHERE userid = ?", undef, $u->id);
-            $dbh->do("DELETE FROM phonepostlogin WHERE userid = ?", undef, $u->id);
-
-            # no need for other users to ban this user any more
-            while ($dbh->do("DELETE FROM reluser WHERE targetid = ? AND type = 'B' LIMIT 1000", undef, $u->id) > 0) {
-                print "  deleted bans from reluser\n" if $optv;
-            }
-
-            # now delete from the main tables
-            foreach my $table (keys %$tinfo) {
-                my $pri = $tinfo->{$table}->{idxcol};
-                while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) {
-                    print "  deleted from $table\n" if $optv;
-                }
-            }
-
-            # and from the s1stylecache table
-            if (@$styleids) {
-                my $styleids_in = join(",", map { $dboa->quote($_) } @$styleids);
-                if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) {
-                    print "  deleted from s1stylecache\n" if $optv;
-                }
-            }
-
-            $dboa->do("DELETE FROM clustertrack2 WHERE userid=?", undef, $userid);
-        }
-
-        # fire event noting this user was expunged
-        if (eval "use LJ::Event::UserExpunged; 1;") {
-            LJ::Event::UserExpunged->new($u)->fire;
-        } else {
-            die "Could not load module LJ::Event::UserExpunged: $@";
-        }
-        LJ::run_hooks('purged_user', $u);
-
-        return 1;
-    }
-
-    # if we get to this point we have to enforce that there's a destination cluster, because
-    # apparently the user failed the expunge test
-    if (!defined $dclust || !defined $dbch) {
-        die "User is not eligible for expunging.\n" if $opts->{expungedel};
-    }
-
-
-    # returns state string, with a/b, readonly, and flush states.
-    # string looks like:
-    #   "src(34)=a,dst(42)=b,readonly(34)=0,readonly(42)=0,src_flushes=32
-    # because if:
-    #   src a/b changes:  lose readonly lock?
-    #   dst a/b changes:  suspect.  did one side crash?  was other side caught up?
-    #   read-only changes:  signals maintenance
-    #   flush counts change: causes HANDLER on src to lose state and reset
-    my $stateString = sub {
-        my $post = shift;  # false for before, true for "after", which forces a config reload
-
-        if ($post) {
-            LJ::Config->reload;
-        }
-
-        my @s;
-        push @s, "src($sclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$sclust};
-        push @s, "dst($dclust)=" . $LJ::CLUSTER_PAIR_ACTIVE{$dclust};
-        push @s, "readonly($sclust)=" . ($LJ::READONLY_CLUSTER{$sclust} ? 1 : 0);
-        push @s, "readonly($dclust)=" . ($LJ::READONLY_CLUSTER{$dclust} ? 1 : 0);
-
-        my $flushes = 0;
-        my $sth = $dbo->prepare("SHOW STATUS LIKE '%flush%'");
-        $sth->execute;
-        while (my $r = $sth->fetchrow_hashref) {
-            $flushes += $r->{Value} if $r->{Variable_name} =~ /^Com_flush|Flush_commands$/;
-        }
-        push @s, "src_flushes=" . $flushes;
-
-        return join(",", @s);
-    };
-
-    print "Moving '$u->{'user'}' from cluster $sclust to $dclust\n" if $optv >= 1;
-    my $pre_state = $stateString->();
-
-    # mark that we're starting the move
-    $dbh->do("INSERT INTO clustermove (userid, sclust, dclust, timestart) ".
-             "VALUES (?,?,?,UNIX_TIMESTAMP())", undef, $userid, $sclust, $dclust);
-    my $cmid = $dbh->{'mysql_insertid'};
-
-    # set readonly cap bit on user
-    unless ($opts->{prelocked} ||
-            LJ::update_user($userid, { raw => "caps=caps|(1<<$readonly_bit)" }))
-    {
-        die "Failed to set readonly bit on user: $user\n";
-    }
-    $dbh->do("SELECT RELEASE_LOCK('moveucluster-$u->{userid}')");
-
-    unless ($opts->{prelocked}) {
-        # wait a bit for writes to stop if journal is somewhat active (last week update)
-        my $secidle = $dbh->selectrow_array("SELECT UNIX_TIMESTAMP()-UNIX_TIMESTAMP(timeupdate) ".
-                                            "FROM userusage WHERE userid=$userid");
-        if ($secidle) {
-            sleep(2) unless $secidle > 86400*7;
-            sleep(1) unless $secidle > 86400;
-        }
-    }
-
-    if ($opts->{movemaster}) {
-        my $diff = 999_999;
-        my $tolerance = 50_000;
-        while ($diff > $tolerance) {
-            my $ss = $dbo->selectrow_hashref("show slave status");
-            if ($ss->{'Slave_IO_Running'} eq "Yes" && $ss->{'Slave_SQL_Running'} eq "Yes") {
-                if ($ss->{'Master_Log_File'} eq $ss->{'Relay_Master_Log_File'}) {
-                    $diff = $ss->{'Read_Master_Log_Pos'} - $ss->{'Exec_master_log_pos'};
-                    print "  diff: $diff\n" if $optv >= 1;
-                    sleep 1 if $diff > $tolerance;
-                } else {
-                    print "  (Wrong log file):  $ss->{'Relay_Master_Log_File'}($ss->{'Exec_master_log_pos'}) not $ss->{'Master_Log_File'}($ss->{'Read_Master_Log_Pos'})\n" if $optv >= 1;
-                }
-            } else {
-                die "Movemaster slave not running";
-            }
-        }
-    }
-
-    print "Moving away from cluster $sclust\n" if $optv;
-
-    my %cmd_done;  # cmd_name -> 1
-    while (my $cmd = $dboa->selectrow_array("SELECT cmd FROM cmdbuffer WHERE journalid=$userid")) {
-        die "Already flushed cmdbuffer job '$cmd' -- it didn't take?\n" if $cmd_done{$cmd}++;
-        print "Flushing cmdbuffer for cmd: $cmd\n" if $optv > 1;
-        require "$ENV{'LJHOME'}/cgi-bin/ljcmdbuffer.pl";
-        LJ::Cmdbuffer::flush($dbh, $dboa, $cmd, $userid);
-    }
-
-    # setup dependencies (we can skip work by not checking a table if we know
-    # its dependent table was empty).  then we have to order things so deps get
-    # processed first.
-    my %was_empty;  # $table -> bool, table was found empty
-    my %dep = (
-               "logtext2" => "log2",
-               "logprop2" => "log2",
-               "logsec2" => "log2",
-               "talkprop2" => "talk2",
-               "talktext2" => "talk2",
-               "phoneposttrans" => "phonepostentry", # FIXME: ljcom
-               "modblob" => "modlog",
-               "sessions_data" => "sessions",
-               "memkeyword2" => "memorable2",
-               "userpicmap2" => "userpic2",
-               "logtagsrecent" => "usertags",
-               "logtags" => "usertags",
-               "logkwsum" => "usertags",
-               );
-
-    # all tables we could be moving.  we need to sort them in
-    # order so that we check dependant tables first
-    my @tables;
-    push @tables, grep { ! $dep{$_} } @alltables;
-    push @tables, grep { $dep{$_} } @alltables;
-
-    # these are ephemeral or handled elsewhere
-    my %skip_table = (
-                      "cmdbuffer" => 1,       # pre-flushed
-                      "events" => 1,          # handled by qbufferd (not yet used)
-                      "s1stylecache" => 1,    # will be recreated
-                      "captcha_session" => 1, # temporary
-                      "tempanonips" => 1,     # temporary ip storage for spam reports
-                      "recentactions" => 1,   # pre-flushed by clean_caches
-                      "pendcomments" => 1,    # don't need to copy these
-                      "active_user"  => 1,    # don't need to copy these
-                      "random_user_set" => 1, # "
-                      );
-
-    $skip_table{'inviterecv'} = 1 if $u->{journaltype} ne 'P'; # non-person, skip invites received
-    $skip_table{'invitesent'} = 1 if $u->{journaltype} ne 'C'; # not community, skip invites sent
-
-    # we had a concern at the time of writing this dependency optization
-    # that we might use "log3" and "talk3" tables in the future with the
-    # old talktext2/etc tables.  if that happens and we forget about this,
-    # this code will trip it up and make us remember:
-    if (grep { $_ eq "log3" || $_ eq "talk3" } @tables) {
-        die "This script needs updating.\n";
-    }
-
-    #
-    # NOTE: this is the start of long reads from the largest user tables!
-    #
-    #    db handles used during this block are:
-    #       $dbo  -- validated source cluster handle for reading
-    #       $dbch -- validated destination cluster handle
-    #
-
-    # check if dest has existing data for this user.  (but only check a few key tables)
-    # if anything else happens to have data, we'll just fail later.  but unlikely.
-    print "Checking for existing data on target cluster...\n" if $optv > 1;
-    foreach my $table (qw(userbio talkleft log2 talk2 sessions userproplite2)) {
-        my $ti = $tinfo->{$table} or die "No table info for $table.  Aborting.";
-
-        eval { $dbch->do("HANDLER $table OPEN"); };
-        if ($@) {
-            die "This mover currently only works on MySQL 4.x and above.\n" .
-                $@;
-        }
-
-        my $idx = $ti->{idx};
-        my $is_there = $dbch->selectrow_array("HANDLER $table READ `$idx` = ($userid) LIMIT 1");
-        $dbch->do("HANDLER $table CLOSE");
-        next unless $is_there;
-
-        if ($opts->{destdel}) {
-            foreach my $table (@tables) {
-                # these are ephemeral or handled elsewhere
-                next if $skip_table{$table};
-                my $ti = $tinfo->{$table} or die "No table info for $table.  Aborting.";
-                my $pri = $ti->{idxcol};
-                while ($dbch->do("DELETE FROM $table WHERE $pri=$userid LIMIT 500") > 0) {
-                    print "  deleted from $table\n" if $optv;
-                }
-            }
-            last;
-        } else {
-            die "  Existing data on destination cluster\n";
-        }
-    }
-
-    # start copying from source to dest.
-    my $rows = 0;
-    my @to_delete;  # array of [ $table, $prikey ]
-    my @styleids;   # to delete, potentially
-
-    foreach my $table (@tables) {
-        next if $skip_table{$table};
-
-        # people accounts don't have moderated posts
-        next if $u->{'journaltype'} eq "P" && ($table eq "modlog" ||
-                                               $table eq "modblob");
-
-        # don't waste time looking at dependent tables with empty parents
-        next if $dep{$table} && $was_empty{$dep{$table}};
-
-        my $ti = $tinfo->{$table} or die "No table info for $table.  Aborting.";
-        my $idx = $ti->{idx};
-        my $idxcol = $ti->{idxcol};
-        my $cols = $ti->{cols};
-        my $pripos = $ti->{pripos};
-
-        # if we're going to be doing a verify operation later anyway, let's do it
-        # now, so we can use the knowledge of rows per table to hint our $batch_size
-        my $expected_rows = undef;
-        my $expected_remain = undef;  # expected rows remaining (unread)
-        my $verifykey = $ti->{verifykey};
-        my %pre;
-
-        if ($opts->{verify} && $verifykey) {
-            $expected_rows = 0;
-            if ($table eq "dudata" || $table eq "ratelog") {
-                $expected_rows = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid");
-            } else {
-                my $sth;
-                $sth = $dbo->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid");
-                $sth->execute;
-                while (my @ar = $sth->fetchrow_array) {
-                    $_ = join(",",@ar);
-                    $pre{$_} = 1;
-                    $expected_rows++;
-                }
-            }
-
-            # no need to continue with tables that don't have any data
-            unless ($expected_rows) {
-                $was_empty{$table} = 1;
-                next;
-            }
-
-            $expected_remain = $expected_rows;
-        }
-
-        eval { $dbo->do("HANDLER $table OPEN"); };
-        if ($@) {
-            die "This mover currently only works on MySQL 4.x and above.\n".
-                $@;
-        }
-
-        my $tct = 0;            # total rows read for this table so far.
-        my $hit_otheruser = 0;  # bool, set to true when we encounter data from a different userid
-        my $batch_size;         # how big of a LIMIT we'll be doing
-        my $ct = 0;             # rows read in latest batch
-        my $did_start = 0;      # bool, if process has started yet (used to enter loop, and control initial HANDLER commands)
-        my $pushed_delete = 0;  # bool, if we've pushed this table on the delete list (once we find it has something)
-
-        my $sqlins = "";
-        my $sqlvals = 0;
-        my $flush = sub {
-            return unless $sqlins;
-            print "# Flushing $table ($sqlvals recs, ", length($sqlins), " bytes)\n" if $optv;
-            $dbch->do($sqlins);
-            $sqlins = "";
-            $sqlvals = 0;
-        };
-
-        my $insert = sub {
-            my $r = shift;
-
-            # there was an old bug where we'd populate in the database
-            # the choice of "none" for comment subject icon, instead of
-            # just storing nothing.  this hack prevents migrating those.
-            if ($table  eq "talkprop2" &&
-                $r->[2] == $prop_icon->{id} &&
-                $r->[3] eq "none") {
-                $rows_skipped{"talkprop2"}++;
-                return;
-            }
-
-            # now that we know it has something to delete (many tables are empty for users)
-            unless ($pushed_delete++) {
-                push @to_delete, [ $table, $idxcol ];
-            }
-
-            if ($sqlins) {
-                $sqlins .= ", ";
-            } else {
-                $sqlins = "INSERT INTO $table (" . join(', ', @{$cols}) . ") VALUES ";
-            }
-            $sqlins .= "(" . join(", ", map { $dbo->quote($_) } @$r) . ")";
-
-            $sqlvals++;
-            $flush->() if $sqlvals > 5000 || length($sqlins) > 800_000;
-        };
-
-        # let tables perform extra processing on the $r before it's 
-        # sent off for inserting.
-        my $magic;
-
-        # we know how to compress these two tables (currently the only two)
-        if ($table eq "logtext2" || $table eq "talktext2") {
-            $magic = sub {
-                my $r = shift;
-                return unless length($r->[3]) > 200;
-                LJ::text_compress(\$r->[3]);
-            };
-        }
-        if ($table eq "s1style") {
-            $magic = sub {
-                my $r = shift;
-                push @styleids, $r->[0];
-            };
-        }
-
-        # calculate the biggest batch size that can reasonably fit in memory
-        my $max_batch = 10000;
-        $max_batch = 1000 if $table eq "logtext2" || $table eq "talktext2";
-
-        while (! $hit_otheruser && ($ct == $batch_size || ! $did_start)) {
-            my $qry;
-            if ($did_start) {
-                # once we've done the initial big read, we want to walk slowly, because
-                # a LIMIT of 1000 will read 1000 rows, regardless, which may be 995
-                # seeks into somebody else's journal that we don't care about.
-                # on the other hand, if we did a --verify check above, we have a good
-                # idea what to expect still, so we'll use that instead of just 25 rows.
-                $batch_size = $expected_remain > 0 ? $expected_remain + 1 : 25;
-                if ($batch_size > $max_batch) { $batch_size = $max_batch; }
-                $expected_remain -= $batch_size;
-
-                $qry = "HANDLER $table READ `$idx` NEXT LIMIT $batch_size";
-            } else {
-                # when we're first starting out, though, let's LIMIT as high as possible,
-                # since MySQL (with InnoDB only?) will only return rows matching the primary key,
-                # so we'll try as big as possible.  but not with myisam -- need to start
-                # small there too, unless we have a guess at the number of rows remaining.
-
-                my $src_is_innodb = 0;  # FIXME: detect this.  but first verify HANDLER differences.
-                if ($src_is_innodb) {
-                    $batch_size = $max_batch;
-                } else {
-                    # MyISAM's HANDLER behavior seems to be different.
-                    # it always returns batch_size, so we keep it
-                    # small to avoid seeks, even on the first query
-                    # (where InnoDB differs and stops when primary key
-                    # doesn't match)
-                    $batch_size = 25;
-                    if ($table eq "clustertrack2" || $table eq "userbio" ||
-                        $table eq "s1usercache" || $table eq "s1overrides") {
-                        # we know these only have 1 row, so 2 will be enough to show
-                        # in one pass that we're done.
-                        $batch_size = 2;
-                    } elsif (defined $expected_rows) {
-                        # if we know how many rows remain, let's try to use that (+1 to stop it)
-                        $batch_size = $expected_rows + 1;
-                        if ($batch_size > $max_batch) { $batch_size = $max_batch; }
-                        $expected_remain -= $batch_size;
-                    }
-                }
-
-                $qry = "HANDLER $table READ `$idx` = ($userid) LIMIT $batch_size";
-                $did_start = 1;
-            }
-
-            my $sth = $dbo->prepare($qry);
-            $sth->execute;
-
-            $ct = 0;
-            while (my $r = $sth->fetchrow_arrayref) {
-                if ($r->[$pripos] != $userid) {
-                    $hit_otheruser = 1;
-                    last;
-                }
-                $magic->($r) if $magic;
-                $insert->($r);
-                $tct++;
-                $ct++;
-            }
-        }
-        $flush->();
-
-        $dbo->do("HANDLER $table CLOSE");
-
-        # verify the important tables, even if --verify is off.
-        if (! $opts->{verify} && $table =~ /^(talk|log)(2|text2)$/) {
-            my $dblcheck = $dbo->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid");
-            die "# Expecting: $dblcheck, but got $tct\n" unless $dblcheck == $tct;
-        }
-
-        if ($opts->{verify} && $verifykey) {
-            if ($table eq "dudata" || $table eq "ratelog") {
-                print "# Verifying $table on size\n";
-                my $post = $dbch->selectrow_array("SELECT COUNT(*) FROM $table WHERE $idxcol=$userid");
-                die "Moved sized is smaller" if $post < $expected_rows;
-            } else {
-                print "# Verifying $table on key $verifykey\n";
-                my %post;
-                my $sth;
-
-                $sth = $dbch->prepare("SELECT $verifykey FROM $table WHERE $idxcol=$userid");
-                $sth->execute;
-                while (my @ar = $sth->fetchrow_array) {
-                    $_ = join(",",@ar);
-                    unless (delete $pre{$_}) {
-                        die "Mystery row showed up in $table: uid=$userid, $verifykey=$_";
-                    }
-                }
-                my $count = scalar keys %pre;
-                die "Rows not moved for uid=$userid, table=$table.  unmoved count = $count"
-                    if $count && $count != $rows_skipped{$table};
-            }
-        }
-
-        $was_empty{$table} = 1 unless $tct;
-        $rows += $tct;
-    }
-
-    print "# Rows done for '$user': $rows\n" if $optv;
-
-    #
-    # NOTE:  we've just finished moving a bunch of rows form $dbo to $dbch,
-    #        which could have potentially been a very slow process since the
-    #        time for the copy is directly proportional to the data a user
-    #        had to move.  We'll revalidate handles now to ensure that they 
-    #        haven't died due to (insert eleventy billion circumstances here).
-    #
-
-    $dbh  = get_validated_role_dbh("master");
-    $dboa = get_definitive_source_dbh($u);
-    $dbo  = get_move_source_dbh($u);
-
-    # db handles should be good to go now
-
-    my $post_state = $stateString->("post");
-    if ($post_state ne $pre_state) {
-        die "Move aborted due to state change during move: Before: [$pre_state], After: [$post_state]\n";
-    }
-    $check_sig->($dbo, "dbo(aftermove)");
-
-    my $unlocked;
-    if (! $verify_code || $verify_code->()) {
-        # unset readonly and move to new cluster in one update
-        $unlocked = LJ::update_user($userid, { clusterid => $dclust, raw => "caps=caps&~(1<<$readonly_bit)" });
-        print "Moved.\n" if $optv;
-    } else {
-        # job server went away or we don't have permission to flip the clusterid attribute
-        # so just unlock them
-        $unlocked = LJ::update_user($userid, { raw => "caps=caps&~(1<<$readonly_bit)" });
-        die "Job server said no.\n";
-    }
-
-    # delete from the index of who's read-only.  if this fails we don't really care
-    # (not all sites might have this table anyway) because it's not used by anything
-    # except the readonly-cleaner which can deal with all cases.
-    if ($unlocked) {
-        eval {
-            $dbh->do("DELETE FROM readonly_user WHERE userid=?", undef, $userid);
-        };
-    }
-
-    # delete from source cluster
-    if ($opts->{del}) {
-        print "Deleting from source cluster...\n" if $optv;
-        foreach my $td (@to_delete) {
-            my ($table, $pri) = @$td;
-            while ($dboa->do("DELETE FROM $table WHERE $pri=$userid LIMIT 1000") > 0) {
-                print "  deleted from $table\n" if $optv;
-            }
-        }
-
-        # s1stylecache table
-        if (@styleids) {
-            my $styleids_in = join(",", map { $dboa->quote($_) } @styleids);
-            if ($dboa->do("DELETE FROM s1stylecache WHERE styleid IN ($styleids_in)") > 0) {
-                print "  deleted from s1stylecache\n" if $optv;
-            }
-        }
-    } else {
-        # at minimum, we delete the clustertrack2 row so it doesn't get
-        # included in a future ljumover.pl query from that cluster.
-        $dboa->do("DELETE FROM clustertrack2 WHERE userid=$userid");
-    }
-
-    $dbh->do("UPDATE clustermove SET sdeleted=?, timedone=UNIX_TIMESTAMP() ".
-             "WHERE cmid=?", undef, $opts->{del} ? 1 : 0, $cmid);
-
-    return 1;
-}
-
-sub fetchTableInfo
-{
-    my @tables = (@LJ::USER_TABLES, @LJ::USER_TABLES_LOCAL);
-    my $memkey = "moveucluster:" . Digest::MD5::md5_hex(join(",",@tables));
-    my $tinfo = LJ::MemCache::get($memkey) || {};
-    foreach my $table (@tables) {
-        next if grep { $_ eq $table } qw(events s1stylecache cmdbuffer captcha_session recentactions pendcomments active_user random_user_set);
-        next if $tinfo->{$table};  # no need to load this one
-
-        # find the index we'll use
-        my $idx;     # the index name we'll be using
-        my $idxcol;  # "userid" or "journalid"
-
-        my $sth = $dbo->prepare("SHOW INDEX FROM $table");
-        $sth->execute;
-        my @pris;
-
-        while (my $r = $sth->fetchrow_hashref) {
-            push @pris, $r->{'Column_name'} if $r->{'Key_name'} eq "PRIMARY";
-            next unless $r->{'Seq_in_index'} == 1;
-            next if $idx;
-            if ($r->{'Column_name'} eq "journalid" ||
-                $r->{'Column_name'} eq "userid" ||
-                $r->{'Column_name'} eq "uid" ||
-                $r->{'Column_name'} eq "commid") {
-                $idx = $r->{'Key_name'};
-                $idxcol = $r->{'Column_name'};
-            }
-        }
-
-        shift @pris if @pris && ($pris[0] eq "journalid" || $pris[0] eq "userid");
-        my $verifykey = join(",", @pris);
-
-        die "can't find index for table $table\n" unless $idx;
-
-        $tinfo->{$table}{idx} = $idx;
-        $tinfo->{$table}{idxcol} = $idxcol;
-        $tinfo->{$table}{verifykey} = $verifykey;
-
-        my $cols = $tinfo->{$table}{cols} = [];
-        my $colnum = 0;
-        $sth = $dboa->prepare("DESCRIBE $table");
-        $sth->execute;
-        while (my $r = $sth->fetchrow_hashref) {
-            push @$cols, $r->{'Field'};
-            if ($r->{'Field'} eq $idxcol) {
-                $tinfo->{$table}{pripos} = $colnum;
-            }
-            $colnum++;
-        }
-    }
-    LJ::MemCache::set($memkey, $tinfo, 90);  # not for long, but quick enough to speed a series of moves
-    return $tinfo;
-}
-
-### FUNCTION: abortWithUsage( $message )
-### Abort the program showing usage message.
-sub abortWithUsage {
-    my $msg = join '', @_;
-
-    if ( $msg ) {
-        pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" );
-    } else {
-        pod2usage( -verbose => 1, -exitval => 1 );
-    }
-}
-

Deleted: trunk/bin/moveuclusterd.pl
===================================================================
--- trunk/bin/moveuclusterd.pl	2011-10-13 22:08:12 UTC (rev 20309)
+++ trunk/bin/moveuclusterd.pl	2011-10-14 00:22:16 UTC (rev 20310)
@@ -1,2751 +0,0 @@
-#!/usr/bin/perl
-##############################################################################
-
-=head1 NAME
-
-moveuclusterd - User-mover task coordinater daemon
-
-=head1 SYNOPSIS
-
-  $ moveuclusterd OPTIONS
-
-=head2 OPTIONS
-
-=over 4
-
-=item -d, --debug
-
-Output debugging information in addition to normal progress messages. May be
-specified more than once to increase debug level.
-
-=item -D, --daemon
-
-Background the program.
-
-=item -h, --help
-
-Output a help message and exit.
-
-=item -H, --host=HOST
-
-Listen on the specified I<HOST> instead of the default '0.0.0.0'.
-
-=item -m, --maxlocktime=SECONDS
-
-Set the number of seconds that is targeted as the timespan to keep jobs locked
-before assigning them. If the oldest job in a cluster's queue is older than this
-value (120 by default), no users will be locked for that queue until the next
-check.
-
-=item -p, --port=PORT
-
-Listen to the given I<PORT> instead of the default 2789.
-
-=item -r, --defaultrate=INTEGER
-
-Set the default rate limit for any source cluster which has not had its rate set
-to I<INTEGER>. The default rate is 1.
-
-=item -s, --lockscale=INTEGER
-
-Set the lock-scaling factor to I<INTEGER>. The lock scaling factor is used to
-decide how many users to lock per source cluster; a scaling factor of C<3> (the
-default) would cause the jobserver to try to maintain 3 x the number of jobs as
-there are allowed connections for a given cluster, modulo the C<maxlocktime>.
-
-=item -v, --verbose
-
-Output the jobserver's log to STDERR.
-
-=back
-
-=head1 REQUIRES
-
-I<Token requires line>
-
-=head1 DESCRIPTION
-
-None yet.
-
-=head1 AUTHOR
-
-Michael Granger E<lt>ged@danga.comE<gt>
-
-Copyright (c) 2004 Danga Interactive. All rights reserved.
-
-This module is free software. You may use, modify, and/or redistribute this
-software under the terms of the Perl Artistic License. (See
-http://language.perl.com/misc/Artistic.html)
-
-=cut
-
-##############################################################################
-package moveuclusterd;
-use strict;
-use warnings qw{all};
-
-
-###############################################################################
-###  I N I T I A L I Z A T I O N
-###############################################################################
-BEGIN {
-
-    # Turn STDOUT buffering off
-    $| = 1;
-
-    # Versioning stuff and custom includes
-    use vars qw{$VERSION $RCSID};
-    $VERSION    = do { my @r = (q$Revision$ =~ /\d+/g); sprintf "%d."."%02d" x $#r, @r };
-    $RCSID      = q$Id$;
-
-    # Define some constants
-    use constant TRUE   => 1;
-    use constant FALSE  => 0;
-
-    use lib "$ENV{LJHOME}/cgi-bin";
-    require "ljlib.pl";
-
-    # Modules
-    use Carp                qw{croak confess};
-    use Getopt::Long        qw{GetOptions};
-    use Pod::Usage          qw{pod2usage};
-
-    Getopt::Long::Configure( 'bundling' );
-}
-
-
-###############################################################################
-### C O N F I G U R A T I O N   G L O B A L S
-###############################################################################
-
-### Main body
-sub MAIN {
-    my (
-        $debugLevel,            # Debugging level to set in server
-        $helpFlag,              # User requested help?
-        $daemonFlag,            # Background after starting?
-        $defaultRate,           # Default src cluster rate cmdline setting
-        $verboseFlag,           # Output the log or no?
-        $server,                # JobServer object
-        %config,                # JobServer configuration
-        $port,                  # Port to listen on
-        $host,                  # Address to listen on
-        $lockScale,             # Lock scaling factor
-        $maxLockTime,           # Max time to keep users locked
-       );
-
-    # Print the program header and read in command line options
-    GetOptions(
-        'D|daemon'        => \$daemonFlag,
-        'H|host=s'        => \$host,
-        'd|debug+'        => \$debugLevel,
-        'h|help'          => \$helpFlag,
-        'm|maxlocktime=i' => \$maxLockTime,
-        'p|port=i'        => \$port,
-        'r|defaultrate=i' => \$defaultRate,
-        's|lockscale=i'   => \$lockScale,
-        'v|verbose'       => \$verboseFlag,
-       ) or abortWithUsage();
-
-    # If the -h flag was given, just show the usage and quit
-    helpMode() and exit if $helpFlag;
-
-    # Build the configuration hash
-    $config{host} = $host if $host;
-    $config{port} = $port if $port;
-    $config{daemon} = $daemonFlag;
-    $config{debugLevel} = $debugLevel || 0;
-    $config{defaultRate} = $defaultRate if $defaultRate;
-    $config{lockScale} = $lockScale if $lockScale;
-    $config{maxLockTime} = $maxLockTime if defined $maxLockTime;
-
-    # Create a new daemon object
-    $server = new JobServer ( %config );
-
-    # Add a simple log handler if they've requested verbose output
-    if ( $verboseFlag ) {
-        my $tmplogger = sub {
-            my ( $level, $msg ) = @_;
-            print STDERR "[$level] $msg\n";
-        };
-        $server->addHandler( 'log', 'verboselogger', $tmplogger );
-    }
-
-    # Start the server
-    $server->start();
-}
-
-
-### FUNCTION: helpMode()
-### Exit normally after printing the usage message
-sub helpMode {
-    pod2usage( -verbose => 1, -exitval => 0 );
-}
-
-
-### FUNCTION: abortWithUsage( $message )
-### Abort the program showing usage message.
-sub abortWithUsage {
-    my $msg = @_ ? join('', @_) : "";
-
-    if ( $msg ) {
-        pod2usage( -verbose => 1, -exitval => 1, -message => "$msg" );
-    } else {
-        pod2usage( -verbose => 1, -exitval => 1 );
-    }
-}
-
-
-
-### If run from the command line, run the server.
-if ( $0 eq __FILE__ ) { MAIN() }
-
-
-#####################################################################
-###	T I M E D   B U F F E R   C L A S S
-#####################################################################
-package TimedBuffer;
-
-BEGIN {
-    use Carp qw{croak confess};
-}
-
-our $DefaultExpiration = 120;
-
-### (CONSTRUCTOR) METHOD: new( $seconds )
-### Create a new timed buffer which will remove entries the specified number of
-### I<seconds> after being added.
-sub new {
-    my $proto = shift;
-    my $class = ref $proto || $proto;
-    my $seconds = shift || $DefaultExpiration;
-
-    my $self = bless {
-        buffer  => [],
-        seconds => $seconds,
-    }, $class;
-
-    return $self;
-}
-
-
-### METHOD: add( @items )
-### Add the given I<items> to the buffer, shifting off older ones if they are
-### expired.
-sub add {
-    my $self = shift or confess "Cannot be used as a function";
-    my @items = @_;
-
-    my $expiration = time - $self->{seconds};
-    my $buffer = $self->{buffer};
-
-    # Expire old entries and add the new ones
-    @$buffer = grep { $_->[1] > $expiration } @$buffer;
-    push @$buffer, map {[ $_, time ]} @items;
-
-    return scalar @$buffer;
-}
-
-
-### METHOD: get( [@indices] )
-### Return the items in the buffer at the specified I<indices>, or all items in
-### the buffer if no I<indices> are given.
-sub get {
-    my $self = shift or confess "Cannot be used as a function";
-
-    my $expiration = time - $self->{seconds};
-    my $buffer = $self->{buffer};
-
-    # Expire old entries
-    @$buffer = grep { $_->[1] > $expiration } @$buffer;
-
-    # Return just the values from the buffer, either in a slice if they
-    # specified indexes, or the whole thing if not.
-    if ( @_ ) {
-        return map { $_->[0] } @{$buffer}[ @_ ];
-    } else {
-        return map { $_->[0] } @$buffer;
-    }
-}
-
-
-
-#####################################################################
-### D A E M O N   C L A S S
-#####################################################################
-package JobServer;
-
-BEGIN {
-    use IO::Socket      qw{};
-    use Data::Dumper    qw{Dumper};
-    use Carp            qw{croak confess};
-    use Time::HiRes     qw{gettimeofday tv_interval};
-    use POSIX           qw{};
-
-    use fields (
-        'clients',              # Connected client objects
-        'config',               # Configuration hash
-        'listener',             # The listener socket
-        'handlers',             # Client event handlers
-        'jobs',                 # Mover jobs
-        'totaljobs',            # Count of jobs processed
-        'assignments',          # Jobs that have been assigned
-        'users',                # Users in the queue
-        'ratelimits',           # Cached cluster ratelimits
-        'raterules',            # Rules for building ratelimit table
-        'jobcounts',            # Counts per cluster of running jobs
-        'starttime',            # Server startup epoch time
-        'recentmoves',          # Timed buffer of recently-completed jobs
-       );
-
-    use lib "$ENV{LJHOME}/cgi-bin";
-    require 'ljlib.pl';
-
-    use base qw{fields};
-}
-
-
-### Class globals
-
-# Default configuration
-our ( %DefaultConfig, %LogLevels );
-
-INIT {
-
-    # Default server configuration; this is merged with any config args the user
-    # specifies in the call to the constructor. Most of these correspond with
-    # command-line flags, so see that section of the POD header for more
-    # information.
-    %DefaultConfig = (
-        port         => 2789,           # Port to listen on
-        host         => '0.0.0.0',      # Host to bind to
-        listenQueue  => 5,              # Listen queue depth
-        daemon       => 0,              # Daemonize or not?
-        debugLevel   => 0,              # Debugging log level
-        defaultRate  => 1,              # The default src cluster rate
-        lockScale    => 3,              # Scaling factor for locking users
-        maxLockTime  => 120,            # Max seconds to keep users locked
-       );
-
-    my $level = 0;
-    %LogLevels = map {
-        $_    => $level++,
-    } qw{debug info notice warn crit fatal};
-
-
-    $Data::Dumper::Terse = 1;
-    $Data::Dumper::Indent = 1;
-}
-
-#
-# Datastructures of class members:
-#
-# clients:     Hashref of connected clients, keyed by fdno
-#
-# jobs:        A hash of arrays of JobServer::Job objects:
-#              {
-#                <srcclusterid> => [ $job1, $job2, ... ],
-#                ...
-#              }
-#
-# users:       A hash index into the inner arrays of 'jobs', keyed by
-#              userid.
-#
-# assignments: A hash of arrays; when a job is assigned to a mover, the
-#              corresponding JobServer::Job is moved into this hash,
-#              keyed by the fdno of the mover responsible.
-#
-# handlers:    Hash of hashes; this is used to register callbacks for clients that
-#              want to monitor the server, receiving log or debugging messages,
-#              new job notifications, etc.
-#
-# totaljobs:   Count of total jobs added to the daemon.
-#
-# raterules:   Maximum number of jobs which can be run against source clusters,
-#              keyed by clusterid. If a global rate limit has been set, this
-#              hash also contains a special key 'global' to contain it.
-#
-# ratelimits:  Cached ratelimits for clusters -- this is rebuilt whenever a
-#              ratelimit rule is added, and is partially rebuilt when new jobs
-#              are added.
-#
-# jobcounts:   Count of jobs running against source clusters, keyed by
-#              source clusterid.
-
-### (CONSTRUCTOR) METHOD: new( %config )
-### Create a new JobServer object with the given I<config>.
-sub new {
-    my JobServer $self = shift;
-    my %config = @_;
-
-    $self = fields::new( $self ) unless ref $self;
-
-    # Client and job queues
-    $self->{clients}     = {};  # fd => client obj
-    $self->{jobs}        = {};  # pending jobs: srcluster => [ jobs ]
-    $self->{users}       = {};  # by-userid hash of jobs
-    $self->{assignments} = {};  # fd => job object
-    $self->{totaljobs}   = 0;   # Count of total jobs added
-    $self->{raterules}   = {};  # User-set rate-limit rules
-    $self->{ratelimits}  = {};  # Cached rate limits by srcclusterid
-    $self->{jobcounts}   = {};  # Count of jobs by srcclusterid
-
-    # Create a timed buffer to contain the jobs which have completed in the last
-    # 6 minutes.
-    $self->{recentmoves} = new TimedBuffer 360;
-
-    # Merge the user-specified configuration with the defaults, with the user's
-    # overriding.
-    $self->{config}      =  {
-        %DefaultConfig,
-        %config,
-    };                          # merge
-
-    # These two get set by start()
-    $self->{listener}    = undef;
-    $self->{starttime}   = undef;
-
-    # CODE refs for handling various events. Keyed by event name, each subhash
-    # contains registrations for event callbacks. Each subhash is keyed by the
-    # fdno of the client that requested it, or an arbitrary string if the
-    # handler belongs to something other than a client.
-    $self->{handlers}    =  {
-        debug        => {},
-        log          => {},
-    };
-
-    return $self;
-}
-
-
-
-### METHOD: start()
-### Start the event loop.
-sub start {
-    my JobServer $self = shift;
-
-    # Start the listener socket
-    my $listener = new IO::Socket::INET
-        Proto       => 'tcp',
-        LocalAddr   => $self->{config}{host},
-        LocalPort   => $self->{config}{port},
-        Listen      => $self->{config}{listenQueue},
-        ReuseAddr   => 1,
-        Blocking    => 0
-            or die "new socket: $!";
-
-    # Log the server startup, then daemonize if it's called for
-    $self->logMsg( 'notice', "Server listening on %s:%d\n",
-                   $listener->sockhost, $listener->sockport );
-    $self->{listener} = $listener;
-    $self->daemonize if $self->{config}{daemon};
-
-    # Remember the startup time
-    $self->{starttime} = time;
-
-    # I don't understand this design -- the Client class is where the event loop
-    # is? Weird. Thanks to SPUD, though, for the example code.
-    JobServer::Client->OtherFds( $listener->fileno => sub {$self->createClient} );
-    JobServer::Client->EventLoop();
-
-    return 1;
-}
-
-
-
-### METHOD: createClient( undef )
-### Listener socket readable callback. Accepts a new client socket and wraps a
-### JobServer::Client around it.
-sub createClient {
-    my JobServer $self = shift;
-
-    my (
-        $csock,                 # Client socket
-        $client,                # JobServer::Client object
-        $fd,                    # File descriptor for client
-       );
-
-    # Get the client socket and set it nonblocking
-    $csock = $self->{listener}->accept or return;
-    $csock->blocking(0);
-    $fd = fileno( $csock );
-
-    $self->logMsg( 'info', 'Client %d connect: %s:%d',
-                   $fd, $csock->peerhost, $csock->peerport );
-
-    # Wrap a client object around it, tell it to watch for input, and send the
-    # greeting.
-    $client = JobServer::Client->new( $self, $csock );
-    $client->watch_read( 1 );
-    $client->write( "Ready.\r\n" );
-
-    return $self->{clients}{$fd} = $client;
-}
-
-
-### METHOD: disconnectClient( $client=JobServer::Client[, $requeue] )
-### Disconnect the specified I<client> from the server. If I<requeue> is true,
-### the job belonging to the client (if any) will be put back into the queue of
-### pending jobs.
-sub disconnectClient {
-    my JobSe...
 (truncated)
Tags: gariev, livejournal, pl, pm
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 0 comments