Committer: sbelyaev
LJSUP-11064: scheduled entries posting worker has been updated.U trunk/bin/worker/delayed-entries-poster U trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm
Modified: trunk/bin/worker/delayed-entries-poster =================================================================== --- trunk/bin/worker/delayed-entries-poster 2012-01-26 11:22:34 UTC (rev 21003) +++ trunk/bin/worker/delayed-entries-poster 2012-01-26 11:29:01 UTC (rev 21004) @@ -10,31 +10,23 @@ use base 'LJ::NewWorker::Manual'; require 'ljlib.pl'; require 'ljdb.pl'; +use LJ::DBUtil; use LJ::DelayedEntry::Scheduler; -my $cluster; - sub options { my $self = shift; return ( - 'cluster=i' => \$cluster, $self->SUPER::options(), ); } -sub call_for_cluster { - my $coderef = shift; - my $opts = shift || {}; - - my $dbr = ($LJ::IS_DEV_SERVER) ? - LJ::get_cluster_reader($cluster) : LJ::DBUtil->get_inactive_db($cluster, $opts->{verbose}); - $coderef->($cluster, $dbr); -} - - sub work { - call_for_cluster( sub { LJ::DelayedEntry::Scheduler::on_pulse(@_); } ); + foreach my $cluster_id (@LJ::CLUSTERS) { + my $dbr = LJ::DBUtil->get_inactive_db($cluster_id, __PACKAGE__->verbose); + LJ::DelayedEntry::Scheduler::on_pulse($cluster_id, $dbr); + } + return 1 if __PACKAGE__->should_quit; return 0; } Modified: trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm =================================================================== --- trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm 2012-01-26 11:22:34 UTC (rev 21003) +++ trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm 2012-01-26 11:29:01 UTC (rev 21004) @@ -1,3 +1,45 @@ +package LJ::DelayedEntry::Scheduler::TableLock; +use strict; +use warnings; + +my $DELAYED_ENTRIES_LOCK_NAME = 'delayed_entries_lock'; + +sub new { + my ($class, $dbh) = @_; + + if (try_lock($dbh)) { + return undef; + } + + my $self = bless {}, $class; + $self->{dbh} = $dbh; + + return $self; +} + +sub try_lock { + my ($dbh) = @_; + + my ($free) = + $dbh->selectrow_array("SELECT IS_FREE_LOCK('$DELAYED_ENTRIES_LOCK_NAME')"); + + if (!$free) { + return 0; + } + + my ($result) = + $dbh->selectrow_array("SELECT GET_LOCK('$DELAYED_ENTRIES_LOCK_NAME', 10)"); + return $result; +} + +sub DESTROY { + my ($self) = @_; + my $dbh = $self->{dbh}; + + $dbh->selectrow_array("SELECT RELEASE_LOCK('$DELAYED_ENTRIES_LOCK_NAME')"); +} + + package LJ::DelayedEntry::Scheduler; use LJ::DelayedEntry; use LJ::Text; @@ -4,9 +46,8 @@ use strict; use warnings; -use Data::Dumper; -my $PULSE_TIME = 1 * 60; +my $PULSE_TIME = 1; sub pulse_time { return $PULSE_TIME; @@ -18,7 +59,7 @@ my $list = $dbh->selectall_arrayref("SELECT journalid, delayedid, posterid " . "FROM delayedlog2 ". - "WHERE posttime <= NOW()"); + "WHERE posttime <= NOW() LIMIT 1000"); foreach my $tuple (@$list) { push @entries, LJ::DelayedEntry->load_data($dbh, @@ -46,24 +87,39 @@ }); } + sub on_pulse { my ($clusterid, $dbh) = @_; __assert($dbh); - my $entries = __load_delayed_entries($dbh); - foreach my $entry(@$entries) { - my $post_status = $entry->convert(); + my $lock = new LJ::DelayedEntry::Scheduler::TableLock($dbh); - # do we need to send error - if ( $post_status->{error_message} ) { - __send_error($entry->poster, - $entry->data->{subject}, - $post_status->{error_message}); - } - if ( $post_status->{delete_entry} ) { - $entry->delete(); - } + if (!$lock) { + return; } + + eval { + my $entries = __load_delayed_entries($dbh); + + do { + foreach my $entry(@$entries) { + my $post_status = $entry->convert(); + + # do we need to send error + if ( $post_status->{error_message} ) { + __send_error($entry->poster, + $entry->data->{subject}, + $post_status->{error_message}); + } + if ( $post_status->{delete_entry} ) { + $entry->delete(); + } + } + } while ($entries = __load_delayed_entries($dbh) && @$entries) + }; + if ($@) { + warn 'worker has failed: ' . $@; + } } sub __assert() {