Skip to content

Commit

Permalink
Merge pull request #7 from xeptore/xeptore/5-Add-usage-data-poll-time…
Browse files Browse the repository at this point in the history
…stamp-to-ingested-records

Add usage data poll timestamp to ingested records
  • Loading branch information
xeptore authored Mar 23, 2023
2 parents 70f35bf + b465f48 commit 76dd6e9
Show file tree
Hide file tree
Showing 7 changed files with 300 additions and 248 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ build-clean: clean build
test:
go test -trimpath -buildvcs=false -ldflags '-extldflags "-static" -s -w -buildid=' -race -failfast -vet=all -v ./...
.PHONY: test

gen:
$(MAKE) -C ./ingest
.PHONY: gen
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.18 h1:DOKFKCQ7FNG2L1rbrmstDN4QVRdS89Nkh85u68Uwp98=
github.com/mattn/go-isatty v0.0.18/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mdlayher/genetlink v1.3.1 h1:roBiPnual+eqtRkKX2Jb8UQN5ZPWnhDCGj/wR6Jlz2w=
Expand Down
3 changes: 3 additions & 0 deletions ingest/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
gen:
mockgen -source ingest.go -destination mocks/ingest.go -package mocks
.PHONY: gen
11 changes: 6 additions & 5 deletions ingest/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,11 @@ func (m *storeMongo) LoadBeforeRestartUsage(ctx context.Context) (map[string]ing
return out, nil
}

func (m *storeMongo) IngestUsage(ctx context.Context, peersUsage []ingest.PeerUsage) error {
func (m *storeMongo) IngestUsage(ctx context.Context, peersUsage []ingest.PeerUsage, gatheredAt time.Time) error {
models := funcutils.Map(peersUsage, func(p ingest.PeerUsage) mongo.WriteModel {
return mongo.NewUpdateOneModel().
SetFilter(bson.M{"publicKey": p.PublicKey}).
SetUpdate(bson.M{"$push": bson.M{"usage": bson.M{"upload": p.Upload, "download": p.Download}}}).
SetUpdate(bson.M{"$push": bson.M{"usage": bson.M{"upload": p.Upload, "download": p.Download, "at": gatheredAt.UnixMilli()}}}).
SetUpsert(true)
})
opts := options.BulkWrite().SetOrdered(false).SetBypassDocumentValidation(true)
Expand All @@ -186,10 +186,11 @@ type wgPeers struct {
ctrl *wgctrl.Client
}

func (wg *wgPeers) Usage(ctx context.Context) ([]ingest.PeerUsage, error) {
func (wg *wgPeers) Usage(ctx context.Context) ([]ingest.PeerUsage, time.Time, error) {
dev, err := wg.ctrl.Device(wgDeviceName)
gatheredAt := time.Now()
if nil != err {
return nil, err
return nil, gatheredAt, err
}

out := funcutils.Map(dev.Peers, func(p wgtypes.Peer) ingest.PeerUsage {
Expand All @@ -200,7 +201,7 @@ func (wg *wgPeers) Usage(ctx context.Context) ([]ingest.PeerUsage, error) {
}
})

return out, nil
return out, gatheredAt, nil
}

type restartMarkFileReadRemover struct{}
Expand Down
9 changes: 5 additions & 4 deletions ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"time"

"github.com/rs/zerolog"
)
Expand All @@ -17,11 +18,11 @@ type PeerUsage struct {

type Store interface {
LoadBeforeRestartUsage(ctx context.Context) (map[string]PeerUsage, error)
IngestUsage(ctx context.Context, peersUsage []PeerUsage) error
IngestUsage(ctx context.Context, peersUsage []PeerUsage, gatheredAt time.Time) error
}

type WgPeers interface {
Usage(ctx context.Context) ([]PeerUsage, error)
Usage(ctx context.Context) (peersUsage []PeerUsage, gatheredAt time.Time, err error)
}

type RestartMarkFileReadRemover interface {
Expand Down Expand Up @@ -57,7 +58,7 @@ func (e *Engine) Run(ctx context.Context, tick <-chan struct{}, restartMarkFileN
case <-ctx.Done():
return ctx.Err()
default:
peersUsage, err := e.wgPeers.Usage(ctx)
peersUsage, gatheredAt, err := e.wgPeers.Usage(ctx)
if nil != err {
e.logger.Error().Err(err).Msg("failed to get wireguard peers usage data")
continue
Expand Down Expand Up @@ -86,7 +87,7 @@ func (e *Engine) Run(ctx context.Context, tick <-chan struct{}, restartMarkFileN
}

if len(peersUsage) > 0 {
if err := e.store.IngestUsage(ctx, peersUsage); nil != err {
if err := e.store.IngestUsage(ctx, peersUsage, gatheredAt); nil != err {
e.logger.Error().Err(err).Msg("failed to ingest peers usage data")
continue
}
Expand Down
Loading

0 comments on commit 76dd6e9

Please sign in to comment.