vadvs (vadvs) wrote in changelog,
vadvs
vadvs
changelog

[livejournal] r15628: LJSUP-4754: update 'reminder' worker to ...

Committer: vsukhanov
LJSUP-4754: update 'reminder' worker to use LJ::NewWorker.

U   trunk/bin/worker/reminders
A   trunk/cgi-bin/LJ/NewWorker/Reminders.pm
Modified: trunk/bin/worker/reminders
===================================================================
--- trunk/bin/worker/reminders	2009-08-24 08:55:39 UTC (rev 15627)
+++ trunk/bin/worker/reminders	2009-08-24 09:15:44 UTC (rev 15628)
@@ -5,9 +5,8 @@
 
 use lib "$ENV{LJHOME}/cgi-bin";
 
-use LJ::Worker::Reminders;
+use LJ::NewWorker::Reminders;
 
 require 'ljlib.pl';
+LJ::NewWorker::Reminders->new()->start();
 
-LJ::Worker::Reminders->new()->run();
-

Added: trunk/cgi-bin/LJ/NewWorker/Reminders.pm
===================================================================
--- trunk/cgi-bin/LJ/NewWorker/Reminders.pm	                        (rev 0)
+++ trunk/cgi-bin/LJ/NewWorker/Reminders.pm	2009-08-24 09:15:44 UTC (rev 15628)
@@ -0,0 +1,222 @@
+package LJ::NewWorker::Reminders;
+
+use strict;
+use warnings;
+
+use lib "$ENV{LJHOME}/cgi-bin";
+
+use base qw(Data::ObjectDriver::BaseObject LJ::NewWorker::Manual);
+
+use Data::ObjectDriver::Driver::DBI;
+use LJ::PersonalStats::AccessControl;
+
+use Carp qw( croak );
+use Digest::MD5 qw( md5_hex );
+use List::Util qw( shuffle );
+
+use TheSchwartz::FuncMap;
+
+require 'ljlib.pl';
+
+use constant MAX_RETRIES   => 5;
+
+__PACKAGE__->install_properties({
+    columns     => [ qw( reminderid userid funcid arg
+                         insert_time scheduled_time run_after
+                         grabbed_timestamp priority retries ) ],
+    datasource  => 'reminders',
+    primary_key => 'reminderid',
+});
+
+sub new {
+    my $class = shift;
+
+    my $self = bless {}, $class;
+
+    for my $db (@LJ::THESCHWARTZ_DBS) {
+        next if exists $db->{role}->{mass} && $db->{role}->{mass}; # Skip mass roles
+        my $full = join '|', map { $db->{$_} || '' } qw( dsn user pass );
+        # FIXME: Different roles can has same $full line!
+        $self->{databases}{ md5_hex($full) } = $db;
+    }
+    
+    return $self;
+}
+
+sub funcid_to_name {
+    my ($self, $hashdsn, $funcid) = @_;
+    my $cache = $self->_funcmap_cache($hashdsn);
+    return $cache->{funcid2name}{$funcid};
+}
+
+sub funcname_to_id {
+    my ($self, $hashdsn, $funcname) = @_;
+    my $cache = $self->_funcmap_cache($hashdsn);
+    unless (exists $cache->{funcname2id}{$funcname}) {
+        my $driver = $self->driver_for($hashdsn);
+        $driver->begin_work;
+        my $map = TheSchwartz::FuncMap->create_or_find($driver, $funcname);
+        $driver->commit;
+        $cache->{funcname2id}{ $map->funcname } = $map->funcid;
+        $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
+    }
+    return $cache->{funcname2id}{$funcname};
+}
+
+sub _funcmap_cache {
+    my ($self, $hashdsn) = @_;
+    unless (exists $self->{funcmap_cache}{$hashdsn}) {
+        my $driver = $self->driver_for($hashdsn);
+        my @maps = $driver->search('TheSchwartz::FuncMap');
+        my $cache = { funcname2id => {}, funcid2name => {} };
+        for my $map (@maps) {
+            $cache->{funcname2id}{ $map->funcname } = $map->funcid;
+            $cache->{funcid2name}{ $map->funcid }   = $map->funcname;
+        }
+        $self->{funcmap_cache}{$hashdsn} = $cache;
+    }
+    return $self->{funcmap_cache}{$hashdsn};
+}
+
+sub driver_for {
+    my ($self, $hashdsn) = @_;
+    my $db = $self->{databases}{$hashdsn}
+        or croak "Ouch, I don't know about a database whose hash is $hashdsn";
+    
+    return Data::ObjectDriver::Driver::DBI->new(
+        dsn      => $db->{dsn},
+        username => $db->{user},
+        password => $db->{pass},
+        ($db->{prefix} ? (prefix   => $db->{prefix}) : ()),
+    );
+}
+
+sub shuffled_databases {
+    my $self = shift;
+    my @dsns = keys %{ $self->{databases} };
+    return shuffle(@dsns);
+}
+
+#### API ##############################
+
+sub schedule {
+    my $self = shift;
+    my $opts = shift;
+
+    return undef unless $opts;
+    return undef unless 'HASH' eq ref $opts;
+
+    my ($userid, $funcid, $function, $arg, $run_after, $priority) =
+        map { delete $opts->{$_} } qw(userid funcid function arg run_after priority);
+
+    # Check is user valid
+    return undef unless $userid;
+
+    if ($arg) {
+        if (ref($arg) eq 'SCALAR') {
+            $arg = Storable::thaw($$arg);
+        } elsif (!ref($arg)) {
+            # if a regular scalar, test to see if it's a storable or not.
+            $arg = _cond_thaw($arg);
+            return undef unless $arg;
+        }
+    }
+
+    for my $hashdsn ($self->shuffled_databases) {
+        unless ($funcid) {
+            return undef unless $function;
+            $funcid = $self->funcname_to_id($hashdsn, $function);
+        }
+
+        $run_after ||= time;
+        $priority ||= 0;
+
+        my $driver = $self->driver_for($hashdsn);
+        my $unixtime = $driver->dbd->sql_for_unixtime;
+
+        my $reminder = (ref $self)->new;
+        $reminder->userid($userid);
+        $reminder->funcid($funcid);
+        $reminder->arg($arg);
+        $reminder->insert_time(time);
+        $reminder->scheduled_time(undef);
+        $reminder->run_after($run_after);
+        $reminder->grabbed_timestamp(undef);
+        $reminder->priority($priority);
+        $reminder->retries(0);
+        $driver->insert($reminder);
+        return $reminder->reminderid;
+    }
+}
+
+#### Worker#############################
+
+sub work {
+    my $self = shift;
+
+    my $count = 0;
+
+    for my $hashdsn ($self->shuffled_databases) {
+        my $driver = $self->driver_for($hashdsn);
+        my $unixtime = $driver->dbd->sql_for_unixtime;
+
+        my @jobs = $driver->search('LJ::NewWorker::Reminders' => {
+            run_after           => \ "<= $unixtime",
+            grabbed_timestamp   => \ 'IS NULL',
+            retries             => { op => '<', value => MAX_RETRIES },
+        },
+        {
+            limit => 100,
+            sort => 'insert_time, scheduled_time, run_after, priority'
+        });
+
+        my $servertime = $driver->rw_handle->selectrow_array("SELECT $unixtime");
+
+        foreach my $job (@jobs) {
+            $job->grabbed_timestamp($servertime);
+            if (1 > $driver->update($job, {
+                reminderid => $job->reminderid,
+                grabbed_timestamp => \ 'IS NULL' } )) {
+                # Cannot grab a job. Somebody grab it already.
+                next;
+            }
+
+            my $funcname = $self->funcid_to_name($hashdsn,$job->funcid);
+            my %funcs_hash  = ( # Hash of authorized functions.
+               'LJ::PersonalStats::AccessControl::work' => 'LJ::PersonalStats::AccessControl::work',
+            );
+
+            eval {
+                no strict 'refs';
+                $funcs_hash{$funcname}->($job->userid);
+                use strict 'refs';
+            };
+            if ($@) {   # Error: increment retries counter and release a job.
+                my $retries = 1 + $job->retries;
+                warn "Error: $@, max retries count exceeded." if $retries >= MAX_RETRIES;
+                $job->retries($retries);
+                $job->grabbed_timestamp(undef); # NULL
+                if (1 > $driver->update($job, { reminderid => $job->reminderid })) {
+                    warn "Cannot release a job";
+                }
+            } else {    # All correct: remove a record from reminderstable.
+                $driver->remove('LJ::NewWorker::Reminders' => {
+                    reminderid  => $job->reminderid,
+                },
+                {
+                    nofetch => 1,
+                });
+            }
+
+            ++$count;
+        }
+    }
+
+    return $count;
+}
+
+sub on_idle {
+    sleep 30;
+}
+
+1;

Subscribe

  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 0 comments