[TheSchwartz] r159: LJSUP-12232: Enable pingback mechanism f...
Committer: amyshkin
LJSUP-12232: Enable pingback mechanism for gazeta.ru and championat.comU trunk/lib/TheSchwartz.pm
Modified: trunk/lib/TheSchwartz.pm
===================================================================
--- trunk/lib/TheSchwartz.pm 2012-05-30 10:38:09 UTC (rev 158)
+++ trunk/lib/TheSchwartz.pm 2012-06-04 14:46:27 UTC (rev 159)
@@ -271,6 +271,62 @@
my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
@$worker_classes;
+ @jobs = $driver->search(
+ 'TheSchwartz::Job' =>
+ {
+ funcid => \@ids,
+ run_after => \ "<= $unixtime",
+ grabbed_until => \ "<= $unixtime",
+ },
+ {
+ limit => $FIND_JOB_BATCH_SIZE,
+ (
+ $client->prioritize
+ ? (
+ 'sort' => 'priority',
+ 'direction' => 'descend',
+ )
+ : ()
+ )
+ }
+ );
+ };
+ if ($@) {
+ unless (OK_ERRORS->{ $driver->last_error || 0 }) {
+ $client->mark_database_as_dead($hashdsn);
+ }
+ }
+
+ # for test harness race condition testing
+ $T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
+
+ my $job = $client->_grab_a_job($hashdsn, @jobs);
+ return $job if $job;
+ }
+}
+
+sub find_jobs_for_workers {
+ my TheSchwartz $client = shift;
+ my($worker_classes) = @_;
+ $worker_classes ||= $client->{current_abilities};
+
+ for my $hashdsn ($client->shuffled_databases) {
+ ## If the database is dead, skip it.
+ next if $client->is_database_dead($hashdsn);
+
+ my $driver = $client->driver_for($hashdsn);
+ my $unixtime = $driver->dbd->sql_for_unixtime;
+
+ my @jobs;
+ eval {
+ ## Search for jobs in this database where:
+ ## 1. funcname is in the list of abilities this $client supports;
+ ## 2. the job is scheduled to be run (run_after is in the past);
+ ## 3. no one else is working on the job (grabbed_until is in
+ ## in the past).
+ my @ids = map { $client->funcname_to_id($driver, $hashdsn, $_) }
+ @$worker_classes;
+
@jobs = $driver->search('TheSchwartz::Job' => {
funcid => \@ids,
run_after => \ "<= $unixtime",
@@ -290,11 +346,15 @@
# for test harness race condition testing
$T_AFTER_GRAB_SELECT_BEFORE_UPDATE->() if $T_AFTER_GRAB_SELECT_BEFORE_UPDATE;
- my $job = $client->_grab_a_job($hashdsn, @jobs);
- return $job if $job;
+ @jobs = $client->_grab_jobs($hashdsn, @jobs) if @jobs;
+
+ return @jobs if @jobs;
}
+
+ return ();
}
+
sub get_server_time {
my TheSchwartz $client = shift;
my($driver) = @_;
@@ -346,7 +406,51 @@
return undef;
}
+sub _grab_jobs {
+ my TheSchwartz $client = shift;
+ my $hashdsn = shift;
+ my $driver = $client->driver_for($hashdsn);
+ ## Got some jobs! Randomize them to avoid contention between workers.
+ my @jobs = shuffle(@_);
+ my @out = ();
+
+ JOB:
+ while (my $job = shift @jobs) {
+ ## Convert the funcid to a funcname, based on this database's map.
+ $job->funcname( $client->funcid_to_name($driver, $hashdsn, $job->funcid) );
+
+ ## Update the job's grabbed_until column so that
+ ## no one else takes it.
+ my $worker_class = $job->funcname;
+ my $old_grabbed_until = $job->grabbed_until;
+
+ my $server_time = $client->get_server_time($driver)
+ or die "expected a server time";
+
+ $job->grabbed_until($server_time + ($worker_class->grab_for || 1));
+
+ ## Update the job in the database, and end the transaction.
+ if ($driver->update($job, { grabbed_until => $old_grabbed_until }) < 1) {
+ ## We lost the race to get this particular job--another worker must
+ ## have got it and already updated it. Move on to the next job.
+ $T_LOST_RACE->() if $T_LOST_RACE;
+ next JOB;
+ }
+
+ ## Now prepare the job, and return it.
+ my $handle = TheSchwartz::JobHandle->new({
+ dsn_hashed => $hashdsn,
+ jobid => $job->jobid,
+ });
+ $handle->client($client);
+ $job->handle($handle);
+ push @out, $job;
+ }
+
+ return @out;
+}
+
sub shuffled_databases {
my TheSchwartz $client = shift;
my @dsns = keys %{ $client->{databases} };
@@ -553,8 +657,10 @@
$job = $client->find_job_for_workers;
}
- my $class = $job ? $job->funcname : undef;
- if ($job) {
+ my $class;
+
+ if ( $job ) {
+ $class = $job->funcname;
my $priority = $job->priority ? ", priority " . $job->priority : "";
$job->debug("TheSchwartz::work_once got job of class '$class'$priority");
} else {
@@ -577,6 +683,50 @@
return 1;
}
+## Returns true if it did something, false if no jobs were found
+sub work_batch {
+ my TheSchwartz $client = shift;
+ my @jobs = @_; # optional specific job to work on
+
+ ## Look for a job with our current set of abilities. Note that the
+ ## list of current abilities may not be equal to the full set of
+ ## abilities, to allow for even distribution between jobs.
+ @jobs = $client->find_jobs_for_workers unless @jobs;
+
+ ## If we didn't find anything, restore our full abilities, and try
+ ## again.
+ if (! @jobs &&
+ @{ $client->{current_abilities} } < @{ $client->{all_abilities} }) {
+ $client->restore_full_abilities;
+ @jobs = $client->find_jobs_for_workers;
+ }
+
+ my $class;
+
+ if ( @jobs ) {
+ for my $job ( @jobs ) {
+ $class = $job->funcname;
+ my $priority = $job->priority ? ", priority " . $job->priority : "";
+ $job->debug("$$ TheSchwartz::work_batch got job of class '$class'$priority");
+
+ ## Now that we found a job for this particular funcname, remove it
+ ## from our list of current abilities. So the next time we look for a
+ ## we'll find a job for a different funcname. This prevents starvation of
+ ## high funcid values because of the way MySQL's indexes work.
+ $client->temporarily_remove_ability($class);
+ }
+ } else {
+ $client->debug("$$ TheSchwartz::work_batch found no jobs");
+ return;
+ }
+
+ $class->work_safely(@jobs);
+
+ ## We got a job, so return 1 so work_until_done (which calls this method)
+ ## knows to keep looking for jobs.
+ return 1;
+}
+
sub funcid_to_name {
my TheSchwartz $client = shift;
my($driver, $hashdsn, $funcid) = @_;
