Skip to content

Commit 9eeb8a5

Browse files
committed
init node location updater:
- add updater package - expose GetState from registerer - fix longitude spelling
1 parent 54fb807 commit 9eeb8a5

File tree

5 files changed

+153
-39
lines changed

5 files changed

+153
-39
lines changed

cmds/modules/noded/main.go

+23-13
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,20 @@ import (
1111
"github.com/urfave/cli/v2"
1212

1313
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
14-
"github.com/threefoldtech/zos4/pkg/app"
15-
"github.com/threefoldtech/zos4/pkg/capacity"
16-
"github.com/threefoldtech/zos4/pkg/environment"
17-
"github.com/threefoldtech/zos4/pkg/events"
18-
"github.com/threefoldtech/zos4/pkg/monitord"
19-
"github.com/threefoldtech/zos4/pkg/perf"
20-
"github.com/threefoldtech/zos4/pkg/perf/cpubench"
21-
"github.com/threefoldtech/zos4/pkg/perf/healthcheck"
22-
"github.com/threefoldtech/zos4/pkg/perf/iperf"
23-
"github.com/threefoldtech/zos4/pkg/perf/publicip"
24-
"github.com/threefoldtech/zos4/pkg/registrar"
25-
"github.com/threefoldtech/zos4/pkg/stubs"
26-
"github.com/threefoldtech/zos4/pkg/utils"
14+
"github.com/threefoldtech/zos/pkg/app"
15+
"github.com/threefoldtech/zos/pkg/capacity"
16+
"github.com/threefoldtech/zos/pkg/environment"
17+
"github.com/threefoldtech/zos/pkg/events"
18+
"github.com/threefoldtech/zos/pkg/monitord"
19+
"github.com/threefoldtech/zos/pkg/perf"
20+
"github.com/threefoldtech/zos/pkg/perf/cpubench"
21+
"github.com/threefoldtech/zos/pkg/perf/healthcheck"
22+
"github.com/threefoldtech/zos/pkg/perf/iperf"
23+
"github.com/threefoldtech/zos/pkg/perf/publicip"
24+
"github.com/threefoldtech/zos/pkg/registrar"
25+
"github.com/threefoldtech/zos/pkg/stubs"
26+
"github.com/threefoldtech/zos/pkg/updater"
27+
"github.com/threefoldtech/zos/pkg/utils"
2728

2829
"github.com/rs/zerolog/log"
2930

@@ -156,6 +157,15 @@ func action(cli *cli.Context) error {
156157
WithVirtualized(len(hypervisor) != 0)
157158

158159
go registerationServer(ctx, msgBrokerCon, env, info)
160+
161+
log.Info().Msg("start node updater")
162+
nodeUpdater := updater.NewUpdater(redis)
163+
if err != nil {
164+
return errors.Wrap(err, "failed to create new node updater")
165+
}
166+
167+
go nodeUpdater.Start(ctx)
168+
159169
log.Info().Msg("start perf scheduler")
160170

161171
perfMon, err := perf.NewPerformanceMonitor(msgBrokerCon)

pkg/registrar.go

+10
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,17 @@ package pkg
44

55
//go:generate zbusc -module registrar -version 0.0.1 -name registrar -package stubs github.com/threefoldtech/zos4/pkg+Registrar stubs/registrar_stub.go
66

7+
type RegistrationState string
8+
9+
type State struct {
10+
NodeID uint32
11+
TwinID uint32
12+
State RegistrationState
13+
Msg string
14+
}
15+
716
type Registrar interface {
817
NodeID() (uint32, error)
918
TwinID() (uint32, error)
19+
GetState() State
1020
}

pkg/registrar/registrar.go

+19-26
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,18 @@ import (
1010
"github.com/pkg/errors"
1111
"github.com/rs/zerolog/log"
1212
"github.com/threefoldtech/zbus"
13-
"github.com/threefoldtech/zos4/pkg/app"
14-
"github.com/threefoldtech/zos4/pkg/environment"
15-
"github.com/threefoldtech/zos4/pkg/stubs"
13+
"github.com/threefoldtech/zos/pkg"
14+
"github.com/threefoldtech/zos/pkg/app"
15+
"github.com/threefoldtech/zos/pkg/environment"
16+
"github.com/threefoldtech/zos/pkg/stubs"
1617
)
1718

1819
// should any of this be moved to pkg?
19-
type RegistrationState string
2020

2121
const (
22-
Failed RegistrationState = "Failed"
23-
InProgress RegistrationState = "InProgress"
24-
Done RegistrationState = "Done"
22+
Failed pkg.RegistrationState = "Failed"
23+
InProgress pkg.RegistrationState = "InProgress"
24+
Done pkg.RegistrationState = "Done"
2525

2626
monitorAccountEvery = 30 * time.Minute
2727
updateNodeInfoInterval = 24 * time.Hour
@@ -32,33 +32,26 @@ var (
3232
ErrFailed = errors.New("registration failed")
3333
)
3434

35-
type State struct {
36-
NodeID uint32
37-
TwinID uint32
38-
State RegistrationState
39-
Msg string
40-
}
41-
42-
func FailedState(err error) State {
43-
return State{
35+
func FailedState(err error) pkg.State {
36+
return pkg.State{
4437
0,
4538
0,
4639
Failed,
4740
err.Error(),
4841
}
4942
}
5043

51-
func InProgressState() State {
52-
return State{
44+
func InProgressState() pkg.State {
45+
return pkg.State{
5346
0,
5447
0,
5548
InProgress,
5649
"",
5750
}
5851
}
5952

60-
func DoneState(nodeID uint32, twinID uint32) State {
61-
return State{
53+
func DoneState(nodeID uint32, twinID uint32) pkg.State {
54+
return pkg.State{
6255
nodeID,
6356
twinID,
6457
Done,
@@ -67,13 +60,13 @@ func DoneState(nodeID uint32, twinID uint32) State {
6760
}
6861

6962
type Registrar struct {
70-
state State
63+
state pkg.State
7164
mutex sync.RWMutex
7265
}
7366

7467
func NewRegistrar(ctx context.Context, cl zbus.Client, env environment.Environment, info RegistrationInfo) *Registrar {
7568
r := Registrar{
76-
State{
69+
pkg.State{
7770
0,
7871
0,
7972
InProgress,
@@ -86,13 +79,13 @@ func NewRegistrar(ctx context.Context, cl zbus.Client, env environment.Environme
8679
return &r
8780
}
8881

89-
func (r *Registrar) setState(s State) {
82+
func (r *Registrar) setState(s pkg.State) {
9083
r.mutex.Lock()
9184
defer r.mutex.Unlock()
9285
r.state = s
9386
}
9487

95-
func (r *Registrar) getState() State {
88+
func (r *Registrar) GetState() pkg.State {
9689
r.mutex.RLock()
9790
defer r.mutex.RUnlock()
9891
return r.state
@@ -167,11 +160,11 @@ func (r *Registrar) reActivate(ctx context.Context, cl zbus.Client, env environm
167160
}
168161

169162
func (r *Registrar) NodeID() (uint32, error) {
170-
return r.returnIfDone(r.getState().NodeID)
163+
return r.returnIfDone(r.GetState().NodeID)
171164
}
172165

173166
func (r *Registrar) TwinID() (uint32, error) {
174-
return r.returnIfDone(r.getState().TwinID)
167+
return r.returnIfDone(r.GetState().TwinID)
175168
}
176169

177170
func (r *Registrar) returnIfDone(v uint32) (uint32, error) {

pkg/stubs/registrar_stub.go

+17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package stubs
77
import (
88
"context"
99
zbus "github.com/threefoldtech/zbus"
10+
pkg "github.com/threefoldtech/zos/pkg"
1011
)
1112

1213
type RegistrarStub struct {
@@ -26,6 +27,22 @@ func NewRegistrarStub(client zbus.Client) *RegistrarStub {
2627
}
2728
}
2829

30+
func (s *RegistrarStub) GetState(ctx context.Context) (ret0 pkg.State) {
31+
args := []interface{}{}
32+
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetState", args...)
33+
if err != nil {
34+
panic(err)
35+
}
36+
result.PanicOnError()
37+
loader := zbus.Loader{
38+
&ret0,
39+
}
40+
if err := result.Unmarshal(&loader); err != nil {
41+
panic(err)
42+
}
43+
return
44+
}
45+
2946
func (s *RegistrarStub) NodeID(ctx context.Context) (ret0 uint32, ret1 error) {
3047
args := []interface{}{}
3148
result, err := s.client.RequestContext(ctx, s.module, s.object, "NodeID", args...)

pkg/updater/updater.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package updater
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
"time"
8+
9+
"github.com/rs/zerolog/log"
10+
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
11+
"github.com/threefoldtech/zbus"
12+
13+
"github.com/threefoldtech/zos/pkg/geoip"
14+
"github.com/threefoldtech/zos/pkg/registrar"
15+
"github.com/threefoldtech/zos/pkg/stubs"
16+
)
17+
18+
const (
19+
updateInterval = 24 * time.Hour
20+
)
21+
22+
type Updater struct {
23+
substrateGateway *stubs.SubstrateGatewayStub
24+
registrar *stubs.RegistrarStub
25+
}
26+
27+
func NewUpdater(bus zbus.Client) *Updater {
28+
return &Updater{
29+
substrateGateway: stubs.NewSubstrateGatewayStub(bus),
30+
registrar: stubs.NewRegistrarStub(bus),
31+
}
32+
}
33+
34+
func (u *Updater) Start(ctx context.Context) {
35+
for {
36+
if u.registrar.GetState(ctx).State == registrar.Done {
37+
if err := u.updateLocation(); err != nil {
38+
log.Error().Err(err).Msg("updating location failed")
39+
}
40+
log.Info().Msg("node location updated")
41+
}
42+
43+
select {
44+
case <-ctx.Done():
45+
log.Info().Msg("stop node updater. context cancelled")
46+
return
47+
case <-time.After(updateInterval):
48+
continue
49+
}
50+
}
51+
}
52+
53+
func (u *Updater) updateLocation() error {
54+
nodeId, err := u.registrar.NodeID(context.Background())
55+
if err != nil {
56+
return fmt.Errorf("failed to get node id: %w", err)
57+
}
58+
59+
node, err := u.substrateGateway.GetNode(context.Background(), nodeId)
60+
if err != nil {
61+
return fmt.Errorf("failed to get node from chain: %w", err)
62+
}
63+
64+
loc, err := geoip.Fetch()
65+
if err != nil {
66+
return fmt.Errorf("failed to fetch location info: %w", err)
67+
}
68+
69+
newLoc := substrate.Location{
70+
City: loc.City,
71+
Country: loc.Country,
72+
Latitude: fmt.Sprintf("%f", loc.Latitude),
73+
Longitude: fmt.Sprintf("%f", loc.Longitude),
74+
}
75+
76+
if !reflect.DeepEqual(newLoc, node.Location) {
77+
node.Location = newLoc
78+
if _, err := u.substrateGateway.UpdateNode(context.Background(), node); err != nil {
79+
return fmt.Errorf("failed to update node on chain: %w", err)
80+
}
81+
}
82+
83+
return nil
84+
}

0 commit comments

Comments
 (0)