Committer: vsukhanov
LJSUP-7095: optimize send mail processA trunk/cgi-bin/LJ/DoSendEmail.pm U trunk/cgi-bin/ljmail.pl
Added: trunk/cgi-bin/LJ/DoSendEmail.pm =================================================================== --- trunk/cgi-bin/LJ/DoSendEmail.pm (rev 0) +++ trunk/cgi-bin/LJ/DoSendEmail.pm 2010-10-08 09:53:50 UTC (rev 17520) @@ -0,0 +1,314 @@ +package LJ::DoSendEmail; +use Net::DNS qw(mx); +use LJ::User::Email; + +## Class prop +my $resolver; +my $status = ''; +my $code = ''; +my $error = ''; +my $details = ''; + +## Class accessors +sub error { + my $class = shift; + $error = $_[0] if @_ > 0; + return $error; +} + +sub status { + my $class = shift; + $status = $_[0] if @_ > 0; + return $status; +} + +sub code { + my $class = shift; + $code = $_[0] if @_ > 0; + return $code; +} + + +sub details { + my $class = shift; + $details = $_[0] if @_ > 0; + return $details; +} + +## +## Send function +## +# status_code: +# undef - OK +# 0 - cannot connect to MX-host or email domain. +# 5xx - smtp-status +# +#sub log_complete_status { +# my $status_code = shift; +# my $emails = shift; # One email if scalar or list of emails if array ref. +# my $message = shift; +# +# LJ::User::Email->mark($status_code, $emails, $message); +#} + +sub set_resolver { $resolver = $_[1] } +sub resolver { $resolver ||= Net::DNS::Resolver->new() } + + +use constant OK => 0; +use constant NO_RCPT => 1; +use constant NO_SUPPORTED_RCPT => 2; +use constant CONNECTION_FAILED => 3; +use constant SMTP_ERROR_NO_RCPT_ON_SERVER => 4; +use constant SMTP_ERROR_PERMANENT => 5; +use constant SMTP_ERROR_GENERAL => 6; + +## +## ->send( +## $rcpt, +## { +## from = From +## data = raw email with headers and bod +## timeout = Maximum time, in seconds, to wait for a response from the SMTP server (perldoc Net::SMTP). Default: 300 +## sender_id = ... +## hello_domain = ... (optional) +## } +## ) +## +## Returns one of constants defined above. +sub send { + my $class = shift; + my ($rcpt, $opts) = @_; + + ## read params + my $from = $opts->{from}; # Envelope From + my $data = $opts->{data}; + my $timeout = $opts->{timeout} || 300; + my $hello_domain = $opts->{hello_domain} || $LJ::DOMAIN; + + ## flush class properties + $class->status(''); + $class->code(''); + $class->error(''); + $class->details(''); + + ## is there other side? )) + return NO_RCPT unless $rcpt; + + my ($host) = $rcpt =~ /\@(.+?)$/; + return NO_SUPPORTED_RCPT unless $host; + + my @ex = (); + if ($LJ::IS_DEV_SERVER){ + @ex = ('127.0.0.1'); ## use local relay + @ex = ('172.19.1.1'); + } else { + ## give me the numbers! + my @mailhosts = mx(resolver(), $host); + @ex = map { $_->exchange } @mailhosts; + } + + # seen in wild: no MX records, but port 25 of domain is an SMTP server. think it's in SMTP spec too? + @ex = ($host) unless @ex; + + my $smtp = Net::SMTP::BetterConnecting->new( + \@ex, + Hello => $hello_domain, + PeerPort => 25, + ConnectTimeout => 4, + ); + unless ($smtp) { + $class->error("Connection failed to domain '$host', MXes: [@ex]"); + LJ::User::Email->mark(0, $rcpt, $class->error); + return CONNECTION_FAILED; + } + + ## Maximum time, in seconds, to wait for a response from the SMTP server + $smtp->timeout($timeout); + # FIXME: need to detect timeouts to log to errors, so people with ridiculous timeouts can see that's why we're not delivering mail + + my ($this_domain) = $from =~ /\@(.+)/; + + # remove bcc + my $body = $data; + $body =~ s/^(.+?\r?\n\r?\n)//s; + my $headers = $1; + $headers =~ s/^bcc:.+\r?\n//mig; ## remove + + ## sender_id should provide as much info for debug as possible. + ## For emails that send TheSchwartz worker is may be a + ## $job->handle->as_string. + ## + ## Also $sender_id is used as mail id. + my $sender_id = $opts->{sender_id}; + unless ($sender_id){ + ## generate it. + require Sys::Hostname; + $sender_id = Sys::Hostname::hostname(); + $sender_id =~ s/[^-]+//; + + $sender_id .= "-" . $$ . "-" . time(); + } + + # unless they specified a message ID, let's prepend our own: + unless ($headers =~ m!^message-id:.+!mi) { + $headers = "Message-ID: <sch-$sender_id\@$this_domain>\r\n" . $headers; + } + + ## _do_send returns nothing on success or failed command on error. + my $res = $class->_do_send($smtp, $from, $rcpt, $sender_id, + $headers, $body); + $class->status($smtp->status); + eval { $class->code( $smtp->code ) }; + my $details = eval { $smtp->code . " " . $smtp->message }; + $smtp->quit; ## + + if ($res){ ## ERROR + ## handle 5xx errors + # ... + # #? $class->on_5xx_rcpt($job, $rcpt, $details->()); + + $class->error("Permanent failure during $failed_phase phase to [$rcpt]: $details \n"); + + ## log error + LJ::User::Email->mark(5, $rcpts, $err_msg); + + ## handle other errors + if ($failed_phase eq "TO"){ + ## Permanent error + ## no need to retry attempts + return SMTP_ERROR_NO_RCPT_ON_SERVER; + } + + if ($class->status == 5){ + return SMTP_ERROR_PERMANENT; + } + + return SMTP_ERROR_GENERAL; + } + + + ## flush errors if they are. + LJ::User::Email->mark(undef, $rcpt, "OK"); + + ## + return OK; +} + +## Send SMTP commands to server. +## On success returns nothing +## On error returns a command that failed. +sub _do_send { + my $class = shift; + my ($smtp, $env_from, $rcpt, $mail_id, $headers, $body) = @_; + + ## Send command MAIL to server. + my $res = $smtp->mail($env_from); + + ## In case of error return name of command that failed. + return "MAIL" unless $res; + + ## Provide recipient to server + $res = $smtp->to($rcpt); + return "TO" unless $res; # return error + + # have to add a fake "Received: " line in here, otherwise some + # stupid over-strict MTAs like bellsouth.net reject it thinking + # it's spam that was sent directly (it was). Called + # "NoHopsNoAuth". + $mail_id =~ s/-/00/; # not sure if hyphen is allowed in + my $date = _rfc2822_date(time()); + my $rcvd = qq{Received: from localhost (theschwartz [127.0.0.1]) + by $this_domain (TheSchwartzMTA) with ESMTP id $mail_id; + $date + }; + $rcvd =~ s/\s+$//; + $rcvd =~ s/\n\s+/\r\n\t/g; + $rcvd .= "\r\n"; + + ## Send commands to server. On error returns the stage name. + return "DATA" unless $smtp->data; + return "DATASEND" unless $smtp->datasend($rcvd . $headers . $body); + return "DATAEND" unless $smtp->dataend; + + return; # OK +} + + +sub _rfc2822_date { + my $time = shift; + my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday) = + gmtime($time); + my @days = qw(Sun Mon Tue Wed Thu Fri Sat Sun); + my @mon = qw(Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec); + return sprintf("%s, %d %s %4d %02d:%02d:%02d +0000 (UTC)", + $days[$wday], $mday, $mon[$mon], $year+1900, $hour, $min, $sec); +} + +package Net::SMTP::BetterConnecting; +use strict; +use base 'Net::SMTP'; +use Net::Config; +use Net::Cmd; + +# Net::SMTP's constructor could use improvement, so this is it: +# -- retry hosts, even if they connect and say "4xx service too busy", etc. +# -- let you specify different connect timeout vs. command timeout +sub new { + my $self = shift; + my $type = ref($self) || $self; + my ($host, %arg); + if (@_ % 2) { + $host = shift; + %arg = @_; + } else { + %arg = @_; + $host = delete $arg{Host}; + } + + my $hosts = defined $host ? $host : $NetConfig{smtp_hosts}; + my $obj; + my $timeout = $arg{Timeout} || 120; + my $connect_timeout = $arg{ConnectTimeout} || $timeout; + + my $h; + foreach $h (@{ref($hosts) ? $hosts : [ $hosts ]}) { + $obj = $type->IO::Socket::INET::new(PeerAddr => ($host = $h), + PeerPort => $arg{Port} || 'smtp(25)', + LocalAddr => $arg{LocalAddr}, + LocalPort => $arg{LocalPort}, + Proto => 'tcp', + Timeout => $connect_timeout, + ) + or next; + + $obj->timeout($timeout); # restore the original timeout + $obj->autoflush(1); + $obj->debug(exists $arg{Debug} ? $arg{Debug} : undef); + + my $res = $obj->response(); + unless ($res == CMD_OK) { + $obj->close(); + $obj = undef; + next; + } + + last if $obj; + } + + return undef unless $obj; + + ${*$obj}{'net_smtp_exact_addr'} = $arg{ExactAddresses}; + ${*$obj}{'net_smtp_host'} = $host; + (${*$obj}{'net_smtp_banner'}) = $obj->message; + (${*$obj}{'net_smtp_domain'}) = $obj->message =~ /\A\s*(\S+)/; + + unless ($obj->hello($arg{Hello} || "")) { + $obj->close(); + return undef; + } + + return $obj; +} + +1; Modified: trunk/cgi-bin/ljmail.pl =================================================================== --- trunk/cgi-bin/ljmail.pl 2010-10-07 09:00:43 UTC (rev 17519) +++ trunk/cgi-bin/ljmail.pl 2010-10-08 09:53:50 UTC (rev 17520) @@ -13,6 +13,7 @@ use Encode qw//; use MIME::Base64 qw//; +use LJ::DoSendEmail; use Class::Autouse qw( IO::Socket::INET @@ -183,48 +184,82 @@ } LJ::note_recent_action(undef, $log_action); - - my $enqueue = sub { - my $starttime = [Time::HiRes::gettimeofday()]; - ## '_reuse_any_existing_connection' will return 'mass' schwartz handle - ## when called from 'mass' workers and will return 'default' for the rest. - my $sclient = LJ::theschwartz({ 'role' => $opt->{'_schwartz_role'} }) - or die "Misconfiguration in mail. Can't go into TheSchwartz."; - my $host; - if (@rcpts == 1) { - $rcpts[0] =~ /(.+)@(.+)$/; - $host = lc($2) . '@' . lc($1); # we store it reversed in database + + + ## Start sending process: + ## 1. At stage One we try to send email right now from this process + ## 2. If stage one faults for some reason, at stage Two add task to TheSchwartz queue. + ## + + ## Stage 1. + ## We never should send emails online when user waits for response. + ## Workers send emails too and exactly in this case + ## we spend worker's time to try to send email directly from this process. + ## This approach intended to reduce TheSchwartz workload. + unless (LJ::is_web_context()){ + foreach my $rcpt (@rcpts){ + my $res = LJ::DoSendEmail->send($rcpt, { + from => $from, ## Envelope From + data => $message_text, + + ## Optional params + # sender_id => "", ## stored in email headers. for debug. + # timeout => 300, ## Default timeout for sending email is 300 sec. + }); + ## handle result + if ($res eq LJ::DoSendEmail::OK){ + ## email succeffully sent + $rcpt = ""; # forget about this rcpt + } else { + ## handle error + + ## 5xx errors + my $details = LJ::DoSendEmail->details; + LJ::errobj('DieString', message => "send_email to $rcpt failed: $details")->log + if LJ::DoSendEmail->status eq 5; + } + } - my $job = TheSchwartz::Job->new(funcname => "TheSchwartz::Worker::SendEmail", - arg => { - env_from => $from, - rcpts => \@rcpts, - data => $message_text, - }, - coalesce => $host, - ); - my $h = $sclient->insert($job); - LJ::blocking_report( 'the_schwartz', 'send_mail', - Time::HiRes::tv_interval($starttime)); + ## empty rcpt means that email was successfully sent + @rcpts = grep {$_} @rcpts; + } - return $h ? 1 : 0; - }; + ## Do we still have someone to notify? + return 1 unless @rcpts; - if ($LJ::IS_DEV_SERVER) { - ## SMTP or sendmail case, dev servers only. Code is loosely taken from MIME::Lite->send - ## Sendmail command line option -t may be used to take recipiens from message headers - ## instead of specifying them in command-line - my $command_line = "/usr/lib/sendmail -oi -oem -f '$from' " . join(" ", map {"'$_'"} @rcpts); - open( my $fh, "| $command_line" ) - or die "Can't run sendmail ($command_line): $!"; - print $fh $message_text; - close $fh; - return 1; - } else { - ## TODO: if ($async_caller) { $success = send_mail_directly() }; $enqueue->() unless $success; - return $enqueue->(); + ## Stage 2. + ## Ok. We've tried to avoid this... But delayed sending. + ## Deligate this job to SendMail worker. + + my $starttime = [Time::HiRes::gettimeofday()]; + + ## '_reuse_any_existing_connection' will return 'mass' schwartz handle + ## when called from 'mass' workers and will return 'default' for the rest. + my $sclient = LJ::theschwartz({ 'role' => $opt->{'_schwartz_role'} }) + or die "Misconfiguration in mail. Can't go into TheSchwartz."; + + ## coalesce param. + my $coalesce = ''; + if (@rcpts == 1) { + $rcpts[0] =~ /(.+)@(.+)$/; + $coalesce = lc($2) . '@' . lc($1); # we store it reversed in database } + + my $job = TheSchwartz::Job->new(funcname => "TheSchwartz::Worker::SendEmail", + arg => { + env_from => $from, + rcpts => \@rcpts, + data => $message_text, + }, + coalesce => $coalesce, + ); + my $h = $sclient->insert($job); + + LJ::blocking_report( 'the_schwartz', 'send_mail', + Time::HiRes::tv_interval($starttime)); + return $h ? 1 : 0; + } 1;