From 99f0fd97fecfaa81b0c7c8bf53f2c52da04a3594 Mon Sep 17 00:00:00 2001 From: Samantha Date: Thu, 6 Feb 2025 16:18:10 -0500 Subject: [PATCH 01/18] email: Initial Pardot client implementation --- email/pardot.go | 189 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 email/pardot.go diff --git a/email/pardot.go b/email/pardot.go new file mode 100644 index 00000000000..c77d37ad229 --- /dev/null +++ b/email/pardot.go @@ -0,0 +1,189 @@ +package email + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/jmhodges/clock" + "github.com/letsencrypt/boulder/core" +) + +const ( + // tokenPath is the path to the Salesforce OAuth2 token endpoint. + tokenPath = "/services/oauth2/token" + + // prospectsPath is the path to the Pardot v5 Prospects endpoint. This + // endpoint will create a new Prospect if one does not already exist. + prospectsPath = "/api/v5/prospects" + + // maxAttempts is the maximum number of attempts to retry a request. + maxAttempts = 3 + + // retryBackoffBase is the base for exponential backoff. + retryBackoffBase = 2.0 + + // retryBackoffMax is the maximum backoff time. + retryBackoffMax = 10 * time.Second + + // retryBackoffMin is the minimum backoff time. + retryBackoffMin = 200 * time.Millisecond +) + +// oAuthToken holds the OAuth2 access token and its expiration. +type oAuthToken struct { + sync.Mutex + + accessToken string + expiresAt time.Time +} + +// PardotClient handles authentication and sending contacts to Pardot. +type PardotClient struct { + businessUnit string + clientId string + clientSecret string + prospectsURL string + tokenURL string + token *oAuthToken + clk clock.Clock +} + +// NewPardotClient creates a new PardotClient. +func NewPardotClient(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClient, error) { + prospectsURL, err := url.JoinPath(pardotBaseURL, prospectsPath) + if err != nil { + return nil, fmt.Errorf("failed to join upsert path: %w", err) + } + tokenURL, err := url.JoinPath(oauthbaseURL, tokenPath) + if err != nil { + return nil, fmt.Errorf("failed to join token path: %w", err) + } + + return &PardotClient{ + businessUnit: businessUnit, + clientId: clientId, + clientSecret: clientSecret, + prospectsURL: prospectsURL, + tokenURL: tokenURL, + + token: &oAuthToken{}, + clk: clk, + }, nil +} + +// updateToken updates the OAuth token if necessary. +func (pc *PardotClient) updateToken() error { + pc.token.Lock() + defer pc.token.Unlock() + + now := pc.clk.Now() + if now.Before(pc.token.expiresAt.Add(-5*time.Minute)) && pc.token.accessToken != "" { + return nil + } + + resp, err := http.PostForm(pc.tokenURL, url.Values{ + "grant_type": {"client_credentials"}, + "client_id": {pc.clientId}, + "client_secret": {pc.clientSecret}, + }) + if err != nil { + return fmt.Errorf("failed to retrieve token: %w", err) + } + defer resp.Body.Close() + + var respJSON struct { + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + } + + if resp.StatusCode == http.StatusOK { + err = json.NewDecoder(resp.Body).Decode(&respJSON) + if err != nil { + return fmt.Errorf("failed to decode token response: %w", err) + } + pc.token.accessToken = respJSON.AccessToken + pc.token.expiresAt = pc.clk.Now().Add(time.Duration(respJSON.ExpiresIn) * time.Second) + return nil + } + + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("token request failed with status %d; while reading body: %w", resp.StatusCode, readErr) + } + return fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, body) +} + +// redactEmail replaces all occurrences of an email address in a response body +// with "[REDACTED]". +func redactEmail(body []byte, email string) string { + return string(bytes.ReplaceAll(body, []byte(email), []byte("[REDACTED]"))) +} + +// UpsertEmail sends an email to the Pardot Prospects endpoint, retrying up +// to 3 times with exponential backoff. +func (pc *PardotClient) UpsertEmail(email string) error { + var finalErr error + + for attempt := 1; attempt <= maxAttempts; attempt++ { + err := pc.updateToken() + if err != nil { + finalErr = err + } else { + break + } + + if attempt < maxAttempts { + time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + } + } + + if finalErr != nil { + return finalErr + } + + payload, err := json.Marshal(map[string]interface{}{ + "prospect": map[string]string{"email": email}, + }) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + + for attempt := 1; attempt <= maxAttempts; attempt++ { + req, err := http.NewRequest("POST", pc.prospectsURL, bytes.NewReader(payload)) + if err != nil { + return fmt.Errorf("failed to create upsert request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+pc.token.accessToken) + req.Header.Set("Pardot-Business-Unit-Id", pc.businessUnit) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + finalErr = fmt.Errorf("upsert request failed: %w", err) + } else { + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + finalErr = fmt.Errorf("upsert request returned status %d; while reading body: %w", resp.StatusCode, err) + } else { + finalErr = fmt.Errorf("upsert request returned status %d: %s", resp.StatusCode, redactEmail(body, email)) + } + } + + if attempt < maxAttempts { + time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + } + } + + return finalErr +} From fd3921080de9bf4cb0be825f6c4ddaf0b57065cb Mon Sep 17 00:00:00 2001 From: Samantha Date: Thu, 6 Feb 2025 16:19:43 -0500 Subject: [PATCH 02/18] email: Initial Exporter implementation --- email/exporter.go | 169 ++++++++ email/proto/exporter.pb.go | 155 ++++++++ email/proto/exporter.proto | 14 + email/proto/exporter_grpc.pb.go | 111 ++++++ go.mod | 1 + go.sum | 2 + vendor/golang.org/x/time/LICENSE | 27 ++ vendor/golang.org/x/time/PATENTS | 22 ++ vendor/golang.org/x/time/rate/rate.go | 426 +++++++++++++++++++++ vendor/golang.org/x/time/rate/sometimes.go | 67 ++++ vendor/modules.txt | 3 + 11 files changed, 997 insertions(+) create mode 100644 email/exporter.go create mode 100644 email/proto/exporter.pb.go create mode 100644 email/proto/exporter.proto create mode 100644 email/proto/exporter_grpc.pb.go create mode 100644 vendor/golang.org/x/time/LICENSE create mode 100644 vendor/golang.org/x/time/PATENTS create mode 100644 vendor/golang.org/x/time/rate/rate.go create mode 100644 vendor/golang.org/x/time/rate/sometimes.go diff --git a/email/exporter.go b/email/exporter.go new file mode 100644 index 00000000000..ad0cd89c23f --- /dev/null +++ b/email/exporter.go @@ -0,0 +1,169 @@ +package email + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/letsencrypt/boulder/core" + exporterpb "github.com/letsencrypt/boulder/email/proto" + berrors "github.com/letsencrypt/boulder/errors" + blog "github.com/letsencrypt/boulder/log" +) + +const ( + // Our daily limit is determined by the tier of our Salesforce account. For + // more information, see: + // https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits + + // ratelimit represents our daily limit of 50,000 requests. + rateLimit = 50000.0 / 86400.0 + + // numWorkers is the number of concurrent workers processing the email + // queue. We also use this as the burst limit for the rate limiter. + numWorkers = 5 + + // queueCap enforces a maximum stack size to prevent unbounded growth. + queueCap = 10000 +) + +var ErrQueueFull = errors.New("email export queue is full") + +// ExporterImpl implements the gRPC server and processes email exports. +type ExporterImpl struct { + sync.RWMutex + + toSend []string + client *PardotClient + log blog.Logger +} + +// NewExporterImpl creates a new ExporterImpl. +func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl { + impl := &ExporterImpl{ + toSend: make([]string, 0, queueCap), + client: client, + log: logger, + } + + queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "email_exporter_queue_length", + Help: "Current length of the email export queue", + }, func() float64 { + impl.RLock() + defer impl.RUnlock() + return float64(len(impl.toSend)) + }) + scope.MustRegister(queueGauge) + + return impl +} + +// UpsertEmails enqueues the provided email addresses. If the queue is near +// capacity, only enqueues as many emails as can fit. Returns ErrQueueFull if +// some or all emails were dropped. +func (impl *ExporterImpl) UpsertEmails(ctx context.Context, req *exporterpb.UpsertEmailsRequest) (*emptypb.Empty, error) { + if core.IsAnyNilOrZero(req, req.Emails) { + return nil, berrors.InternalServerError("Incomplete UpsertEmails request") + } + + impl.Lock() + defer impl.Unlock() + + spotsLeft := queueCap - len(impl.toSend) + if spotsLeft <= 0 { + return nil, ErrQueueFull + } + + toAdd := req.Emails + if len(toAdd) > spotsLeft { + toAdd = toAdd[:spotsLeft] + } + + impl.toSend = append(impl.toSend, toAdd...) + + if len(toAdd) < len(req.Emails) { + impl.log.Errf("Dropped %d emails due to queue capacity", len(req.Emails)-len(toAdd)) + return nil, ErrQueueFull + } + + return &emptypb.Empty{}, nil +} + +// takeEmail pops an email from the slice (LIFO). +func (impl *ExporterImpl) takeEmail() (string, bool) { + impl.Lock() + defer impl.Unlock() + + if len(impl.toSend) == 0 { + return "", false + } + + email := impl.toSend[len(impl.toSend)-1] + impl.toSend = impl.toSend[:len(impl.toSend)-1] + + return email, true +} + +// Start begins asynchronous processing of the email queue. When the parent +// daemonCtx is cancelled switches into a draining mode. +func (impl *ExporterImpl) Start(daemonCtx context.Context) { + limiter := rate.NewLimiter(rate.Limit(rateLimit), numWorkers) + var wg sync.WaitGroup + + worker := func() { + defer wg.Done() + draining := false + for { + if daemonCtx.Err() != nil { + draining = true + } + + if draining { + err := limiter.Wait(context.Background()) + if err != nil { + // This should never happen, we're using a background + // context. + impl.log.Errf("While draining: limiter wait error: %s", err) + } + } else { + err := limiter.Wait(daemonCtx) + if err != nil { + if errors.Is(err, context.Canceled) { + draining = true + continue + } + impl.log.Errf("While running: unexpected limiter wait error: %s", err) + continue + } + } + + email, ok := impl.takeEmail() + if !ok { + if draining { + return + } + // No emails to process, avoid busy-waiting. + time.Sleep(100 * time.Millisecond) + continue + } + + err := impl.client.UpsertEmail(email) + if err != nil { + impl.log.Errf("Failed to upsert email: %s", err) + } + } + } + + for range numWorkers { + wg.Add(1) + go worker() + } + <-daemonCtx.Done() + wg.Wait() +} diff --git a/email/proto/exporter.pb.go b/email/proto/exporter.pb.go new file mode 100644 index 00000000000..d028d8699e8 --- /dev/null +++ b/email/proto/exporter.pb.go @@ -0,0 +1,155 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.34.1 +// protoc v3.20.1 +// source: exporter.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type UpsertEmailsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Emails []string `protobuf:"bytes,1,rep,name=emails,proto3" json:"emails,omitempty"` +} + +func (x *UpsertEmailsRequest) Reset() { + *x = UpsertEmailsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_exporter_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpsertEmailsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpsertEmailsRequest) ProtoMessage() {} + +func (x *UpsertEmailsRequest) ProtoReflect() protoreflect.Message { + mi := &file_exporter_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpsertEmailsRequest.ProtoReflect.Descriptor instead. +func (*UpsertEmailsRequest) Descriptor() ([]byte, []int) { + return file_exporter_proto_rawDescGZIP(), []int{0} +} + +func (x *UpsertEmailsRequest) GetEmails() []string { + if x != nil { + return x.Emails + } + return nil +} + +var File_exporter_proto protoreflect.FileDescriptor + +var file_exporter_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x65, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2d, 0x0a, 0x13, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, + 0x61, 0x69, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x65, + 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x65, 0x6d, 0x61, + 0x69, 0x6c, 0x73, 0x32, 0x4e, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x12, + 0x42, 0x0a, 0x0c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x12, + 0x1a, 0x2e, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2e, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, + 0x61, 0x69, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x65, 0x74, 0x73, 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x2f, 0x62, 0x6f, + 0x75, 0x6c, 0x64, 0x65, 0x72, 0x2f, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_exporter_proto_rawDescOnce sync.Once + file_exporter_proto_rawDescData = file_exporter_proto_rawDesc +) + +func file_exporter_proto_rawDescGZIP() []byte { + file_exporter_proto_rawDescOnce.Do(func() { + file_exporter_proto_rawDescData = protoimpl.X.CompressGZIP(file_exporter_proto_rawDescData) + }) + return file_exporter_proto_rawDescData +} + +var file_exporter_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_exporter_proto_goTypes = []interface{}{ + (*UpsertEmailsRequest)(nil), // 0: email.UpsertEmailsRequest + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty +} +var file_exporter_proto_depIdxs = []int32{ + 0, // 0: email.Exporter.UpsertEmails:input_type -> email.UpsertEmailsRequest + 1, // 1: email.Exporter.UpsertEmails:output_type -> google.protobuf.Empty + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_exporter_proto_init() } +func file_exporter_proto_init() { + if File_exporter_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_exporter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpsertEmailsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_exporter_proto_rawDesc, + NumEnums: 0, + NumMessages: 1, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_exporter_proto_goTypes, + DependencyIndexes: file_exporter_proto_depIdxs, + MessageInfos: file_exporter_proto_msgTypes, + }.Build() + File_exporter_proto = out.File + file_exporter_proto_rawDesc = nil + file_exporter_proto_goTypes = nil + file_exporter_proto_depIdxs = nil +} diff --git a/email/proto/exporter.proto b/email/proto/exporter.proto new file mode 100644 index 00000000000..c2a29527014 --- /dev/null +++ b/email/proto/exporter.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package email; +option go_package = "github.com/letsencrypt/boulder/email/proto"; + +import "google/protobuf/empty.proto"; + +service Exporter { + rpc UpsertEmails (UpsertEmailsRequest) returns (google.protobuf.Empty); +} + +message UpsertEmailsRequest { + repeated string emails = 1; +} diff --git a/email/proto/exporter_grpc.pb.go b/email/proto/exporter_grpc.pb.go new file mode 100644 index 00000000000..3732398069d --- /dev/null +++ b/email/proto/exporter_grpc.pb.go @@ -0,0 +1,111 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.20.1 +// source: exporter.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + Exporter_UpsertEmails_FullMethodName = "/email.Exporter/UpsertEmails" +) + +// ExporterClient is the client API for Exporter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ExporterClient interface { + UpsertEmails(ctx context.Context, in *UpsertEmailsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) +} + +type exporterClient struct { + cc grpc.ClientConnInterface +} + +func NewExporterClient(cc grpc.ClientConnInterface) ExporterClient { + return &exporterClient{cc} +} + +func (c *exporterClient) UpsertEmails(ctx context.Context, in *UpsertEmailsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, Exporter_UpsertEmails_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ExporterServer is the server API for Exporter service. +// All implementations must embed UnimplementedExporterServer +// for forward compatibility +type ExporterServer interface { + UpsertEmails(context.Context, *UpsertEmailsRequest) (*emptypb.Empty, error) + mustEmbedUnimplementedExporterServer() +} + +// UnimplementedExporterServer must be embedded to have forward compatible implementations. +type UnimplementedExporterServer struct { +} + +func (UnimplementedExporterServer) UpsertEmails(context.Context, *UpsertEmailsRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpsertEmails not implemented") +} +func (UnimplementedExporterServer) mustEmbedUnimplementedExporterServer() {} + +// UnsafeExporterServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ExporterServer will +// result in compilation errors. +type UnsafeExporterServer interface { + mustEmbedUnimplementedExporterServer() +} + +func RegisterExporterServer(s grpc.ServiceRegistrar, srv ExporterServer) { + s.RegisterService(&Exporter_ServiceDesc, srv) +} + +func _Exporter_UpsertEmails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpsertEmailsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ExporterServer).UpsertEmails(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Exporter_UpsertEmails_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ExporterServer).UpsertEmails(ctx, req.(*UpsertEmailsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Exporter_ServiceDesc is the grpc.ServiceDesc for Exporter service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Exporter_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "email.Exporter", + HandlerType: (*ExporterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "UpsertEmails", + Handler: _Exporter_UpsertEmails_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "exporter.proto", +} diff --git a/go.mod b/go.mod index b0375f565ac..f4be00e5fe5 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,7 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/sys v0.29.0 // indirect + golang.org/x/time v0.10.0 golang.org/x/tools v0.22.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240903143218-8af14fe29dc1 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/go.sum b/go.sum index 9f3c973976e..82e4bac3917 100644 --- a/go.sum +++ b/go.sum @@ -422,6 +422,8 @@ golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/golang.org/x/time/LICENSE b/vendor/golang.org/x/time/LICENSE new file mode 100644 index 00000000000..2a7cf70da6e --- /dev/null +++ b/vendor/golang.org/x/time/LICENSE @@ -0,0 +1,27 @@ +Copyright 2009 The Go Authors. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google LLC nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/time/PATENTS b/vendor/golang.org/x/time/PATENTS new file mode 100644 index 00000000000..733099041f8 --- /dev/null +++ b/vendor/golang.org/x/time/PATENTS @@ -0,0 +1,22 @@ +Additional IP Rights Grant (Patents) + +"This implementation" means the copyrightable works distributed by +Google as part of the Go project. + +Google hereby grants to You a perpetual, worldwide, non-exclusive, +no-charge, royalty-free, irrevocable (except as stated in this section) +patent license to make, have made, use, offer to sell, sell, import, +transfer and otherwise run, modify and propagate the contents of this +implementation of Go, where such license applies only to those patent +claims, both currently owned or controlled by Google and acquired in +the future, licensable by Google that are necessarily infringed by this +implementation of Go. This grant does not include claims that would be +infringed only as a consequence of further modification of this +implementation. If you or your agent or exclusive licensee institute or +order or agree to the institution of patent litigation against any +entity (including a cross-claim or counterclaim in a lawsuit) alleging +that this implementation of Go or any code incorporated within this +implementation of Go constitutes direct or contributory patent +infringement, or inducement of patent infringement, then any patent +rights granted to you under this License for this implementation of Go +shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/time/rate/rate.go b/vendor/golang.org/x/time/rate/rate.go new file mode 100644 index 00000000000..ec5f0cdd0c0 --- /dev/null +++ b/vendor/golang.org/x/time/rate/rate.go @@ -0,0 +1,426 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package rate provides a rate limiter. +package rate + +import ( + "context" + "fmt" + "math" + "sync" + "time" +) + +// Limit defines the maximum frequency of some events. +// Limit is represented as number of events per second. +// A zero Limit allows no events. +type Limit float64 + +// Inf is the infinite rate limit; it allows all events (even if burst is zero). +const Inf = Limit(math.MaxFloat64) + +// Every converts a minimum time interval between events to a Limit. +func Every(interval time.Duration) Limit { + if interval <= 0 { + return Inf + } + return 1 / Limit(interval.Seconds()) +} + +// A Limiter controls how frequently events are allowed to happen. +// It implements a "token bucket" of size b, initially full and refilled +// at rate r tokens per second. +// Informally, in any large enough time interval, the Limiter limits the +// rate to r tokens per second, with a maximum burst size of b events. +// As a special case, if r == Inf (the infinite rate), b is ignored. +// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// +// The zero value is a valid Limiter, but it will reject all events. +// Use NewLimiter to create non-zero Limiters. +// +// Limiter has three main methods, Allow, Reserve, and Wait. +// Most callers should use Wait. +// +// Each of the three methods consumes a single token. +// They differ in their behavior when no token is available. +// If no token is available, Allow returns false. +// If no token is available, Reserve returns a reservation for a future token +// and the amount of time the caller must wait before using it. +// If no token is available, Wait blocks until one can be obtained +// or its associated context.Context is canceled. +// +// The methods AllowN, ReserveN, and WaitN consume n tokens. +// +// Limiter is safe for simultaneous use by multiple goroutines. +type Limiter struct { + mu sync.Mutex + limit Limit + burst int + tokens float64 + // last is the last time the limiter's tokens field was updated + last time.Time + // lastEvent is the latest time of a rate-limited event (past or future) + lastEvent time.Time +} + +// Limit returns the maximum overall event rate. +func (lim *Limiter) Limit() Limit { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.limit +} + +// Burst returns the maximum burst size. Burst is the maximum number of tokens +// that can be consumed in a single call to Allow, Reserve, or Wait, so higher +// Burst values allow more events to happen at once. +// A zero Burst allows no events, unless limit == Inf. +func (lim *Limiter) Burst() int { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.burst +} + +// TokensAt returns the number of tokens available at time t. +func (lim *Limiter) TokensAt(t time.Time) float64 { + lim.mu.Lock() + _, tokens := lim.advance(t) // does not mutate lim + lim.mu.Unlock() + return tokens +} + +// Tokens returns the number of tokens available now. +func (lim *Limiter) Tokens() float64 { + return lim.TokensAt(time.Now()) +} + +// NewLimiter returns a new Limiter that allows events up to rate r and permits +// bursts of at most b tokens. +func NewLimiter(r Limit, b int) *Limiter { + return &Limiter{ + limit: r, + burst: b, + tokens: float64(b), + } +} + +// Allow reports whether an event may happen now. +func (lim *Limiter) Allow() bool { + return lim.AllowN(time.Now(), 1) +} + +// AllowN reports whether n events may happen at time t. +// Use this method if you intend to drop / skip events that exceed the rate limit. +// Otherwise use Reserve or Wait. +func (lim *Limiter) AllowN(t time.Time, n int) bool { + return lim.reserveN(t, n, 0).ok +} + +// A Reservation holds information about events that are permitted by a Limiter to happen after a delay. +// A Reservation may be canceled, which may enable the Limiter to permit additional events. +type Reservation struct { + ok bool + lim *Limiter + tokens int + timeToAct time.Time + // This is the Limit at reservation time, it can change later. + limit Limit +} + +// OK returns whether the limiter can provide the requested number of tokens +// within the maximum wait time. If OK is false, Delay returns InfDuration, and +// Cancel does nothing. +func (r *Reservation) OK() bool { + return r.ok +} + +// Delay is shorthand for DelayFrom(time.Now()). +func (r *Reservation) Delay() time.Duration { + return r.DelayFrom(time.Now()) +} + +// InfDuration is the duration returned by Delay when a Reservation is not OK. +const InfDuration = time.Duration(math.MaxInt64) + +// DelayFrom returns the duration for which the reservation holder must wait +// before taking the reserved action. Zero duration means act immediately. +// InfDuration means the limiter cannot grant the tokens requested in this +// Reservation within the maximum wait time. +func (r *Reservation) DelayFrom(t time.Time) time.Duration { + if !r.ok { + return InfDuration + } + delay := r.timeToAct.Sub(t) + if delay < 0 { + return 0 + } + return delay +} + +// Cancel is shorthand for CancelAt(time.Now()). +func (r *Reservation) Cancel() { + r.CancelAt(time.Now()) +} + +// CancelAt indicates that the reservation holder will not perform the reserved action +// and reverses the effects of this Reservation on the rate limit as much as possible, +// considering that other reservations may have already been made. +func (r *Reservation) CancelAt(t time.Time) { + if !r.ok { + return + } + + r.lim.mu.Lock() + defer r.lim.mu.Unlock() + + if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(t) { + return + } + + // calculate tokens to restore + // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved + // after r was obtained. These tokens should not be restored. + restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) + if restoreTokens <= 0 { + return + } + // advance time to now + t, tokens := r.lim.advance(t) + // calculate new number of tokens + tokens += restoreTokens + if burst := float64(r.lim.burst); tokens > burst { + tokens = burst + } + // update state + r.lim.last = t + r.lim.tokens = tokens + if r.timeToAct == r.lim.lastEvent { + prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) + if !prevEvent.Before(t) { + r.lim.lastEvent = prevEvent + } + } +} + +// Reserve is shorthand for ReserveN(time.Now(), 1). +func (lim *Limiter) Reserve() *Reservation { + return lim.ReserveN(time.Now(), 1) +} + +// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. +// The Limiter takes this Reservation into account when allowing future events. +// The returned Reservation’s OK() method returns false if n exceeds the Limiter's burst size. +// Usage example: +// +// r := lim.ReserveN(time.Now(), 1) +// if !r.OK() { +// // Not allowed to act! Did you remember to set lim.burst to be > 0 ? +// return +// } +// time.Sleep(r.Delay()) +// Act() +// +// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. +// If you need to respect a deadline or cancel the delay, use Wait instead. +// To drop or skip events exceeding rate limit, use Allow instead. +func (lim *Limiter) ReserveN(t time.Time, n int) *Reservation { + r := lim.reserveN(t, n, InfDuration) + return &r +} + +// Wait is shorthand for WaitN(ctx, 1). +func (lim *Limiter) Wait(ctx context.Context) (err error) { + return lim.WaitN(ctx, 1) +} + +// WaitN blocks until lim permits n events to happen. +// It returns an error if n exceeds the Limiter's burst size, the Context is +// canceled, or the expected wait time exceeds the Context's Deadline. +// The burst limit is ignored if the rate limit is Inf. +func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { + // The test code calls lim.wait with a fake timer generator. + // This is the real timer generator. + newTimer := func(d time.Duration) (<-chan time.Time, func() bool, func()) { + timer := time.NewTimer(d) + return timer.C, timer.Stop, func() {} + } + + return lim.wait(ctx, n, time.Now(), newTimer) +} + +// wait is the internal implementation of WaitN. +func (lim *Limiter) wait(ctx context.Context, n int, t time.Time, newTimer func(d time.Duration) (<-chan time.Time, func() bool, func())) error { + lim.mu.Lock() + burst := lim.burst + limit := lim.limit + lim.mu.Unlock() + + if n > burst && limit != Inf { + return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst) + } + // Check if ctx is already cancelled + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // Determine wait limit + waitLimit := InfDuration + if deadline, ok := ctx.Deadline(); ok { + waitLimit = deadline.Sub(t) + } + // Reserve + r := lim.reserveN(t, n, waitLimit) + if !r.ok { + return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) + } + // Wait if necessary + delay := r.DelayFrom(t) + if delay == 0 { + return nil + } + ch, stop, advance := newTimer(delay) + defer stop() + advance() // only has an effect when testing + select { + case <-ch: + // We can proceed. + return nil + case <-ctx.Done(): + // Context was canceled before we could proceed. Cancel the + // reservation, which may permit other events to proceed sooner. + r.Cancel() + return ctx.Err() + } +} + +// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). +func (lim *Limiter) SetLimit(newLimit Limit) { + lim.SetLimitAt(time.Now(), newLimit) +} + +// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated +// or underutilized by those which reserved (using Reserve or Wait) but did not yet act +// before SetLimitAt was called. +func (lim *Limiter) SetLimitAt(t time.Time, newLimit Limit) { + lim.mu.Lock() + defer lim.mu.Unlock() + + t, tokens := lim.advance(t) + + lim.last = t + lim.tokens = tokens + lim.limit = newLimit +} + +// SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). +func (lim *Limiter) SetBurst(newBurst int) { + lim.SetBurstAt(time.Now(), newBurst) +} + +// SetBurstAt sets a new burst size for the limiter. +func (lim *Limiter) SetBurstAt(t time.Time, newBurst int) { + lim.mu.Lock() + defer lim.mu.Unlock() + + t, tokens := lim.advance(t) + + lim.last = t + lim.tokens = tokens + lim.burst = newBurst +} + +// reserveN is a helper method for AllowN, ReserveN, and WaitN. +// maxFutureReserve specifies the maximum reservation wait duration allowed. +// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. +func (lim *Limiter) reserveN(t time.Time, n int, maxFutureReserve time.Duration) Reservation { + lim.mu.Lock() + defer lim.mu.Unlock() + + if lim.limit == Inf { + return Reservation{ + ok: true, + lim: lim, + tokens: n, + timeToAct: t, + } + } + + t, tokens := lim.advance(t) + + // Calculate the remaining number of tokens resulting from the request. + tokens -= float64(n) + + // Calculate the wait duration + var waitDuration time.Duration + if tokens < 0 { + waitDuration = lim.limit.durationFromTokens(-tokens) + } + + // Decide result + ok := n <= lim.burst && waitDuration <= maxFutureReserve + + // Prepare reservation + r := Reservation{ + ok: ok, + lim: lim, + limit: lim.limit, + } + if ok { + r.tokens = n + r.timeToAct = t.Add(waitDuration) + + // Update state + lim.last = t + lim.tokens = tokens + lim.lastEvent = r.timeToAct + } + + return r +} + +// advance calculates and returns an updated state for lim resulting from the passage of time. +// lim is not changed. +// advance requires that lim.mu is held. +func (lim *Limiter) advance(t time.Time) (newT time.Time, newTokens float64) { + last := lim.last + if t.Before(last) { + last = t + } + + // Calculate the new number of tokens, due to time that passed. + elapsed := t.Sub(last) + delta := lim.limit.tokensFromDuration(elapsed) + tokens := lim.tokens + delta + if burst := float64(lim.burst); tokens > burst { + tokens = burst + } + return t, tokens +} + +// durationFromTokens is a unit conversion function from the number of tokens to the duration +// of time it takes to accumulate them at a rate of limit tokens per second. +func (limit Limit) durationFromTokens(tokens float64) time.Duration { + if limit <= 0 { + return InfDuration + } + + duration := (tokens / float64(limit)) * float64(time.Second) + + // Cap the duration to the maximum representable int64 value, to avoid overflow. + if duration > float64(math.MaxInt64) { + return InfDuration + } + + return time.Duration(duration) +} + +// tokensFromDuration is a unit conversion function from a time duration to the number of tokens +// which could be accumulated during that duration at a rate of limit tokens per second. +func (limit Limit) tokensFromDuration(d time.Duration) float64 { + if limit <= 0 { + return 0 + } + return d.Seconds() * float64(limit) +} diff --git a/vendor/golang.org/x/time/rate/sometimes.go b/vendor/golang.org/x/time/rate/sometimes.go new file mode 100644 index 00000000000..6ba99ddb67b --- /dev/null +++ b/vendor/golang.org/x/time/rate/sometimes.go @@ -0,0 +1,67 @@ +// Copyright 2022 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package rate + +import ( + "sync" + "time" +) + +// Sometimes will perform an action occasionally. The First, Every, and +// Interval fields govern the behavior of Do, which performs the action. +// A zero Sometimes value will perform an action exactly once. +// +// # Example: logging with rate limiting +// +// var sometimes = rate.Sometimes{First: 3, Interval: 10*time.Second} +// func Spammy() { +// sometimes.Do(func() { log.Info("here I am!") }) +// } +type Sometimes struct { + First int // if non-zero, the first N calls to Do will run f. + Every int // if non-zero, every Nth call to Do will run f. + Interval time.Duration // if non-zero and Interval has elapsed since f's last run, Do will run f. + + mu sync.Mutex + count int // number of Do calls + last time.Time // last time f was run +} + +// Do runs the function f as allowed by First, Every, and Interval. +// +// The model is a union (not intersection) of filters. The first call to Do +// always runs f. Subsequent calls to Do run f if allowed by First or Every or +// Interval. +// +// A non-zero First:N causes the first N Do(f) calls to run f. +// +// A non-zero Every:M causes every Mth Do(f) call, starting with the first, to +// run f. +// +// A non-zero Interval causes Do(f) to run f if Interval has elapsed since +// Do last ran f. +// +// Specifying multiple filters produces the union of these execution streams. +// For example, specifying both First:N and Every:M causes the first N Do(f) +// calls and every Mth Do(f) call, starting with the first, to run f. See +// Examples for more. +// +// If Do is called multiple times simultaneously, the calls will block and run +// serially. Therefore, Do is intended for lightweight operations. +// +// Because a call to Do may block until f returns, if f causes Do to be called, +// it will deadlock. +func (s *Sometimes) Do(f func()) { + s.mu.Lock() + defer s.mu.Unlock() + if s.count == 0 || + (s.First > 0 && s.count < s.First) || + (s.Every > 0 && s.count%s.Every == 0) || + (s.Interval > 0 && time.Since(s.last) >= s.Interval) { + f() + s.last = time.Now() + } + s.count++ +} diff --git a/vendor/modules.txt b/vendor/modules.txt index b0f479e9144..8194d1d23aa 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -409,6 +409,9 @@ golang.org/x/text/secure/bidirule golang.org/x/text/transform golang.org/x/text/unicode/bidi golang.org/x/text/unicode/norm +# golang.org/x/time v0.10.0 +## explicit; go 1.18 +golang.org/x/time/rate # golang.org/x/tools v0.22.0 ## explicit; go 1.19 golang.org/x/tools/go/gcexportdata From 9dc403307e4f756dfe679952faccc9ab45dbffe4 Mon Sep 17 00:00:00 2001 From: Samantha Date: Thu, 6 Feb 2025 16:20:46 -0500 Subject: [PATCH 03/18] cmd/email-exporter: Initial implementation --- cmd/email-exporter/main.go | 103 +++++++++++++++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 cmd/email-exporter/main.go diff --git a/cmd/email-exporter/main.go b/cmd/email-exporter/main.go new file mode 100644 index 00000000000..9f7bdb9852a --- /dev/null +++ b/cmd/email-exporter/main.go @@ -0,0 +1,103 @@ +package notmain + +import ( + "context" + "flag" + "os" + + "github.com/letsencrypt/boulder/cmd" + "github.com/letsencrypt/boulder/email" + emailpb "github.com/letsencrypt/boulder/email/proto" + bgrpc "github.com/letsencrypt/boulder/grpc" +) + +// Config holds the configuration for the email-exporter service. +type Config struct { + EmailExporter struct { + cmd.ServiceConfig + + // PardotBusinessUnit is the Pardot business unit to use. + PardotBusinessUnit string `validate:"required"` + + // ClientId is the OAuth API client ID provided by Salesforce. + ClientId cmd.PasswordConfig + + // ClientSecret is the OAuth API client secret provided by Salesforce. + ClientSecret cmd.PasswordConfig + + // SalesforceBaseURL is the base URL for the Salesforce API. (e.g., + // "https://login.salesforce.com") + SalesforceBaseURL string `validate:"required"` + + // PardotBaseURL is the base URL for the Pardot API. (e.g., + // "https://pi.pardot.com") + PardotBaseURL string `validate:"required"` + } + Syslog cmd.SyslogConfig + OpenTelemetry cmd.OpenTelemetryConfig +} + +func main() { + configFile := flag.String("config", "", "Path to configuration file") + grpcAddr := flag.String("addr", "", "gRPC listen address override") + debugAddr := flag.String("debug-addr", "", "Debug server address override") + flag.Parse() + + if *configFile == "" { + flag.Usage() + os.Exit(1) + } + + var c Config + err := cmd.ReadConfigFile(*configFile, &c) + cmd.FailOnError(err, "Reading JSON config file into config structure") + + if *grpcAddr != "" { + c.EmailExporter.ServiceConfig.GRPC.Address = *grpcAddr + } + if *debugAddr != "" { + c.EmailExporter.ServiceConfig.DebugAddr = *debugAddr + } + + scope, logger, oTelShutdown := cmd.StatsAndLogging(c.Syslog, c.OpenTelemetry, c.EmailExporter.ServiceConfig.DebugAddr) + defer oTelShutdown(context.Background()) + + logger.Info(cmd.VersionString()) + + clk := cmd.Clock() + clientId, err := c.EmailExporter.ClientId.Pass() + cmd.FailOnError(err, "Loading client ID") + clientSecret, err := c.EmailExporter.ClientSecret.Pass() + cmd.FailOnError(err, "Loading client secret") + + pardotClient, err := email.NewPardotClient( + clk, + c.EmailExporter.PardotBusinessUnit, + clientId, + clientSecret, + c.EmailExporter.SalesforceBaseURL, + c.EmailExporter.PardotBaseURL, + ) + cmd.FailOnError(err, "Creating Pardot client") + exporterServer := email.NewExporterImpl(pardotClient, scope, logger) + + daemonCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Begin asynchronous processing of the email queue. + go exporterServer.Start(daemonCtx) + + tlsConfig, err := c.EmailExporter.TLS.Load(scope) + cmd.FailOnError(err, "Loading TLS config") + + start, err := bgrpc.NewServer(c.EmailExporter.GRPC, logger).Add( + &emailpb.Exporter_ServiceDesc, exporterServer).Build(tlsConfig, scope, clk) + cmd.FailOnError(err, "Configuring gRPC server") + + // Start the gRPC service. + cmd.FailOnError(start(), "email-exporter gRPC service failed to start") +} + +func init() { + cmd.RegisterCommand("email-exporter", main, &cmd.ConfigValidator{Config: &Config{}}) +} From e8ff50054fd0d5121bfeb57e9cb2be50a73555d8 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 16:37:11 -0500 Subject: [PATCH 04/18] Adjust proto. --- email/proto/exporter.pb.go | 53 +++++++++++++++++---------------- email/proto/exporter.proto | 4 +-- email/proto/exporter_grpc.pb.go | 28 ++++++++--------- 3 files changed, 43 insertions(+), 42 deletions(-) diff --git a/email/proto/exporter.pb.go b/email/proto/exporter.pb.go index d028d8699e8..97b4328d1de 100644 --- a/email/proto/exporter.pb.go +++ b/email/proto/exporter.pb.go @@ -21,7 +21,7 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -type UpsertEmailsRequest struct { +type CreateProspectsRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields @@ -29,8 +29,8 @@ type UpsertEmailsRequest struct { Emails []string `protobuf:"bytes,1,rep,name=emails,proto3" json:"emails,omitempty"` } -func (x *UpsertEmailsRequest) Reset() { - *x = UpsertEmailsRequest{} +func (x *CreateProspectsRequest) Reset() { + *x = CreateProspectsRequest{} if protoimpl.UnsafeEnabled { mi := &file_exporter_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -38,13 +38,13 @@ func (x *UpsertEmailsRequest) Reset() { } } -func (x *UpsertEmailsRequest) String() string { +func (x *CreateProspectsRequest) String() string { return protoimpl.X.MessageStringOf(x) } -func (*UpsertEmailsRequest) ProtoMessage() {} +func (*CreateProspectsRequest) ProtoMessage() {} -func (x *UpsertEmailsRequest) ProtoReflect() protoreflect.Message { +func (x *CreateProspectsRequest) ProtoReflect() protoreflect.Message { mi := &file_exporter_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -56,12 +56,12 @@ func (x *UpsertEmailsRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use UpsertEmailsRequest.ProtoReflect.Descriptor instead. -func (*UpsertEmailsRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use CreateProspectsRequest.ProtoReflect.Descriptor instead. +func (*CreateProspectsRequest) Descriptor() ([]byte, []int) { return file_exporter_proto_rawDescGZIP(), []int{0} } -func (x *UpsertEmailsRequest) GetEmails() []string { +func (x *CreateProspectsRequest) GetEmails() []string { if x != nil { return x.Emails } @@ -74,18 +74,19 @@ var file_exporter_proto_rawDesc = []byte{ 0x0a, 0x0e, 0x65, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2d, 0x0a, 0x13, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, - 0x61, 0x69, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x65, - 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, 0x65, 0x6d, 0x61, - 0x69, 0x6c, 0x73, 0x32, 0x4e, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x72, 0x12, - 0x42, 0x0a, 0x0c, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x12, - 0x1a, 0x2e, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2e, 0x55, 0x70, 0x73, 0x65, 0x72, 0x74, 0x45, 0x6d, - 0x61, 0x69, 0x6c, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6c, 0x65, 0x74, 0x73, 0x65, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x2f, 0x62, 0x6f, - 0x75, 0x6c, 0x64, 0x65, 0x72, 0x2f, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x30, 0x0a, 0x16, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, + 0x6f, 0x73, 0x70, 0x65, 0x63, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, + 0x0a, 0x06, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x06, + 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x73, 0x32, 0x54, 0x0a, 0x08, 0x45, 0x78, 0x70, 0x6f, 0x72, 0x74, + 0x65, 0x72, 0x12, 0x48, 0x0a, 0x0f, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x73, + 0x70, 0x65, 0x63, 0x74, 0x73, 0x12, 0x1d, 0x2e, 0x65, 0x6d, 0x61, 0x69, 0x6c, 0x2e, 0x43, 0x72, + 0x65, 0x61, 0x74, 0x65, 0x50, 0x72, 0x6f, 0x73, 0x70, 0x65, 0x63, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x42, 0x2c, 0x5a, 0x2a, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x65, 0x74, 0x73, 0x65, + 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x2f, 0x62, 0x6f, 0x75, 0x6c, 0x64, 0x65, 0x72, 0x2f, 0x65, + 0x6d, 0x61, 0x69, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -102,12 +103,12 @@ func file_exporter_proto_rawDescGZIP() []byte { var file_exporter_proto_msgTypes = make([]protoimpl.MessageInfo, 1) var file_exporter_proto_goTypes = []interface{}{ - (*UpsertEmailsRequest)(nil), // 0: email.UpsertEmailsRequest - (*emptypb.Empty)(nil), // 1: google.protobuf.Empty + (*CreateProspectsRequest)(nil), // 0: email.CreateProspectsRequest + (*emptypb.Empty)(nil), // 1: google.protobuf.Empty } var file_exporter_proto_depIdxs = []int32{ - 0, // 0: email.Exporter.UpsertEmails:input_type -> email.UpsertEmailsRequest - 1, // 1: email.Exporter.UpsertEmails:output_type -> google.protobuf.Empty + 0, // 0: email.Exporter.CreateProspects:input_type -> email.CreateProspectsRequest + 1, // 1: email.Exporter.CreateProspects:output_type -> google.protobuf.Empty 1, // [1:2] is the sub-list for method output_type 0, // [0:1] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name @@ -122,7 +123,7 @@ func file_exporter_proto_init() { } if !protoimpl.UnsafeEnabled { file_exporter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*UpsertEmailsRequest); i { + switch v := v.(*CreateProspectsRequest); i { case 0: return &v.state case 1: diff --git a/email/proto/exporter.proto b/email/proto/exporter.proto index c2a29527014..62fed89a818 100644 --- a/email/proto/exporter.proto +++ b/email/proto/exporter.proto @@ -6,9 +6,9 @@ option go_package = "github.com/letsencrypt/boulder/email/proto"; import "google/protobuf/empty.proto"; service Exporter { - rpc UpsertEmails (UpsertEmailsRequest) returns (google.protobuf.Empty); + rpc CreateProspects (CreateProspectsRequest) returns (google.protobuf.Empty); } -message UpsertEmailsRequest { +message CreateProspectsRequest { repeated string emails = 1; } diff --git a/email/proto/exporter_grpc.pb.go b/email/proto/exporter_grpc.pb.go index 3732398069d..02a00fe51e8 100644 --- a/email/proto/exporter_grpc.pb.go +++ b/email/proto/exporter_grpc.pb.go @@ -20,14 +20,14 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - Exporter_UpsertEmails_FullMethodName = "/email.Exporter/UpsertEmails" + Exporter_CreateProspects_FullMethodName = "/email.Exporter/CreateProspects" ) // ExporterClient is the client API for Exporter service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type ExporterClient interface { - UpsertEmails(ctx context.Context, in *UpsertEmailsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + CreateProspects(ctx context.Context, in *CreateProspectsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } type exporterClient struct { @@ -38,10 +38,10 @@ func NewExporterClient(cc grpc.ClientConnInterface) ExporterClient { return &exporterClient{cc} } -func (c *exporterClient) UpsertEmails(ctx context.Context, in *UpsertEmailsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { +func (c *exporterClient) CreateProspects(ctx context.Context, in *CreateProspectsRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(emptypb.Empty) - err := c.cc.Invoke(ctx, Exporter_UpsertEmails_FullMethodName, in, out, cOpts...) + err := c.cc.Invoke(ctx, Exporter_CreateProspects_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -52,7 +52,7 @@ func (c *exporterClient) UpsertEmails(ctx context.Context, in *UpsertEmailsReque // All implementations must embed UnimplementedExporterServer // for forward compatibility type ExporterServer interface { - UpsertEmails(context.Context, *UpsertEmailsRequest) (*emptypb.Empty, error) + CreateProspects(context.Context, *CreateProspectsRequest) (*emptypb.Empty, error) mustEmbedUnimplementedExporterServer() } @@ -60,8 +60,8 @@ type ExporterServer interface { type UnimplementedExporterServer struct { } -func (UnimplementedExporterServer) UpsertEmails(context.Context, *UpsertEmailsRequest) (*emptypb.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method UpsertEmails not implemented") +func (UnimplementedExporterServer) CreateProspects(context.Context, *CreateProspectsRequest) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method CreateProspects not implemented") } func (UnimplementedExporterServer) mustEmbedUnimplementedExporterServer() {} @@ -76,20 +76,20 @@ func RegisterExporterServer(s grpc.ServiceRegistrar, srv ExporterServer) { s.RegisterService(&Exporter_ServiceDesc, srv) } -func _Exporter_UpsertEmails_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(UpsertEmailsRequest) +func _Exporter_CreateProspects_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(CreateProspectsRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ExporterServer).UpsertEmails(ctx, in) + return srv.(ExporterServer).CreateProspects(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: Exporter_UpsertEmails_FullMethodName, + FullMethod: Exporter_CreateProspects_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ExporterServer).UpsertEmails(ctx, req.(*UpsertEmailsRequest)) + return srv.(ExporterServer).CreateProspects(ctx, req.(*CreateProspectsRequest)) } return interceptor(ctx, in, info, handler) } @@ -102,8 +102,8 @@ var Exporter_ServiceDesc = grpc.ServiceDesc{ HandlerType: (*ExporterServer)(nil), Methods: []grpc.MethodDesc{ { - MethodName: "UpsertEmails", - Handler: _Exporter_UpsertEmails_Handler, + MethodName: "CreateProspects", + Handler: _Exporter_CreateProspects_Handler, }, }, Streams: []grpc.StreamDesc{}, From ccb76c2e640a224f161369b63913f07a1e794047 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 16:38:03 -0500 Subject: [PATCH 05/18] email/pardot: Address comments --- email/pardot.go | 96 ++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 50 deletions(-) diff --git a/email/pardot.go b/email/pardot.go index c77d37ad229..f637b3aea9e 100644 --- a/email/pardot.go +++ b/email/pardot.go @@ -19,8 +19,9 @@ const ( tokenPath = "/services/oauth2/token" // prospectsPath is the path to the Pardot v5 Prospects endpoint. This - // endpoint will create a new Prospect if one does not already exist. - prospectsPath = "/api/v5/prospects" + // endpoint will create a new Prospect if one does not already exist with + // the same email address. + prospectsPath = "/api/v5/objects/prospects" // maxAttempts is the maximum number of attempts to retry a request. maxAttempts = 3 @@ -58,7 +59,7 @@ type PardotClient struct { func NewPardotClient(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClient, error) { prospectsURL, err := url.JoinPath(pardotBaseURL, prospectsPath) if err != nil { - return nil, fmt.Errorf("failed to join upsert path: %w", err) + return nil, fmt.Errorf("failed to join prospects path: %w", err) } tokenURL, err := url.JoinPath(oauthbaseURL, tokenPath) if err != nil { @@ -102,21 +103,22 @@ func (pc *PardotClient) updateToken() error { ExpiresIn int `json:"expires_in"` } - if resp.StatusCode == http.StatusOK { - err = json.NewDecoder(resp.Body).Decode(&respJSON) - if err != nil { - return fmt.Errorf("failed to decode token response: %w", err) + if resp.StatusCode != http.StatusOK { + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return fmt.Errorf("token request failed with status %d; while reading body: %w", resp.StatusCode, readErr) } - pc.token.accessToken = respJSON.AccessToken - pc.token.expiresAt = pc.clk.Now().Add(time.Duration(respJSON.ExpiresIn) * time.Second) - return nil + return fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, body) } - body, readErr := io.ReadAll(resp.Body) - if readErr != nil { - return fmt.Errorf("token request failed with status %d; while reading body: %w", resp.StatusCode, readErr) + err = json.NewDecoder(resp.Body).Decode(&respJSON) + if err != nil { + return fmt.Errorf("failed to decode token response: %w", err) } - return fmt.Errorf("token request failed with status %d: %s", resp.StatusCode, body) + pc.token.accessToken = respJSON.AccessToken + pc.token.expiresAt = pc.clk.Now().Add(time.Duration(respJSON.ExpiresIn) * time.Second) + + return nil } // redactEmail replaces all occurrences of an email address in a response body @@ -125,39 +127,35 @@ func redactEmail(body []byte, email string) string { return string(bytes.ReplaceAll(body, []byte(email), []byte("[REDACTED]"))) } -// UpsertEmail sends an email to the Pardot Prospects endpoint, retrying up +// CreateProspect submits an email to the Pardot Prospects endpoint, retrying up // to 3 times with exponential backoff. -func (pc *PardotClient) UpsertEmail(email string) error { - var finalErr error - - for attempt := 1; attempt <= maxAttempts; attempt++ { - err := pc.updateToken() +func (pc *PardotClient) CreateProspect(email string) error { + var err error + for attempt := range maxAttempts { + time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + err = pc.updateToken() if err != nil { - finalErr = err - } else { - break - } - - if attempt < maxAttempts { - time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + continue } + break } - - if finalErr != nil { - return finalErr + if err != nil { + return fmt.Errorf("failed to update token: %w", err) } - payload, err := json.Marshal(map[string]interface{}{ - "prospect": map[string]string{"email": email}, - }) + payload, err := json.Marshal(map[string]string{"email": email}) if err != nil { return fmt.Errorf("failed to marshal payload: %w", err) } - for attempt := 1; attempt <= maxAttempts; attempt++ { + var finalErr error + for attempt := range maxAttempts { + time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + req, err := http.NewRequest("POST", pc.prospectsURL, bytes.NewReader(payload)) if err != nil { - return fmt.Errorf("failed to create upsert request: %w", err) + finalErr = fmt.Errorf("failed to create prospects request: %w", err) + continue } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+pc.token.accessToken) @@ -165,24 +163,22 @@ func (pc *PardotClient) UpsertEmail(email string) error { resp, err := http.DefaultClient.Do(req) if err != nil { - finalErr = fmt.Errorf("upsert request failed: %w", err) - } else { - defer resp.Body.Close() - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - return nil - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - finalErr = fmt.Errorf("upsert request returned status %d; while reading body: %w", resp.StatusCode, err) - } else { - finalErr = fmt.Errorf("upsert request returned status %d: %s", resp.StatusCode, redactEmail(body, email)) - } + finalErr = fmt.Errorf("prospects request failed: %w", err) + continue + } + + defer resp.Body.Close() + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return nil } - if attempt < maxAttempts { - time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) + body, err := io.ReadAll(resp.Body) + if err != nil { + finalErr = fmt.Errorf("prospects request returned status %d; while reading body: %w", resp.StatusCode, err) + continue } + finalErr = fmt.Errorf("prospects request returned status %d: %s", resp.StatusCode, redactEmail(body, email)) + continue } return finalErr From cdff8d1954087bda11fbe45d2e82b0cb27ed2bf2 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 16:39:22 -0500 Subject: [PATCH 06/18] email/exporter: Address comments --- email/exporter.go | 91 ++++++++++++++++++++++------------------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/email/exporter.go b/email/exporter.go index ad0cd89c23f..56c48b05197 100644 --- a/email/exporter.go +++ b/email/exporter.go @@ -36,19 +36,32 @@ var ErrQueueFull = errors.New("email export queue is full") // ExporterImpl implements the gRPC server and processes email exports. type ExporterImpl struct { + exporterpb.UnsafeExporterServer + sync.RWMutex + drainWG sync.WaitGroup - toSend []string - client *PardotClient - log blog.Logger + toSend []string + client *PardotClient + emailsHandledCounter prometheus.Counter + log blog.Logger } +var _ exporterpb.ExporterServer = (*ExporterImpl)(nil) + // NewExporterImpl creates a new ExporterImpl. func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl { + emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "email_exporter_emails_handled", + Help: "Total number of emails handled by the email exporter", + }) + scope.MustRegister(emailsHandledCounter) + impl := &ExporterImpl{ - toSend: make([]string, 0, queueCap), - client: client, - log: logger, + toSend: make([]string, 0, queueCap), + client: client, + emailsHandledCounter: emailsHandledCounter, + log: logger, } queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ @@ -64,10 +77,10 @@ func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger b return impl } -// UpsertEmails enqueues the provided email addresses. If the queue is near +// CreateProspects enqueues the provided email addresses. If the queue is near // capacity, only enqueues as many emails as can fit. Returns ErrQueueFull if // some or all emails were dropped. -func (impl *ExporterImpl) UpsertEmails(ctx context.Context, req *exporterpb.UpsertEmailsRequest) (*emptypb.Empty, error) { +func (impl *ExporterImpl) CreateProspects(ctx context.Context, req *exporterpb.CreateProspectsRequest) (*emptypb.Empty, error) { if core.IsAnyNilOrZero(req, req.Emails) { return nil, berrors.InternalServerError("Incomplete UpsertEmails request") } @@ -76,26 +89,14 @@ func (impl *ExporterImpl) UpsertEmails(ctx context.Context, req *exporterpb.Upse defer impl.Unlock() spotsLeft := queueCap - len(impl.toSend) - if spotsLeft <= 0 { - return nil, ErrQueueFull - } - - toAdd := req.Emails - if len(toAdd) > spotsLeft { - toAdd = toAdd[:spotsLeft] - } - - impl.toSend = append(impl.toSend, toAdd...) - - if len(toAdd) < len(req.Emails) { - impl.log.Errf("Dropped %d emails due to queue capacity", len(req.Emails)-len(toAdd)) + if spotsLeft < len(req.Emails) { return nil, ErrQueueFull } + impl.toSend = append(impl.toSend, req.Emails...) return &emptypb.Empty{}, nil } -// takeEmail pops an email from the slice (LIFO). func (impl *ExporterImpl) takeEmail() (string, bool) { impl.Lock() defer impl.Unlock() @@ -111,41 +112,31 @@ func (impl *ExporterImpl) takeEmail() (string, bool) { } // Start begins asynchronous processing of the email queue. When the parent -// daemonCtx is cancelled switches into a draining mode. +// daemonCtx is cancelled the queue will be drained and the workers will exit. func (impl *ExporterImpl) Start(daemonCtx context.Context) { limiter := rate.NewLimiter(rate.Limit(rateLimit), numWorkers) - var wg sync.WaitGroup worker := func() { - defer wg.Done() - draining := false + defer impl.drainWG.Done() for { - if daemonCtx.Err() != nil { - draining = true + if daemonCtx.Err() != nil && len(impl.toSend) == 0 { + return } - if draining { - err := limiter.Wait(context.Background()) - if err != nil { - // This should never happen, we're using a background - // context. - impl.log.Errf("While draining: limiter wait error: %s", err) - } - } else { - err := limiter.Wait(daemonCtx) - if err != nil { - if errors.Is(err, context.Canceled) { - draining = true - continue - } - impl.log.Errf("While running: unexpected limiter wait error: %s", err) + err := limiter.Wait(daemonCtx) + if err != nil { + if errors.Is(err, context.Canceled) { + // Keep processing emails until the queue is drained. continue } + impl.log.Errf("Unexpected limiter wait error: %s", err) + continue } email, ok := impl.takeEmail() if !ok { - if draining { + if daemonCtx.Err() != nil { + // Exit immediately. return } // No emails to process, avoid busy-waiting. @@ -153,17 +144,21 @@ func (impl *ExporterImpl) Start(daemonCtx context.Context) { continue } - err := impl.client.UpsertEmail(email) + err = impl.client.CreateProspect(email) if err != nil { impl.log.Errf("Failed to upsert email: %s", err) } + impl.emailsHandledCounter.Inc() } } for range numWorkers { - wg.Add(1) + impl.drainWG.Add(1) go worker() } - <-daemonCtx.Done() - wg.Wait() +} + +// Drain blocks until all workers have finished processing the email queue. +func (impl *ExporterImpl) Drain() { + impl.drainWG.Wait() } From da59011ce21c6803c9fb9affa1032a4999db57fe Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 16:39:46 -0500 Subject: [PATCH 07/18] cmd/email-exporter: Address comments --- cmd/email-exporter/main.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/cmd/email-exporter/main.go b/cmd/email-exporter/main.go index 9f7bdb9852a..c546a48de44 100644 --- a/cmd/email-exporter/main.go +++ b/cmd/email-exporter/main.go @@ -81,21 +81,20 @@ func main() { cmd.FailOnError(err, "Creating Pardot client") exporterServer := email.NewExporterImpl(pardotClient, scope, logger) - daemonCtx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Begin asynchronous processing of the email queue. - go exporterServer.Start(daemonCtx) - tlsConfig, err := c.EmailExporter.TLS.Load(scope) cmd.FailOnError(err, "Loading TLS config") + daemonCtx, shutdownExporterServer := context.WithCancel(context.Background()) + go exporterServer.Start(daemonCtx) + start, err := bgrpc.NewServer(c.EmailExporter.GRPC, logger).Add( &emailpb.Exporter_ServiceDesc, exporterServer).Build(tlsConfig, scope, clk) cmd.FailOnError(err, "Configuring gRPC server") - // Start the gRPC service. - cmd.FailOnError(start(), "email-exporter gRPC service failed to start") + err = start() + shutdownExporterServer() + exporterServer.Drain() + cmd.FailOnError(err, "email-exporter gRPC service failed to start") } func init() { From 289423267550b3558d88ee3e87e0df7af595e065 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 17:00:33 -0500 Subject: [PATCH 08/18] cmd: Add email-exporter subcommand --- cmd/boulder/main.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/boulder/main.go b/cmd/boulder/main.go index fcaeb7c0f3e..dcacdf6208a 100644 --- a/cmd/boulder/main.go +++ b/cmd/boulder/main.go @@ -19,6 +19,7 @@ import ( _ "github.com/letsencrypt/boulder/cmd/crl-checker" _ "github.com/letsencrypt/boulder/cmd/crl-storer" _ "github.com/letsencrypt/boulder/cmd/crl-updater" + _ "github.com/letsencrypt/boulder/cmd/email-exporter" _ "github.com/letsencrypt/boulder/cmd/expiration-mailer" _ "github.com/letsencrypt/boulder/cmd/id-exporter" _ "github.com/letsencrypt/boulder/cmd/log-validator" From dfce9771c7433f72f890f74fe5c92d3e0c52b129 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 17:04:04 -0500 Subject: [PATCH 09/18] test: Add email-exporter configuration --- test/certs/generate.sh | 2 +- test/config-next/email-exporter.json | 39 +++++++++++++++++++++++++++ test/config/email-exporter.json | 39 +++++++++++++++++++++++++++ test/consul/config.hcl | 8 ++++++ test/secrets/salesforce_client_id | 1 + test/secrets/salesforce_client_secret | 1 + 6 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 test/config-next/email-exporter.json create mode 100644 test/config/email-exporter.json create mode 100644 test/secrets/salesforce_client_id create mode 100644 test/secrets/salesforce_client_secret diff --git a/test/certs/generate.sh b/test/certs/generate.sh index 3f0e03d2c37..fbcaa96288b 100755 --- a/test/certs/generate.sh +++ b/test/certs/generate.sh @@ -42,7 +42,7 @@ ipki() ( # Used by Boulder gRPC services as both server and client mTLS certificates. for SERVICE in admin expiration-mailer ocsp-responder consul \ wfe akamai-purger bad-key-revoker crl-updater crl-storer \ - health-checker rocsp-tool sfe; do + health-checker rocsp-tool sfe email-exporter; do minica -domains "${SERVICE}.boulder" & done diff --git a/test/config-next/email-exporter.json b/test/config-next/email-exporter.json new file mode 100644 index 00000000000..7874d0bc077 --- /dev/null +++ b/test/config-next/email-exporter.json @@ -0,0 +1,39 @@ +{ + "emailExporter": { + "debugAddr": ":8114", + "grpc": { + "maxConnectionAge": "30s", + "address": ":9603", + "services": { + "email.Exporter": { + "clientNames": [ + "wfe.boulder" + ] + }, + "grpc.health.v1.Health": { + "clientNames": [ + "health-checker.boulder" + ] + } + } + }, + "tls": { + "caCertFile": "test/certs/ipki/minica.pem", + "certFile": "test/certs/ipki/email-exporter.boulder/cert.pem", + "keyFile": "test/certs/ipki/email-exporter.boulder/key.pem" + }, + "pardotBusinessUnit": "test-business-unit", + "clientId": { + "passwordFile": "test/secrets/salesforce_client_id" + }, + "clientSecret": { + "passwordFile": "test/secrets/salesforce_client_secret" + }, + "salesforceBaseURL": "http://localhost:9601", + "pardotBaseURL": "http://localhost:9602" + }, + "syslog": { + "stdoutlevel": 6, + "sysloglevel": -1 + } +} diff --git a/test/config/email-exporter.json b/test/config/email-exporter.json new file mode 100644 index 00000000000..7874d0bc077 --- /dev/null +++ b/test/config/email-exporter.json @@ -0,0 +1,39 @@ +{ + "emailExporter": { + "debugAddr": ":8114", + "grpc": { + "maxConnectionAge": "30s", + "address": ":9603", + "services": { + "email.Exporter": { + "clientNames": [ + "wfe.boulder" + ] + }, + "grpc.health.v1.Health": { + "clientNames": [ + "health-checker.boulder" + ] + } + } + }, + "tls": { + "caCertFile": "test/certs/ipki/minica.pem", + "certFile": "test/certs/ipki/email-exporter.boulder/cert.pem", + "keyFile": "test/certs/ipki/email-exporter.boulder/key.pem" + }, + "pardotBusinessUnit": "test-business-unit", + "clientId": { + "passwordFile": "test/secrets/salesforce_client_id" + }, + "clientSecret": { + "passwordFile": "test/secrets/salesforce_client_secret" + }, + "salesforceBaseURL": "http://localhost:9601", + "pardotBaseURL": "http://localhost:9602" + }, + "syslog": { + "stdoutlevel": 6, + "sysloglevel": -1 + } +} diff --git a/test/consul/config.hcl b/test/consul/config.hcl index d00af2681de..2d5ea5f2ca1 100644 --- a/test/consul/config.hcl +++ b/test/consul/config.hcl @@ -33,6 +33,14 @@ services { tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution. } +services { + id = "email-exporter-a" + name = "email-exporter" + address = "10.77.77.77" + port = 9603 + tags = ["tcp"] // Required for SRV RR support in gRPC DNS resolution. +} + services { id = "boulder-a" name = "boulder" diff --git a/test/secrets/salesforce_client_id b/test/secrets/salesforce_client_id new file mode 100644 index 00000000000..0020d21da80 --- /dev/null +++ b/test/secrets/salesforce_client_id @@ -0,0 +1 @@ +test-client-id diff --git a/test/secrets/salesforce_client_secret b/test/secrets/salesforce_client_secret new file mode 100644 index 00000000000..dec23d7014d --- /dev/null +++ b/test/secrets/salesforce_client_secret @@ -0,0 +1 @@ +you-shall-not-pass From 5b74c9681b814194c2d3dc8890c010e0e8f825a0 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 17:24:20 -0500 Subject: [PATCH 10/18] test/pardot-test-srv: Initial implementation of mock API server --- Makefile | 2 +- test/config-next/pardot-test-srv.json | 7 + test/config/pardot-test-srv.json | 7 + test/pardot-test-srv/README.md | 96 +++++++++ test/pardot-test-srv/main.go | 279 ++++++++++++++++++++++++++ tools/make-assets.sh | 2 +- 6 files changed, 391 insertions(+), 2 deletions(-) create mode 100644 test/config-next/pardot-test-srv.json create mode 100644 test/config/pardot-test-srv.json create mode 100644 test/pardot-test-srv/README.md create mode 100644 test/pardot-test-srv/main.go diff --git a/Makefile b/Makefile index 9522b89a72f..d93aae1830d 100644 --- a/Makefile +++ b/Makefile @@ -6,7 +6,7 @@ VERSION ?= 1.0.0 EPOCH ?= 1 MAINTAINER ?= "Community" -CMDS = admin boulder ceremony ct-test-srv +CMDS = admin boulder ceremony ct-test-srv pardot-test-srv CMD_BINS = $(addprefix bin/, $(CMDS) ) OBJECTS = $(CMD_BINS) diff --git a/test/config-next/pardot-test-srv.json b/test/config-next/pardot-test-srv.json new file mode 100644 index 00000000000..c0b494abe1c --- /dev/null +++ b/test/config-next/pardot-test-srv.json @@ -0,0 +1,7 @@ +{ + "oauthPort": 9601, + "pardotPort": 9602, + "expectedClientId": "test-client-id", + "expectedClientSecret": "you-shall-not-pass", + "developmentMode" : true +} diff --git a/test/config/pardot-test-srv.json b/test/config/pardot-test-srv.json new file mode 100644 index 00000000000..c0b494abe1c --- /dev/null +++ b/test/config/pardot-test-srv.json @@ -0,0 +1,7 @@ +{ + "oauthPort": 9601, + "pardotPort": 9602, + "expectedClientId": "test-client-id", + "expectedClientSecret": "you-shall-not-pass", + "developmentMode" : true +} diff --git a/test/pardot-test-srv/README.md b/test/pardot-test-srv/README.md new file mode 100644 index 00000000000..b982828d10f --- /dev/null +++ b/test/pardot-test-srv/README.md @@ -0,0 +1,96 @@ +# pardot-test-srv + +`pardot-test-srv` is a lightweight mock server for integration testing with the Salesforce Pardot API and OAuth authentication. + +## Features + +- Simulates Salesforce OAuth2 authentication with configurable credentials. +- Issues randomly generated Bearer tokens for API authentication. +- Provides a mock Pardot API that validates Bearer tokens and requires a business unit header. +- Exposes an endpoint to query submitted emails by business unit (in development mode). +- Allows forced Bearer token expiration for testing authentication flows (in development mode). + +## Usage + +Run `pardot-test-srv` with a configuration file: +```sh +go run test/partdot-test-srv/main.go +``` + +### Example Configuration (`config.json`) + +```json +{ + "oAuthPort": 8080, + "pardotPort": 9090, + "expectedClientID": "my-client-id", + "expectedClientSecret": "my-client-secret", + "developmentMode": false +} +``` + +## API Endpoints + +### OAuth Token Request + +**Endpoint:** `POST /services/oauth2/token` +**Parameters (Form Data):** +- `client_id` +- `client_secret` + +**Response:** +```json +{ + "access_token": "randomly-generated-token", + "token_type": "Bearer", + "expires_in": "3600" +} +``` + +### Create Prospect + +**Endpoint:** `POST /api/v5/objects/prospects` +**Headers:** +- `Authorization: Bearer ` +- `Pardot-Business-Unit-Id: ` + +**Payload Example:** +```json +{ + "email": "email@example.com" +} +``` + +**Response:** +```json +{ + "status": "success" +} +``` + +### Query Submitted Prospects (Development Mode Only) + +**Endpoint:** `GET /query_prospects` +**Query Parameter:** +- `pardot_business_unit_id=` + +**Response:** +```json +{ + "prospects": [ + "email1@example.com", + "email2@example.com" + ] +} +``` + +### Force Token Expiration (Development Mode Only) + +**Endpoint:** `GET /expire_token` + +**Response:** +```json +{ + "status": "token expired" +} +``` \ No newline at end of file diff --git a/test/pardot-test-srv/main.go b/test/pardot-test-srv/main.go new file mode 100644 index 00000000000..d022e4bf213 --- /dev/null +++ b/test/pardot-test-srv/main.go @@ -0,0 +1,279 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "io" + "log" + "maps" + "math/rand/v2" + "net/http" + "os" + "slices" + "sync" + "time" + + "github.com/letsencrypt/boulder/cmd" +) + +type config struct { + // OAuthPort is the port on which the OAuth server will listen. + OAuthPort int + + // PardotPort is the port on which the Pardot server will listen. + PardotPort int + + // ExpectedClientID is the client ID that the server expects to receive in + // requests to the /services/oauth2/token endpoint. + ExpectedClientID string + + // ExpectedClientSecret is the client secret that the server expects to + // receive in requests to the /services/oauth2/token endpoint. + ExpectedClientSecret string + + // DevelopmentMode is a flag that indicates whether the server is running in + // development mode. In development mode, the server will: + // - provide an endpoint to expire the current token, + // - store prospects in memory, and + // - provide an endpoint to query the stored prospects. + // + // Only set this flag to true if you are running the server for testing + // (e.g. within docker-compose) or local development purposes. + DevelopmentMode bool +} + +type token struct { + sync.Mutex + + // active is the currently active token. If this field is empty, it means + // that the token has been manually expired. + active string +} + +type prospectsByBusinessUnitId map[string]map[string]struct{} + +type prospects struct { + sync.RWMutex + + // byBusinessUnitId is a map from business unit ID to a unique set of + // prospects. Prospects are only stored in memory if the server is running + // in development mode. + byBusinessUnitId prospectsByBusinessUnitId +} + +type testServer struct { + expectedClientID string + expectedClientSecret string + token token + prospects prospects + developmentMode bool +} + +// generateToken generates a new random token. +func generateToken() string { + bytes := make([]byte, 32) + for i := range bytes { + bytes[i] = byte(rand.IntN(256)) + } + return fmt.Sprintf("%x", bytes) +} + +func (ts *testServer) getTokenHandler(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + http.Error(w, "Invalid request", http.StatusBadRequest) + return + } + + clientID := r.FormValue("client_id") + clientSecret := r.FormValue("client_secret") + + if clientID != ts.expectedClientID || clientSecret != ts.expectedClientSecret { + http.Error(w, "Invalid credentials", http.StatusUnauthorized) + return + } + + ts.token.Lock() + defer ts.token.Unlock() + if ts.token.active == "" { + ts.token.active = generateToken() + } + + response := map[string]interface{}{ + "access_token": ts.token.active, + "token_type": "Bearer", + "expires_in": 3600, + } + + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(response) + if err != nil { + log.Printf("Failed to encode token response: %v", err) + http.Error(w, "Failed to encode token response", http.StatusInternalServerError) + } +} + +func (ts *testServer) expireTokenHandler(w http.ResponseWriter, r *http.Request) { + ts.token.Lock() + ts.token.active = "" + ts.token.Unlock() + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(map[string]string{"status": "token expired"}) + if err != nil { + log.Printf("Failed to encode expire token response: %v", err) + http.Error(w, "Failed to encode expire token response", http.StatusInternalServerError) + } +} + +func (ts *testServer) createProspectsHandler(w http.ResponseWriter, r *http.Request) { + ts.token.Lock() + validToken := ts.token.active + ts.token.Unlock() + + token := r.Header.Get("Authorization") + businessUnitId := r.Header.Get("Pardot-Business-Unit-Id") + + if token != "Bearer "+validToken { + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + body, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read request body", http.StatusInternalServerError) + return + } + + type prospectData struct { + Email string `json:"email"` + } + + var prospect prospectData + err = json.Unmarshal(body, &prospect) + if err != nil { + http.Error(w, "Failed to parse request body", http.StatusBadRequest) + return + } + + if prospect.Email == "" { + http.Error(w, "Missing 'email' field in request body", http.StatusBadRequest) + return + } + + if ts.developmentMode { + ts.prospects.Lock() + _, exists := ts.prospects.byBusinessUnitId[businessUnitId] + if !exists { + ts.prospects.byBusinessUnitId[businessUnitId] = make(map[string]struct{}) + } + ts.prospects.byBusinessUnitId[businessUnitId][prospect.Email] = struct{}{} + ts.prospects.Unlock() + } + + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(map[string]string{"status": "success"}) + if err != nil { + http.Error(w, "Failed to encode response", http.StatusInternalServerError) + return + } +} + +func (ts *testServer) queryProspectsHandler(w http.ResponseWriter, r *http.Request) { + buid := r.URL.Query().Get("pardot_business_unit_id") + if buid == "" { + http.Error(w, "Missing 'pardot_business_unit_id' parameter", http.StatusBadRequest) + return + } + + ts.prospects.RLock() + prospectsForBuid, exists := ts.prospects.byBusinessUnitId[buid] + ts.prospects.RUnlock() + + var requested []string + if exists { + for p := range maps.Keys(prospectsForBuid) { + requested = append(requested, p) + } + + } + slices.Sort(requested) + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(map[string]interface{}{"prospects": requested}) + if err != nil { + log.Printf("Failed to encode prospects query response: %v", err) + http.Error(w, "Failed to encode prospects query response", http.StatusInternalServerError) + } +} + +func main() { + configFile := flag.String("config", "", "Path to configuration file") + flag.Parse() + + if *configFile == "" { + flag.Usage() + os.Exit(1) + } + + file, err := os.Open(*configFile) + cmd.FailOnError(err, "Failed to open configuration file") + defer file.Close() + decoder := json.NewDecoder(file) + var c config + err = decoder.Decode(&c) + cmd.FailOnError(err, "Failed to decode configuration file") + + ts := &testServer{ + expectedClientID: c.ExpectedClientID, + expectedClientSecret: c.ExpectedClientSecret, + prospects: prospects{ + byBusinessUnitId: make(prospectsByBusinessUnitId), + }, + token: token{ + active: generateToken(), + }, + developmentMode: c.DevelopmentMode, + } + + // Oauth API + oauthMux := http.NewServeMux() + oauthMux.HandleFunc("/services/oauth2/token", ts.getTokenHandler) + if c.DevelopmentMode { + oauthMux.HandleFunc("/expire_token", ts.expireTokenHandler) + } + oauthServer := &http.Server{ + Addr: fmt.Sprintf(":%d", c.OAuthPort), + Handler: oauthMux, + ReadTimeout: 30 * time.Second, + } + log.Printf("pardot-test-srv oauth server running on port %d", c.OAuthPort) + go func() { + err := oauthServer.ListenAndServe() + if err != nil { + log.Fatalf("Failed to start OAuth server: %s", err) + } + }() + + // Pardot API + pardotMux := http.NewServeMux() + pardotMux.HandleFunc("/api/v5/objects/prospects", ts.createProspectsHandler) + if c.DevelopmentMode { + pardotMux.HandleFunc("/query_prospects", ts.queryProspectsHandler) + } + pardotServer := &http.Server{ + Addr: fmt.Sprintf(":%d", c.PardotPort), + Handler: pardotMux, + ReadTimeout: 30 * time.Second, + } + log.Printf("pardot-test-srv pardot server running on port %d", c.PardotPort) + go func() { + err := pardotServer.ListenAndServe() + if err != nil { + log.Fatalf("Failed to start Pardot server: %s", err) + } + }() + + cmd.WaitForSignal() +} diff --git a/tools/make-assets.sh b/tools/make-assets.sh index ff1c4e10439..3e9b0c1c94c 100755 --- a/tools/make-assets.sh +++ b/tools/make-assets.sh @@ -41,7 +41,7 @@ TARGET="${BUILD}/opt/boulder" COMMIT_ID="$(git rev-parse --short=8 HEAD)" mkdir -p "${TARGET}/bin" -for NAME in admin boulder ceremony ct-test-srv ; do +for NAME in admin boulder ceremony ct-test-srv pardot-test-srv ; do cp -a "bin/${NAME}" "${TARGET}/bin/" done From e40db1c236ea6182f3cea600184c0d9f23011843 Mon Sep 17 00:00:00 2001 From: Samantha Date: Wed, 12 Feb 2025 17:24:56 -0500 Subject: [PATCH 11/18] startservers: Add email-exporter and pardot-test-srv --- test/startservers.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/test/startservers.py b/test/startservers.py index 2d94c53e5df..9b029caeb28 100644 --- a/test/startservers.py +++ b/test/startservers.py @@ -125,10 +125,20 @@ 8112, None, None, ('./bin/boulder', 'nonce-service', '--config', os.path.join(config_dir, 'nonce-b.json'), '--addr', '10.77.77.77:9401', '--debug-addr', ':8112',), None), + Service('pardot-test-srv', + # Uses port 9601 to mock Salesforce OAuth2 token API and 9602 to mock + # the Pardot API. + 9601, None, None, + ('./bin/pardot-test-srv', '--config', os.path.join(config_dir, 'pardot-test-srv.json'),), + None), + Service('email-exporter', + 8114, None, None, + ('./bin/boulder', 'email-exporter', '--config', os.path.join(config_dir, 'email-exporter.json'), '--addr', ':9603', '--debug-addr', ':8114'), + ('pardot-test-srv',)), Service('boulder-wfe2', 4001, None, None, ('./bin/boulder', 'boulder-wfe2', '--config', os.path.join(config_dir, 'wfe2.json'), '--addr', ':4001', '--tls-addr', ':4431', '--debug-addr', ':8013'), - ('boulder-ra-1', 'boulder-ra-2', 'boulder-sa-1', 'boulder-sa-2', 'nonce-service-taro-1', 'nonce-service-taro-2', 'nonce-service-zinc-1')), + ('boulder-ra-1', 'boulder-ra-2', 'boulder-sa-1', 'boulder-sa-2', 'nonce-service-taro-1', 'nonce-service-taro-2', 'nonce-service-zinc-1', 'email-exporter')), Service('sfe', 4003, None, None, ('./bin/boulder', 'sfe', '--config', os.path.join(config_dir, 'sfe.json'), '--addr', ':4003', '--debug-addr', ':8015'), From 6b60eb3057e03ab2c11f4ef43c437b4fa7400b88 Mon Sep 17 00:00:00 2001 From: Samantha Date: Thu, 13 Feb 2025 12:49:38 -0500 Subject: [PATCH 12/18] Satisfy Typos --- test/pardot-test-srv/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/pardot-test-srv/main.go b/test/pardot-test-srv/main.go index d022e4bf213..276cc0803e4 100644 --- a/test/pardot-test-srv/main.go +++ b/test/pardot-test-srv/main.go @@ -181,27 +181,27 @@ func (ts *testServer) createProspectsHandler(w http.ResponseWriter, r *http.Requ } func (ts *testServer) queryProspectsHandler(w http.ResponseWriter, r *http.Request) { - buid := r.URL.Query().Get("pardot_business_unit_id") - if buid == "" { + businessUnitIdHeader := r.URL.Query().Get("pardot_business_unit_id") + if businessUnitIdHeader == "" { http.Error(w, "Missing 'pardot_business_unit_id' parameter", http.StatusBadRequest) return } ts.prospects.RLock() - prospectsForBuid, exists := ts.prospects.byBusinessUnitId[buid] + requestedProspects, exists := ts.prospects.byBusinessUnitId[businessUnitIdHeader] ts.prospects.RUnlock() - var requested []string + var respProspects []string if exists { - for p := range maps.Keys(prospectsForBuid) { - requested = append(requested, p) + for p := range maps.Keys(requestedProspects) { + respProspects = append(respProspects, p) } } - slices.Sort(requested) + slices.Sort(respProspects) w.Header().Set("Content-Type", "application/json") - err := json.NewEncoder(w).Encode(map[string]interface{}{"prospects": requested}) + err := json.NewEncoder(w).Encode(map[string]interface{}{"prospects": respProspects}) if err != nil { log.Printf("Failed to encode prospects query response: %v", err) http.Error(w, "Failed to encode prospects query response", http.StatusInternalServerError) From 79345c4581fe920c12ad26d11f5806f8b944ecc9 Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 15:19:37 -0500 Subject: [PATCH 13/18] wfe: Add email-exporter and provide mocks --- cmd/boulder-wfe2/main.go | 14 +++- cmd/email-exporter/main.go | 2 +- email/pardot.go | 20 +++-- mocks/emailexporter.go | 69 ++++++++++++++++ test/config-next/wfe2.json | 10 +++ wfe2/wfe.go | 48 +++++++++++ wfe2/wfe_test.go | 159 +++++++++++++++++++++++++++++++++++++ 7 files changed, 312 insertions(+), 10 deletions(-) create mode 100644 mocks/emailexporter.go diff --git a/cmd/boulder-wfe2/main.go b/cmd/boulder-wfe2/main.go index c78664f7929..3a2378bcae6 100644 --- a/cmd/boulder-wfe2/main.go +++ b/cmd/boulder-wfe2/main.go @@ -12,6 +12,7 @@ import ( "github.com/letsencrypt/boulder/cmd" "github.com/letsencrypt/boulder/config" + emailpb "github.com/letsencrypt/boulder/email/proto" "github.com/letsencrypt/boulder/features" "github.com/letsencrypt/boulder/goodkey" "github.com/letsencrypt/boulder/goodkey/sagoodkey" @@ -59,8 +60,9 @@ type Config struct { TLS cmd.TLSConfig - RAService *cmd.GRPCClientConfig - SAService *cmd.GRPCClientConfig + RAService *cmd.GRPCClientConfig + SAService *cmd.GRPCClientConfig + EmailExporter *cmd.GRPCClientConfig // GetNonceService is a gRPC config which contains a single SRV name // used to lookup nonce-service instances used exclusively for nonce @@ -289,6 +291,13 @@ func main() { cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA") sac := sapb.NewStorageAuthorityReadOnlyClient(saConn) + var eec emailpb.ExporterClient + if c.WFE.EmailExporter != nil { + emailExporterConn, err := bgrpc.ClientSetup(c.WFE.EmailExporter, tlsConfig, stats, clk) + cmd.FailOnError(err, "Failed to load credentials and create gRPC connection to SA") + eec = emailpb.NewExporterClient(emailExporterConn) + } + if c.WFE.RedeemNonceService == nil { cmd.Fail("'redeemNonceService' must be configured.") } @@ -375,6 +384,7 @@ func main() { pendingAuthorizationLifetime, rac, sac, + eec, gnc, rnc, noncePrefixKey, diff --git a/cmd/email-exporter/main.go b/cmd/email-exporter/main.go index c546a48de44..6abd7ebf924 100644 --- a/cmd/email-exporter/main.go +++ b/cmd/email-exporter/main.go @@ -70,7 +70,7 @@ func main() { clientSecret, err := c.EmailExporter.ClientSecret.Pass() cmd.FailOnError(err, "Loading client secret") - pardotClient, err := email.NewPardotClient( + pardotClient, err := email.NewPardotClientImpl( clk, c.EmailExporter.PardotBusinessUnit, clientId, diff --git a/email/pardot.go b/email/pardot.go index f637b3aea9e..ac13ed4782c 100644 --- a/email/pardot.go +++ b/email/pardot.go @@ -36,6 +36,10 @@ const ( retryBackoffMin = 200 * time.Millisecond ) +type PardotClient interface { + CreateProspect(email string) error +} + // oAuthToken holds the OAuth2 access token and its expiration. type oAuthToken struct { sync.Mutex @@ -44,8 +48,8 @@ type oAuthToken struct { expiresAt time.Time } -// PardotClient handles authentication and sending contacts to Pardot. -type PardotClient struct { +// PardotClientImpl handles authentication and sending contacts to Pardot. +type PardotClientImpl struct { businessUnit string clientId string clientSecret string @@ -55,8 +59,10 @@ type PardotClient struct { clk clock.Clock } -// NewPardotClient creates a new PardotClient. -func NewPardotClient(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClient, error) { +var _ PardotClient = &PardotClientImpl{} + +// NewPardotClientImpl creates a new PardotClientImpl. +func NewPardotClientImpl(clk clock.Clock, businessUnit, clientId, clientSecret, oauthbaseURL, pardotBaseURL string) (*PardotClientImpl, error) { prospectsURL, err := url.JoinPath(pardotBaseURL, prospectsPath) if err != nil { return nil, fmt.Errorf("failed to join prospects path: %w", err) @@ -66,7 +72,7 @@ func NewPardotClient(clk clock.Clock, businessUnit, clientId, clientSecret, oaut return nil, fmt.Errorf("failed to join token path: %w", err) } - return &PardotClient{ + return &PardotClientImpl{ businessUnit: businessUnit, clientId: clientId, clientSecret: clientSecret, @@ -79,7 +85,7 @@ func NewPardotClient(clk clock.Clock, businessUnit, clientId, clientSecret, oaut } // updateToken updates the OAuth token if necessary. -func (pc *PardotClient) updateToken() error { +func (pc *PardotClientImpl) updateToken() error { pc.token.Lock() defer pc.token.Unlock() @@ -129,7 +135,7 @@ func redactEmail(body []byte, email string) string { // CreateProspect submits an email to the Pardot Prospects endpoint, retrying up // to 3 times with exponential backoff. -func (pc *PardotClient) CreateProspect(email string) error { +func (pc *PardotClientImpl) CreateProspect(email string) error { var err error for attempt := range maxAttempts { time.Sleep(core.RetryBackoff(attempt, retryBackoffMin, retryBackoffMax, retryBackoffBase)) diff --git a/mocks/emailexporter.go b/mocks/emailexporter.go new file mode 100644 index 00000000000..8290aef36a2 --- /dev/null +++ b/mocks/emailexporter.go @@ -0,0 +1,69 @@ +package mocks + +import ( + "context" + "fmt" + "sync" + + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/emptypb" + + "github.com/letsencrypt/boulder/email" + emailpb "github.com/letsencrypt/boulder/email/proto" +) + +// MockPardotClientImpl is a mock implementation of PardotClient. +type MockPardotClientImpl struct { + sync.Mutex + CreatedProspects []string + ForceCreateError bool +} + +// NewMockPardotClientImpl returns a MockPardotClientImpl, implementing the +// PardotClient interface. Both refer to the same instance, with the interface +// for mock interaction and the struct for state inspection and modification. +func NewMockPardotClientImpl() (email.PardotClient, *MockPardotClientImpl) { + mockImpl := &MockPardotClientImpl{ + CreatedProspects: []string{}, + ForceCreateError: false, + } + return mockImpl, mockImpl +} + +// CreateProspect adds an email to CreatedProspects. Returns an error if +// ForceCreateError is set. +func (m *MockPardotClientImpl) CreateProspect(email string) error { + m.Lock() + defer m.Unlock() + + if m.ForceCreateError { + return fmt.Errorf("error creating prospect") + } + + m.CreatedProspects = append(m.CreatedProspects, email) + return nil +} + +// MockExporterClientImpl is a mock implementation of ExporterClient. +type MockExporterClientImpl struct { + PardotClient email.PardotClient +} + +// NewMockExporterImpl returns a MockExporterClientImpl as an +// ExporterClient. +func NewMockExporterImpl(pardotClient email.PardotClient) emailpb.ExporterClient { + return &MockExporterClientImpl{ + PardotClient: pardotClient, + } +} + +// CreateProspects submits emails to the inner PardotClient, returning an error +// if any fail. +func (m *MockExporterClientImpl) CreateProspects(ctx context.Context, req *emailpb.CreateProspectsRequest, _ ...grpc.CallOption) (*emptypb.Empty, error) { + for _, e := range req.Emails { + if err := m.PardotClient.CreateProspect(e); err != nil { + return nil, err + } + } + return &emptypb.Empty{}, nil +} diff --git a/test/config-next/wfe2.json b/test/config-next/wfe2.json index 0d67cf568d6..c11c86dcc90 100644 --- a/test/config-next/wfe2.json +++ b/test/config-next/wfe2.json @@ -37,6 +37,16 @@ "noWaitForReady": true, "hostOverride": "sa.boulder" }, + "emailExporter": { + "dnsAuthority": "consul.service.consul", + "srvLookup": { + "service": "email-exporter", + "domain": "service.consul" + }, + "timeout": "15s", + "noWaitForReady": true, + "hostOverride": "email-exporter.boulder" + }, "accountCache": { "size": 9000, "ttl": "5s" diff --git a/wfe2/wfe.go b/wfe2/wfe.go index d15a661e887..fba98cfdabb 100644 --- a/wfe2/wfe.go +++ b/wfe2/wfe.go @@ -26,6 +26,7 @@ import ( "github.com/letsencrypt/boulder/core" corepb "github.com/letsencrypt/boulder/core/proto" + emailpb "github.com/letsencrypt/boulder/email/proto" berrors "github.com/letsencrypt/boulder/errors" "github.com/letsencrypt/boulder/features" "github.com/letsencrypt/boulder/goodkey" @@ -92,6 +93,7 @@ var errIncompleteGRPCResponse = errors.New("incomplete gRPC response message") type WebFrontEndImpl struct { ra rapb.RegistrationAuthorityClient sa sapb.StorageAuthorityReadOnlyClient + ee emailpb.ExporterClient // gnc is a nonce-service client used exclusively for the issuance of // nonces. It's configured to route requests to backends colocated with the // WFE. @@ -187,6 +189,7 @@ func NewWebFrontEndImpl( pendingAuthorizationLifetime time.Duration, rac rapb.RegistrationAuthorityClient, sac sapb.StorageAuthorityReadOnlyClient, + eec emailpb.ExporterClient, gnc nonce.Getter, rnc nonce.Redeemer, rncKey []byte, @@ -228,6 +231,7 @@ func NewWebFrontEndImpl( pendingAuthorizationLifetime: pendingAuthorizationLifetime, ra: rac, sa: sac, + ee: eec, gnc: gnc, rnc: rnc, rncKey: rncKey, @@ -632,6 +636,28 @@ func link(url, relation string) string { return fmt.Sprintf("<%s>;rel=\"%s\"", url, relation) } +// contactsToEmails converts a *[]string of contacts (e.g. mailto: +// person@example.com) to a []string of valid email addresses. Non-email +// contacts or contacts with invalid email addresses are ignored. +func contactsToEmails(contacts *[]string) []string { + if contacts == nil { + return nil + } + var emails []string + for _, c := range *contacts { + if !strings.HasPrefix(c, "mailto:") { + continue + } + address := strings.TrimPrefix(c, "mailto:") + err := policy.ValidEmail(address) + if err != nil { + continue + } + emails = append(emails, address) + } + return emails +} + // checkNewAccountLimits checks whether sufficient limit quota exists for the // creation of a new account. If so, that quota is spent. If an error is // encountered during the check, it is logged but not returned. A refund @@ -844,6 +870,18 @@ func (wfe *WebFrontEndImpl) NewAccount( return } newRegistrationSuccessful = true + + prospects := contactsToEmails(accountCreateRequest.Contact) + if wfe.ee != nil && len(prospects) > 0 { + _, err := wfe.ee.CreateProspects(ctx, &emailpb.CreateProspectsRequest{ + // Note: We are explicitly using the contacts provided by the + // subscriber here, rather than the contacts returned by the RA. + Emails: prospects, + }) + if err != nil { + wfe.log.Warningf("Error creating prospect: %v", err) + } + } } // parseRevocation accepts the payload for a revocation request and parses it @@ -1445,6 +1483,16 @@ func (wfe *WebFrontEndImpl) updateAccount( return nil, probs.ServerInternal("Error updating account") } + prospects := contactsToEmails(accountUpdateRequest.Contact) + if wfe.ee != nil && len(prospects) > 0 { + _, err := wfe.ee.CreateProspects(ctx, &emailpb.CreateProspectsRequest{ + Emails: prospects, + }) + if err != nil { + wfe.log.Warningf("Error creating prospect: %v", err) + } + } + return &updatedReg, nil } diff --git a/wfe2/wfe_test.go b/wfe2/wfe_test.go index cde0d6095d1..7c55d766ebf 100644 --- a/wfe2/wfe_test.go +++ b/wfe2/wfe_test.go @@ -431,6 +431,7 @@ func setupWFE(t *testing.T) (WebFrontEndImpl, clock.FakeClock, requestSigner) { 7*24*time.Hour, &MockRegistrationAuthority{clk: fc}, mockSA, + nil, gnc, rnc, rncKey, @@ -4196,3 +4197,161 @@ func TestNewOrderRateLimits(t *testing.T) { mux.ServeHTTP(responseWriter, r) test.AssertEquals(t, responseWriter.Code, http.StatusCreated) } + +func TestNewAccountCreatesProspects(t *testing.T) { + t.Parallel() + + key := loadKey(t, []byte(test2KeyPrivatePEM)) + _, ok := key.(*rsa.PrivateKey) + test.Assert(t, ok, "Couldn't load test2 key") + + path := newAcctPath + signedURL := fmt.Sprintf("http://localhost%s", path) + + testCases := []struct { + name string + contacts []string + expected []string + }{ + { + name: "No email", + contacts: []string{}, + expected: []string{}, + }, + { + name: "One email", + contacts: []string{"mailto:person@mail.com"}, + expected: []string{"person@mail.com"}, + }, + { + name: "Two emails", + contacts: []string{"mailto:person1@mail.com", "mailto:person2@mail.com"}, + expected: []string{"person1@mail.com", "person2@mail.com"}, + }, + { + name: "Invalid email", + contacts: []string{"mailto:lol@%mail.com"}, + expected: []string{}, + }, + { + name: "One valid email, one invalid email", + contacts: []string{"mailto:person@mail.com", "mailto:lol@%mail.com"}, + expected: []string{"person@mail.com"}, + }, + { + name: "Valid email with non-email prefix", + contacts: []string{"heliograph:person@mail.com"}, + expected: []string{}, + }, + { + name: "Non-email prefix with correct field signal instructions", + contacts: []string{`heliograph:STATION OF RECEPTION: High Ridge above Black Hollow, near Lone Pine. +AZIMUTH TO SIGNAL STATION: Due West, bearing Twin Peaks. +WATCH PERIOD: Third hour post-zenith; observation maintained for 30 minutes. +SIGNAL CODE: Standard Morse, three-flash attention signal. +ALTERNATE SITE: If no reply, move to Observation Point B at Broken Cairn.`}, + expected: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + wfe, _, signer := setupWFE(t) + + mockPardotClient, mockImpl := mocks.NewMockPardotClientImpl() + wfe.ee = mocks.NewMockExporterImpl(mockPardotClient) + + contactsJSON, err := json.Marshal(tc.contacts) + test.AssertNotError(t, err, "Failed to marshal contacts") + + payload := fmt.Sprintf(`{"contact":%s,"termsOfServiceAgreed":true}`, contactsJSON) + _, _, body := signer.embeddedJWK(key, signedURL, payload) + request := makePostRequestWithPath(path, body) + + responseWriter := httptest.NewRecorder() + wfe.NewAccount(context.Background(), newRequestEvent(), responseWriter, request) + + for _, email := range tc.expected { + test.AssertSliceContains(t, mockImpl.CreatedProspects, email) + } + }) + } +} + +func TestUpdateAccountCreatesProspects(t *testing.T) { + t.Parallel() + + key := loadKey(t, []byte(test1KeyPrivatePEM)) + _, ok := key.(*rsa.PrivateKey) + test.Assert(t, ok, "Couldn't load RSA key for acct 1") + + testCases := []struct { + name string + contacts []string + expected []string + }{ + { + name: "No email", + contacts: []string{}, + expected: []string{}, + }, + { + name: "One email", + contacts: []string{"mailto:person@mail.com"}, + expected: []string{"person@mail.com"}, + }, + { + name: "Two emails", + contacts: []string{"mailto:person1@mail.com", "mailto:person2@mail.com"}, + expected: []string{"person1@mail.com", "person2@mail.com"}, + }, + { + name: "Invalid email", + contacts: []string{"mailto:lol@%mail.com"}, + expected: []string{}, + }, + { + name: "One valid email, one invalid email", + contacts: []string{"mailto:person@mail.com", "mailto:lol@%mail.com"}, + expected: []string{"person@mail.com"}, + }, + { + name: "Valid email with invalid prefix", + contacts: []string{"pantelegraph:person@mail.com"}, + expected: []string{}, + }, + { + name: "Non-email prefix with correct telegraphic notation", + contacts: []string{`pantelegraph:RECEIVING OFFICE: Bureau Room, Third Floor, Merchant House. +TRANSMISSION LINE: Direct relay, Exchange Circuit No. 42.`}, + expected: []string{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + wfe, _, signer := setupWFE(t) + mockPardotClient, mockImpl := mocks.NewMockPardotClientImpl() + wfe.ee = mocks.NewMockExporterImpl(mockPardotClient) + + responseWriter := httptest.NewRecorder() + contactsJSON, err := json.Marshal(tc.contacts) + test.AssertNotError(t, err, "Failed to marshal contacts") + + newContact := fmt.Sprintf(`{"contact":%s}`, contactsJSON) + signedURL := "http://localhost/1" + path := "1" + _, _, body := signer.byKeyID(1, key, signedURL, newContact) + request := makePostRequestWithPath(path, body) + wfe.Account(ctx, newRequestEvent(), responseWriter, request) + test.AssertEquals(t, responseWriter.Code, http.StatusOK) + for _, email := range tc.expected { + test.AssertSliceContains(t, mockImpl.CreatedProspects, email) + } + }) + } +} From 081f31fa89e5b75cf56b6980af3a3187afc25467 Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 15:23:16 -0500 Subject: [PATCH 14/18] email-exporter: Simplify implementation and make per-day limit configurable --- cmd/email-exporter/main.go | 7 +- email/exporter.go | 117 +++++++++++++-------------- test/config-next/email-exporter.json | 1 + test/config/email-exporter.json | 1 + 4 files changed, 65 insertions(+), 61 deletions(-) diff --git a/cmd/email-exporter/main.go b/cmd/email-exporter/main.go index 6abd7ebf924..cb8f02a9e97 100644 --- a/cmd/email-exporter/main.go +++ b/cmd/email-exporter/main.go @@ -16,6 +16,11 @@ type Config struct { EmailExporter struct { cmd.ServiceConfig + // PerDayLimit is our daily limit as determined by the tier of our + // Salesforce account. For more information, see: + // https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits + PerDayLimit float64 `validate:"required,min=1"` + // PardotBusinessUnit is the Pardot business unit to use. PardotBusinessUnit string `validate:"required"` @@ -79,7 +84,7 @@ func main() { c.EmailExporter.PardotBaseURL, ) cmd.FailOnError(err, "Creating Pardot client") - exporterServer := email.NewExporterImpl(pardotClient, scope, logger) + exporterServer := email.NewExporterImpl(pardotClient, c.EmailExporter.PerDayLimit, scope, logger) tlsConfig, err := c.EmailExporter.TLS.Load(scope) cmd.FailOnError(err, "Loading TLS config") diff --git a/email/exporter.go b/email/exporter.go index 56c48b05197..1c55412400b 100644 --- a/email/exporter.go +++ b/email/exporter.go @@ -4,53 +4,54 @@ import ( "context" "errors" "sync" - "time" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" "google.golang.org/protobuf/types/known/emptypb" "github.com/letsencrypt/boulder/core" - exporterpb "github.com/letsencrypt/boulder/email/proto" + emailpb "github.com/letsencrypt/boulder/email/proto" berrors "github.com/letsencrypt/boulder/errors" blog "github.com/letsencrypt/boulder/log" ) const ( - // Our daily limit is determined by the tier of our Salesforce account. For - // more information, see: - // https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits - - // ratelimit represents our daily limit of 50,000 requests. - rateLimit = 50000.0 / 86400.0 - - // numWorkers is the number of concurrent workers processing the email - // queue. We also use this as the burst limit for the rate limiter. - numWorkers = 5 + // five is the number of concurrent workers processing the email queue. This + // number was chosen specifically to match the number of concurrent + // connections allowed by the Pardot API. + five = 5 // queueCap enforces a maximum stack size to prevent unbounded growth. queueCap = 10000 ) -var ErrQueueFull = errors.New("email export queue is full") +var ErrQueueFull = errors.New("email-exporter queue is full") // ExporterImpl implements the gRPC server and processes email exports. type ExporterImpl struct { - exporterpb.UnsafeExporterServer + emailpb.UnsafeExporterServer - sync.RWMutex + sync.Mutex drainWG sync.WaitGroup + wake *sync.Cond + limiter *rate.Limiter toSend []string - client *PardotClient + client PardotClient emailsHandledCounter prometheus.Counter log blog.Logger } -var _ exporterpb.ExporterServer = (*ExporterImpl)(nil) +var _ emailpb.ExporterServer = (*ExporterImpl)(nil) // NewExporterImpl creates a new ExporterImpl. -func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl { +func NewExporterImpl(client PardotClient, perDayLimit float64, scope prometheus.Registerer, logger blog.Logger) *ExporterImpl { + // This limiter enforces the daily Pardot API limit and restricts + // concurrency to the maximum of 5 requests specified in their + // documentation. For more details see: + // https://developer.salesforce.com/docs/marketing/pardot/guide/overview.html?q=rate%20limits + limiter := rate.NewLimiter(rate.Limit(perDayLimit/86400.0), 5) + emailsHandledCounter := prometheus.NewCounter(prometheus.CounterOpts{ Name: "email_exporter_emails_handled", Help: "Total number of emails handled by the email exporter", @@ -58,18 +59,20 @@ func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger b scope.MustRegister(emailsHandledCounter) impl := &ExporterImpl{ + limiter: limiter, toSend: make([]string, 0, queueCap), client: client, emailsHandledCounter: emailsHandledCounter, log: logger, } + impl.wake = sync.NewCond(&impl.Mutex) queueGauge := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "email_exporter_queue_length", Help: "Current length of the email export queue", }, func() float64 { - impl.RLock() - defer impl.RUnlock() + impl.Lock() + defer impl.Unlock() return float64(len(impl.toSend)) }) scope.MustRegister(queueGauge) @@ -77,82 +80,76 @@ func NewExporterImpl(client *PardotClient, scope prometheus.Registerer, logger b return impl } -// CreateProspects enqueues the provided email addresses. If the queue is near -// capacity, only enqueues as many emails as can fit. Returns ErrQueueFull if -// some or all emails were dropped. -func (impl *ExporterImpl) CreateProspects(ctx context.Context, req *exporterpb.CreateProspectsRequest) (*emptypb.Empty, error) { +// CreateProspects enqueues the provided email addresses. If the queue cannot +// accommodate the new emails, an ErrQueueFull is returned. +func (impl *ExporterImpl) CreateProspects(ctx context.Context, req *emailpb.CreateProspectsRequest) (*emptypb.Empty, error) { if core.IsAnyNilOrZero(req, req.Emails) { return nil, berrors.InternalServerError("Incomplete UpsertEmails request") } impl.Lock() - defer impl.Unlock() - spotsLeft := queueCap - len(impl.toSend) if spotsLeft < len(req.Emails) { return nil, ErrQueueFull } impl.toSend = append(impl.toSend, req.Emails...) + impl.Unlock() + // Wake waiting workers to process the new emails. + impl.wake.Broadcast() return &emptypb.Empty{}, nil } -func (impl *ExporterImpl) takeEmail() (string, bool) { - impl.Lock() - defer impl.Unlock() - - if len(impl.toSend) == 0 { - return "", false - } - - email := impl.toSend[len(impl.toSend)-1] - impl.toSend = impl.toSend[:len(impl.toSend)-1] - - return email, true -} - // Start begins asynchronous processing of the email queue. When the parent // daemonCtx is cancelled the queue will be drained and the workers will exit. func (impl *ExporterImpl) Start(daemonCtx context.Context) { - limiter := rate.NewLimiter(rate.Limit(rateLimit), numWorkers) + go func() { + <-daemonCtx.Done() + impl.Lock() + // Wake waiting workers to exit. + impl.wake.Broadcast() + impl.Unlock() + }() worker := func() { defer impl.drainWG.Done() for { - if daemonCtx.Err() != nil && len(impl.toSend) == 0 { + impl.Lock() + + for len(impl.toSend) == 0 && daemonCtx.Err() == nil { + // Wait for the queue to be updated or the daemon to exit. + impl.wake.Wait() + } + + if len(impl.toSend) == 0 && daemonCtx.Err() != nil { + // No more emails to process, exit. + impl.Unlock() return } - err := limiter.Wait(daemonCtx) + // Dequeue and dispatch an email. + last := len(impl.toSend) - 1 + email := impl.toSend[last] + impl.toSend = impl.toSend[:last] + impl.Unlock() + + err := impl.limiter.Wait(daemonCtx) if err != nil { - if errors.Is(err, context.Canceled) { - // Keep processing emails until the queue is drained. + if !errors.Is(err, context.Canceled) { + impl.log.Errf("Unexpected limiter.Wait() error: %s", err) continue } - impl.log.Errf("Unexpected limiter wait error: %s", err) - continue - } - - email, ok := impl.takeEmail() - if !ok { - if daemonCtx.Err() != nil { - // Exit immediately. - return - } - // No emails to process, avoid busy-waiting. - time.Sleep(100 * time.Millisecond) - continue } err = impl.client.CreateProspect(email) if err != nil { - impl.log.Errf("Failed to upsert email: %s", err) + impl.log.Errf("Sending Prospect to Pardot: %s", err) } impl.emailsHandledCounter.Inc() } } - for range numWorkers { + for range five { impl.drainWG.Add(1) go worker() } diff --git a/test/config-next/email-exporter.json b/test/config-next/email-exporter.json index 7874d0bc077..1eaed9b7933 100644 --- a/test/config-next/email-exporter.json +++ b/test/config-next/email-exporter.json @@ -22,6 +22,7 @@ "certFile": "test/certs/ipki/email-exporter.boulder/cert.pem", "keyFile": "test/certs/ipki/email-exporter.boulder/key.pem" }, + "perDayLimit": 999999, "pardotBusinessUnit": "test-business-unit", "clientId": { "passwordFile": "test/secrets/salesforce_client_id" diff --git a/test/config/email-exporter.json b/test/config/email-exporter.json index 7874d0bc077..1eaed9b7933 100644 --- a/test/config/email-exporter.json +++ b/test/config/email-exporter.json @@ -22,6 +22,7 @@ "certFile": "test/certs/ipki/email-exporter.boulder/cert.pem", "keyFile": "test/certs/ipki/email-exporter.boulder/key.pem" }, + "perDayLimit": 999999, "pardotBusinessUnit": "test-business-unit", "clientId": { "passwordFile": "test/secrets/salesforce_client_id" From 5f38309ca31ece3040c06fb3702583563f414b18 Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 15:27:19 -0500 Subject: [PATCH 15/18] integration: Add end-to-end test of the email-exporter --- test/integration/email_exporter_test.go | 170 ++++++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 test/integration/email_exporter_test.go diff --git a/test/integration/email_exporter_test.go b/test/integration/email_exporter_test.go new file mode 100644 index 00000000000..4461940a4b4 --- /dev/null +++ b/test/integration/email_exporter_test.go @@ -0,0 +1,170 @@ +package integration + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "encoding/json" + "fmt" + "net/http" + "net/url" + "testing" + "time" + + "github.com/eggsampler/acme/v3" + + "github.com/letsencrypt/boulder/test" +) + +// randomDomain creates a random domain name for testing. +// +// panics if crypto/rand.Rand.Read fails. +func randomDomain() string { + var bytes [4]byte + _, err := rand.Read(bytes[:]) + if err != nil { + panic(err) + } + return fmt.Sprintf("%x.mail.com", bytes[:]) +} + +// TestProspectsCreatedForNewAccount tests that prospects are dispatched to +// pardot-test-srv by the email-exporter when a new account is created. +func TestProspectsCreatedForNewAccount(t *testing.T) { + t.Parallel() + + domain := randomDomain() + + tests := []struct { + name string + contacts []string + expectProspects []string + }{ + { + name: "Single email", + contacts: []string{"mailto:example@" + domain}, + expectProspects: []string{"example@" + domain}, + }, + { + name: "Multiple emails", + contacts: []string{"mailto:example1@" + domain, "mailto:example2@" + domain}, + expectProspects: []string{"example1@" + domain, "example2@" + domain}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + c, err := acme.NewClient("http://boulder.service.consul:4001/directory") + if err != nil { + t.Fatalf("failed to connect to acme directory: %s", err) + } + + acctKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("failed to generate account key: %s", err) + } + + _, err = c.NewAccount(acctKey, false, true, tt.contacts...) + if err != nil { + t.Fatalf("failed to create initial account: %s", err) + } + + // Wait for the prospects to be exported from the email exporter + // queue to pardot-test-srv. + time.Sleep(100 * time.Millisecond) + + httpClient := http.DefaultClient + resp, err := httpClient.Get("http://localhost:9602/query_prospects?" + url.Values{ + "pardot_business_unit_id": []string{"test-business-unit"}}.Encode(), + ) + test.AssertNotError(t, err, "Failed to query prospects") + test.AssertEquals(t, resp.StatusCode, http.StatusOK) + defer resp.Body.Close() + + var got struct { + Prospects []string `json:"prospects"` + } + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&got) + test.AssertNotError(t, err, "Failed to decode prospects") + + for _, expectEmail := range tt.expectProspects { + test.AssertSliceContains(t, got.Prospects, expectEmail) + } + }) + } +} + +// TestProspectsCreatedWhenAccountUpdated tests that prospects are dispatched to +// pardot-test-srv by the email-exporter when an account is updated. +func TestProspectsCreatedWhenAccountUpdated(t *testing.T) { + t.Parallel() + + domain := randomDomain() + + c, err := acme.NewClient("http://boulder.service.consul:4001/directory") + if err != nil { + t.Fatalf("failed to connect to acme directory: %s", err) + } + + acctKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + t.Fatalf("failed to generate account key: %s", err) + } + + acct, err := c.NewAccount(acctKey, false, true) + if err != nil { + t.Fatalf("failed to create initial account: %s", err) + } + + tests := []struct { + name string + contacts []string + expectProspects []string + }{ + { + name: "Single email", + contacts: []string{"mailto:example@" + domain}, + expectProspects: []string{"example@" + domain}, + }, + { + name: "Multiple emails", + contacts: []string{"mailto:example1@" + domain, "mailto:example2@" + domain}, + expectProspects: []string{"example1@" + domain, "example2@" + domain}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + _, err := c.UpdateAccount(acct, tt.contacts...) + test.AssertNotError(t, err, "Failed to update account") + + // Wait for the prospects to be exported from the email exporter + // queue to pardot-test-srv. + time.Sleep(100 * time.Millisecond) + + httpClient := http.DefaultClient + resp, err := httpClient.Get("http://localhost:9602/query_prospects?" + url.Values{ + "pardot_business_unit_id": []string{"test-business-unit"}}.Encode(), + ) + test.AssertNotError(t, err, "Failed to query prospects") + test.AssertEquals(t, resp.StatusCode, http.StatusOK) + defer resp.Body.Close() + + var got struct { + Prospects []string `json:"prospects"` + } + decoder := json.NewDecoder(resp.Body) + err = decoder.Decode(&got) + test.AssertNotError(t, err, "Failed to decode prospects") + + for _, expectEmail := range tt.expectProspects { + test.AssertSliceContains(t, got.Prospects, expectEmail) + } + }) + } +} From fab5b5e48d096d9ecdb03e03b1b3f8b32e1a2a45 Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 15:29:48 -0500 Subject: [PATCH 16/18] Lint JSON configs --- test/config-next/pardot-test-srv.json | 2 +- test/config/pardot-test-srv.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/config-next/pardot-test-srv.json b/test/config-next/pardot-test-srv.json index c0b494abe1c..a265739b146 100644 --- a/test/config-next/pardot-test-srv.json +++ b/test/config-next/pardot-test-srv.json @@ -3,5 +3,5 @@ "pardotPort": 9602, "expectedClientId": "test-client-id", "expectedClientSecret": "you-shall-not-pass", - "developmentMode" : true + "developmentMode": true } diff --git a/test/config/pardot-test-srv.json b/test/config/pardot-test-srv.json index c0b494abe1c..a265739b146 100644 --- a/test/config/pardot-test-srv.json +++ b/test/config/pardot-test-srv.json @@ -3,5 +3,5 @@ "pardotPort": 9602, "expectedClientId": "test-client-id", "expectedClientSecret": "you-shall-not-pass", - "developmentMode" : true + "developmentMode": true } From ff049b77a407d5c51f98d05d5bbdf713cd57ad4a Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 16:42:30 -0500 Subject: [PATCH 17/18] Fix integration test. --- test/integration/email_exporter_test.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/test/integration/email_exporter_test.go b/test/integration/email_exporter_test.go index 4461940a4b4..673b883777c 100644 --- a/test/integration/email_exporter_test.go +++ b/test/integration/email_exporter_test.go @@ -1,3 +1,5 @@ +//go:build integration + package integration import ( @@ -8,6 +10,7 @@ import ( "fmt" "net/http" "net/url" + "os" "testing" "time" @@ -33,6 +36,10 @@ func randomDomain() string { func TestProspectsCreatedForNewAccount(t *testing.T) { t.Parallel() + if os.Getenv("BOULDER_CONFIG_DIR") != "test/config-next" { + t.Skip("Test requires WFE to be configured to use email-exporter") + } + domain := randomDomain() tests := []struct { @@ -73,7 +80,7 @@ func TestProspectsCreatedForNewAccount(t *testing.T) { // Wait for the prospects to be exported from the email exporter // queue to pardot-test-srv. - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) httpClient := http.DefaultClient resp, err := httpClient.Get("http://localhost:9602/query_prospects?" + url.Values{ @@ -102,6 +109,10 @@ func TestProspectsCreatedForNewAccount(t *testing.T) { func TestProspectsCreatedWhenAccountUpdated(t *testing.T) { t.Parallel() + if os.Getenv("BOULDER_CONFIG_DIR") != "test/config-next" { + t.Skip("Test requires WFE to be configured to use email-exporter") + } + domain := randomDomain() c, err := acme.NewClient("http://boulder.service.consul:4001/directory") @@ -145,7 +156,7 @@ func TestProspectsCreatedWhenAccountUpdated(t *testing.T) { // Wait for the prospects to be exported from the email exporter // queue to pardot-test-srv. - time.Sleep(100 * time.Millisecond) + time.Sleep(1 * time.Second) httpClient := http.DefaultClient resp, err := httpClient.Get("http://localhost:9602/query_prospects?" + url.Values{ From 4c788dbe2b9d36d26d0524b0e1aa38432ff50779 Mon Sep 17 00:00:00 2001 From: Samantha Date: Fri, 14 Feb 2025 18:08:33 -0500 Subject: [PATCH 18/18] email/exporter: Add unit test coverage --- email/exporter_test.go | 116 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 email/exporter_test.go diff --git a/email/exporter_test.go b/email/exporter_test.go new file mode 100644 index 00000000000..9c6f71e6c60 --- /dev/null +++ b/email/exporter_test.go @@ -0,0 +1,116 @@ +package email + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + + emailpb "github.com/letsencrypt/boulder/email/proto" + blog "github.com/letsencrypt/boulder/log" + "github.com/letsencrypt/boulder/test" +) + +var ctx = context.Background() + +// MockPardotClientImpl is a mock implementation of PardotClient. +type MockPardotClientImpl struct { + sync.Mutex + CreatedProspects []string +} + +// NewMockPardotClientImpl returns a MockPardotClientImpl, implementing the +// PardotClient interface. Both refer to the same instance, with the interface +// for mock interaction and the struct for state inspection and modification. +func NewMockPardotClientImpl() (PardotClient, *MockPardotClientImpl) { + mockImpl := &MockPardotClientImpl{ + CreatedProspects: []string{}, + } + return mockImpl, mockImpl +} + +// CreateProspect adds an email to CreatedProspects. +func (m *MockPardotClientImpl) CreateProspect(email string) error { + m.Lock() + defer m.Unlock() + + m.CreatedProspects = append(m.CreatedProspects, email) + return nil +} + +func setup() (*ExporterImpl, *MockPardotClientImpl, func(), func()) { + mockClient, clientImpl := NewMockPardotClientImpl() + logger := blog.NewMock() + scope := prometheus.NewRegistry() + exporter := NewExporterImpl(mockClient, 1000000, scope, logger) + + daemonCtx, cancel := context.WithCancel(context.Background()) + + return exporter, clientImpl, + func() { exporter.Start(daemonCtx) }, + func() { + cancel() + exporter.Drain() + } +} + +func TestCreateProspects(t *testing.T) { + t.Parallel() + + exporter, clientImpl, start, cleanup := setup() + start() + defer cleanup() + + _, err := exporter.CreateProspects(ctx, &emailpb.CreateProspectsRequest{ + Emails: []string{"test@example.com", "user@example.com"}, + }) + + // Wait for the queue to be processed. + time.Sleep(100 * time.Millisecond) + + test.AssertNotError(t, err, "Error creating prospects") + test.AssertEquals(t, 2, len(clientImpl.CreatedProspects)) +} + +func TestCreateProspectsQueueFull(t *testing.T) { + t.Parallel() + + exporter, _, _, _ := setup() + + // Fill the queue. + exporter.Lock() + exporter.toSend = make([]string, queueCap-1) + exporter.Unlock() + + _, err := exporter.CreateProspects(ctx, &emailpb.CreateProspectsRequest{ + Emails: []string{"test@example.com", "user@example.com"}, + }) + test.AssertErrorIs(t, err, ErrQueueFull) +} + +func TestCreateProspectsQueueDrains(t *testing.T) { + t.Parallel() + + exporter, clientImpl, start, cleanup := setup() + start() + + // Add 100 emails to the queue. + var emails []string + for i := range 100 { + emails = append(emails, fmt.Sprintf("test@%d.example.com", i)) + } + + _, err := exporter.CreateProspects(ctx, &emailpb.CreateProspectsRequest{ + Emails: emails, + }) + test.AssertNotError(t, err, "Error creating prospects") + + // Drain the queue. + cleanup() + + // Check that the queue was drained. + test.AssertEquals(t, 100, len(clientImpl.CreatedProspects)) +}