[livejournal] r21733: LJSUP-10900: Implement MessagePack proto...
Committer: afedorov
LJSUP-10900: Implement MessagePack protocol driver to access to Relation Service.U trunk/cgi-bin/LJ/RelationService/MysqlAPI.pm U trunk/cgi-bin/LJ/RelationService.pm U trunk/cgi-bin/ljrelation.pl
Modified: trunk/cgi-bin/LJ/RelationService/MysqlAPI.pm
===================================================================
--- trunk/cgi-bin/LJ/RelationService/MysqlAPI.pm 2012-04-17 11:05:53 UTC (rev 21732)
+++ trunk/cgi-bin/LJ/RelationService/MysqlAPI.pm 2012-04-17 13:23:12 UTC (rev 21733)
@@ -810,4 +810,138 @@
return 1;
}
+sub set_rel_multi {
+ my $class = shift;
+ my $edges = shift;
+ return _mod_rel_multi({ mode => 'set', edges => $edges });
+}
+
+sub clear_rel_multi {
+ my $class = shift;
+ my $edges = shift;
+ return _mod_rel_multi({ mode => 'clear', edges => $edges });
+}
+
+# <LJFUNC>
+# name: LJ::RelationService::MysqlAPI::_mod_rel_multi
+# des: Sets/Clears relationship edges for lists of user tuples.
+# args: keys, edges
+# des-keys: keys: mode => {clear|set}.
+# des-edges: edges => array of arrayrefs of edges to set: [userid, targetid, type]
+# Where:
+# userid: source userid, or a user hash;
+# targetid: target userid, or a user hash;
+# type: type of the relationship.
+# returns: 1 if all updates succeeded, otherwise undef
+# </LJFUNC>
+sub _mod_rel_multi
+{
+ my $opts = shift;
+ return undef unless @{$opts->{edges}};
+
+ my $mode = $opts->{mode} eq 'clear' ? 'clear' : 'set';
+ my $memval = $mode eq 'set' ? 1 : 0;
+
+ my @reluser = (); # [userid, targetid, type]
+ my @reluser2 = ();
+ foreach my $edge (@{$opts->{edges}}) {
+ my ($userid, $targetid, $type) = @$edge;
+ $userid = LJ::want_userid($userid);
+ $targetid = LJ::want_userid($targetid);
+ next unless $type && $userid && $targetid;
+
+ my $typeid = LJ::get_reluser_id($type)+0;
+ my $eff_type = $typeid || $type;
+
+ # working on reluser or reluser2?
+ push @{$typeid ? \@reluser2 : \@reluser}, [$userid, $targetid, $eff_type];
+ }
+
+ # now group reluser2 edges by clusterid
+ my %reluser2 = (); # cid => [userid, targetid, type]
+ my $users = LJ::load_userids(map { $_->[0] } @reluser2);
+ foreach (@reluser2) {
+ my $cid = $users->{$_->[0]}->{clusterid} or next;
+ push @{$reluser2{$cid}}, $_;
+ }
+ @reluser2 = ();
+
+ # try to get all required cluster masters before we start doing database updates
+ my %cache_dbcm = ();
+ foreach my $cid (keys %reluser2) {
+ next unless @{$reluser2{$cid}};
+
+ # return undef immediately if we won't be able to do all the updates
+ $cache_dbcm{$cid} = LJ::get_cluster_master($cid)
+ or return undef;
+ }
+
+ # if any error occurs with a cluster, we'll skip over that cluster and continue
+ # trying to process others since we've likely already done some amount of db
+ # updates already, but we'll return undef to signify that everything did not
+ # go smoothly
+ my $ret = 1;
+
+ # do clustered reluser2 updates
+ foreach my $cid (keys %cache_dbcm) {
+ # array of arrayrefs: [userid, targetid, type]
+ my @edges = @{$reluser2{$cid}};
+
+ # set in database, then in memcache. keep the two atomic per clusterid
+ my $dbcm = $cache_dbcm{$cid};
+
+ my @vals = map { @$_ } @edges;
+
+ if ($mode eq 'set') {
+ my $bind = join(",", map { "(?,?,?)" } @edges);
+ $dbcm->do("REPLACE INTO reluser2 (userid, targetid, type) VALUES $bind",
+ undef, @vals);
+ }
+
+ if ($mode eq 'clear') {
+ my $where = join(" OR ", map { "(userid=? AND targetid=? AND type=?)" } @edges);
+ $dbcm->do("DELETE FROM reluser2 WHERE $where", undef, @vals);
+ }
+
+ # don't update memcache if db update failed for this cluster
+ if ($dbcm->err) {
+ $ret = undef;
+ next;
+ }
+
+ # updates to this cluster succeeded, set memcache
+ LJ::_set_rel_memcache(@$_, $memval) foreach @edges;
+ }
+
+ # do global reluser updates
+ if (@reluser) {
+
+ # nothing to do after this block but return, so we can
+ # immediately return undef from here if there's a problem
+ my $dbh = LJ::get_db_writer()
+ or return undef;
+
+ my @vals = map { @$_ } @reluser;
+
+ if ($mode eq 'set') {
+ my $bind = join(",", map { "(?,?,?)" } @reluser);
+ $dbh->do("REPLACE INTO reluser (userid, targetid, type) VALUES $bind",
+ undef, @vals);
+ }
+
+ if ($mode eq 'clear') {
+ my $where = join(" OR ", map { "userid=? AND targetid=? AND type=?" } @reluser);
+ $dbh->do("DELETE FROM reluser WHERE $where", undef, @vals);
+ }
+
+ # don't update memcache if db update failed for this cluster
+ return undef if $dbh->err;
+
+ # $_ = [userid, targetid, type] for each iteration
+ LJ::_set_rel_memcache(@$_, $memval) foreach @reluser;
+ }
+
+ return $ret;
+}
+
1;
Modified: trunk/cgi-bin/LJ/RelationService.pm
===================================================================
--- trunk/cgi-bin/LJ/RelationService.pm 2012-04-17 11:05:53 UTC (rev 21732)
+++ trunk/cgi-bin/LJ/RelationService.pm 2012-04-17 13:23:12 UTC (rev 21733)
@@ -229,4 +229,38 @@
return $interface->delete_and_purge_completely($u, %opts);
}
+sub clear_rel_multi {
+ my $class = shift;
+ my $edges = shift;
+
+ return undef unless ref $edges eq 'ARRAY';
+
+ if ($class->_load_alt_api('write', 'B')) {
+ my $alt = $class->alt_api();
+ if ($alt) {
+ $alt->clear_rel_multi($edges);
+ }
+ }
+
+ my $interface = $class->relation_api();
+ return $interface->clear_rel_multi($edges);
+}
+
+sub set_rel_multi {
+ my $class = shift;
+ my $edges = shift;
+
+ return undef unless ref $edges eq 'ARRAY';
+
+ if ($class->_load_alt_api('write', 'B')) {
+ my $alt = $class->alt_api();
+ if ($alt) {
+ $alt->set_rel_multi($edges);
+ }
+ }
+
+ my $interface = $class->relation_api();
+ return $interface->set_rel_multi($edges);
+}
+
1;
Modified: trunk/cgi-bin/ljrelation.pl
===================================================================
--- trunk/cgi-bin/ljrelation.pl 2012-04-17 11:05:53 UTC (rev 21732)
+++ trunk/cgi-bin/ljrelation.pl 2012-04-17 13:23:12 UTC (rev 21733)
@@ -300,7 +300,7 @@
# returns: 1 if all sets succeeded, otherwise undef
# </LJFUNC>
sub set_rel_multi {
- return _mod_rel_multi({ mode => 'set', edges => \@_ });
+ return LJ::RelationService->set_rel_multi( \@_ );
}
# <LJFUNC>
@@ -315,133 +315,10 @@
# returns: 1 if all clears succeeded, otherwise undef
# </LJFUNC>
sub clear_rel_multi {
- return _mod_rel_multi({ mode => 'clear', edges => \@_ });
+ return LJ::RelationService->clear_rel_multi( \@_ );
}
# <LJFUNC>
-# name: LJ::_mod_rel_multi
-# des: Sets/Clears relationship edges for lists of user tuples.
-# args: keys, edges
-# des-keys: keys: mode => {clear|set}.
-# des-edges: edges => array of arrayrefs of edges to set: [userid, targetid, type]
-# Where:
-# userid: source userid, or a user hash;
-# targetid: target userid, or a user hash;
-# type: type of the relationship.
-# returns: 1 if all updates succeeded, otherwise undef
-# </LJFUNC>
-sub _mod_rel_multi
-{
- my $opts = shift;
- return undef unless @{$opts->{edges}};
-
- my $mode = $opts->{mode} eq 'clear' ? 'clear' : 'set';
- my $memval = $mode eq 'set' ? 1 : 0;
-
- my @reluser = (); # [userid, targetid, type]
- my @reluser2 = ();
- foreach my $edge (@{$opts->{edges}}) {
- my ($userid, $targetid, $type) = @$edge;
- $userid = LJ::want_userid($userid);
- $targetid = LJ::want_userid($targetid);
- next unless $type && $userid && $targetid;
-
- my $typeid = LJ::get_reluser_id($type)+0;
- my $eff_type = $typeid || $type;
-
- # working on reluser or reluser2?
- push @{$typeid ? \@reluser2 : \@reluser}, [$userid, $targetid, $eff_type];
- }
-
- # now group reluser2 edges by clusterid
- my %reluser2 = (); # cid => [userid, targetid, type]
- my $users = LJ::load_userids(map { $_->[0] } @reluser2);
- foreach (@reluser2) {
- my $cid = $users->{$_->[0]}->{clusterid} or next;
- push @{$reluser2{$cid}}, $_;
- }
- @reluser2 = ();
-
- # try to get all required cluster masters before we start doing database updates
- my %cache_dbcm = ();
- foreach my $cid (keys %reluser2) {
- next unless @{$reluser2{$cid}};
-
- # return undef immediately if we won't be able to do all the updates
- $cache_dbcm{$cid} = LJ::get_cluster_master($cid)
- or return undef;
- }
-
- # if any error occurs with a cluster, we'll skip over that cluster and continue
- # trying to process others since we've likely already done some amount of db
- # updates already, but we'll return undef to signify that everything did not
- # go smoothly
- my $ret = 1;
-
- # do clustered reluser2 updates
- foreach my $cid (keys %cache_dbcm) {
- # array of arrayrefs: [userid, targetid, type]
- my @edges = @{$reluser2{$cid}};
-
- # set in database, then in memcache. keep the two atomic per clusterid
- my $dbcm = $cache_dbcm{$cid};
-
- my @vals = map { @$_ } @edges;
-
- if ($mode eq 'set') {
- my $bind = join(",", map { "(?,?,?)" } @edges);
- $dbcm->do("REPLACE INTO reluser2 (userid, targetid, type) VALUES $bind",
- undef, @vals);
- }
-
- if ($mode eq 'clear') {
- my $where = join(" OR ", map { "(userid=? AND targetid=? AND type=?)" } @edges);
- $dbcm->do("DELETE FROM reluser2 WHERE $where", undef, @vals);
- }
-
- # don't update memcache if db update failed for this cluster
- if ($dbcm->err) {
- $ret = undef;
- next;
- }
-
- # updates to this cluster succeeded, set memcache
- LJ::_set_rel_memcache(@$_, $memval) foreach @edges;
- }
-
- # do global reluser updates
- if (@reluser) {
-
- # nothing to do after this block but return, so we can
- # immediately return undef from here if there's a problem
- my $dbh = LJ::get_db_writer()
- or return undef;
-
- my @vals = map { @$_ } @reluser;
-
- if ($mode eq 'set') {
- my $bind = join(",", map { "(?,?,?)" } @reluser);
- $dbh->do("REPLACE INTO reluser (userid, targetid, type) VALUES $bind",
- undef, @vals);
- }
-
- if ($mode eq 'clear') {
- my $where = join(" OR ", map { "userid=? AND targetid=? AND type=?" } @reluser);
- $dbh->do("DELETE FROM reluser WHERE $where", undef, @vals);
- }
-
- # don't update memcache if db update failed for this cluster
- return undef if $dbh->err;
-
- # $_ = [userid, targetid, type] for each iteration
- LJ::_set_rel_memcache(@$_, $memval) foreach @reluser;
- }
-
- return $ret;
-}
-
-
-# <LJFUNC>
# name: LJ::clear_rel
# des: Deletes a relationship between two users or all relationships of a particular type
# for one user, on either side of the relationship.

