vadvs (vadvs) wrote in changelog,
vadvs
vadvs
changelog

[livejournal] r17520: LJSUP-7095: optimize send mail process

Committer: vsukhanov
LJSUP-7095: optimize send mail process
A   trunk/cgi-bin/LJ/DoSendEmail.pm
U   trunk/cgi-bin/ljmail.pl
Added: trunk/cgi-bin/LJ/DoSendEmail.pm
===================================================================
--- trunk/cgi-bin/LJ/DoSendEmail.pm	                        (rev 0)
+++ trunk/cgi-bin/LJ/DoSendEmail.pm	2010-10-08 09:53:50 UTC (rev 17520)
@@ -0,0 +1,314 @@
+package LJ::DoSendEmail;
+use Net::DNS qw(mx);
+use LJ::User::Email;
+
+## Class prop
+my $resolver;
+my $status  = '';
+my $code    = '';
+my $error   = '';
+my $details = '';
+
+## Class accessors
+sub error {
+    my $class = shift;
+    $error = $_[0] if @_ > 0;
+    return $error;
+}
+
+sub status {
+    my $class = shift;
+    $status = $_[0] if @_ > 0;
+    return $status;
+}
+
+sub code {
+    my $class = shift;
+    $code = $_[0] if @_ > 0;
+    return $code;
+}
+
+
+sub details {
+    my $class = shift;
+    $details = $_[0] if @_ > 0;
+    return $details;
+}
+
+##
+## Send function
+##
+# status_code:
+#   undef   - OK
+#   0       - cannot connect to MX-host or email domain.
+#   5xx     - smtp-status
+#
+#sub log_complete_status {
+#    my $status_code = shift;
+#    my $emails      = shift;    # One email if scalar or list of emails if array ref.
+#    my $message     = shift;
+#
+#    LJ::User::Email->mark($status_code, $emails, $message);
+#}
+
+sub set_resolver { $resolver = $_[1] }
+sub resolver { $resolver ||= Net::DNS::Resolver->new() }
+
+
+use constant OK => 0;
+use constant NO_RCPT => 1;
+use constant NO_SUPPORTED_RCPT => 2;
+use constant CONNECTION_FAILED => 3;
+use constant SMTP_ERROR_NO_RCPT_ON_SERVER => 4;
+use constant SMTP_ERROR_PERMANENT         => 5;
+use constant SMTP_ERROR_GENERAL           => 6;
+
+##
+## ->send(
+##      $rcpt,
+##      {
+##          from         = From
+##          data         = raw email with headers and bod
+##          timeout      = Maximum time, in seconds, to wait for a response from the SMTP server (perldoc Net::SMTP). Default: 300
+##          sender_id    = ...
+##          hello_domain = ... (optional)
+##      }
+## )
+## 
+## Returns one of constants defined above.
+sub send {
+    my $class = shift;
+    my ($rcpt, $opts) = @_;
+
+    ## read params
+    my $from         = $opts->{from}; # Envelope From
+    my $data         = $opts->{data};
+    my $timeout      = $opts->{timeout} || 300;
+    my $hello_domain = $opts->{hello_domain} || $LJ::DOMAIN;
+
+    ## flush class properties
+    $class->status('');
+    $class->code('');
+    $class->error('');
+    $class->details('');
+
+    ## is there other side? ))
+    return NO_RCPT unless $rcpt;
+
+    my ($host) = $rcpt =~ /\@(.+?)$/;
+    return NO_SUPPORTED_RCPT unless $host;
+
+    my @ex = ();
+    if ($LJ::IS_DEV_SERVER){
+        @ex = ('127.0.0.1'); ## use local relay
+        @ex = ('172.19.1.1');
+    } else {
+        ## give me the numbers!
+        my @mailhosts = mx(resolver(), $host);
+        @ex = map { $_->exchange } @mailhosts;
+    }
+
+    # seen in wild:  no MX records, but port 25 of domain is an SMTP server.  think it's in SMTP spec too?
+    @ex = ($host) unless @ex;
+
+    my $smtp = Net::SMTP::BetterConnecting->new(
+                                                \@ex,
+                                                Hello          => $hello_domain,
+                                                PeerPort       => 25,
+                                                ConnectTimeout => 4,
+                                                );
+    unless ($smtp) {
+        $class->error("Connection failed to domain '$host', MXes: [@ex]");
+        LJ::User::Email->mark(0, $rcpt, $class->error);
+        return CONNECTION_FAILED;
+    }
+
+    ## Maximum time, in seconds, to wait for a response from the SMTP server
+    $smtp->timeout($timeout);
+    # FIXME: need to detect timeouts to log to errors, so people with ridiculous timeouts can see that's why we're not delivering mail
+
+    my ($this_domain) = $from =~ /\@(.+)/;
+
+    # remove bcc
+    my $body = $data;
+       $body =~ s/^(.+?\r?\n\r?\n)//s;
+    my $headers = $1;
+       $headers =~ s/^bcc:.+\r?\n//mig; ## remove
+
+    ## sender_id should provide as much info for debug as possible.
+    ## For emails that send TheSchwartz worker is may be a 
+    ##      $job->handle->as_string.
+    ##
+    ## Also $sender_id is used as mail id.
+    my $sender_id = $opts->{sender_id};
+    unless ($sender_id){
+        ## generate it.
+        require Sys::Hostname;
+        $sender_id = Sys::Hostname::hostname();
+        $sender_id =~ s/[^-]+//;
+        
+        $sender_id .= "-" . $$ . "-" . time();
+    }
+
+    # unless they specified a message ID, let's prepend our own:
+    unless ($headers =~ m!^message-id:.+!mi) {
+        $headers = "Message-ID: <sch-$sender_id\@$this_domain>\r\n" . $headers;
+    }
+
+    ## _do_send returns nothing on success or failed command on error.
+    my $res = $class->_do_send($smtp, $from, $rcpt, $sender_id, 
+                               $headers, $body);
+    $class->status($smtp->status);
+    eval { $class->code( $smtp->code ) };
+    my $details = eval { $smtp->code . " " . $smtp->message };
+    $smtp->quit; ##
+
+    if ($res){ ## ERROR
+        ## handle 5xx errors
+        # ...
+        # #? $class->on_5xx_rcpt($job, $rcpt, $details->());
+
+        $class->error("Permanent failure during $failed_phase phase to [$rcpt]: $details \n");
+
+        ## log error
+        LJ::User::Email->mark(5, $rcpts, $err_msg);
+
+        ## handle other errors
+        if ($failed_phase eq "TO"){
+        ## Permanent error
+        ## no need to retry attempts
+            return SMTP_ERROR_NO_RCPT_ON_SERVER;
+        }
+
+        if ($class->status == 5){
+            return SMTP_ERROR_PERMANENT;
+        }
+
+        return SMTP_ERROR_GENERAL;
+    }
+
+
+    ## flush errors if they are.
+    LJ::User::Email->mark(undef, $rcpt, "OK");
+
+    ##
+    return OK;
+}
+
+## Send SMTP commands to server.
+##      On success returns nothing
+##      On error returns a command that failed.
+sub _do_send {
+    my $class = shift;
+    my ($smtp, $env_from, $rcpt, $mail_id, $headers, $body) = @_;
+
+    ## Send command MAIL to server.
+    my $res = $smtp->mail($env_from);
+
+    ## In case of error return name of command that failed.
+    return "MAIL" unless $res;
+
+    ## Provide recipient to server 
+    $res = $smtp->to($rcpt);
+    return "TO" unless $res; # return error
+
+    # have to add a fake "Received: " line in here, otherwise some
+    # stupid over-strict MTAs like bellsouth.net reject it thinking
+    # it's spam that was sent directly (it was).  Called
+    # "NoHopsNoAuth".
+    $mail_id =~ s/-/00/;  # not sure if hyphen is allowed in
+    my $date = _rfc2822_date(time());
+    my $rcvd = qq{Received: from localhost (theschwartz [127.0.0.1])
+                      by $this_domain (TheSchwartzMTA) with ESMTP id $mail_id;
+                      $date
+                  };
+    $rcvd =~ s/\s+$//;
+    $rcvd =~ s/\n\s+/\r\n\t/g;
+    $rcvd .= "\r\n";
+
+    ## Send commands to server. On error returns the stage name.
+    return "DATA"     unless $smtp->data;
+    return "DATASEND" unless $smtp->datasend($rcvd . $headers . $body);
+    return "DATAEND"  unless $smtp->dataend;
+
+    return; # OK
+}
+
+
+sub _rfc2822_date {
+    my $time = shift;
+    my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday) =
+        gmtime($time);
+    my @days = qw(Sun Mon Tue Wed Thu Fri Sat Sun);
+    my @mon  = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec);
+    return sprintf("%s, %d %s %4d %02d:%02d:%02d +0000 (UTC)",
+                   $days[$wday], $mday, $mon[$mon], $year+1900, $hour, $min, $sec);
+}
+
+package Net::SMTP::BetterConnecting;
+use strict;
+use base 'Net::SMTP';
+use Net::Config;
+use Net::Cmd;
+
+# Net::SMTP's constructor could use improvement, so this is it:
+#     -- retry hosts, even if they connect and say "4xx service too busy", etc.
+#     -- let you specify different connect timeout vs. command timeout
+sub new {
+    my $self = shift;
+    my $type = ref($self) || $self;
+    my ($host, %arg);
+    if (@_ % 2) {
+        $host = shift;
+        %arg  = @_;
+    } else {
+        %arg  = @_;
+        $host = delete $arg{Host};
+    }
+
+    my $hosts = defined $host ? $host : $NetConfig{smtp_hosts};
+    my $obj;
+    my $timeout         = $arg{Timeout} || 120;
+    my $connect_timeout = $arg{ConnectTimeout} || $timeout;
+
+    my $h;
+    foreach $h (@{ref($hosts) ? $hosts : [ $hosts ]}) {
+        $obj = $type->IO::Socket::INET::new(PeerAddr => ($host = $h),
+                                            PeerPort => $arg{Port} || 'smtp(25)',
+                                            LocalAddr => $arg{LocalAddr},
+                                            LocalPort => $arg{LocalPort},
+                                            Proto    => 'tcp',
+                                            Timeout  => $connect_timeout,
+                                            )
+            or next;
+
+        $obj->timeout($timeout);  # restore the original timeout
+        $obj->autoflush(1);
+        $obj->debug(exists $arg{Debug} ? $arg{Debug} : undef);
+
+        my $res = $obj->response();
+        unless ($res == CMD_OK) {
+            $obj->close();
+            $obj = undef;
+            next;
+        }
+
+        last if $obj;
+    }
+
+    return undef unless $obj;
+
+    ${*$obj}{'net_smtp_exact_addr'} = $arg{ExactAddresses};
+    ${*$obj}{'net_smtp_host'}       = $host;
+    (${*$obj}{'net_smtp_banner'})   = $obj->message;
+    (${*$obj}{'net_smtp_domain'})   = $obj->message =~ /\A\s*(\S+)/;
+
+    unless ($obj->hello($arg{Hello} || "")) {
+        $obj->close();
+        return undef;
+    }
+
+    return $obj;
+}
+
+1;

