Skip to content

Commit

Permalink
Provider checker mode
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Litvinov <[email protected]>
  • Loading branch information
Zensey committed May 14, 2024
1 parent c97221a commit 83a6cde
Show file tree
Hide file tree
Showing 16 changed files with 365 additions and 5 deletions.
1 change: 1 addition & 0 deletions cmd/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (di *Dependencies) bootstrapTequilapi(nodeOptions node.Options, listener ne
tequilapi_endpoints.AddRoutesForAuthentication(di.Authenticator, di.JWTAuthenticator, di.SSOMystnodes),
tequilapi_endpoints.AddRoutesForIdentities(di.IdentityManager, di.IdentitySelector, di.IdentityRegistry, di.ConsumerBalanceTracker, di.AddressProvider, di.HermesChannelRepository, di.BCHelper, di.Transactor, di.BeneficiaryProvider, di.IdentityMover, di.BeneficiaryAddressStorage, di.HermesMigrator),
tequilapi_endpoints.AddRoutesForConnection(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.AddressProvider),
tequilapi_endpoints.AddRoutesForConnectionDiag(di.MultiConnectionManager, di.StateKeeper, di.ProposalRepository, di.IdentityRegistry, di.EventBus, di.EventBus, di.AddressProvider, di.IdentitySelector, nodeOptions),
tequilapi_endpoints.AddRoutesForSessions(di.SessionStorage),
tequilapi_endpoints.AddRoutesForConnectionLocation(di.IPResolver, di.LocationResolver, di.LocationResolver),
tequilapi_endpoints.AddRoutesForProposals(di.ProposalRepository, di.PricingHelper, di.LocationResolver, di.FilterPresetStorage, di.NATProber),
Expand Down
16 changes: 13 additions & 3 deletions cmd/di.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ type Dependencies struct {
NodeStatusTracker *monitoring.StatusTracker
NodeStatsTracker *node.StatsTracker
uiVersionConfig versionmanager.NodeUIVersionConfig

provPinger *connection.ProviderChecker
}

// Bootstrap initiates all container dependencies
Expand Down Expand Up @@ -287,7 +289,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
return err
}

