madeon (madeon) wrote in changelog,
madeon
madeon
changelog

[livejournal] r21004: LJSUP-11064: scheduled entries posting w...

Committer: sbelyaev
LJSUP-11064: scheduled entries posting worker has been updated.
U   trunk/bin/worker/delayed-entries-poster
U   trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm
Modified: trunk/bin/worker/delayed-entries-poster
===================================================================
--- trunk/bin/worker/delayed-entries-poster	2012-01-26 11:22:34 UTC (rev 21003)
+++ trunk/bin/worker/delayed-entries-poster	2012-01-26 11:29:01 UTC (rev 21004)
@@ -10,31 +10,23 @@
 use base 'LJ::NewWorker::Manual';
 require 'ljlib.pl';
 require 'ljdb.pl';
+use LJ::DBUtil;
 
 use LJ::DelayedEntry::Scheduler;
 
-my $cluster;
-
 sub options {
     my $self = shift;
     return (
-        'cluster=i'  => \$cluster,
         $self->SUPER::options(),
     );
 }
 
-sub call_for_cluster {
-    my $coderef = shift;
-    my $opts = shift || {};
-
-    my $dbr = ($LJ::IS_DEV_SERVER) ?
-            LJ::get_cluster_reader($cluster) : LJ::DBUtil->get_inactive_db($cluster, $opts->{verbose});
-    $coderef->($cluster, $dbr);
-}
-
-
 sub work {
-    call_for_cluster( sub { LJ::DelayedEntry::Scheduler::on_pulse(@_); } );
+    foreach my $cluster_id (@LJ::CLUSTERS) {
+        my $dbr = LJ::DBUtil->get_inactive_db($cluster_id, __PACKAGE__->verbose);
+        LJ::DelayedEntry::Scheduler::on_pulse($cluster_id, $dbr);
+    }
+
     return 1 if __PACKAGE__->should_quit;
     return 0;
 }

Modified: trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm
===================================================================
--- trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm	2012-01-26 11:22:34 UTC (rev 21003)
+++ trunk/cgi-bin/LJ/DelayedEntry/Scheduler.pm	2012-01-26 11:29:01 UTC (rev 21004)
@@ -1,3 +1,45 @@
+package LJ::DelayedEntry::Scheduler::TableLock;
+use strict;
+use warnings;
+
+my $DELAYED_ENTRIES_LOCK_NAME = 'delayed_entries_lock';
+
+sub new {
+    my ($class, $dbh) = @_;
+
+    if (try_lock($dbh)) {
+        return undef;
+    }
+
+    my $self = bless {}, $class;
+    $self->{dbh} = $dbh;
+
+    return $self;
+}
+
+sub try_lock {
+    my ($dbh) = @_;
+
+    my ($free) = 
+        $dbh->selectrow_array("SELECT IS_FREE_LOCK('$DELAYED_ENTRIES_LOCK_NAME')");
+
+    if (!$free) {
+        return 0;
+    }
+
+    my ($result) = 
+        $dbh->selectrow_array("SELECT GET_LOCK('$DELAYED_ENTRIES_LOCK_NAME', 10)");
+    return $result;
+}
+
+sub DESTROY {
+    my ($self) = @_;
+    my $dbh = $self->{dbh};
+
+    $dbh->selectrow_array("SELECT RELEASE_LOCK('$DELAYED_ENTRIES_LOCK_NAME')");
+}
+
+
 package LJ::DelayedEntry::Scheduler;
 use LJ::DelayedEntry;
 use LJ::Text;
@@ -4,9 +46,8 @@
 
 use strict;
 use warnings;
-use Data::Dumper;
 
-my $PULSE_TIME = 1 * 60; 
+my $PULSE_TIME = 1;
 
 sub pulse_time {
     return $PULSE_TIME;
@@ -18,7 +59,7 @@
     
     my $list = $dbh->selectall_arrayref("SELECT journalid, delayedid, posterid " .
                                         "FROM delayedlog2 ".
-                                        "WHERE posttime <= NOW()");
+                                        "WHERE posttime <= NOW() LIMIT 1000");
 
     foreach my $tuple (@$list) {
         push @entries, LJ::DelayedEntry->load_data($dbh,
@@ -46,24 +87,39 @@
     });    
 }
 
+
 sub on_pulse {
     my ($clusterid, $dbh) = @_;
     __assert($dbh);
-    my $entries = __load_delayed_entries($dbh);
 
-    foreach my $entry(@$entries) {
-        my $post_status = $entry->convert();
+    my $lock = new LJ::DelayedEntry::Scheduler::TableLock($dbh);
 
-        # do we need to send error
-        if ( $post_status->{error_message} ) {
-            __send_error($entry->poster, 
-                         $entry->data->{subject},
-                         $post_status->{error_message});
-        }
-        if ( $post_status->{delete_entry} ) {
-            $entry->delete();
-        }
+    if (!$lock) {
+        return;
     }
+
+    eval {
+        my $entries = __load_delayed_entries($dbh);
+
+        do {
+            foreach my $entry(@$entries) {
+                my $post_status = $entry->convert();
+        
+                # do we need to send error
+                if ( $post_status->{error_message} ) {
+                    __send_error($entry->poster, 
+                                $entry->data->{subject},
+                                $post_status->{error_message});
+                }
+                if ( $post_status->{delete_entry} ) {
+                    $entry->delete();
+                }
+            }
+        } while ($entries = __load_delayed_entries($dbh) && @$entries)
+    };
+    if ($@) {
+        warn 'worker has failed: ' . $@;
+    }
 }
 
 sub __assert() {

Tags: livejournal, madeon, pm, sbelyaev
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 0 comments