vadvs (vadvs) wrote in changelog,
vadvs
vadvs
changelog

[TheSchwartz-Worker-SendEmail] r13: LJSUP-7095: optimize send mail process

Committer: vad
LJSUP-7095: optimize send mail process
U   trunk/lib/TheSchwartz/Worker/SendEmail.pm
Modified: trunk/lib/TheSchwartz/Worker/SendEmail.pm
===================================================================
--- trunk/lib/TheSchwartz/Worker/SendEmail.pm	2010-07-20 03:32:26 UTC (rev 12)
+++ trunk/lib/TheSchwartz/Worker/SendEmail.pm	2010-10-08 09:54:24 UTC (rev 13)
@@ -41,6 +41,7 @@
 use Net::DNS qw(mx);
 use Storable;
 use LJ::User::Email;
+use LJ::DoSendEmail;
 
 our $VERSION = '1.00';
 
@@ -59,19 +60,6 @@
 
 =cut
 
-# status_code:
-#   undef   - OK
-#   0       - cannot connect to MX-host or email domain.
-#   5xx     - smtp-status
-#
-sub log_complete_status {
-    my $status_code = shift;
-    my $emails      = shift;    # One email if scalar or list of emails if array ref.
-    my $message     = shift;
-
-    LJ::User::Email->mark($status_code, $emails, $message);
-}
-
 sub set_resolver {
     $resolver = $_[1];
 }