if err := di.bootstrapQualityComponents(nodeOptions.Quality); err != nil {
if err := di.bootstrapQualityComponents(nodeOptions.Quality, nodeOptions); err != nil {
return err
}

Expand All @@ -299,6 +301,7 @@ func (di *Dependencies) Bootstrap(nodeOptions node.Options) error {
if err = di.handleConnStateChange(); err != nil {
return err
}

if err := di.Node.Start(); err != nil {
return err
}
Expand Down Expand Up @@ -581,6 +584,8 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
di.bootstrapBeneficiarySaver(nodeOptions)

di.ConnectionRegistry = connection.NewRegistry()

log.Error().Msg("DI > > > > > > > > > > > > > > > > > > > > > > >")
di.MultiConnectionManager = connection.NewMultiConnectionManager(func() connection.Manager {
return connection.NewManager(
pingpong.ExchangeFactoryFunc(
Expand All @@ -604,6 +609,7 @@ func (di *Dependencies) bootstrapNodeComponents(nodeOptions node.Options, tequil
di.P2PDialer,
di.allowTrustedDomainBypassTunnel,
di.disallowTrustedDomainBypassTunnel,
di.provPinger,
)
})

Expand Down Expand Up @@ -883,7 +889,7 @@ func (di *Dependencies) bootstrapIdentityComponents(options node.Options) error
return nil
}

func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality) (err error) {
func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality, nodeOptions node.Options) (err error) {
if err := di.AllowURLAccess(options.Address); err != nil {
return err
}
Expand Down Expand Up @@ -924,6 +930,10 @@ func (di *Dependencies) bootstrapQualityComponents(options node.OptionsQuality)
return err
}

if nodeOptions.ProvChecker {
di.provPinger = connection.NewProviderChecker(di.EventBus)
}

return nil
}

Expand Down Expand Up @@ -1065,7 +1075,7 @@ func (di *Dependencies) handleConnStateChange() error {

latestState := connectionstate.NotConnected
return di.EventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, func(e connectionstate.AppEventConnectionState) {
if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) {
if config.GetBool(config.FlagProxyMode) || config.GetBool(config.FlagDVPNMode) || config.GetBool(config.FlagProvCheckerMode) {
return // Proxy mode doesn't establish system wide tunnels, no reconnect required.
}

Expand Down
10 changes: 10 additions & 0 deletions config/flags_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,13 @@ var (
Value: false,
}

// FlagProvCheckerMode allows running node under current user as a provider checker agent.
FlagProvCheckerMode = cli.BoolFlag{
Name: "provchecker",
Usage: "",
Value: false,
}

// FlagUserspace allows running a node without privileged permissions.
FlagUserspace = cli.BoolFlag{
Name: "userspace",
Expand Down Expand Up @@ -349,6 +356,7 @@ func RegisterFlagsNode(flags *[]cli.Flag) error {
&FlagUserMode,
&FlagDVPNMode,
&FlagProxyMode,
&FlagProvCheckerMode,
&FlagUserspace,
&FlagVendorID,
&FlagLauncherVersion,
Expand Down Expand Up @@ -411,6 +419,8 @@ func ParseFlagsNode(ctx *cli.Context) {
Current.ParseBoolFlag(ctx, FlagUserMode)
Current.ParseBoolFlag(ctx, FlagDVPNMode)
Current.ParseBoolFlag(ctx, FlagProxyMode)
Current.ParseBoolFlag(ctx, FlagProvCheckerMode)

Current.ParseBoolFlag(ctx, FlagUserspace)
Current.ParseStringFlag(ctx, FlagVendorID)
Current.ParseStringFlag(ctx, FlagLauncherVersion)
Expand Down
4 changes: 4 additions & 0 deletions core/connection/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Connection interface {
Statistics() (connectionstate.Statistics, error)
}

type ConnectionDiag interface {
Diag() bool
}

// StateChannel is the channel we receive state change events on
type StateChannel chan connectionstate.State

Expand Down
10 changes: 10 additions & 0 deletions core/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ type connectionManager struct {
statsTracker statsTracker

uuid string

provChecker *ProviderChecker
}

// NewManager creates connection manager with given dependencies
Expand All @@ -184,6 +186,7 @@ func NewManager(
validator validator,
p2pDialer p2p.Dialer,
preReconnect, postReconnect func(),
provChecker *ProviderChecker,
) *connectionManager {
uuid, err := uuid.NewV4()
if err != nil {
Expand All @@ -207,6 +210,7 @@ func NewManager(
preReconnect: preReconnect,
postReconnect: postReconnect,
uuid: uuid.String(),
provChecker: provChecker,
}

m.eventBus.SubscribeAsync(connectionstate.AppTopicConnectionState, m.reconnectOnHold)
Expand Down Expand Up @@ -301,6 +305,10 @@ func (m *connectionManager) Connect(consumerID identity.Identity, hermesID commo
return nil
})

if m.provChecker != nil {
go m.provChecker.Diag(m, proposal.ProviderID)
}

go m.consumeConnectionStates(m.activeConnection.State())
go m.checkSessionIP(m.channel, m.connectOptions.ConsumerID, m.connectOptions.SessionID, originalPublicIP)

Expand Down Expand Up @@ -801,6 +809,8 @@ func (m *connectionManager) Cancel() {
}

func (m *connectionManager) Disconnect() error {
log.Trace().Msg("connectionManager) Disconnect >>>>>>>>>>>>>>>>>")

if m.Status().State == connectionstate.NotConnected {
return ErrNoConnection
}
Expand Down
1 change: 1 addition & 0 deletions core/connection/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func (tc *testContext) SetupTest() {
&mockValidator{},
tc.mockP2P,
func() {}, func() {},
tc.connManager.provChecker,
)
tc.connManager.timeGetter = func() time.Time {
return tc.mockTime
Expand Down
34 changes: 34 additions & 0 deletions core/connection/pinger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package connection

import (
"time"

"github.com/mysteriumnetwork/node/core/quality"
"github.com/mysteriumnetwork/node/eventbus"
"github.com/rs/zerolog/log"
)

type ProviderChecker struct {
bus eventbus.Publisher
}

func NewProviderChecker(bus eventbus.Publisher) *ProviderChecker {
return &ProviderChecker{
bus: bus,
}
}

func (p *ProviderChecker) Diag(cm *connectionManager, providerID string) {
c, ok := cm.activeConnection.(ConnectionDiag)
res := false
if ok {
log.Debug().Msgf("Check provider> %v", providerID)

time.Sleep(1 * time.Second)
res = c.Diag()
time.Sleep(1 * time.Second)
cm.Disconnect()
}
ev := quality.DiagEvent{ProviderID: providerID, Result: res}
p.bus.Publish(quality.AppTopicConnectionDiagRes, ev)
}
6 changes: 4 additions & 2 deletions core/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type Options struct {

Payments OptionsPayments

Consumer bool
Mobile bool
Consumer bool
Mobile bool
ProvChecker bool

SwarmDialerDNSHeadstart time.Duration
PilvytisAddress string
Expand Down Expand Up @@ -205,6 +206,7 @@ func GetOptions() *Options {
SSE: OptionsSSE{
Enabled: config.GetBool(config.FlagSSEEnable),
},
ProvChecker: config.GetBool(config.FlagProvCheckerMode),
}
}

Expand Down
7 changes: 7 additions & 0 deletions core/quality/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ type PingEvent struct {
Duration time.Duration `json:"duration"`
}

type DiagEvent struct {
ProviderID string
Result bool
}

const (
// AppTopicConnectionEvents represents event bus topic for the connection events.
AppTopicConnectionEvents = "connection_events"
Expand All @@ -111,4 +116,6 @@ const (

// AppTopicProviderPingP2P represents event bus topic for provider p2p pings to consumer.
AppTopicProviderPingP2P = "provider_ping_p2p"

AppTopicConnectionDiagRes = "connection_diag"
)
6 changes: 6 additions & 0 deletions services/wireguard/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (c *Connection) State() <-chan connectionstate.State {
return c.stateCh
}

func (c *Connection) Diag() bool {
return c.connectionEndpoint.Diag()
}

// Statistics returns connection statistics channel.
func (c *Connection) Statistics() (connectionstate.Statistics, error) {
stats, err := c.connectionEndpoint.PeerStats()
Expand All @@ -110,6 +114,8 @@ func (c *Connection) Reconnect(ctx context.Context, options connection.ConnectOp
}

func (c *Connection) start(ctx context.Context, start startConn, options connection.ConnectOptions) (err error) {
log.Info().Msg("+++++++++++++++++++++++++++++++++++++++++++++++++++++ *Connection) start")

var config wg.ServiceConfig
if err = json.Unmarshal(options.SessionConfig, &config); err != nil {
return errors.Wrap(err, "failed to unmarshal connection config")
Expand Down
3 changes: 3 additions & 0 deletions services/wireguard/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func (mce *mockConnectionEndpoint) ConfigureRoutes(_ net.IP) error { retur
func (mce *mockConnectionEndpoint) PeerStats() (wgcfg.Stats, error) {
return wgcfg.Stats{LastHandshake: time.Now(), BytesSent: 10, BytesReceived: 11}, nil
}
func (mce *mockConnectionEndpoint) Diag() bool {
return true
}

type mockHandshakeWaiter struct {
err error
Expand Down
1 change: 1 addition & 0 deletions services/wireguard/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,5 @@ type ConnectionEndpoint interface {
Config() (ServiceConfig, error)
InterfaceName() string
Stop() error
Diag() bool
}
10 changes: 10 additions & 0 deletions services/wireguard/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ type connectionEndpoint struct {
wgClient WgClient
}

func (ce *connectionEndpoint) Diag() bool {
c, ok := ce.wgClient.(WgClientDiag)
if ok {
return c.Diag()
}
return false
}

// StartConsumerMode starts and configure wireguard network interface running in consumer mode.
func (ce *connectionEndpoint) StartConsumerMode(cfg wgcfg.DeviceConfig) error {
if err := ce.cleanAbandonedInterfaces(); err != nil {
Expand Down Expand Up @@ -80,6 +88,8 @@ func (ce *connectionEndpoint) StartConsumerMode(cfg wgcfg.DeviceConfig) error {
}
return errors.Wrap(err, "could not configure device")
}

// ce.wgClient.Diag()
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions services/wireguard/endpoint/wg_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/rs/zerolog/log"

"github.com/mysteriumnetwork/node/config"
"github.com/mysteriumnetwork/node/services/wireguard/endpoint/diagclient"

Check failure on line 27 in services/wireguard/endpoint/wg_client.go

View workflow job for this annotation

GitHub Actions / e2e-nat

no required module provides package github.com/mysteriumnetwork/node/services/wireguard/endpoint/diagclient; to add it:

Check failure on line 27 in services/wireguard/endpoint/wg_client.go

View workflow job for this annotation

GitHub Actions / e2e-nat

no required module provides package github.com/mysteriumnetwork/node/services/wireguard/endpoint/diagclient; to add it:

Check failure on line 27 in services/wireguard/endpoint/wg_client.go

View workflow job for this annotation

GitHub Actions / e2e-basic

no required module provides package github.com/mysteriumnetwork/node/services/wireguard/endpoint/diagclient; to add it:
"github.com/mysteriumnetwork/node/services/wireguard/endpoint/dvpnclient"
"github.com/mysteriumnetwork/node/services/wireguard/endpoint/kernelspace"
netstack_provider "github.com/mysteriumnetwork/node/services/wireguard/endpoint/netstack-provider"
Expand All @@ -43,6 +44,10 @@ type WgClient interface {
Close() error
}

type WgClientDiag interface {
Diag() bool
}

// WgClientFactory represents WireGuard client factory.
type WgClientFactory struct {
once sync.Once
Expand All @@ -56,6 +61,10 @@ func NewWGClientFactory() *WgClientFactory {

// NewWGClient returns a new wireguard client.
func (wcf *WgClientFactory) NewWGClient() (WgClient, error) {

if config.GetBool(config.FlagProvCheckerMode) {
return diagclient.New()
}
if config.GetBool(config.FlagDVPNMode) {
return dvpnclient.New()
}
Expand Down
7 changes: 7 additions & 0 deletions tequilapi/contract/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func NewConnectionInfoDTO(session connectionstate.Status) ConnectionInfoDTO {
return response
}

// swagger:model ConnectionDiagInfoDTO
type ConnectionDiagInfoDTO struct {
Status bool `json:"status"`
Error interface{} `json:"error"`
ProviderID string `json:"provider_id"`
}

// ConnectionInfoDTO holds partial consumer connection details.
// swagger:model ConnectionInfoDTO
type ConnectionInfoDTO struct {
Expand Down
Loading

0 comments on commit 83a6cde

Please sign in to comment.