forked from edgexfoundry/device-sdk-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice.go
288 lines (244 loc) · 8.69 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
// -*- Mode: Go; indent-tabs-mode: t -*-
//
// Copyright (C) 2017-2018 Canonical Ltd
// Copyright (C) 2018 IOTech Ltd
// Copyright (c) 2019 Intel Corporation
//
// SPDX-License-Identifier: Apache-2.0
// This package provides a basic EdgeX Foundry device service implementation
// meant to be embedded in an application, similar in approach to the builtin
// net/http package.
package device
import (
"context"
"fmt"
"net/http"
"os"
"strconv"
"time"
"github.com/edgexfoundry/device-sdk-go/internal/cache"
"github.com/edgexfoundry/device-sdk-go/internal/clients"
"github.com/edgexfoundry/device-sdk-go/internal/common"
configLoader "github.com/edgexfoundry/device-sdk-go/internal/config"
"github.com/edgexfoundry/device-sdk-go/internal/controller"
"github.com/edgexfoundry/device-sdk-go/internal/provision"
ds_models "github.com/edgexfoundry/device-sdk-go/pkg/models"
"github.com/edgexfoundry/go-mod-core-contracts/clients/types"
"github.com/edgexfoundry/go-mod-core-contracts/models"
"github.com/google/uuid"
)
var (
svc *Service
)
// A Service listens for requests and routes them to the right command
type Service struct {
svcInfo *common.ServiceInfo
discovery ds_models.ProtocolDiscovery
initAttempts int
initialized bool
stopped bool
cw *Watchers
asyncCh chan *ds_models.AsyncValues
startTime time.Time
}
func (s *Service) Name() string {
return common.ServiceName
}
func (s *Service) Version() string {
return common.ServiceVersion
}
func (s *Service) Discovery() ds_models.ProtocolDiscovery {
return s.discovery
}
func (s *Service) AsyncReadings() bool {
return common.CurrentConfig.Service.EnableAsyncReadings
}
// Start the device service.
func (s *Service) Start(errChan chan error) (err error) {
err = clients.InitDependencyClients()
if err != nil {
return err
}
// If useRegistry selected then configLoader.RegistryClient will not be nil
if configLoader.RegistryClient != nil {
// Logging has now been initialized so can start listening for configuration changes.
go configLoader.ListenForConfigChanges()
}
err = selfRegister()
if err != nil {
return fmt.Errorf("Couldn't register to metadata service")
}
// initialize devices, objects & profiles
cache.InitCache()
err = provision.LoadProfiles(common.CurrentConfig.Device.ProfilesDir)
if err != nil {
return fmt.Errorf("Failed to create the pre-defined Device Profiles")
}
err = provision.LoadDevices(common.CurrentConfig.DeviceList)
if err != nil {
return fmt.Errorf("Failed to create the pre-defined Devices")
}
s.cw = newWatchers()
// initialize driver
if common.CurrentConfig.Service.EnableAsyncReadings {
s.asyncCh = make(chan *ds_models.AsyncValues, common.CurrentConfig.Service.AsyncBufferSize)
go processAsyncResults()
}
err = common.Driver.Initialize(common.LoggingClient, s.asyncCh)
if err != nil {
return fmt.Errorf("Driver.Initialize failure: %v", err)
}
// Setup REST API
r := controller.InitRestRoutes()
//scheduler.StartScheduler()
http.TimeoutHandler(nil, time.Millisecond*time.Duration(s.svcInfo.Timeout), "Request timed out")
// TODO: call ListenAndServe in a goroutine
common.LoggingClient.Info(fmt.Sprintf("*Service Start() called, name=%s, version=%s", common.ServiceName, common.ServiceVersion))
go func() {
errChan <- http.ListenAndServe(common.Colon+strconv.Itoa(s.svcInfo.Port), r)
}()
common.LoggingClient.Info("Listening on port: " + strconv.Itoa(common.CurrentConfig.Service.Port))
common.LoggingClient.Info("Service started in: " + time.Since(s.startTime).String())
common.LoggingClient.Debug("*Service Start() exit")
return err
}
func selfRegister() error {
common.LoggingClient.Debug("Trying to find Device Service: " + common.ServiceName)
ctx := context.WithValue(context.Background(), common.CorrelationHeader, uuid.New().String())
ds, err := common.DeviceServiceClient.DeviceServiceForName(common.ServiceName, ctx)
if err != nil {
if errsc, ok := err.(*types.ErrServiceClient); ok && (errsc.StatusCode == http.StatusNotFound) {
common.LoggingClient.Info(fmt.Sprintf("Device Service %s doesn't exist, creating a new one", ds.Name))
ds, err = createNewDeviceService()
} else {
common.LoggingClient.Error(fmt.Sprintf("DeviceServicForName failed: %v", err))
return err
}
} else {
common.LoggingClient.Info(fmt.Sprintf("Device Service %s exists", ds.Name))
}
common.LoggingClient.Debug(fmt.Sprintf("Device Service in Core MetaData: %v", ds))
common.CurrentDeviceService = ds
svc.initialized = true
return nil
}
func createNewDeviceService() (models.DeviceService, error) {
addr, err := makeNewAddressable()
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("makeNewAddressable failed: %v", err))
return models.DeviceService{}, err
}
millis := time.Now().UnixNano() / int64(time.Millisecond)
ds := models.DeviceService{
Service: models.Service{
Name: common.ServiceName,
Labels: svc.svcInfo.Labels,
OperatingState: "ENABLED",
Addressable: *addr,
},
AdminState: "UNLOCKED",
}
ds.Service.Origin = millis
ctx := context.WithValue(context.Background(), common.CorrelationHeader, uuid.New().String())
id, err := common.DeviceServiceClient.Add(&ds, ctx)
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("Add Deviceservice: %s; failed: %v", common.ServiceName, err))
return models.DeviceService{}, err
}
if err = common.VerifyIdFormat(id, "Device Service"); err != nil {
return models.DeviceService{}, err
}
// NOTE - this differs from Addressable and Device objects,
// neither of which require the '.Service'prefix
ds.Service.Id = id
common.LoggingClient.Debug("New deviceservice Id: " + ds.Service.Id)
return ds, nil
}
func makeNewAddressable() (*models.Addressable, error) {
// check whether there has been an existing addressable
ctx := context.WithValue(context.Background(), common.CorrelationHeader, uuid.New().String())
addr, err := common.AddressableClient.AddressableForName(common.ServiceName, ctx)
if err != nil {
if errsc, ok := err.(*types.ErrServiceClient); ok && (errsc.StatusCode == http.StatusNotFound) {
common.LoggingClient.Info(fmt.Sprintf("Addressable %s doesn't exist, creating a new one", common.ServiceName))
millis := time.Now().UnixNano() / int64(time.Millisecond)
addr = models.Addressable{
BaseObject: models.BaseObject{
Origin: millis,
},
Name: common.ServiceName,
HTTPMethod: http.MethodPost,
Protocol: common.HttpProto,
Address: svc.svcInfo.Host,
Port: svc.svcInfo.Port,
Path: common.APICallbackRoute,
}
id, err := common.AddressableClient.Add(&addr, ctx)
if err != nil {
common.LoggingClient.Error(fmt.Sprintf("Add addressable failed %v, error: %v", addr, err))
return nil, err
}
if err = common.VerifyIdFormat(id, "Addressable"); err != nil {
return nil, err
}
addr.Id = id
} else {
common.LoggingClient.Error(fmt.Sprintf("AddressableForName failed: %v", err))
return nil, err
}
} else {
common.LoggingClient.Info(fmt.Sprintf("Addressable %s exists", common.ServiceName))
}
return &addr, nil
}
// Stop shuts down the Service
func (s *Service) Stop(force bool) error {
s.stopped = true
common.Driver.Stop(force)
//scheduler.StopScheduler()
return nil
}
// NewService create a new device service instance with the given
// version number, config profile, config directory, whether to use registry, and Driver, which cannot be nil.
// Note - this function is a singleton, if called more than once,
// it will always return an error.
func NewService(serviceName string, serviceVersion string, confProfile string, confDir string, useRegistry bool, proto ds_models.ProtocolDriver) (*Service, error) {
startTime := time.Now()
if svc != nil {
err := fmt.Errorf("NewService: service already exists!\n")
return nil, err
}
if len(serviceName) == 0 {
err := fmt.Errorf("NewService: empty name specified\n")
return nil, err
}
common.ServiceName = serviceName
config, err := configLoader.LoadConfig(useRegistry, confProfile, confDir)
if err != nil {
fmt.Fprintf(os.Stderr, "error loading config file: %v\n", err)
os.Exit(1)
}
common.CurrentConfig = config
if len(serviceVersion) == 0 {
err := fmt.Errorf("NewService: empty version number specified\n")
return nil, err
}
common.ServiceVersion = serviceVersion
if proto == nil {
err := fmt.Errorf("NewService: no Driver specified\n")
return nil, err
}
svc = &Service{}
svc.startTime = startTime
svc.svcInfo = &config.Service
common.Driver = proto
return svc, nil
}
// RunningService returns the Service instance which is running
func RunningService() *Service {
return svc
}
// DriverConfigs retrieves the driver specific configuration
func DriverConfigs() map[string]string {
return common.CurrentConfig.Driver
}