@@ -113,10 +101,45 @@
 
 sub work {
     my ($class, $job) = @_;
-    my $args = $job->arg;
+    my $args   = $job->arg;
     my $client = $job->handle->client;
-    my $rcpts    = $args->{rcpts};     # arrayref of recipients
+    my $rcpts  = $args->{rcpts}; # arrayref of recipients OR rcpt
 
+use Data::Dumper;
+warn "RCPTS: start=" . Dumper($rcpts);
+
+    ##
+    if (ref $rcpts eq 'ARRAY' and @$rcpts > 1){
+warn "SPLITTING: " . Dumper($rcpts);
+        $0 = "send-email [splitting]";
+        my @new_jobs;
+        foreach my $rcpt (@$rcpts) {
+            my $new_args = Storable::dclone($args);
+            $new_args->{rcpts} = $rcpt;
+        
+            my ($host) = $rcpt =~ /\@(.+?)$/;
+            next unless $host;
+            $host = lc $host;
+
+            my $new_job = TheSchwartz::Job->new(
+                                                funcname => 'TheSchwartz::Worker::SendEmail',
+                                                arg      => $new_args,
+                                                coalesce => "$host\@",
+                                                );
+            push @new_jobs, $new_job;
+        }
+        $job->replace_with(@new_jobs);
+        return;
+    }
+    
+    ## Only one rcpt
+    my $rcpt = ref $rcpts eq 'ARRAY' ? $rcpts->[0] : $rcpts;
+warn "rcpt: $rcpt";
+    my ($host) = $rcpt =~ /\@(.+?)$/;
+warn "host: $host";
+    return $job->completed unless $host;
+
+=head
     my %dom_rcpts;  # domain -> [ $rcpt, ... ]
     foreach my $to (@$rcpts) {
         my ($host) = $to =~ /\@(.+?)$/;
@@ -154,142 +177,44 @@
 
     # all rcpts on same server, proceed...
     (my($host), $rcpts) = %dom_rcpts;   # (there's only one key)
+=cut
+
     $0 = "send-email [$host]";
 
-    my @mailhosts = mx(resolver(), $host);
-
-    my @ex = map { $_->exchange } @mailhosts;
-
-    # seen in wild:  no MX records, but port 25 of domain is an SMTP server.  think it's in SMTP spec too?
-    @ex = ($host) unless @ex;
-
-    my $smtp = Net::SMTP::BetterConnecting->new(
-                                                \@ex,
-                                                Hello          => $hello_domain,
-                                                PeerPort       => 25,
-                                                ConnectTimeout => 4,
-                                                );
-    unless ($smtp) {
-        my $err_msg = "Connection failed to domain '$host', MXes: [@ex]";
-        LJ::User::Email->mark(0, $rcpts, $err_msg);
-        die "$err_msg\n";
-    }
-
-    $smtp->timeout(300);
-    # FIXME: need to detect timeouts to log to errors, so people with ridiculous timeouts can see that's why we're not delivering mail
-
-    my $done = 0;
-    while ($job && $class->_send_job_on_connection($smtp, $job) && ++$done < 50) {
-        # added per Abe's request -bt@Nov262008
-        last;
-        my $job1 = $job;
-        $job = $client->find_job_with_coalescing_prefix(__PACKAGE__, "$host\@");
-
-        my $handle = '<nothing>';
-        if ($job) {
-            $job->set_as_current;
-            $handle = $job->handle->as_string;
-            die "RSET failed" unless $smtp->reset;
-        }
-
-        $job1->debug("sent successfully.  trying another.  found: " . $handle);
-    }
-
-    $smtp->quit;
-}
-
-sub _send_job_on_connection {
-    my ($class, $smtp, $job) = @_;
-
+    ## 
     my $args = $job->arg;
-    my $hstr = $job->handle->as_string;
-
-    if ($ENV{DEBUG}) {
-        require Data::Dumper;
-        warn "sending email on $smtp: " . Data::Dumper::Dumper($args);
-    }
-
     my $env_from = $args->{env_from};  # Envelope From
-    my $rcpts    = $args->{rcpts};     # arrayref of recipients
-    my $body     = $args->{data};
-    my $headers;
+    my $message  = $args->{data};
+    my $hstr     = $job->handle->as_string;
 
-    my ($this_domain) = $env_from =~ /\@(.+)/;
-
-    # remove bcc
-    $body =~ s/^(.+?\r?\n\r?\n)//s;
-    $headers = $1;
-    $headers =~ s/^bcc:.+\r?\n//mig;
-
-    # unless they specified a message ID, let's prepend our own:
-    unless ($headers =~ m!^message-id:.+!mi) {
-        $headers = "Message-ID: <sch-$hstr\@$this_domain>\r\n" . $headers;
-    }
-
-    my $details = sub {
-        return eval {
-            $smtp->code . " " . $smtp->message;
+    my $res = LJ::DoSendEmail->send($rcpt, {
+                        from      => $env_from,
+                        data      => $message,
+                        sender_id => $hstr,
+                        });
+    unless ($res eq LJ::DoSendEmail::OK){
+    ## handle error 
+        
+        ## 5xx errors
+        my $details = LJ::DoSendEmail->details;
+        if (LJ::DoSendEmail->status eq 5){
+            $class->on_5xx_rcpt($job, $rcpt, $details);
+            #$job->completed; ## no sence to repeat 
+            die LJ::DoSendEmail->error;
         }
-    };
 
-    my $not_ok = sub {
-        my $cmd = shift;
-        my $err_msg = "during $cmd phase to [@$rcpts]: " . $details->();
-        if ($smtp->status == 5) {
-            $err_msg = "Permanent failure " . $err_msg;
-            $job->permanent_failure($err_msg);
-            LJ::User::Email->mark(5, $rcpts, $err_msg);
-            return 0;  # let's not re-use this connection anymore.
+        if ($res eq LJ::DoSendEmail::NO_SUPPORTED_RCPT){
+        ## permanent error
+            return $job->completed;
         }
-        $err_msg = "Error " . $err_msg;
-        LJ::User::Email->mark(5, $rcpts, $err_msg);
-        die "$err_msg\n";
-    };
 
-    return $not_ok->("MAIL")     unless $smtp->mail($env_from);
-
-    my $got_an_okay = 0;
-    foreach my $rcpt (@$rcpts) {
-        if ($smtp->to($rcpt)) {
-            $got_an_okay = 1;
-            next;
-        }
-        if ($smtp->status == 5) {
-            $class->on_5xx_rcpt($job, $rcpt, $details->());
-            next;
-        }
-        my $err_msg = "Error during TO phase to [@$rcpts]: " . $details->();
-        LJ::User::Email->mark(5, $rcpts, $err_msg);
-        die "$err_msg\n";
+        ## general error
+        die LJ::DoSendEmail->error;
     }
 
-    unless ($got_an_okay) {
-        $job->permanent_failure("Permanent failure TO [@$rcpts]: " . $details->() . "\n");
-        return 0;
-    }
+    ## SUCCESS
+    return $job->completed;
 
-    # have to add a fake "Received: " line in here, otherwise some
-    # stupid over-strict MTAs like bellsouth.net reject it thinking
-    # it's spam that was sent directly (it was).  Called
-    # "NoHopsNoAuth".
-    my $mailid = $hstr;
-    $mailid =~ s/-/00/;  # not sure if hyphen is allowed in
-    my $date = _rfc2822_date(time());
-    my $rcvd = qq{Received: from localhost (theschwartz [127.0.0.1])
-                      by $this_domain (TheSchwartzMTA) with ESMTP id $mailid;
-                      $date
-                  };
-    $rcvd =~ s/\s+$//;
-    $rcvd =~ s/\n\s+/\r\n\t/g;
-    $rcvd .= "\r\n";
-
-    return $not_ok->("DATA")     unless $smtp->data;
-    return $not_ok->("DATASEND") unless $smtp->datasend($rcvd . $headers . $body);
-    return $not_ok->("DATAEND")  unless $smtp->dataend;
-
-    $job->completed;
-    LJ::User::Email->mark(undef, $rcpts, "OK");
-    return 1;
 }
 
 sub on_5xx_rcpt {
@@ -316,98 +241,7 @@
     # .... run subref to, say, put in LJ db that this email is undeliverable
 }
 
-sub _rfc2822_date {
-    my $time = shift;
-    my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday) =
-        gmtime($time);
-    my @days = qw(Sun Mon Tue Wed Thu Fri Sat Sun);
-    my @mon  = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
-    return sprintf("%s, %d %s %4d %02d:%02d:%02d +0000 (UTC)",
-                   $days[$wday], $mday, $mon[$mon], $year+1900, $hour, $min, $sec);
-}
 
-package Net::SMTP::BetterConnecting;
-use strict;
-use base 'Net::SMTP';
-use Net::Config;
-use Net::Cmd;
 
-# Net::SMTP's constructor could use improvement, so this is it:
-#     -- retry hosts, even if they connect and say "4xx service too busy", etc.
-#     -- let you specify different connect timeout vs. command timeout
-sub new {
-    my $self = shift;
-    my $type = ref($self) || $self;
-    my ($host, %arg);
-    if (@_ % 2) {
-        $host = shift;
-        %arg  = @_;
-    } else {
-        %arg  = @_;
-        $host = delete $arg{Host};
-    }
 
-    my $hosts = defined $host ? $host : $NetConfig{smtp_hosts};
-    my $obj;
-    my $timeout         = $arg{Timeout} || 120;
-    my $connect_timeout = $arg{ConnectTimeout} || $timeout;
-
-    my $h;
-    foreach $h (@{ref($hosts) ? $hosts : [ $hosts ]}) {
-        $obj = $type->IO::Socket::INET::new(PeerAddr => ($host = $h),
-                                            PeerPort => $arg{Port} || 'smtp(25)',
-                                            LocalAddr => $arg{LocalAddr},
-                                            LocalPort => $arg{LocalPort},
-                                            Proto    => 'tcp',
-                                            Timeout  => $connect_timeout,
-                                            )
-            or next;
-
-        $obj->timeout($timeout);  # restore the original timeout
-        $obj->autoflush(1);
-        $obj->debug(exists $arg{Debug} ? $arg{Debug} : undef);
-
-        my $res = $obj->response();
-        unless ($res == CMD_OK) {
-            $obj->close();
-            $obj = undef;
-            next;
-        }
-
-        last if $obj;
-    }
-
-    return undef unless $obj;
-
-    ${*$obj}{'net_smtp_exact_addr'} = $arg{ExactAddresses};
-    ${*$obj}{'net_smtp_host'}       = $host;
-    (${*$obj}{'net_smtp_banner'})   = $obj->message;
-    (${*$obj}{'net_smtp_domain'})   = $obj->message =~ /\A\s*(\S+)/;
-
-    unless ($obj->hello($arg{Hello} || "")) {
-        $obj->close();
-        return undef;
-    }
-
-    return $obj;
-}
-
-=head1 AUTHOR
-
-Brad Fitzpatrick -- brad@danga.com
-
-=head1 COPYRIGHT, LICENSE, and WARRANTY
-
-Copyright 2006-2007, SixApart, Ltd.
-
-License to use under the same terms as Perl itself.
-
-This software comes with no warranty of any kind.
-
-=head1 SEE ALSO
-
-L<TheSchwartz>
-
-=cut
-
 1;

Tags: pm, theschwartz-worker-sendemail, vadvs
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 2 comments