Modified: trunk/cgi-bin/ljmail.pl
===================================================================
--- trunk/cgi-bin/ljmail.pl	2010-10-07 09:00:43 UTC (rev 17519)
+++ trunk/cgi-bin/ljmail.pl	2010-10-08 09:53:50 UTC (rev 17520)
@@ -13,6 +13,7 @@
 
 use Encode qw//;
 use MIME::Base64 qw//;
+use LJ::DoSendEmail;
 
 use Class::Autouse qw(
                       IO::Socket::INET
@@ -183,48 +184,82 @@
     }
 
     LJ::note_recent_action(undef, $log_action);
- 
-    my $enqueue = sub {
-        my $starttime = [Time::HiRes::gettimeofday()];
-        ## '_reuse_any_existing_connection' will return 'mass' schwartz handle 
-        ## when called from 'mass' workers and will return 'default' for the rest.
-        my $sclient = LJ::theschwartz({ 'role' => $opt->{'_schwartz_role'} }) 
-            or die "Misconfiguration in mail.  Can't go into TheSchwartz.";
-        my $host;
-        if (@rcpts == 1) {
-            $rcpts[0] =~ /(.+)@(.+)$/;
-            $host = lc($2) . '@' . lc($1);   # we store it reversed in database
+
+
+    ## Start sending process:
+    ##  1. At stage One we try to send email right now from this process 
+    ##  2. If stage one faults for some reason, at stage Two add task to TheSchwartz queue.
+    ##
+
+    ## Stage 1.
+    ##  We never should send emails online when user waits for response.
+    ##  Workers send emails too and exactly in this case 
+    ##  we spend worker's time to try to send email directly from this process.
+    ##  This approach intended to reduce TheSchwartz workload.
+    unless (LJ::is_web_context()){
+        foreach my $rcpt (@rcpts){
+            my $res = LJ::DoSendEmail->send($rcpt, {
+                        from    => $from,         ## Envelope From
+                        data    => $message_text,
+
+                        ## Optional params
+                        # sender_id => "",  ## stored in email headers. for debug.
+                        # timeout   => 300, ## Default timeout for sending email is 300 sec.
+                        });
+            ## handle result
+            if ($res eq LJ::DoSendEmail::OK){
+            ## email succeffully sent
+                $rcpt = ""; # forget about this rcpt
+            } else {
+            ## handle error 
+
+                ## 5xx errors
+                my $details = LJ::DoSendEmail->details;
+                LJ::errobj('DieString', message => "send_email to $rcpt failed: $details")->log
+                    if LJ::DoSendEmail->status eq 5;
+            }
+
         }
-        my $job = TheSchwartz::Job->new(funcname => "TheSchwartz::Worker::SendEmail",
-                                        arg      => {
-                                            env_from => $from,
-                                            rcpts    => \@rcpts,
-                                            data     => $message_text,
-                                        },
-                                        coalesce => $host,
-                                        );
-        my $h = $sclient->insert($job);
 
-        LJ::blocking_report( 'the_schwartz', 'send_mail',
-                             Time::HiRes::tv_interval($starttime));
+        ## empty rcpt means that email was successfully sent
+        @rcpts = grep {$_} @rcpts;
+    }
 
-        return $h ? 1 : 0;
-    };
+    ## Do we still have someone to notify?
+    return 1 unless @rcpts; 
 
-    if ($LJ::IS_DEV_SERVER) {
-        ## SMTP or sendmail case, dev servers only. Code is loosely taken from MIME::Lite->send
-        ## Sendmail command line option -t may be used to take recipiens from message headers 
-        ## instead of specifying them in command-line
-        my $command_line = "/usr/lib/sendmail -oi -oem -f '$from' " . join(" ", map {"'$_'"} @rcpts);
-        open( my $fh, "| $command_line" ) 
-            or die "Can't run sendmail ($command_line): $!";
-        print $fh $message_text;
-        close $fh;
-        return 1;
-    } else {
-        ## TODO: if ($async_caller) { $success = send_mail_directly() }; $enqueue->() unless $success;
-        return $enqueue->();
+    ## Stage 2.
+    ##  Ok. We've tried to avoid this... But delayed sending.
+    ##  Deligate this job to SendMail worker.
+     
+    my $starttime = [Time::HiRes::gettimeofday()];
+
+    ## '_reuse_any_existing_connection' will return 'mass' schwartz handle 
+    ## when called from 'mass' workers and will return 'default' for the rest.
+    my $sclient = LJ::theschwartz({ 'role' => $opt->{'_schwartz_role'} }) 
+        or die "Misconfiguration in mail.  Can't go into TheSchwartz.";
+
+    ## coalesce param.
+    my $coalesce = '';
+    if (@rcpts == 1) {
+        $rcpts[0] =~ /(.+)@(.+)$/;
+        $coalesce = lc($2) . '@' . lc($1);   # we store it reversed in database
     }
+
+    my $job = TheSchwartz::Job->new(funcname => "TheSchwartz::Worker::SendEmail",
+                                    arg      => {
+                                        env_from => $from,
+                                        rcpts    => \@rcpts,
+                                        data     => $message_text,
+                                    },
+                                    coalesce => $coalesce,
+                                    );
+    my $h = $sclient->insert($job);
+
+    LJ::blocking_report( 'the_schwartz', 'send_mail',
+                         Time::HiRes::tv_interval($starttime));
+    return $h ? 1 : 0;
+
 }
 
 1;

Tags: livejournal, pl, pm, vadvs
Subscribe
  • Post a new comment

    Error

    Anonymous comments are disabled in this journal

    default userpic

    Your reply will be screened

    Your IP address will be recorded 

  • 0 comments