Skip to content

Commit

Permalink
Merge pull request #85 from mailgun/thrawn/develop
Browse files Browse the repository at this point in the history
Added WrappedContext and mongoutil.Config
  • Loading branch information
thrawn01 authored Aug 5, 2021
2 parents 4efdeb2 + b5ebe7c commit 0635e8b
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 1 deletion.
53 changes: 52 additions & 1 deletion cancel/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package cancel

import (
"context"
"sync"
"time"
)

type Context interface {
context.Context
Wrap(context.Context) context.Context
Cancel()
}

Expand All @@ -15,7 +17,7 @@ type cancelCtx struct {
cancel context.CancelFunc
}

// Creates a context that wraps the given context and returns an obj that can be cancelled.
// New creates a context that wraps the given context and returns an obj that can be cancelled.
// This allows an object which desires to cancel a long running operation to store a single
// cancel.Context in it's struct variables instead of having to store both the context.Context
// and context.CancelFunc.
Expand All @@ -36,3 +38,52 @@ func (c *cancelCtx) Deadline() (deadline time.Time, ok bool) { return c.ctx.Dead
func (c *cancelCtx) Done() <-chan struct{} { return c.ctx.Done() }
func (c *cancelCtx) Err() error { return c.ctx.Err() }
func (c *cancelCtx) Value(key interface{}) interface{} { return c.ctx.Value(key) }

// Wrap returns a Context that will be cancelled when either cancel.Context or the passed context is cancelled
func (c *cancelCtx) Wrap(ctx context.Context) context.Context {
return NewWrappedContext(ctx, c)
}

// NewWrappedContext returns a Context that will be cancelled when either of the passed contexts are cancelled
func NewWrappedContext(left, right context.Context) context.Context {
w := WrappedContext{
doneCh: make(chan struct{}),
}
// Wait for either ctx to be cancelled and propagate to the wrapped context
go func() {
select {
case <-left.Done():
w.Reason(left.Err())
case <-right.Done():
w.Reason(right.Err())
}
}()
return &w
}

type WrappedContext struct {
context.Context

mutex sync.Mutex
doneCh chan struct{}
err error
}

func (w *WrappedContext) Reason(err error) {
w.mutex.Lock()
defer w.mutex.Unlock()
close(w.doneCh)
w.err = err
}

func (w *WrappedContext) Done() <-chan struct{} {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.doneCh
}

func (w *WrappedContext) Err() error {
w.mutex.Lock()
defer w.mutex.Unlock()
return w.err
}
58 changes: 58 additions & 0 deletions cancel/context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package cancel_test

import (
"context"
"testing"
"time"

"github.com/mailgun/holster/v4/cancel"
)

func TestWrapFirst(t *testing.T) {
// First context
firstCtx := cancel.New(context.Background())
// Second context
secondCtx, cancel := context.WithCancel(context.Background())
defer cancel()

// Now if either firstCtx or secondCtx is cancelled 'ctx' should cancel
ctx := firstCtx.Wrap(secondCtx)

done := make(chan struct{})
go func() {
<-ctx.Done()
close(done)
}()

firstCtx.Cancel()

select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("timeout waiting for context to cancel")
}
}

