forked from cloudfoundry/auctioneer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service_client.go
110 lines (87 loc) · 2.53 KB
/
service_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package auctioneer
import (
"encoding/json"
"errors"
"time"
"github.com/cloudfoundry-incubator/consuladapter"
"github.com/cloudfoundry-incubator/locket"
"github.com/pivotal-golang/clock"
"github.com/pivotal-golang/lager"
"github.com/tedsuo/ifrit"
)
const LockSchemaKey = "auctioneer_lock"
func LockSchemaPath() string {
return locket.LockSchemaPath(LockSchemaKey)
}
type Presence struct {
AuctioneerID string `json:"auctioneer_id"`
AuctioneerAddress string `json:"auctioneer_address"`
}
func NewPresence(id, address string) Presence {
return Presence{
AuctioneerID: id,
AuctioneerAddress: address,
}
}
func (a Presence) Validate() error {
if a.AuctioneerID == "" {
return errors.New("auctioneer_id cannot be blank")
}
if a.AuctioneerAddress == "" {
return errors.New("auctioneer_address cannot be blank")
}
return nil
}
type ServiceClient interface {
NewAuctioneerLockRunner(logger lager.Logger, presence Presence, retryInterval, lockTTL time.Duration) (ifrit.Runner, error)
CurrentAuctioneer() (Presence, error)
CurrentAuctioneerAddress() (string, error)
}
type serviceClient struct {
consulClient consuladapter.Client
clock clock.Clock
}
func NewServiceClient(consulClient consuladapter.Client, clock clock.Clock) ServiceClient {
return serviceClient{
consulClient: consulClient,
clock: clock,
}
}
func (c serviceClient) NewAuctioneerLockRunner(logger lager.Logger, presence Presence, retryInterval, lockTTL time.Duration) (ifrit.Runner, error) {
if err := presence.Validate(); err != nil {
return nil, err
}
payload, err := json.Marshal(presence)
if err != nil {
return nil, err
}
return locket.NewLock(logger, c.consulClient, LockSchemaPath(), payload, c.clock, retryInterval, lockTTL), nil
}
func (c serviceClient) CurrentAuctioneer() (Presence, error) {
presence := Presence{}
value, err := c.getAcquiredValue(LockSchemaPath())
if err != nil {
return presence, err
}
if err := json.Unmarshal(value, &presence); err != nil {
return presence, err
}
if err := presence.Validate(); err != nil {
return presence, err
}
return presence, nil
}
func (c serviceClient) CurrentAuctioneerAddress() (string, error) {
presence, err := c.CurrentAuctioneer()
return presence.AuctioneerAddress, err
}
func (c serviceClient) getAcquiredValue(key string) ([]byte, error) {
kvPair, _, err := c.consulClient.KV().Get(key, nil)
if err != nil {
return nil, err
}
if kvPair == nil || kvPair.Session == "" {
return nil, consuladapter.NewKeyNotFoundError(key)
}
return kvPair.Value, nil
}