Committer: vsukhanov
LJSUP-7319: Antispam: massfriendingU trunk/bin/upgrading/proplists.dat U trunk/bin/upgrading/update-db-general.pl A trunk/bin/worker/friending_queue U trunk/cgi-bin/LJ/Event/Befriended.pm A trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm U trunk/cgi-bin/LJ/Event/Defriended.pm A trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm A trunk/cgi-bin/LJ/Event/friendedDelayed.pm U trunk/cgi-bin/LJ/Event.pm A trunk/cgi-bin/LJ/FriendQueue.pm U trunk/cgi-bin/LJ/User.pm U trunk/cgi-bin/ljlib.pl U trunk/cgi-bin/ljprotocol.pl
Modified: trunk/bin/upgrading/proplists.dat =================================================================== --- trunk/bin/upgrading/proplists.dat 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/bin/upgrading/proplists.dat 2010-11-18 09:01:32 UTC (rev 17718) @@ -1580,3 +1580,21 @@ ratelist.usermessage: des: Logged when a users sends a message to another user +userproplist.sys_base_friending_notif_delay: + datatype: num + des: Base delay of sending notification to friended/defrended user (for first action in queue). + cldversion: 8 + prettyname: Base system friending notification delay + +userproplist.sys_friending_notif_delay: + datatype: num + des: Delay (per action per action in queue) of sending notification to friended/defrended user. + cldversion: 8 + prettyname: System friending notification delay + +userproplist.sys_limit_friending_notif_delay: + datatype: num + des: Maximum delay of sending notifications. Flush all users action from queue when delay is exceed the limit. + cldversion: 8 + prettyname: System limit of friending notification delay + Modified: trunk/bin/upgrading/update-db-general.pl =================================================================== --- trunk/bin/upgrading/update-db-general.pl 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/bin/upgrading/update-db-general.pl 2010-11-18 09:01:32 UTC (rev 17718) @@ -3377,6 +3377,22 @@ ) TYPE=InnoDB EOC +## Queue of delayed Befriending/Defriending events +register_tablecreate("friending_actions_q", <<'EOC'); +CREATE TABLE friending_actions_q ( + rec_id INT UNSIGNED NOT NULL AUTO_INCREMENT, + userid INT UNSIGNED NOT NULL, + friendid INT UNSIGNED NOT NULL, + action CHAR(1), + etime INT, + jobid BIGINT(20) UNSIGNED, -- appropriate schwartz job + + PRIMARY KEY(rec_id), + INDEX(userid) + + ) Type=InnoDB +EOC + ### changes register_alter(sub { Added: trunk/bin/worker/friending_queue =================================================================== --- trunk/bin/worker/friending_queue (rev 0) +++ trunk/bin/worker/friending_queue 2010-11-18 09:01:32 UTC (rev 17718) @@ -0,0 +1,102 @@ +#!/usr/bin/perl +package TheSchwartz::Worker::FriendingQueue; +use strict; +use lib "$ENV{LJHOME}/cgi-bin"; +use base 'LJ::NewWorker::TheSchwartz'; +require 'ljlib.pl'; + +sub capabilities { 'LJ::Worker::HandleUserFriendingActions', "LJ::Worker::FlushUserFriendingActions" } + +__PACKAGE__->start; + +package LJ::Worker::HandleUserFriendingActions; +use strict; +use base 'TheSchwartz::Worker'; +use LJ::FriendQueue; + +sub work { + my ($class, $job) = @_; + my $args = $job->arg; + + my ($uid, $time) = @$args; + + my $u = LJ::load_userid($uid); + + my @actions = LJ::FriendQueue->load($u->userid); + return $job->completed unless @actions; ## everything processed yet + + my $last_jobid = $actions[-1]->{jobid}; + my $last_rec_id = $actions[-1]->{rec_id}; + + ## we should process queue only if the last record is for this job + $job->completed if $actions[-1]->{jobid} ne $job->jobid; + + ## get rid of pair actions: friendA, defriendA... or visa versa + my %filtered = (); + ## scenario assumes that events may be added to queue not in order of their occurance. + foreach my $action (@actions){ + my $friendid = $action->{friendid}; + my $act = $action->{action}; # add | del + my $neg_act = $act eq 'A' ? 'D' : 'A'; + + if ($filtered{"$friendid-$neg_act"}){ + my $skiped_action = shift @{ $filtered{"$friendid-$neg_act"} }; + + ## do not hold empty arrays + delete $filtered{"$friendid-$neg_act"} + if @{ $filtered{"$friendid-$neg_act"} } < 1; + + ## - vs + = 0; + $skiped_action->{skiped} = 1; ## set flag + $action->{skiped} = 1; + + next; + } else { + push @{ $filtered{"$friendid-$act"} ||= [] } => $action; + } + } + @actions = grep { not $_->{skiped} } @actions; + + my $sclient = LJ::theschwartz(); + unless ($sclient){ + $job->failed("Can't get TheSchwartz client"); + return; + } + + ## + foreach my $action (@actions){ + my $userid = $action->{userid}; + my $friendid = $action->{friendid}; + + my $bfjob = $action->{action} eq 'A' + ? LJ::Event::Befriended->new($friendid, $userid)->fire_job + : LJ::Event::Defriended->new($friendid, $userid)->fire_job; + $sclient->insert_jobs($bfjob); + } + + LJ::FriendQueue->empty($u->userid, $last_rec_id); + + $job->completed; +} + +package LJ::Worker::FlushUserFriendingActions; +use strict; +use base 'TheSchwartz::Worker'; +use LJ::FriendQueue; + +sub work { + my ($class, $job) = @_; + my $args = $job->arg; + + my ($uid, $time) = @$args; + + my $u = LJ::load_userid($uid); + ## remove all + LJ::FriendQueue->empty($u->userid); + #LJ::FriendQueue->log_ + + $job->completed; +} + + +1; Property changes on: trunk/bin/worker/friending_queue ___________________________________________________________________ Added: svn:executable + * Modified: trunk/cgi-bin/LJ/Event/Befriended.pm =================================================================== --- trunk/cgi-bin/LJ/Event/Befriended.pm 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/LJ/Event/Befriended.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -6,11 +6,10 @@ sub new { my ($class, $u, $fromu) = @_; - foreach ($u, $fromu) { - croak 'Not an LJ::User' unless blessed $_ && $_->isa("LJ::User"); - } - return $class->SUPER::new($u, $fromu->{userid}); + my $uid = LJ::want_userid($u); ## friendid + my $fromuid = LJ::want_userid($fromu); + return $class->SUPER::new($uid, $fromuid); } sub is_common { 0 } Added: trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm =================================================================== --- trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm (rev 0) +++ trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -0,0 +1,10 @@ +package LJ::Event::BefriendedDelayed; +use strict; +use base 'LJ::Event::friendedDelayed'; + +sub send { + my $class = shift; + $class->SUPER::send(add => @_); +} + +1; Modified: trunk/cgi-bin/LJ/Event/Defriended.pm =================================================================== --- trunk/cgi-bin/LJ/Event/Defriended.pm 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/LJ/Event/Defriended.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -6,11 +6,11 @@ sub new { my ($class, $u, $fromu) = @_; - foreach ($u, $fromu) { - croak 'Not an LJ::User' unless blessed $_ && $_->isa("LJ::User"); - } - return $class->SUPER::new($u, $fromu->{userid}); + my $uid = LJ::want_userid($u); ## friendid + my $fromuid = LJ::want_userid($fromu); + + return $class->SUPER::new($uid, $fromuid); } sub is_common { 0 } Added: trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm =================================================================== --- trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm (rev 0) +++ trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -0,0 +1,9 @@ +package LJ::Event::DefriendedDelayed; +use strict; +use base 'LJ::Event::friendedDelayed'; + +sub send { + my $class = shift; + $class->SUPER::send(del => @_); +} +1; Added: trunk/cgi-bin/LJ/Event/friendedDelayed.pm =================================================================== --- trunk/cgi-bin/LJ/Event/friendedDelayed.pm (rev 0) +++ trunk/cgi-bin/LJ/Event/friendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -0,0 +1,70 @@ +package LJ::Event::friendedDelayed; +use strict; +use LJ::FriendQueue; + +use constant BASE_DELAY => 1800; # sec +use constant DELAY_PER_ACTION => 600; # sec +use constant DELAY_LIMIT => 24 * 60 * 60; # sec + +sub send { + my $class = shift; + + my $action = shift; + my $tou = shift; + my $fromu = shift; + + die "Wrong action parameter: $action" + unless $action =~ /^add|del$/; + + ## 1. create TheSchwartz job + ## 1.1. calculate delay + ## + ## 2. add info to FriendQueue + + ## delay values + my $systemu = LJ::load_user('system'); + LJ::load_user_props($systemu, qw/sys_base_friending_notif_delay + sys_friending_notif_delay + sys_limit_friending_notif_delay + /); + my $base_delay = $systemu->prop('sys_base_friending_notif_delay') || BASE_DELAY; + my $delay_per_action = $systemu->prop('sys_friending_notif_delay') || DELAY_PER_ACTION; + my $max_delay = $systemu->prop('sys_limit_friending_notif_delay') || DELAY_LIMIT; + + ## delay for a job + my $actions_in_q = LJ::FriendQueue->count($fromu->userid); + my $delay = (1 + $actions_in_q) * DELAY_PER_ACTION + BASE_DELAY; # sec + my $funcname = "LJ::Worker::HandleUserFriendingActions"; + + ## After each user's activities (friend/defriend) we increase the delay. + ## If the delay exceeded the allowed level (DELAY_LIMIT) than flush all records + ## and add info about it to log. + if ($delay >= DELAY_LIMIT){ + $funcname = "LJ::Worker::FlushUserFriendingActions"; + $delay = 0; # flush right now )) + } + + ## + my $time = time(); + my $job = TheSchwartz::Job->new( + funcname => $funcname, + arg => [$fromu->userid, $time], + run_after => $time + $delay, + ); + + my $sclient = LJ::theschwartz(); + $sclient->insert_jobs($job); + + my $jobid = $job->jobid; ## defined aftere insert_jobs + + LJ::FriendQueue->push( + userid => $fromu->userid, + friendid => $tou->userid, + action => $action, + etime => $time, + jobid => $jobid, + ); + +} + +1; Modified: trunk/cgi-bin/LJ/Event.pm =================================================================== --- trunk/cgi-bin/LJ/Event.pm 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/LJ/Event.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -75,7 +75,7 @@ confess("too many args") if @args > 4; return bless { - userid => $u ? $u->id : 0, + userid => LJ::want_userid($u), args => \@args, }, $class; } Added: trunk/cgi-bin/LJ/FriendQueue.pm =================================================================== --- trunk/cgi-bin/LJ/FriendQueue.pm (rev 0) +++ trunk/cgi-bin/LJ/FriendQueue.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -0,0 +1,87 @@ +package LJ::FriendQueue; +use strict; + +sub count { + my $class = shift; + my $userid = shift; + + ## TODO: use memcached? + + my $u = LJ::load_userid($userid); + my $dbcr = LJ::get_cluster_reader($u->clusterid); + + my ($count) = $dbcr->selectrow_array(" + SELECT count(1) + FROM friending_actions_q + WHERE + userid = ? + ", undef, $u->userid) + or warn "Can't select from friend_actions_q: " . DBI->errstr; + + return $count; +} + + +sub push { + my $class = shift; + my %opts = @_; + my $userid = $opts{userid}; + my $friendid = $opts{friendid}; + my $action = $opts{action} eq 'add' ? 'A' : 'D'; + my $etime = $opts{etime} || time(); + my $jobid = $opts{jobid}; + + my $u = LJ::load_userid($userid); + my $dbcw = LJ::get_cluster_master($u->clusterid); + + $dbcw->do("INSERT INTO friending_actions_q + (userid, friendid, action, etime, jobid) + VALUES + (?,?,?,?,?)", undef, + $userid, $friendid, $action, $etime, $jobid) + or die "Can't insert into friend_actions_q: " . DBI->errstr; +} + +sub load { + my $class = shift; + my $userid = shift; + + my $u = LJ::load_userid($userid); + my $dbcr = LJ::get_cluster_reader($u->clusterid); + my $sth = $dbcr->prepare(" + SELECT * + FROM friending_actions_q + WHERE + userid = ? + ORDER BY rec_id + "); + $sth->execute($u->userid); + my @actions = (); + while (my $h = $sth->fetchrow_hashref){ + push @actions => $h; + } + + return @actions; +} + +sub empty { + my $class = shift; + my $userid = shift; + my $rec_id = shift; + + my $u = LJ::load_userid($userid); + my $dbcw = LJ::get_cluster_master($u->clusterid); + my $rec_id_st = $rec_id ? " AND rec_id <= " . int($rec_id) : ""; + $dbcw->do("DELETE + FROM friending_actions_q + WHERE + userid = ? + $rec_id_st + ", undef, $userid) + or die "Can't flush records from friending_actions_q: " . DBI->errstr; + + return 1; +} + +1; + Modified: trunk/cgi-bin/LJ/User.pm =================================================================== --- trunk/cgi-bin/LJ/User.pm 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/LJ/User.pm 2010-11-18 09:01:32 UTC (rev 17718) @@ -7996,7 +7996,8 @@ my $friender = LJ::load_userid($userid); my $friendee = LJ::load_userid($fid); if ($notify && !$friendee->is_banned($friender)) { - push @jobs, LJ::Event::Befriended->new($friendee, $friender)->fire_job; + require LJ::Event::BefriendedDelayed; + LJ::Event::BefriendedDelayed->send($friendee, $friender); } push @jobs, TheSchwartz::Job->new( @@ -8062,7 +8063,8 @@ # only fire event if the friender is a person and not banned and visible if ($notify && !$friendee->has_banned($u)) { - push @jobs, LJ::Event::Defriended->new($friendee, $u)->fire_job; + require LJ::Event::DefriendedDelayed; + LJ::Event::DefriendedDelayed->send($friendee, $u); } push @jobs, TheSchwartz::Job->new( Modified: trunk/cgi-bin/ljlib.pl =================================================================== --- trunk/cgi-bin/ljlib.pl 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/ljlib.pl 2010-11-18 09:01:32 UTC (rev 17718) @@ -103,6 +103,7 @@ "logprop_history", "comet_history", "pingrel", "eventrates", "eventratescounters", + "friending_actions_q", ); # keep track of what db locks we have out Modified: trunk/cgi-bin/ljprotocol.pl =================================================================== --- trunk/cgi-bin/ljprotocol.pl 2010-11-18 08:55:46 UTC (rev 17717) +++ trunk/cgi-bin/ljprotocol.pl 2010-11-18 09:01:32 UTC (rev 17718) @@ -6,7 +6,6 @@ use LJ::Constants; use Class::Autouse qw( - LJ::Console LJ::Event::JournalNewEntry LJ::Event::UserNewEntry LJ::Event::Befriended @@ -2876,10 +2875,13 @@ LJ::memcache_kill($friendid, 'friendofs2'); if ($sclient && !$currently_is_friend && !$currently_is_banned) { + ## delay event to accumulate users activity + require LJ::Event::BefriendedDelayed; + LJ::Event::BefriendedDelayed->new( + LJ::load_userid($friendid), ## to user + LJ::load_userid($userid) ## from user + ); my @jobs; - push @jobs, LJ::Event::Befriended->new(LJ::load_userid($friendid), LJ::load_userid($userid))->fire_job - if !$LJ::DISABLED{esn}; - push @jobs, TheSchwartz::Job->new( funcname => "LJ::Worker::FriendChange", arg => [$userid, 'add', $friendid], @@ -3315,6 +3317,7 @@ }; my $cmdout = $res->{'results'} = []; + require LJ::Console; foreach my $cmd (@{$req->{'commands'}}) { # callee can pre-parse the args, or we can do it bash-style my @args = ref $cmd eq "ARRAY" ? @$cmd