-
Notifications
You must be signed in to change notification settings - Fork 8
/
pubsub.go
110 lines (90 loc) · 3.08 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package pubsub
import (
"context"
"errors"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
"github.com/ThreeDotsLabs/watermill/message"
"go.k6.io/k6/js/modules"
"go.k6.io/k6/lib"
"github.com/mitchellh/mapstructure"
"google.golang.org/api/option"
)
// Register the extension on module initialization, available to
// import from JS as "k6/x/pubsub".
func init() {
modules.Register("k6/x/pubsub", new(PubSub))
}
// PubSub is the k6 extension for a Google Pub/Sub client.
// See https://cloud.google.com/pubsub/docs/overview
type PubSub struct{}
// publisherConf provides a Pub/Sub publisher client configuration. This configuration
// structure can be used on a client side. All parameters are optional.
type publisherConf struct {
ProjectID string
Credentials string
PublishTimeout int
Debug bool
Trace bool
DoNotCreateTopicIfMissing bool
}
// Publisher is the basic wrapper for Google Pub/Sub publisher and uses
// watermill as a client. See https://github.com/ThreeDotsLabs/watermill/
//
// Publisher represents the constructor and creates an instance of
// googlecloud.Publisher with provided projectID and publishTimeout.
// Publisher uses watermill StdLoggerAdapter logger.
func (ps *PubSub) Publisher(config map[string]interface{}) *googlecloud.Publisher {
cnf := &publisherConf{}
err := mapstructure.Decode(config, cnf)
if err != nil {
log.Fatalf("xk6-pubsub: unable to read publisher config: %v", err)
}
if cnf.PublishTimeout < 1 {
cnf.PublishTimeout = 5
}
client, err := googlecloud.NewPublisher(
googlecloud.PublisherConfig{
ProjectID: cnf.ProjectID,
Marshaler: googlecloud.DefaultMarshalerUnmarshaler{},
PublishTimeout: time.Second * time.Duration(cnf.PublishTimeout),
DoNotCreateTopicIfMissing: cnf.DoNotCreateTopicIfMissing,
ClientOptions: withCredentials(cnf.Credentials),
},
watermill.NewStdLogger(cnf.Debug, cnf.Trace),
)
if err != nil {
log.Fatalf("xk6-pubsub: unable to init publisher: %v", err)
}
return client
}
// Publish publishes a message to the provided topic using provided
// googlecloud.Publisher. The msg value must be passed as string
// and will be converted to bytes sequence before publishing.
func (ps *PubSub) Publish(ctx context.Context, p *googlecloud.Publisher, topic, msg string) error {
state := lib.GetState(ctx)
if state == nil {
err := errors.New("xk6-pubsub: state is nil")
ReportError(err, "cannot determine state")
return err
}
err := p.Publish(
topic,
message.NewMessage(watermill.NewShortUUID(), []byte(msg)),
)
if err != nil {
ReportError(err, "xk6-pubsub: unable to publish message")
return err
}
return nil
}
// withCredentials explicitly setup Pub/Sub credentials as option.ClientOption.
func withCredentials(credentials string) []option.ClientOption {
var opt []option.ClientOption
if len(credentials) > 0 {
opt = append(opt, option.WithCredentialsJSON([]byte(credentials)))
}
return opt
}