[livejournal] r17718: LJSUP-7319: Antispam: massfriending
Committer: vsukhanov
LJSUP-7319: Antispam: massfriendingU trunk/bin/upgrading/proplists.dat U trunk/bin/upgrading/update-db-general.pl A trunk/bin/worker/friending_queue U trunk/cgi-bin/LJ/Event/Befriended.pm A trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm U trunk/cgi-bin/LJ/Event/Defriended.pm A trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm A trunk/cgi-bin/LJ/Event/friendedDelayed.pm U trunk/cgi-bin/LJ/Event.pm A trunk/cgi-bin/LJ/FriendQueue.pm U trunk/cgi-bin/LJ/User.pm U trunk/cgi-bin/ljlib.pl U trunk/cgi-bin/ljprotocol.pl
Modified: trunk/bin/upgrading/proplists.dat
===================================================================
--- trunk/bin/upgrading/proplists.dat 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/bin/upgrading/proplists.dat 2010-11-18 09:01:32 UTC (rev 17718)
@@ -1580,3 +1580,21 @@
ratelist.usermessage:
des: Logged when a users sends a message to another user
+userproplist.sys_base_friending_notif_delay:
+ datatype: num
+ des: Base delay of sending notification to friended/defrended user (for first action in queue).
+ cldversion: 8
+ prettyname: Base system friending notification delay
+
+userproplist.sys_friending_notif_delay:
+ datatype: num
+ des: Delay (per action per action in queue) of sending notification to friended/defrended user.
+ cldversion: 8
+ prettyname: System friending notification delay
+
+userproplist.sys_limit_friending_notif_delay:
+ datatype: num
+ des: Maximum delay of sending notifications. Flush all users action from queue when delay is exceed the limit.
+ cldversion: 8
+ prettyname: System limit of friending notification delay
+
Modified: trunk/bin/upgrading/update-db-general.pl
===================================================================
--- trunk/bin/upgrading/update-db-general.pl 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/bin/upgrading/update-db-general.pl 2010-11-18 09:01:32 UTC (rev 17718)
@@ -3377,6 +3377,22 @@
) TYPE=InnoDB
EOC
+## Queue of delayed Befriending/Defriending events
+register_tablecreate("friending_actions_q", <<'EOC');
+CREATE TABLE friending_actions_q (
+ rec_id INT UNSIGNED NOT NULL AUTO_INCREMENT,
+ userid INT UNSIGNED NOT NULL,
+ friendid INT UNSIGNED NOT NULL,
+ action CHAR(1),
+ etime INT,
+ jobid BIGINT(20) UNSIGNED, -- appropriate schwartz job
+
+ PRIMARY KEY(rec_id),
+ INDEX(userid)
+
+ ) Type=InnoDB
+EOC
+
### changes
register_alter(sub {
Added: trunk/bin/worker/friending_queue
===================================================================
--- trunk/bin/worker/friending_queue (rev 0)
+++ trunk/bin/worker/friending_queue 2010-11-18 09:01:32 UTC (rev 17718)
@@ -0,0 +1,102 @@
+#!/usr/bin/perl
+package TheSchwartz::Worker::FriendingQueue;
+use strict;
+use lib "$ENV{LJHOME}/cgi-bin";
+use base 'LJ::NewWorker::TheSchwartz';
+require 'ljlib.pl';
+
+sub capabilities { 'LJ::Worker::HandleUserFriendingActions', "LJ::Worker::FlushUserFriendingActions" }
+
+__PACKAGE__->start;
+
+package LJ::Worker::HandleUserFriendingActions;
+use strict;
+use base 'TheSchwartz::Worker';
+use LJ::FriendQueue;
+
+sub work {
+ my ($class, $job) = @_;
+ my $args = $job->arg;
+
+ my ($uid, $time) = @$args;
+
+ my $u = LJ::load_userid($uid);
+
+ my @actions = LJ::FriendQueue->load($u->userid);
+ return $job->completed unless @actions; ## everything processed yet
+
+ my $last_jobid = $actions[-1]->{jobid};
+ my $last_rec_id = $actions[-1]->{rec_id};
+
+ ## we should process queue only if the last record is for this job
+ $job->completed if $actions[-1]->{jobid} ne $job->jobid;
+
+ ## get rid of pair actions: friendA, defriendA... or visa versa
+ my %filtered = ();
+ ## scenario assumes that events may be added to queue not in order of their occurance.
+ foreach my $action (@actions){
+ my $friendid = $action->{friendid};
+ my $act = $action->{action}; # add | del
+ my $neg_act = $act eq 'A' ? 'D' : 'A';
+
+ if ($filtered{"$friendid-$neg_act"}){
+ my $skiped_action = shift @{ $filtered{"$friendid-$neg_act"} };
+
+ ## do not hold empty arrays
+ delete $filtered{"$friendid-$neg_act"}
+ if @{ $filtered{"$friendid-$neg_act"} } < 1;
+
+ ## - vs + = 0;
+ $skiped_action->{skiped} = 1; ## set flag
+ $action->{skiped} = 1;
+
+ next;
+ } else {
+ push @{ $filtered{"$friendid-$act"} ||= [] } => $action;
+ }
+ }
+ @actions = grep { not $_->{skiped} } @actions;
+
+ my $sclient = LJ::theschwartz();
+ unless ($sclient){
+ $job->failed("Can't get TheSchwartz client");
+ return;
+ }
+
+ ##
+ foreach my $action (@actions){
+ my $userid = $action->{userid};
+ my $friendid = $action->{friendid};
+
+ my $bfjob = $action->{action} eq 'A'
+ ? LJ::Event::Befriended->new($friendid, $userid)->fire_job
+ : LJ::Event::Defriended->new($friendid, $userid)->fire_job;
+ $sclient->insert_jobs($bfjob);
+ }
+
+ LJ::FriendQueue->empty($u->userid, $last_rec_id);
+
+ $job->completed;
+}
+
+package LJ::Worker::FlushUserFriendingActions;
+use strict;
+use base 'TheSchwartz::Worker';
+use LJ::FriendQueue;
+
+sub work {
+ my ($class, $job) = @_;
+ my $args = $job->arg;
+
+ my ($uid, $time) = @$args;
+
+ my $u = LJ::load_userid($uid);
+ ## remove all
+ LJ::FriendQueue->empty($u->userid);
+ #LJ::FriendQueue->log_
+
+ $job->completed;
+}
+
+
+1;
Property changes on: trunk/bin/worker/friending_queue
___________________________________________________________________
Added: svn:executable
+ *
Modified: trunk/cgi-bin/LJ/Event/Befriended.pm
===================================================================
--- trunk/cgi-bin/LJ/Event/Befriended.pm 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/LJ/Event/Befriended.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -6,11 +6,10 @@
sub new {
my ($class, $u, $fromu) = @_;
- foreach ($u, $fromu) {
- croak 'Not an LJ::User' unless blessed $_ && $_->isa("LJ::User");
- }
- return $class->SUPER::new($u, $fromu->{userid});
+ my $uid = LJ::want_userid($u); ## friendid
+ my $fromuid = LJ::want_userid($fromu);
+ return $class->SUPER::new($uid, $fromuid);
}
sub is_common { 0 }
Added: trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm
===================================================================
--- trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm (rev 0)
+++ trunk/cgi-bin/LJ/Event/BefriendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -0,0 +1,10 @@
+package LJ::Event::BefriendedDelayed;
+use strict;
+use base 'LJ::Event::friendedDelayed';
+
+sub send {
+ my $class = shift;
+ $class->SUPER::send(add => @_);
+}
+
+1;
Modified: trunk/cgi-bin/LJ/Event/Defriended.pm
===================================================================
--- trunk/cgi-bin/LJ/Event/Defriended.pm 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/LJ/Event/Defriended.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -6,11 +6,11 @@
sub new {
my ($class, $u, $fromu) = @_;
- foreach ($u, $fromu) {
- croak 'Not an LJ::User' unless blessed $_ && $_->isa("LJ::User");
- }
- return $class->SUPER::new($u, $fromu->{userid});
+ my $uid = LJ::want_userid($u); ## friendid
+ my $fromuid = LJ::want_userid($fromu);
+
+ return $class->SUPER::new($uid, $fromuid);
}
sub is_common { 0 }
Added: trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm
===================================================================
--- trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm (rev 0)
+++ trunk/cgi-bin/LJ/Event/DefriendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -0,0 +1,9 @@
+package LJ::Event::DefriendedDelayed;
+use strict;
+use base 'LJ::Event::friendedDelayed';
+
+sub send {
+ my $class = shift;
+ $class->SUPER::send(del => @_);
+}
+1;
Added: trunk/cgi-bin/LJ/Event/friendedDelayed.pm
===================================================================
--- trunk/cgi-bin/LJ/Event/friendedDelayed.pm (rev 0)
+++ trunk/cgi-bin/LJ/Event/friendedDelayed.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -0,0 +1,70 @@
+package LJ::Event::friendedDelayed;
+use strict;
+use LJ::FriendQueue;
+
+use constant BASE_DELAY => 1800; # sec
+use constant DELAY_PER_ACTION => 600; # sec
+use constant DELAY_LIMIT => 24 * 60 * 60; # sec
+
+sub send {
+ my $class = shift;
+
+ my $action = shift;
+ my $tou = shift;
+ my $fromu = shift;
+
+ die "Wrong action parameter: $action"
+ unless $action =~ /^add|del$/;
+
+ ## 1. create TheSchwartz job
+ ## 1.1. calculate delay
+ ##
+ ## 2. add info to FriendQueue
+
+ ## delay values
+ my $systemu = LJ::load_user('system');
+ LJ::load_user_props($systemu, qw/sys_base_friending_notif_delay
+ sys_friending_notif_delay
+ sys_limit_friending_notif_delay
+ /);
+ my $base_delay = $systemu->prop('sys_base_friending_notif_delay') || BASE_DELAY;
+ my $delay_per_action = $systemu->prop('sys_friending_notif_delay') || DELAY_PER_ACTION;
+ my $max_delay = $systemu->prop('sys_limit_friending_notif_delay') || DELAY_LIMIT;
+
+ ## delay for a job
+ my $actions_in_q = LJ::FriendQueue->count($fromu->userid);
+ my $delay = (1 + $actions_in_q) * DELAY_PER_ACTION + BASE_DELAY; # sec
+ my $funcname = "LJ::Worker::HandleUserFriendingActions";
+
+ ## After each user's activities (friend/defriend) we increase the delay.
+ ## If the delay exceeded the allowed level (DELAY_LIMIT) than flush all records
+ ## and add info about it to log.
+ if ($delay >= DELAY_LIMIT){
+ $funcname = "LJ::Worker::FlushUserFriendingActions";
+ $delay = 0; # flush right now ))
+ }
+
+ ##
+ my $time = time();
+ my $job = TheSchwartz::Job->new(
+ funcname => $funcname,
+ arg => [$fromu->userid, $time],
+ run_after => $time + $delay,
+ );
+
+ my $sclient = LJ::theschwartz();
+ $sclient->insert_jobs($job);
+
+ my $jobid = $job->jobid; ## defined aftere insert_jobs
+
+ LJ::FriendQueue->push(
+ userid => $fromu->userid,
+ friendid => $tou->userid,
+ action => $action,
+ etime => $time,
+ jobid => $jobid,
+ );
+
+}
+
+1;
Modified: trunk/cgi-bin/LJ/Event.pm
===================================================================
--- trunk/cgi-bin/LJ/Event.pm 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/LJ/Event.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -75,7 +75,7 @@
confess("too many args") if @args > 4;
return bless {
- userid => $u ? $u->id : 0,
+ userid => LJ::want_userid($u),
args => \@args,
}, $class;
}
Added: trunk/cgi-bin/LJ/FriendQueue.pm
===================================================================
--- trunk/cgi-bin/LJ/FriendQueue.pm (rev 0)
+++ trunk/cgi-bin/LJ/FriendQueue.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -0,0 +1,87 @@
+package LJ::FriendQueue;
+use strict;
+
+sub count {
+ my $class = shift;
+ my $userid = shift;
+
+ ## TODO: use memcached?
+
+ my $u = LJ::load_userid($userid);
+ my $dbcr = LJ::get_cluster_reader($u->clusterid);
+
+ my ($count) = $dbcr->selectrow_array("
+ SELECT count(1)
+ FROM friending_actions_q
+ WHERE
+ userid = ?
+ ", undef, $u->userid)
+ or warn "Can't select from friend_actions_q: " . DBI->errstr;
+
+ return $count;
+}
+
+
+sub push {
+ my $class = shift;
+ my %opts = @_;
+ my $userid = $opts{userid};
+ my $friendid = $opts{friendid};
+ my $action = $opts{action} eq 'add' ? 'A' : 'D';
+ my $etime = $opts{etime} || time();
+ my $jobid = $opts{jobid};
+
+ my $u = LJ::load_userid($userid);
+ my $dbcw = LJ::get_cluster_master($u->clusterid);
+
+ $dbcw->do("INSERT INTO friending_actions_q
+ (userid, friendid, action, etime, jobid)
+ VALUES
+ (?,?,?,?,?)", undef,
+ $userid, $friendid, $action, $etime, $jobid)
+ or die "Can't insert into friend_actions_q: " . DBI->errstr;
+}
+
+sub load {
+ my $class = shift;
+ my $userid = shift;
+
+ my $u = LJ::load_userid($userid);
+ my $dbcr = LJ::get_cluster_reader($u->clusterid);
+ my $sth = $dbcr->prepare("
+ SELECT *
+ FROM friending_actions_q
+ WHERE
+ userid = ?
+ ORDER BY rec_id
+ ");
+ $sth->execute($u->userid);
+ my @actions = ();
+ while (my $h = $sth->fetchrow_hashref){
+ push @actions => $h;
+ }
+
+ return @actions;
+}
+
+sub empty {
+ my $class = shift;
+ my $userid = shift;
+ my $rec_id = shift;
+
+ my $u = LJ::load_userid($userid);
+ my $dbcw = LJ::get_cluster_master($u->clusterid);
+ my $rec_id_st = $rec_id ? " AND rec_id <= " . int($rec_id) : "";
+ $dbcw->do("DELETE
+ FROM friending_actions_q
+ WHERE
+ userid = ?
+ $rec_id_st
+ ", undef, $userid)
+ or die "Can't flush records from friending_actions_q: " . DBI->errstr;
+
+ return 1;
+}
+
+1;
+
Modified: trunk/cgi-bin/LJ/User.pm
===================================================================
--- trunk/cgi-bin/LJ/User.pm 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/LJ/User.pm 2010-11-18 09:01:32 UTC (rev 17718)
@@ -7996,7 +7996,8 @@
my $friender = LJ::load_userid($userid);
my $friendee = LJ::load_userid($fid);
if ($notify && !$friendee->is_banned($friender)) {
- push @jobs, LJ::Event::Befriended->new($friendee, $friender)->fire_job;
+ require LJ::Event::BefriendedDelayed;
+ LJ::Event::BefriendedDelayed->send($friendee, $friender);
}
push @jobs, TheSchwartz::Job->new(
@@ -8062,7 +8063,8 @@
# only fire event if the friender is a person and not banned and visible
if ($notify && !$friendee->has_banned($u)) {
- push @jobs, LJ::Event::Defriended->new($friendee, $u)->fire_job;
+ require LJ::Event::DefriendedDelayed;
+ LJ::Event::DefriendedDelayed->send($friendee, $u);
}
push @jobs, TheSchwartz::Job->new(
Modified: trunk/cgi-bin/ljlib.pl
===================================================================
--- trunk/cgi-bin/ljlib.pl 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/ljlib.pl 2010-11-18 09:01:32 UTC (rev 17718)
@@ -103,6 +103,7 @@
"logprop_history",
"comet_history", "pingrel",
"eventrates", "eventratescounters",
+ "friending_actions_q",
);
# keep track of what db locks we have out
Modified: trunk/cgi-bin/ljprotocol.pl
===================================================================
--- trunk/cgi-bin/ljprotocol.pl 2010-11-18 08:55:46 UTC (rev 17717)
+++ trunk/cgi-bin/ljprotocol.pl 2010-11-18 09:01:32 UTC (rev 17718)
@@ -6,7 +6,6 @@
use LJ::Constants;
use Class::Autouse qw(
- LJ::Console
LJ::Event::JournalNewEntry
LJ::Event::UserNewEntry
LJ::Event::Befriended
@@ -2876,10 +2875,13 @@
LJ::memcache_kill($friendid, 'friendofs2');
if ($sclient && !$currently_is_friend && !$currently_is_banned) {
+ ## delay event to accumulate users activity
+ require LJ::Event::BefriendedDelayed;
+ LJ::Event::BefriendedDelayed->new(
+ LJ::load_userid($friendid), ## to user
+ LJ::load_userid($userid) ## from user
+ );
my @jobs;
- push @jobs, LJ::Event::Befriended->new(LJ::load_userid($friendid), LJ::load_userid($userid))->fire_job
- if !$LJ::DISABLED{esn};
-
push @jobs, TheSchwartz::Job->new(
funcname => "LJ::Worker::FriendChange",
arg => [$userid, 'add', $friendid],
@@ -3315,6 +3317,7 @@
};
my $cmdout = $res->{'results'} = [];
+ require LJ::Console;
foreach my $cmd (@{$req->{'commands'}}) {
# callee can pre-parse the args, or we can do it bash-style
my @args = ref $cmd eq "ARRAY" ? @$cmd
