Skip to content

Commit

Permalink
Support email plugin status sink (#48044)
Browse files Browse the repository at this point in the history
* Support email plugin status sink

* Use constant

* Address minor feedback

* Fix typo
  • Loading branch information
bernardjkim authored Oct 30, 2024
1 parent dddd9ea commit 2d5ec57
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 8 deletions.
6 changes: 6 additions & 0 deletions integrations/access/email/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (a *App) run(ctx context.Context) error {

// init inits plugin
func (a *App) init(ctx context.Context) error {
log := logger.Get(ctx)
ctx, cancel := context.WithTimeout(ctx, initTimeout)
defer cancel()

Expand All @@ -146,6 +147,11 @@ func (a *App) init(ctx context.Context) error {
return trace.Wrap(err)
}

log.Debug("Starting client connection health check...")
if err = a.client.CheckHealth(ctx); err != nil {
return trace.Wrap(err, "client connection health check failed")
}
log.Debug("Client connection health check finished ok")
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions integrations/access/email/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin
}

if conf.Mailgun != nil {
mailer = NewMailgunMailer(*conf.Mailgun, conf.Delivery.Sender, clusterName)
mailer = NewMailgunMailer(*conf.Mailgun, conf.StatusSink, conf.Delivery.Sender, clusterName, conf.RoleToRecipients[types.Wildcard])
logger.Get(ctx).WithField("domain", conf.Mailgun.Domain).Info("Using Mailgun as email transport")
}

if conf.SMTP != nil {
mailer = NewSMTPMailer(*conf.SMTP, conf.Delivery.Sender, clusterName)
mailer = NewSMTPMailer(*conf.SMTP, conf.StatusSink, conf.Delivery.Sender, clusterName)
logger.Get(ctx).WithFields(logger.Fields{
"host": conf.SMTP.Host,
"port": conf.SMTP.Port,
Expand All @@ -79,6 +79,11 @@ func NewClient(ctx context.Context, conf Config, clusterName, webProxyAddr strin
}, nil
}

// CheckHealth checks if the Email client connection is healthy.
func (c *Client) CheckHealth(ctx context.Context) error {
return trace.Wrap(c.mailer.CheckHealth(ctx))
}

// SendNewThreads sends emails on new requests. Returns EmailData.
func (c *Client) SendNewThreads(ctx context.Context, recipients []string, reqID string, reqData RequestData) ([]EmailThread, error) {
var threads []EmailThread
Expand Down
4 changes: 4 additions & 0 deletions integrations/access/email/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type Config struct {
Delivery DeliveryConfig `toml:"delivery"`
RoleToRecipients common.RawRecipientsMap `toml:"role_to_recipients"`
Log logger.Config `toml:"log"`

// StatusSink receives any status updates from the plugin for
// further processing. Status updates will be ignored if not set.
StatusSink common.StatusSink
}

// LoadConfig reads the config file, initializes a new Config struct object, and returns it.
Expand Down
122 changes: 116 additions & 6 deletions integrations/access/email/mailers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/rand"
"encoding/binary"
"fmt"
"net/http"
"os"
"strconv"
"strings"
Expand All @@ -29,42 +30,93 @@ import (
"github.com/gravitational/trace"
"github.com/mailgun/mailgun-go/v4"
"gopkg.in/mail.v2"

"github.com/gravitational/teleport/integrations/access/common"
"github.com/gravitational/teleport/integrations/lib/logger"
)

const (
// statusEmitTimeout specifies the max timeout to emit status.
statusEmitTimeout = 10 * time.Second
// mailgunHTTPTimeout specifies the max timeout for mailgun api send request.
mailgunHTTPTimeout = 10 * time.Second
// smtpDialerTimeout specifies the max timeout for smtp dialer read/write operations.
smtpDialerTimeout = 10 * time.Second
)

// Mailer is an interface to mail sender
type Mailer interface {
Send(ctx context.Context, id, recipient, body, references string) (string, error)
// CheckHealth checks if the Email client connection is healthy.
CheckHealth(ctx context.Context) error
}

// SMTPMailer implements SMTP mailer
type SMTPMailer struct {
dialer *mail.Dialer
sender string
clusterName string
sink common.StatusSink
}

// MailgunMailer implements mailgun mailer
type MailgunMailer struct {
mailgun *mailgun.MailgunImpl
sender string
clusterName string

// fallbackRecipients specifies the list of default recipients.
// This is only used for initial health check.
fallbackRecipients []string
}

// NewSMTPMailer inits new SMTP mailer
func NewSMTPMailer(c SMTPConfig, sender, clusterName string) Mailer {
func NewSMTPMailer(c SMTPConfig, sink common.StatusSink, sender, clusterName string) Mailer {
dialer := mail.NewDialer(c.Host, c.Port, c.Username, c.Password)
dialer.StartTLSPolicy = c.MailStartTLSPolicy
dialer.Timeout = smtpDialerTimeout

return &SMTPMailer{dialer, sender, clusterName}
return &SMTPMailer{
dialer: dialer,
sender: sender,
clusterName: clusterName,
sink: sink,
}
}

// NewMailgunMailer inits new Mailgun mailer
func NewMailgunMailer(c MailgunConfig, sender, clusterName string) Mailer {
func NewMailgunMailer(c MailgunConfig, sink common.StatusSink, sender, clusterName string, fallbackRecipients []string) Mailer {
m := mailgun.NewMailgun(c.Domain, c.PrivateKey)
if c.APIBase != "" {
m.SetAPIBase(c.APIBase)
}
return &MailgunMailer{m, sender, clusterName}
client := &http.Client{
Transport: &statusSinkTransport{
RoundTripper: http.DefaultTransport,
sink: sink,
},
}
m.SetClient(client)
return &MailgunMailer{
mailgun: m,
sender: sender,
clusterName: clusterName,
fallbackRecipients: fallbackRecipients,
}
}

// CheckHealth checks the health of the SMTP service.
func (m *SMTPMailer) CheckHealth(ctx context.Context) error {
log := logger.Get(ctx)
client, err := m.dialer.Dial()
m.emitStatus(ctx, err)
if err != nil {
return trace.Wrap(err)
}
if err := client.Close(); err != nil {
log.Debug("Failed to close client connection after health check")
}
return nil
}

// Send sends email via SMTP
Expand All @@ -91,10 +143,10 @@ func (m *SMTPMailer) Send(ctx context.Context, id, recipient, body, references s
}

err = m.dialer.DialAndSend(msg)
m.emitStatus(ctx, err)
if err != nil {
return "", trace.Wrap(err)
}

return id, nil
}

Expand Down Expand Up @@ -123,6 +175,38 @@ func (m *SMTPMailer) base36(input uint64) string {
return strings.ToUpper(strconv.FormatUint(input, 36))
}

// emitStatus emits status based on provided statusErr.
func (m *SMTPMailer) emitStatus(ctx context.Context, statusErr error) {
if m.sink == nil {
return
}

ctx, cancel := context.WithTimeout(ctx, statusEmitTimeout)
defer cancel()

log := logger.Get(ctx)
code := http.StatusOK
if statusErr != nil {
// Returned error is undocumented. Using generic error code for all errors.
code = http.StatusInternalServerError
}
if err := m.sink.Emit(ctx, common.StatusFromStatusCode(code)); err != nil {
log.WithError(err).Error("Error while emitting Email plugin status")
}
}

// CheckHealth checks the health of the Mailgun service.
func (m *MailgunMailer) CheckHealth(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout)
defer cancel()

msg := m.mailgun.NewMessage(m.sender, "Health Check", "Testing Mailgun API connection...", m.fallbackRecipients...)
msg.SetRequireTLS(true)
msg.EnableTestMode() // Test message submission without delivering to recipients.
_, _, err := m.mailgun.Send(ctx, msg)
return trace.Wrap(err)
}

// Send sends email via Mailgun
func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, references string) (string, error) {
subject := fmt.Sprintf("%v Role Request %v", m.clusterName, id)
Expand All @@ -136,7 +220,7 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference
msg.AddHeader("In-Reply-To", refHeader)
}

ctx, cancel := context.WithTimeout(ctx, time.Second*10)
ctx, cancel := context.WithTimeout(ctx, mailgunHTTPTimeout)
defer cancel()

_, id, err := m.mailgun.Send(ctx, msg)
Expand All @@ -147,3 +231,29 @@ func (m *MailgunMailer) Send(ctx context.Context, id, recipient, body, reference

return id, nil
}

// statusSinkTransport wraps the Mailgun client transport and
// emits plugin status.
type statusSinkTransport struct {
http.RoundTripper
sink common.StatusSink
}

// RoundTrip implements the http.RoundTripper interface.
func (t *statusSinkTransport) RoundTrip(req *http.Request) (*http.Response, error) {
log := logger.Get(req.Context())
resp, err := t.RoundTripper.RoundTrip(req)
if err != nil {
return nil, trace.Wrap(err)
}
if t.sink != nil {
ctx, cancel := context.WithTimeout(req.Context(), statusEmitTimeout)
defer cancel()

status := common.StatusFromStatusCode(resp.StatusCode)
if err := t.sink.Emit(ctx, status); err != nil {
log.WithError(err).Error("Error while emitting Email plugin status")
}
}
return resp, nil
}
7 changes: 7 additions & 0 deletions integrations/access/email/testlib/mock_mailgun.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ func newMockMailgunServer(concurrency int) *mockMailgunServer {

id := uuid.New().String()

// The testmode flag is only used during health check.
// Do no create message when in testmode.
if r.PostFormValue("o:testmode") == "yes" {
fmt.Fprintf(w, `{"id": "%v"}`, id)
return
}

message := mockMailgunMessage{
ID: id,
Sender: r.PostFormValue("from"),
Expand Down

0 comments on commit 2d5ec57

Please sign in to comment.