Типа я (zilogic) wrote in changelog,
Типа я
zilogic
changelog

[TheSchwartz] r159: LJSUP-12232: Enable pingback mechanism f...

Committer: amyshkin
LJSUP-12232: Enable pingback mechanism for gazeta.ru and championat.com
U   trunk/lib/TheSchwartz.pm
Modified: trunk/lib/TheSchwartz.pm
===================================================================
--- trunk/lib/TheSchwartz.pm	2012-05-30 10:38:09 UTC (rev 158)
+++ trunk/lib/TheSchwartz.pm	2012-06-04 14:46:27 UTC (rev 159)
@@ -271,6 +271,62 @@
             my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
                       @$worker_classes;
 
+            @jobs = $driver->search(
+                'TheSchwartz::Job' =>
+                {
+                    funcid        => \@ids,
+                    run_after     => \ "<= $unixtime",
+                    grabbed_until => \ "<= $unixtime",
+                },
+                {
+                    limit => $FIND_JOB_BATCH_SIZE,
+                    (
+                        $client->prioritize
+                            ? (
+                                'sort'      => 'priority',
+                                'direction' => 'descend',
+                              )
+                            : ()
+                    )
+                }
+            );
+        };
+        if ($@) {
+            unless (OK_ERRORS->{ $driver->last_error || 0 }) {
+                $client->mark_database_as_dead($hashdsn);
+            }
+        }
+
+        # for test harness race condition testing
+        $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
+
+        my $job = $client->_grab_a_job($hashdsn, @jobs);
+        return $job if $job;
+    }
+}
+
+sub find_jobs_for_workers {
+    my TheSchwartz $client = shift;
+    my($worker_classes) = @_;
+    $worker_classes ||= $client->{current_abilities};
+
+    for my $hashdsn ($client->shuffled_databases) {
+        ## If the database is dead, skip it.
+        next if $client->is_database_dead($hashdsn);
+
+        my $driver = $client->driver_for($hashdsn);
+        my $unixtime = $driver->dbd->sql_for_unixtime;
+
+        my @jobs;
+        eval {
+            ## Search for jobs in this database where:
+            ## 1. funcname is in the list of abilities this $client supports;
+            ## 2. the job is scheduled to be run (run_after is in the past);
+            ## 3. no one else is working on the job (grabbed_until is in
+            ##    in the past).
+            my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
+                      @$worker_classes;
+
             @jobs = $driver->search('TheSchwartz::Job' => {
                     funcid        => \@ids,
                     run_after     => \ "<= $unixtime",
@@ -290,11 +346,15 @@
         # for test harness race condition testing
         $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
 
-        my $job = $client->_grab_a_job($hashdsn, @jobs);
-        return $job if $job;
+        @jobs = $client->_grab_jobs($hashdsn, @jobs) if @jobs;
+
+        return @jobs if @jobs;
     }
+
+    return ();
 }
 
+
 sub get_server_time {
     my TheSchwartz $client = shift;
     my($driver) = @_;
@@ -346,7 +406,51 @@
     return undef;
 }
 
+sub _grab_jobs {
+    my TheSchwartz $client = shift;
+    my $hashdsn = shift;
+    my $driver = $client->driver_for($hashdsn);
 
+    ## Got some jobs! Randomize them to avoid contention between workers.
+    my @jobs = shuffle(@_);
+    my @out = ();
+
+  JOB:
+    while (my $job = shift @jobs) {
+        ## Convert the funcid to a funcname, based on this database's map.
+        $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
+
+        ## Update the job's grabbed_until column so that
+        ## no one else takes it.
+        my $worker_class = $job->funcname;
+        my $old_grabbed_until = $job->grabbed_until;
+
+        my $server_time = $client->get_server_time($driver)
+            or die "expected a server time";
+
+        $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
+
+        ## Update the job in the database, and end the transaction.
+        if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
+            ## We lost the race to get this particular job--another worker must
+            ## have got it and already updated it. Move on to the next job.
+            $T_LOST_RACE->() if $T_LOST_RACE;
+            next JOB;
+        }
+
+        ## Now prepare the job, and return it.
+        my $handle = TheSchwartz::JobHandle->new({
+            dsn_hashed => $hashdsn,
+            jobid      => $job->jobid,
+        });
+        $handle->client($client);
+        $job->handle($handle);
+        push @out, $job;
+    }
+
+    return @out;
+}
+
 sub shuffled_databases {
     my TheSchwartz $client = shift;
     my @dsns = keys %{ $client->{databases} };
@@ -553,8 +657,10 @@
         $job = $client->find_job_for_workers;
     }
 
-    my $class = $job ? $job->funcname : undef;
-    if ($job) {
+    my $class;
+
+    if ( $job ) {
+        $class = $job->funcname;
         my $priority = $job->priority ? ", priority " . $job->priority : "";
         $job->debug("TheSchwartz::work_once got job of class '$class'$priority");
     } else {
@@ -577,6 +683,50 @@
     return 1;
 }
 
+## Returns true if it did something, false if no jobs were found
+sub work_batch {
+    my TheSchwartz $client = shift;
+    my @jobs = @_;  # optional specific job to work on
+
+    ## Look for a job with our current set of abilities. Note that the
+    ## list of current abilities may not be equal to the full set of
+    ## abilities, to allow for even distribution between jobs.
+    @jobs = $client->find_jobs_for_workers unless @jobs;
+
+    ## If we didn't find anything, restore our full abilities, and try
+    ## again.
+    if (! @jobs &&
+        @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
+        $client->restore_full_abilities;
+        @jobs = $client->find_jobs_for_workers;
+    }
+
+    my $class;
+
+    if ( @jobs ) {
+        for my $job ( @jobs ) {
+            $class = $job->funcname;
+            my $priority = $job->priority ? ", priority " . $job->priority : "";
+            $job->debug("$$ TheSchwartz::work_batch got job of class '$class'$priority");
+
+            ## Now that we found a job for this particular funcname, remove it
+            ## from our list of current abilities. So the next time we look for a
+            ## we'll find a job for a different funcname. This prevents starvation of
+            ## high funcid values because of the way MySQL's indexes work.
+            $client->temporarily_remove_ability($class);
+        }
+    } else {
+        $client->debug("$$ TheSchwartz::work_batch found no jobs");
+        return;
+    }
+
+    $class->work_safely(@jobs);
+
+    ## We got a job, so return 1 so work_until_done (which calls this method)
+    ## knows to keep looking for jobs.
+    return 1;
+}
+
 sub funcid_to_name {
     my TheSchwartz $client = shift;
     my($driver, $hashdsn, $funcid) = @_;

Tags: amyshkin, pm, theschwartz, zilogic
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 0 comments