func TestWrapSecond(t *testing.T) {
// First context
firstCtx := cancel.New(context.Background())
// Second context
secondCtx, cancel := context.WithCancel(context.Background())

// Now if either firstCtx or secondCtx is cancelled 'ctx' should cancel
ctx := firstCtx.Wrap(secondCtx)

done := make(chan struct{})
go func() {
<-ctx.Done()
close(done)
}()

cancel()

select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("timeout waiting for context to cancel")
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ require (
google.golang.org/grpc v1.38.0
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,5 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
107 changes: 107 additions & 0 deletions mongoutil/uri.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mongoutil

import (
"bytes"
"fmt"
"os"
"strconv"
"strings"
"unicode"
)

type Config struct {
Servers []string `json:"servers"`
Database string `json:"database"`
URI string `json:"uri"`
Options []map[string]interface{} `json:"options"`
}

func MongoURI() string {
mongoURI := os.Getenv("MONGO_URI")
if mongoURI == "" {
return "mongodb://127.0.0.1:27017/mg_test"
}
return mongoURI
}

func (c Config) URIWithOptions() string {
URI := c.URI

// Create an URI using the Servers list and Database if provided
if len(c.Servers) != 0 && c.Database != "" {
URI = fmt.Sprintf("mongodb://%s/%s", strings.Join(c.Servers, ","), c.Database)
}

type opt struct {
key string
value string
}
adjustedURI := URI
var options []opt

// Parse options from the URI.
qmIdx := strings.Index(URI, "?")
if qmIdx > 0 {
adjustedURI = URI[:qmIdx]
for _, pair := range strings.Split(URI[qmIdx+1:], "&") {
eqIdx := strings.Index(pair, "=")
if eqIdx > 0 {
options = append(options, opt{key: pair[:eqIdx], value: pair[eqIdx+1:]})
}
}
}

// NOTE: The options are an ordered list because mongo cares
// about the order of some options like replica tag order.

// Override URI options with config options.
for _, o := range c.Options {
for optName, optVal := range o {
switch optVal := optVal.(type) {
case int:
options = append(options, opt{key: toCamelCase(optName), value: strconv.Itoa(optVal)})
case float64:
options = append(options, opt{key: toCamelCase(optName), value: strconv.Itoa(int(optVal))})
case string:
options = append(options, opt{key: toCamelCase(optName), value: optVal})
}
}
}

// Construct a URI as recognized by mgo.Dial
firstOpt := true
var buf bytes.Buffer
buf.WriteString(adjustedURI)

for i := range options {
o := options[i]
if firstOpt {
buf.WriteRune('?')
firstOpt = false
} else {
buf.WriteRune('&')
}
buf.WriteString(o.key)
buf.WriteRune('=')
buf.WriteString(o.value)
}
return buf.String()
}

func toCamelCase(s string) string {
var buf bytes.Buffer
capitalize := false
for _, ch := range s {
if ch == '_' {
capitalize = true
continue
}
if capitalize {
capitalize = false
buf.WriteRune(unicode.ToUpper(ch))
continue
}
buf.WriteRune(ch)
}
return buf.String()
}
107 changes: 107 additions & 0 deletions mongoutil/uri_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package mongoutil_test

import (
"encoding/json"
"testing"

"github.com/mailgun/holster/v4/mongoutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"
)

func TestMongoConfig_URIWithOptions(t *testing.T) {
for _, tt := range []struct {
cfg mongoutil.Config
name string
uri string
}{{
name: "Bare minimum config",
cfg: mongoutil.Config{
URI: "mongodb://127.0.0.1:27017/foo",
},
uri: "mongodb://127.0.0.1:27017/foo",
}, {
name: "URI parameters appended after URI encoded parameters",
cfg: mongoutil.Config{
URI: "mongodb://127.0.0.1:27017/foo?replicaSet=bazz&blah=wow",
Options: []map[string]interface{}{
{"replica_set": "bar"},
{"max_pool_size": 5},
},
},
uri: "mongodb://127.0.0.1:27017/foo?replicaSet=bazz&blah=wow&replicaSet=bar&maxPoolSize=5",
}, {
name: "Read preference provided",
cfg: mongoutil.Config{
URI: "mongodb://127.0.0.1:27017/foo",
Options: []map[string]interface{}{
{"read_preference": "secondaryPreferred"},
},
},
uri: "mongodb://127.0.0.1:27017/foo?readPreference=secondaryPreferred",
}, {
name: "Servers and Database provided",
cfg: mongoutil.Config{
Servers: []string{
"mongodb-n01:27017",
"mongodb-n02:28017",
},
Database: "foo",
Options: []map[string]interface{}{
{"read_preference": "secondaryPreferred"},
},
},
uri: "mongodb://mongodb-n01:27017,mongodb-n02:28017/foo?readPreference=secondaryPreferred",
}} {
t.Run(tt.name, func(t *testing.T) {
uri := tt.cfg.URIWithOptions()
assert.Equal(t, tt.uri, uri)
})
}
}

func TestMongoURIFromJSON(t *testing.T) {
cfgJSON := []byte(`{
"uri": "mongodb://127.0.0.1:27017/foo",
"options": [
{"compressors": "snappy,zlib"},
{"replica_set": "v34_queue"},
{"read_preference": "secondaryPreferred"},
{"max_pool_size": 5}
]
}`)
var conf mongoutil.Config
// When
err := json.Unmarshal(cfgJSON, &conf)
// Then
require.NoError(t, err)
require.Equal(t,
"mongodb://127.0.0.1:27017/foo?compressors=snappy,zlib&replicaSet=v34_queue&"+
"readPreference=secondaryPreferred&maxPoolSize=5", conf.URIWithOptions())
}

func TestMongoURIFromYAML(t *testing.T) {
cfgYAML := []byte(`servers:
- mongo-routes-n01-us-east-1.postgun.com:27017
- mongo-routes-n02-us-east-1.postgun.com:27017
- mongo-routes-n03-us-east-1.postgun.com:27017
database: mg_prod
options:
- ssl: true
- tlsCertificateKeyFile: /etc/mailgun/ssl/mongo.pem
- tlsCAFile: /etc/mailgun/ssl/mongo-ca.crt
- replicaSet: routes
- readPreferenceTags: "dc:use1"
- readPreferenceTags: "dc:usw2"
`)
var conf mongoutil.Config
// When
err := yaml.Unmarshal(cfgYAML, &conf)
// Then
require.NoError(t, err)
require.Equal(t, "mongodb://mongo-routes-n01-us-east-1.postgun.com:27017,"+
"mongo-routes-n02-us-east-1.postgun.com:27017,mongo-routes-n03-us-east-1.postgun.com:27017/mg_prod?"+
"tlsCertificateKeyFile=/etc/mailgun/ssl/mongo.pem&tlsCAFile=/etc/mailgun/ssl/mongo-ca.crt&"+
"replicaSet=routes&readPreferenceTags=dc:use1&readPreferenceTags=dc:usw2", conf.URIWithOptions())
}

0 comments on commit 0635e8b

Please sign in to comment.