diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d28c0e --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.idea +*.iml +tmp +settings.dev.yml +go.sum +*.log diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fd11bdc --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Jason + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..03b47d3 --- /dev/null +++ b/README.md @@ -0,0 +1,51 @@ +# eth-stats +可用于实时监控分布自不同区域的ethereum节点,操作简单,一目了然。 +该项目可监控基于ethereum的大多数项目,包括L2、L3等等,具体细节,就需要自行探索了 + +## 功能 +1. 每个节点名称不得重复 +2. 支持实时上传节点信息 +3. 节点异常时,实时邮件反馈 +4. 定时邮件发送节点简报 +5. server和client强稳定性,运行期间,除非强制杀进程或者bug,否则程序不会因为任何逻辑问题停止运行,降低了运维复杂度 +6. 可通过命令行传入参或者通过配置文件启动`client、server`,不建议同时使用两种方式,选择其中一种即可 +7. 本项目没有前端页面,主要是不会用前端语言,也设计不了。。。本项目在server/app/service/api中提供了socket数据出口,只要前端使用socket调用,即可渲染在前端。 + 1. 前端通过socket的emit可读取:`stats 节点信息`、`latency 延迟`、`node-ping ping`三类数据 + +## 使用方式 +分为客户端和服务器端,客户端安装在每台需要监控的节点上,服务器端找台有ip的稳定机子部署就行 + +### client +```shell +cd client +go build -o client ./client.go +CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o client client.go + +# 配置方式启动 +client start -c config/setting.yml + +# 命令行方式启动 +./client start --name test --secret 123456 --server-url 链地址,如:ws://127.0.0.1:30303 +``` + +### server +```shell +cd server +go build server.go -o server +CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -o server server.go + +# 配置方式启动 +server start -c config/setting.yml + +# 命令行方式启动 +./server start --name ethereum-server --secret 123456 --host 0.0.0.0 --port 3000 --email-subject-prefix ethereum --email-host 邮箱服务地址 --email-port 465 --email-username 发件邮箱账户 --email-password 邮箱密钥 --email-from 发件邮箱账户--email-to 收件邮箱账户(多个逗号隔开) +``` + +## 参考 +[1] [goerli-ethstats-server](https://github.com/goerli/ethstats-server) +[2] [goerli-ethstats-client](https://github.com/goerli/ethstats-client) +[3] [AvileneRausch2001-ethstats](https://github.com/AvileneRausch2001/ethstats) +[4] [AvileneRausch2001-ethstats](https://github.com/AvileneRausch2001/ethstats) +[5] [maticnetwork-ethstats-backend](https://github.com/maticnetwork/ethstats-backend) + + diff --git a/client/app/app.go b/client/app/app.go new file mode 100644 index 0000000..729c454 --- /dev/null +++ b/client/app/app.go @@ -0,0 +1,270 @@ +package app + +import ( + "context" + "encoding/json" + "errors" + "ethstats/client/app/model" + "ethstats/client/config" + "ethstats/common/util/connutil" + "github.com/bitxx/ethutil" + "github.com/bitxx/logger/logbase" + "os" + "os/signal" + "runtime" + "strconv" + "strings" + "time" +) + +type App struct { + node model.Node + readyCh chan struct{} + pongCh chan struct{} + logger *logbase.Helper +} + +func NewApp() *App { + node := model.Node{ + Id: config.ApplicationConfig.Name, + Name: config.ApplicationConfig.Name, + Contact: config.ApplicationConfig.Contract, + ChainPort: config.ChainConfig.Port, + OSPlatform: runtime.GOARCH, + OS: runtime.GOOS, + Client: config.ApplicationConfig.Version, + } + + return &App{ + node: node, + readyCh: make(chan struct{}), + pongCh: make(chan struct{}), + logger: logbase.NewHelper(logbase.DefaultLogger), + } +} + +func (a *App) Start() { + // logbase.NewHelper(core.Runtime.GetLogger()) + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + var err error + isInterrupt := false + + conn := &connutil.ConnWrapper{} + readTicker := time.NewTimer(0) + latencyTicker := time.NewTimer(0) + + defer func() { + a.close(conn, readTicker, latencyTicker) + // if not interrupt,restart the client + if !isInterrupt { + time.Sleep(5 * time.Second) + a.Start() + } + }() + + conn, err = connutil.NewDialConn(config.ApplicationConfig.ServerUrl) + if err != nil { + a.logger.Warn("dial error: ", err) + return + } + + for { + select { + case <-readTicker.C: + //after customer,change the time + readTicker.Reset(10 * time.Second) + latencyTicker.Reset(2 * time.Second) + + //login + login := map[string][]interface{}{ + "emit": {"hello", map[string]string{ + "id": a.node.Name, + "secret": config.ApplicationConfig.Secret, + }}, + } + err = conn.WriteJSON(login) + if err != nil { + return + } + + //read info + go a.readLoop(conn) + + select { + case <-latencyTicker.C: + if err = a.reportLatency(conn); err != nil { + a.logger.Warn("requested latency report failed: ", err) + } + case <-a.readyCh: + //登录成功,上传数据 + if err = a.reportStats(conn); err != nil { + a.logger.Warn("stats info report failed: ", err) + } + } + case <-interrupt: + a.close(conn, readTicker, latencyTicker) + isInterrupt = true + return + } + } +} + +func (a *App) readLoop(conn *connutil.ConnWrapper) { + for { + blob := json.RawMessage{} + if err := conn.ReadJSON(&blob); err != nil { + a.logger.Warn("received and decode message error: ", err) + return + } + // If the network packet is a system ping, respond to it directly + var ping string + if err := json.Unmarshal(blob, &ping); err == nil && strings.HasPrefix(ping, "primus::ping::") { + if err := conn.WriteJSON(strings.Replace(ping, "ping", "pong", -1)); err != nil { + a.logger.Warn("failed to respond to system ping message: ", err) + return + } + continue + } + // Not a system ping, try to decode an actual state message + var msg map[string][]interface{} + if err := json.Unmarshal(blob, &msg); err != nil { + a.logger.Warn("failed to decode message: ", err) + return + } + + if len(msg["emit"]) == 0 { + a.logger.Warn("received message invalid: ", msg) + return + } + msgType, ok := msg["emit"][0].(string) + if !ok { + a.logger.Warn("received invalid message type: ", msg["emit"][0]) + return + } + a.logger.Trace("received message type: ", msgType) + + switch msgType { + case "ready": + //只有接收到了ready信息,才初始化获取数据 + a.logger.Info("connect success!") + a.readyCh <- struct{}{} + case "un-authorization": + if len(msg["emit"]) >= 2 { + if errMsg, ok := msg["emit"][1].(string); ok { + a.logger.Warn(errMsg) + } + } + return + case "node-pong": + a.pongCh <- struct{}{} + } + + } +} + +func (a *App) reportLatency(conn *connutil.ConnWrapper) error { + start := time.Now() + + ping := map[string][]interface{}{ + "emit": {"node-ping", map[string]string{ + "id": config.ApplicationConfig.Name, + "clientTime": start.String(), + }}, + } + + if err := conn.WriteJSON(ping); err != nil { + return err + } + // Wait for the pong request to arrive back + select { + case <-a.pongCh: + // Pong delivered, report the latency + case <-time.After(10 * time.Second): + // MsgPing timeout, abort + return errors.New("ping timed out") + } + latency := strconv.Itoa(int((time.Since(start) / time.Duration(2)).Nanoseconds() / 1000000)) + + // Send back the measured latency + a.logger.Trace("sending measured latency: ", latency) + + stats := map[string][]interface{}{ + "emit": {"latency", map[string]string{ + "id": config.ApplicationConfig.Name, + "latency": latency, + }}, + } + return conn.WriteJSON(stats) +} + +func (a *App) reportStats(conn *connutil.ConnWrapper) error { + ethClient := ethutil.NewEthClient(config.ChainConfig.Url, config.ChainConfig.Timeout) + chain, err := ethClient.Chain() + if err != nil { + return err + } + c := chain.RemoteRpcClient + // peer count + peerCount, _ := c.PeerCount(context.Background()) + + // is active + active := false + if peerCount > 0 { + active = true + } + + // gas price + gasPrice, _ := c.SuggestGasPrice(context.Background()) + + // is syncing + process, err := c.SyncProgress(context.Background()) + syncing := false + if err == nil && process != nil { + progress := process.CurrentBlock - process.StartingBlock + total := process.HighestBlock - process.StartingBlock + if progress/total < 1 { + syncing = true + } + } + + // latest block + latestBlock, err := c.BlockByNumber(context.Background(), nil) + block := model.Block{} + if err == nil { + block.Number = latestBlock.NumberU64() + block.Hash = latestBlock.Hash().String() + block.Difficulty = latestBlock.Difficulty().Uint64() + block.Time = latestBlock.Time() + //block.Transactions = latestBlock.Transactions() + //block.Uncles = latestBlock.Uncles() + } + pendingCount, _ := c.PendingTransactionCount(context.Background()) + + stats := model.Stats{ + NodeInfo: a.node, + Active: active, + PeerCount: peerCount, + Pending: pendingCount, + GasPrice: gasPrice.Int64(), + Syncing: syncing, + Block: &block, + } + report := map[string][]interface{}{ + "emit": {"stats", stats}, + } + return conn.WriteJSON(report) +} + +func (a *App) close(conn *connutil.ConnWrapper, readTicker, latencyTicker *time.Timer) { + if conn != nil { + _ = conn.Close() + } + if readTicker != nil { + _ = readTicker.Stop() + } + if latencyTicker != nil { + _ = latencyTicker.Stop() + } +} diff --git a/client/app/app_test.go b/client/app/app_test.go new file mode 100644 index 0000000..453cfb7 --- /dev/null +++ b/client/app/app_test.go @@ -0,0 +1,81 @@ +package app + +import ( + "bufio" + "fmt" + "github.com/golang/protobuf/protoc-gen-go/generator" + "golang.org/x/sys/unix" + "net" + "os" + "runtime" + "strconv" + "strings" + "testing" +) + +func TestOS(t *testing.T) { + fmt.Println(runtime.GOOS) + fmt.Println(runtime.GOARCH) + fmt.Println(runtime.Compiler) + + var uts unix.Utsname + if err := unix.Uname(&uts); err != nil { + panic(err) + } + sysname := unix.ByteSliceToString(uts.Sysname[:]) + release := unix.ByteSliceToString(uts.Release[:]) + version := unix.ByteSliceToString(uts.Version[:]) + machine := unix.ByteSliceToString(uts.Machine[:]) + nodeName := unix.ByteSliceToString(uts.Nodename[:]) + fmt.Printf("sysname: %s\nrelease: %s\n", sysname, release) + fmt.Printf("version: %s\nmachine: %s\n", version, machine) + fmt.Printf("nodeName: %s\n", nodeName) + if sysname == "Darwin" { + dotPos := strings.Index(release, ".") + if dotPos == -1 { + fmt.Printf("invalid release value: %s\n", release) + return + } + major := release[:dotPos] + majorVersion, err := strconv.Atoi(major) + if err != nil { + fmt.Printf("invalid release value: %s, %v\n", release, err) + return + } + fmt.Println("macOS >= Big Sur:", majorVersion >= 20) + } +} + +func TestSring(t *testing.T) { + fmt.Println(generator.CamelCase("dfasSDFxadfx")) +} + +func TestSocket(t *testing.T) { + conn, err := net.Dial("tcp", "127.0.0.1:3000") + if err != nil { + fmt.Println("client dial err=", err) + return + } + defer conn.Close() + for { + fmt.Println("请输入信息,回车结束输入") + reader := bufio.NewReader(os.Stdin) + //终端读取用户回车,并准备发送给服务器 + line, err := reader.ReadString('\n') + if err != nil { + fmt.Println("readString err=", err) + } + line = strings.Trim(line, "\r\n") + if line == "exit" { + fmt.Println("客户端退出...") + break + } + line = strings.TrimSpace(line) + //将line发送给服务器 + n, err := conn.Write([]byte(line)) + if err != nil { + fmt.Println("conn.Write err=", err) + } + fmt.Printf("发送的内容了%d文字\n", n) + } +} diff --git a/client/app/model/block.go b/client/app/model/block.go new file mode 100644 index 0000000..3704517 --- /dev/null +++ b/client/app/model/block.go @@ -0,0 +1,12 @@ +package model + +import "github.com/ethereum/go-ethereum/core/types" + +type Block struct { + Number uint64 + Hash string + Difficulty uint64 + Transactions []*types.Transaction + Uncles []*types.Header + Time uint64 +} diff --git a/client/app/model/node.go b/client/app/model/node.go new file mode 100644 index 0000000..a5fae91 --- /dev/null +++ b/client/app/model/node.go @@ -0,0 +1,16 @@ +package model + +type Node struct { + Id string + Name string //名称 + Contact string //联系方式 + Coinbase string //账户地址 + Node string //节点 + Net string //网络 + Protocol string //协议 + Api string //接口 + ChainPort string //端口 + OSPlatform string //平台 + OS string //系统 + Client string //客户端 +} diff --git a/client/app/model/stats.go b/client/app/model/stats.go new file mode 100644 index 0000000..c82efed --- /dev/null +++ b/client/app/model/stats.go @@ -0,0 +1,11 @@ +package model + +type Stats struct { + Active bool + PeerCount uint64 + Pending uint + GasPrice int64 + Syncing bool + NodeInfo Node + Block *Block +} diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..6ff4b7a --- /dev/null +++ b/client/client.go @@ -0,0 +1,13 @@ +package main + +import ( + "ethstats/client/cmd" +) + +//go:generate go env -w GO111MODULE=on +//go:generate go env -w GOPROXY=https://goproxy.cn,direct +//go:generate go mod tidy +//go:generate go mod download +func main() { + cmd.Execute() +} diff --git a/client/cmd/cobra.go b/client/cmd/cobra.go new file mode 100644 index 0000000..a0627ba --- /dev/null +++ b/client/cmd/cobra.go @@ -0,0 +1,34 @@ +package cmd + +import ( + "errors" + "ethstats/client/cmd/run" + "github.com/spf13/cobra" + "os" +) + +var rootCmd = &cobra.Command{ + Use: "client", + Short: "client", + SilenceUsage: true, + Long: "client", + Version: "1.0.0", + Args: func(cmd *cobra.Command, args []string) error { + if len(args) < 1 { + return errors.New("requires at least one arg") + } + return nil + }, + PersistentPreRunE: func(*cobra.Command, []string) error { return nil }, +} + +func init() { + rootCmd.AddCommand(run.StartCmd) +} + +// Execute : apply commands +func Execute() { + if err := rootCmd.Execute(); err != nil { + os.Exit(-1) + } +} diff --git a/client/cmd/run/start.go b/client/cmd/run/start.go new file mode 100644 index 0000000..70cf502 --- /dev/null +++ b/client/cmd/run/start.go @@ -0,0 +1,116 @@ +package run + +import ( + "ethstats/client/app" + "ethstats/client/config" + "github.com/bitxx/load-config/source/file" + "github.com/spf13/cobra" + "log" +) + +var ( + configPath string + StartCmd *cobra.Command +) + +const ( + name = "name" + contract = "contract" + version = "version" + secret = "secret" + serverUrl = "server-url" + logPath = "log-path" + logLevel = "log-level" + logStdout = "log-stdout" + logType = "log-type" + logCap = "log-cap" + chainUrl = "chain-url" + chainPort = "chain-port" +) + +func init() { + StartCmd = &cobra.Command{ + Use: "start", + Short: "run the client", + Example: "client start -c config/settings.yml", + SilenceUsage: true, + PreRun: func(cmd *cobra.Command, args []string) { + config.Setup( + file.NewSource(file.WithPath(configPath)), + ) + flag := cmd.PersistentFlags() + + if name, _ := flag.GetString(name); name != "" { + config.ApplicationConfig.Name = name + } + if contract, _ := flag.GetString(contract); contract != "" { + config.ApplicationConfig.Contract = contract + } + if version, _ := flag.GetString(version); version != "" && config.ApplicationConfig.Version == "" { + config.ApplicationConfig.Version = version + } + if secret, _ := flag.GetString(secret); secret != "" && config.ApplicationConfig.Secret == "" { + config.ApplicationConfig.Secret = secret + } + if serverUrl, _ := flag.GetString(serverUrl); serverUrl != "" && config.ApplicationConfig.ServerUrl == "" { + config.ApplicationConfig.ServerUrl = serverUrl + } + if logPath, _ := flag.GetString(logPath); logPath != "" && config.LoggerConfig.Path == "" { + config.LoggerConfig.Path = logPath + } + if logLevel, _ := flag.GetString(logLevel); logLevel != "" && config.LoggerConfig.Level == "" { + config.LoggerConfig.Level = logLevel + } + if logStdout, _ := flag.GetString(logStdout); logStdout != "" && config.LoggerConfig.Stdout == "" { + config.LoggerConfig.Stdout = logStdout + } + if logType, _ := flag.GetString(logType); logType != "" && config.LoggerConfig.Type == "" { + config.LoggerConfig.Type = logType + } + if logCap, _ := flag.GetUint(logCap); logCap > 0 && config.LoggerConfig.Cap <= 0 { + config.LoggerConfig.Cap = logCap + } + if chainUrl, _ := flag.GetString(chainUrl); chainUrl != "" { + config.ChainConfig.Url = chainUrl + } + if chainPort, _ := flag.GetString(chainPort); chainPort != "" && config.ChainConfig.Port == "" { + config.ChainConfig.Port = chainPort + } + if config.ApplicationConfig.Name == "" { + log.Fatal("param name can't empty") + } + if config.ApplicationConfig.Secret == "" { + log.Fatal("param secret can't empty") + } + if config.ApplicationConfig.ServerUrl == "" { + log.Fatal("param serverUrl can't empty") + } + if config.ChainConfig.Url == "" { + log.Fatal("param chainUrl can't empty") + } + }, + RunE: func(cmd *cobra.Command, args []string) error { + + return run() + }, + } + cmd := StartCmd.PersistentFlags() + cmd.StringVarP(&configPath, "config", "c", "", "Start server with provided configuration file") + cmd.String(name, "", "node name") + cmd.String(contract, "", "contract") + cmd.String(version, "v1.0.0", "version") + cmd.String(secret, "", "secret") + cmd.String(serverUrl, "", "server url") + cmd.String(logPath, "files/logs", "log path") + cmd.String(logLevel, "trace", "log path") + cmd.String(logStdout, "file", "default,file") + cmd.String(logType, "default", "default、zap、logrus") + cmd.Uint(logCap, 50, "log cap") + cmd.String(chainUrl, "", "chain url with port,eg:https://127.0.0.1:30303") + cmd.String(chainPort, "30303", "chain port,use for report") +} + +func run() error { + app.NewApp().Start() + return nil +} diff --git a/client/config/application.go b/client/config/application.go new file mode 100644 index 0000000..6ade23f --- /dev/null +++ b/client/config/application.go @@ -0,0 +1,11 @@ +package config + +type Application struct { + Name string + Contract string + Version string + Secret string + ServerUrl string +} + +var ApplicationConfig = new(Application) diff --git a/client/config/chain.go b/client/config/chain.go new file mode 100644 index 0000000..f1214ec --- /dev/null +++ b/client/config/chain.go @@ -0,0 +1,9 @@ +package config + +type Chain struct { + Url string + Timeout int64 + Port string +} + +var ChainConfig = new(Chain) diff --git a/client/config/config.go b/client/config/config.go new file mode 100644 index 0000000..cf2c526 --- /dev/null +++ b/client/config/config.go @@ -0,0 +1,57 @@ +package config + +import ( + "fmt" + loadconfig "github.com/bitxx/load-config" + "github.com/bitxx/load-config/source" + "log" +) + +type Config struct { + Application *Application `yaml:"application"` + Logger *Logger `yaml:"logger"` + Chain *Chain `yaml:"chain"` + callbacks []func() +} + +func (e *Config) init() { + e.Logger.Setup() + e.runCallback() +} + +func (e *Config) Init() { + e.init() + log.Println("!!! client config init") +} + +func (e *Config) runCallback() { + for i := range e.callbacks { + e.callbacks[i]() + } +} + +func (e *Config) OnChange() { + e.init() + log.Println("!!! client config change and reload") +} + +// Setup 载入配置文件 +func Setup(s source.Source, + fs ...func()) { + _cfg := &Config{ + Application: ApplicationConfig, + Chain: ChainConfig, + Logger: LoggerConfig, + callbacks: fs, + } + var err error + loadconfig.DefaultConfig, err = loadconfig.NewConfig( + loadconfig.WithSource(s), + loadconfig.WithEntity(_cfg), + ) + if err != nil { + log.Println(fmt.Sprintf("New client config object fail: %s, use default param to start", err.Error())) + return + } + _cfg.Init() +} diff --git a/client/config/logger.go b/client/config/logger.go new file mode 100644 index 0000000..516afbe --- /dev/null +++ b/client/config/logger.go @@ -0,0 +1,28 @@ +package config + +import ( + "github.com/bitxx/logger" +) + +type Logger struct { + Type string + Path string + Level string + Stdout string + EnabledDB bool + Cap uint +} + +// Setup 设置logger +func (e Logger) Setup() { + l := logger.SetupLogger( + logger.WithType(e.Type), + logger.WithPath(e.Path), + logger.WithLevel(e.Level), + logger.WithStdout(e.Stdout), + logger.WithCap(e.Cap), + ) + _ = l.Init() +} + +var LoggerConfig = new(Logger) diff --git a/client/config/settings.yml b/client/config/settings.yml new file mode 100644 index 0000000..bbdb1a6 --- /dev/null +++ b/client/config/settings.yml @@ -0,0 +1,22 @@ +application: + name: test + contact: "邮箱等联系方式" + version: v1.0.0 + secret: "123456" + serverUrl: "ws://localhost:3000" +logger: + # 日志存放路径 + path: files/logs + # 日志输出,file:文件,default:命令行,其他:命令行 + stdout: '' #控制台日志,启用后,不输出到文件 + # 日志等级, trace, debug, info, warn, error, fatal + level: trace + # 支持default、zap、logrus + type: default + # 单文件条数 + cap: 100 +chain: + url: "链地址,如:ws://127.0.0.1:30303" + # second + timeout: 60 + port: "30303" diff --git a/common/util/connutil/conn.go b/common/util/connutil/conn.go new file mode 100644 index 0000000..ce643bc --- /dev/null +++ b/common/util/connutil/conn.go @@ -0,0 +1,77 @@ +package connutil + +import ( + "github.com/gorilla/websocket" + "net" + "net/http" + "sync" + "time" +) + +type ConnWrapper struct { + conn *websocket.Conn + rlock sync.Mutex + wlock sync.Mutex +} + +// NewDialConn 不加读写锁,执行时会出现问题 +func NewDialConn(url string) (*ConnWrapper, error) { + //发现有的节点因为网络问题,默认的45秒有点短,导致总timeout,这里先固定成120s,后续根据需要再考虑要不要可配置化 + dial := websocket.Dialer{ + Proxy: http.ProxyFromEnvironment, + HandshakeTimeout: 120 * time.Second, + } + c, _, err := dial.Dial(url, nil) + if err != nil { + return nil, err + } + return &ConnWrapper{conn: c}, nil +} + +func NewUpgradeConn(upgradeConn websocket.Upgrader, w http.ResponseWriter, r *http.Request) (*ConnWrapper, error) { + c, err := upgradeConn.Upgrade(w, r, nil) + if err != nil { + return nil, err + } + return &ConnWrapper{conn: c}, nil +} + +// WriteJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *ConnWrapper) WriteJSON(v interface{}) error { + w.wlock.Lock() + defer w.wlock.Unlock() + + return w.conn.WriteJSON(v) +} + +// ReadJSON wraps corresponding method on the websocket but is safe for concurrent calling +func (w *ConnWrapper) ReadJSON(v interface{}) error { + w.rlock.Lock() + defer w.rlock.Unlock() + return w.conn.ReadJSON(v) +} + +func (w *ConnWrapper) WriteMessage(messageType int, data []byte) error { + w.wlock.Lock() + defer w.wlock.Unlock() + + return w.conn.WriteMessage(messageType, data) +} + +func (w *ConnWrapper) ReadMessage() (messageType int, p []byte, err error) { + w.rlock.Lock() + defer w.rlock.Unlock() + + return w.conn.ReadMessage() +} + +func (w *ConnWrapper) RemoteAddr() net.Addr { + return w.conn.RemoteAddr() +} + +// Close wraps corresponding method on the websocket but is safe for concurrent calling +func (w *ConnWrapper) Close() error { + // The Close and WriteControl methods can be called concurrently with all other methods, + // so the mutex is not used here + return w.conn.Close() +} diff --git a/common/util/emailutil/email.go b/common/util/emailutil/email.go new file mode 100644 index 0000000..da0988e --- /dev/null +++ b/common/util/emailutil/email.go @@ -0,0 +1,57 @@ +package emailutil + +import ( + "crypto/tls" + "errors" + "ethstats/server/config" + "gopkg.in/gomail.v2" + "strings" + "sync" +) + +var ( + wlock sync.Mutex +) + +func SendEmailDefault(subject, content string) error { + return SendEmail(config.EmailConfig.ToEmail, config.EmailConfig.SubjectPrefix+subject, content, + config.EmailConfig.Username, config.EmailConfig.FromEmail, config.EmailConfig.Host, + config.EmailConfig.Password, config.EmailConfig.ContentType, config.EmailConfig.Port) +} + +// SendEmail send to notification +func SendEmail(toEmailList, subject, content string, username, fromEmail, host, password, contentType string, port int) error { + if len(toEmailList) <= 1 || subject == "" || content == "" || username == "" || fromEmail == "" || host == "" || password == "" || contentType == "" || port <= 0 { + return errors.New("param init error,not send email") + } + wlock.Lock() + defer wlock.Unlock() + + emailSlips := strings.Split(toEmailList, ",") + var emails []string + for _, v := range emailSlips { + emails = append(emails, v) + } + + // smtp 配置 + d := gomail.NewDialer(host, port, username, password) + d.TLSConfig = &tls.Config{InsecureSkipVerify: true} + + // Send emails using d. + toEmails := emails + + m := gomail.NewMessage() + m.SetHeader("From", fromEmail) + m.SetHeader("To", toEmails...) + + // 邮件标题 + m.SetHeader("Subject", subject) + + m.SetBody(contentType, content) + + if err := d.DialAndSend(m); err != nil { + return err + } + + return nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..87e64ed --- /dev/null +++ b/go.mod @@ -0,0 +1,50 @@ +module ethstats + +go 1.20 + +require ( + github.com/bitxx/ethutil v1.0.0 + github.com/bitxx/load-config v1.5.1 + github.com/bitxx/logger v1.5.3 + github.com/ethereum/go-ethereum v1.12.0 + github.com/golang/protobuf v1.5.3 + github.com/gorilla/websocket v1.5.0 + github.com/spf13/cobra v1.7.0 + golang.org/x/sys v0.10.0 + gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df +) + +require ( + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect + github.com/bitly/go-simplejson v0.5.1 // indirect + github.com/btcsuite/btcd v0.22.3 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect + github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce // indirect + github.com/deckarep/golang-set/v2 v2.1.0 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/fsnotify/fsnotify v1.6.0 // indirect + github.com/ghodss/yaml v1.0.0 // indirect + github.com/go-ole/go-ole v1.2.1 // indirect + github.com/go-stack/stack v1.8.1 // indirect + github.com/holiman/uint256 v1.2.2-0.20230321075855-87b91420868c // indirect + github.com/imdario/mergo v0.3.13 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/miguelmota/go-ethereum-hdwallet v0.1.1 // indirect + github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/tklauser/go-sysconf v0.3.5 // indirect + github.com/tklauser/numcpus v0.2.2 // indirect + github.com/tyler-smith/go-bip39 v1.1.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 // indirect + golang.org/x/crypto v0.11.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect + gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect +) diff --git a/server/app/app.go b/server/app/app.go new file mode 100644 index 0000000..51e5466 --- /dev/null +++ b/server/app/app.go @@ -0,0 +1,35 @@ +package app + +import ( + "ethstats/server/app/model" + "ethstats/server/app/service" + "ethstats/server/config" + "github.com/bitxx/logger/logbase" + "net/http" +) + +type App struct { + logger *logbase.Helper + channel *model.Channel +} + +func NewApp() *App { + channel := &model.Channel{ + MsgPing: make(chan []byte), + MsgLatency: make(chan []byte), + Nodes: make(map[string][]byte), + LoginIDs: make(map[string]string), + } + return &App{ + channel: channel, + logger: logbase.NewHelper(logbase.DefaultLogger), + } +} + +func (a *App) Start() { + relay := service.NewRelay(a.channel, a.logger) + api := service.NewApi(a.channel, a.logger) + http.HandleFunc("/", relay.HandleRequest) + http.HandleFunc("/api", api.HandleRequest) + a.logger.Fatal(http.ListenAndServe(config.ApplicationConfig.Host+":"+config.ApplicationConfig.Port, nil)) +} diff --git a/server/app/model/auth_message.go b/server/app/model/auth_message.go new file mode 100644 index 0000000..42bc644 --- /dev/null +++ b/server/app/model/auth_message.go @@ -0,0 +1,20 @@ +package model + +import ( + "ethstats/common/util/connutil" +) + +// AuthMessage is the struct sent by the server on the first connection +type AuthMessage struct { + ID string `json:"id"` + Secret string `json:"secret"` +} + +// SendResponse send the ready response to the node to initiate the communication +func (a *AuthMessage) SendResponse(c *connutil.ConnWrapper) error { + return c.WriteJSON(map[string][]interface{}{"emit": {"ready"}}) +} + +func (a *AuthMessage) SendLoginErrResponse(c *connutil.ConnWrapper, errMsg string) error { + return c.WriteJSON(map[string][]interface{}{"emit": {"un-authorization", errMsg}}) +} diff --git a/server/app/model/channel.go b/server/app/model/channel.go new file mode 100644 index 0000000..3cbb21f --- /dev/null +++ b/server/app/model/channel.go @@ -0,0 +1,15 @@ +package model + +// Channel is the service whereby servers exchange info +type Channel struct { + + // MsgStats is the content of the stats reported by the Ethereum node + MsgPing chan []byte + MsgLatency chan []byte + + // Nodes registered to the relay server + Nodes map[string][]byte + + //use for flag the login client + LoginIDs map[string]string +} diff --git a/server/app/model/ping.go b/server/app/model/ping.go new file mode 100644 index 0000000..f61f59c --- /dev/null +++ b/server/app/model/ping.go @@ -0,0 +1,17 @@ +package model + +import ( + "ethstats/common/util/connutil" +) + +// NodePing contains the last time the node is alive +type NodePing struct { + ID string `json:"id"` + Time string `json:"clientTime"` +} + +// SendResponse send the pong response to the node +func (n *NodePing) SendResponse(c *connutil.ConnWrapper) error { + // message type is always 1 + return c.WriteJSON(map[string][]interface{}{"emit": {"node-pong", n.ID}}) +} diff --git a/server/app/model/type.go b/server/app/model/type.go new file mode 100644 index 0000000..61a9838 --- /dev/null +++ b/server/app/model/type.go @@ -0,0 +1,33 @@ +package model + +import ( + "encoding/json" +) + +// Message contains the Ethereum message +type Message struct { + Content []byte +} + +// GetType return the type of the message sent by the Ethereum node +func (e *Message) GetType() (string, error) { + var content map[string][]interface{} + err := json.Unmarshal([]byte(e.Content), &content) + if err != nil { + return "", err + } + result, _ := content["emit"][0].(string) + return result, nil +} + +// GetValue retrieve the current content of the emitted message by the node +func (e *Message) GetValue() ([]byte, error) { + var content map[string][]interface{} + err := json.Unmarshal([]byte(e.Content), &content) + if err != nil { + return nil, err + } + result, _ := content["emit"][1].(interface{}) + val, err := json.Marshal(result) + return val, err +} diff --git a/server/app/service/api.go b/server/app/service/api.go new file mode 100644 index 0000000..ccfd8d3 --- /dev/null +++ b/server/app/service/api.go @@ -0,0 +1,157 @@ +package service + +import ( + "ethstats/common/util/connutil" + "ethstats/common/util/emailutil" + "ethstats/server/app/model" + "ethstats/server/config" + "fmt" + "github.com/bitxx/logger/logbase" + "github.com/gorilla/websocket" + "net/http" + "regexp" + "strconv" + "strings" + "time" +) + +// Api is the responsible to send node state to registered hub +type Api struct { + logger *logbase.Helper + hub *hub +} + +// NewApi creates a new Api struct with the required service +func NewApi(channel *model.Channel, logger *logbase.Helper) *Api { + hub := &hub{ + register: make(chan *connutil.ConnWrapper), + logger: logger, + close: make(chan interface{}), + clients: make(map[*connutil.ConnWrapper]bool), + channel: channel, + } + go hub.loop() + return &Api{ + logger: logger, + hub: hub, + } +} + +// Close this server and all registered client connections +func (a *Api) Close() { + a.logger.Info("prepared to close all client connections") + a.hub.close <- "close" +} + +// HandleRequest handle all request from hub that are not Ethereum nodes +func (a *Api) HandleRequest(w http.ResponseWriter, r *http.Request) { + upgradeConn := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := connutil.NewUpgradeConn(upgradeConn, w, r) + if err != nil { + a.logger.Errorf("error trying to establish communication with client (addr=%s, host=%s, URI=%s), %s", + r.RemoteAddr, r.Host, r.RequestURI, err) + return + } + a.logger.Infof("connected new client! (host=%s)", r.Host) + a.hub.register <- conn +} + +// hub maintain a list of registered clients to send messages +type hub struct { + register chan *connutil.ConnWrapper + logger *logbase.Helper + close chan interface{} + clients map[*connutil.ConnWrapper]bool + channel *model.Channel +} + +// loop loops as the server is alive and send messages to registered clients +func (h *hub) loop() { + nodesReportTicker := time.NewTicker(15 * time.Second) + nodesMonitorTicker := time.NewTicker(time.Duration(config.EmailConfig.MonitorTime) * time.Second) + defer func() { + nodesReportTicker.Stop() + nodesMonitorTicker.Stop() + }() + //nodesMonitorTicker := time.NewTicker(1440 * time.Second) + for { + select { + case client := <-h.register: + h.clients[client] = true + case ping := <-h.channel.MsgPing: + //debug log for show the ping + //h.logger.Info("debug log show ping = > ", string(ping)) + //use for send to any fronted client + h.writeMessage(ping) + case latency := <-h.channel.MsgLatency: + //debug log for show the latency + //h.logger.Info("debug log show latency = > ", string(latency)) + //use for send to any fronted client + h.writeMessage(latency) + case <-nodesReportTicker.C: + if len(h.clients) <= 0 { + continue + } + for _, v := range h.channel.Nodes { + //debug log for show the node info + //h.logger.Info("debug log show node = > ", string(v)) + //use for send to any fronted client + h.writeMessage(v) + } + case <-nodesMonitorTicker.C: + nodeCount := len(h.channel.Nodes) + nodeInfo := "" + for k, v := range h.channel.Nodes { + //node info like this: + //{"Active":true,"PeerCount":500,"Pending":94,"GasPrice":1097000000000,"Syncing":false,"NodeInfo":{"Id":"test","Name":"test","Contact":"","Coinbase":"","Node":"","Net":"","Protocol":"","Api":"","ChainPort":"30303","OSPlatform":"amd64","OS":"darwin","Client":"v1.0.0"},"Block":{"Number":484645,"Hash":"0x0036faa4d9b83ec836ebbcd6a98699323c41bdcdedf8d96a21c6dd25b9c41b88","Difficulty":0,"Transactions":null,"Uncles":null,"Time":1690774181}} + + idRe := regexp.MustCompile("Id\\\":(.*?),") + idTmp1 := idRe.FindString(string(v)) + idTmp2 := strings.Replace(idTmp1, "Id\":\"", "", -1) + id := strings.Replace(idTmp2, "\",", "", -1) + + re := regexp.MustCompile("Number\\\":(.*?),") + heightTmp1 := re.FindString(string(v)) + heightTmp2 := strings.Replace(heightTmp1, "Number\":", "", -1) + latestHigh := strings.Replace(heightTmp2, ",", "", -1) + + ip := strings.Replace(k, "[::1]", "127.0.0.1", -1) + nodeInfo = nodeInfo + "--节点ID:" + id + ",节点IP:" + ip + ",块高度:" + latestHigh + "\n" + } + content := "节点数量:" + strconv.Itoa(nodeCount) + "\n各节点块高度:\n" + nodeInfo + fmt.Println(content) + _ = emailutil.SendEmailDefault(fmt.Sprintf("%s-节点监控简报\n", time.Now().Format("2006-01-02 15:04:05")), content) + case <-h.close: + h.quit() + break + } + } +} + +// writeMessage to all registered clients. If an error occurs sending a message to a client, +// then these connection is closed and removed from the pool of registered clients +func (h *hub) writeMessage(msg []byte) { + for client := range h.clients { + err := client.WriteMessage(1, msg) + if err != nil { + h.logger.Infof("Closed connection with client: %s", client.RemoteAddr()) + // close and delete the client connection and release + client.Close() + delete(h.clients, client) + } + } +} + +func (h *hub) quit() { + h.logger.Info("Closing all registered clients") + for client := range h.clients { + client.Close() + delete(h.clients, client) + } + close(h.register) + close(h.close) +} diff --git a/server/app/service/relay.go b/server/app/service/relay.go new file mode 100644 index 0000000..fd1f0d0 --- /dev/null +++ b/server/app/service/relay.go @@ -0,0 +1,190 @@ +package service + +import ( + "encoding/json" + "ethstats/common/util/connutil" + "ethstats/common/util/emailutil" + "ethstats/server/app/model" + "ethstats/server/config" + "fmt" + "github.com/bitxx/logger/logbase" + "github.com/gorilla/websocket" + "net/http" + "time" +) + +const ( + messageHello string = "hello" + messagePing string = "node-ping" + messageLatency string = "latency" + messageStats string = "stats" +) + +// NodeRelay contains the secret used to authenticate the communication between +// the Ethereum node and this server +type NodeRelay struct { + secret string + logger *logbase.Helper + channel *model.Channel +} + +// NewRelay creates a new NodeRelay struct with required fields +func NewRelay(channel *model.Channel, logger *logbase.Helper) *NodeRelay { + return &NodeRelay{ + channel: channel, + secret: config.ApplicationConfig.Secret, + logger: logger, + } +} + +// Close closes the connection between this server and all Ethereum nodes connected to it +func (n *NodeRelay) Close() { + close(n.channel.MsgPing) + close(n.channel.MsgLatency) +} + +// HandleRequest is the function to handle all server requests that came from +// Ethereum nodes +func (n *NodeRelay) HandleRequest(w http.ResponseWriter, r *http.Request) { + upgradeConn := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + conn, err := connutil.NewUpgradeConn(upgradeConn, w, r) + if err != nil { + n.logger.Warnf("error establishing node connection: %s", err) + return + } + n.logger.Infof("new node connected! (addr=%s, host=%s)", r.RemoteAddr, r.Host) + go n.loop(conn) +} + +// loop loops as long as the connection is alive and retrieves node packages +func (n *NodeRelay) loop(c *connutil.ConnWrapper) { + // Close connection if an unexpected error occurs and delete the node + // from the map of connected nodes... + defer func(c *connutil.ConnWrapper) { + //remove the error node and login id\ + if n.channel.Nodes[c.RemoteAddr().String()] != nil { + delete(n.channel.Nodes, c.RemoteAddr().String()) + } + if n.channel.LoginIDs[c.RemoteAddr().String()] != "" { + content := "node: [" + n.channel.LoginIDs[c.RemoteAddr().String()] + "-" + c.RemoteAddr().String() + "] error, stop it" + err := emailutil.SendEmailDefault(fmt.Sprintf("%s-异常节点\n", time.Now().Format("2006-01-02 15:04:05")), content) + if err != nil { + n.logger.Trace("email content: ", content, " send error: ", err) + } + if err == nil { + n.logger.Trace("email send success") + } + delete(n.channel.LoginIDs, c.RemoteAddr().String()) + } + err := c.Close() + if err != nil { + n.logger.Error(err) + return + } + n.logger.Warnf("connection with node closed, there are %d connected nodes", len(n.channel.Nodes)) + }(c) + // Client loop + for { + _, content, err := c.ReadMessage() + if err != nil { + n.logger.Errorf("error reading message from client, %s", err) + return + } + // Create emitted message from the node + msg := model.Message{Content: content} + msgType, err := msg.GetType() + if err != nil { + n.logger.Warnf("can't get type of message sent by the node: %s", err) + return + } + switch msgType { + case messageHello: + authMsg, parseError := parseAuthMessage(msg) + if parseError != nil { + n.logger.Warnf("can't parse authorization message sent by node[%s], error: %s", authMsg.ID, parseError) + loginErr := authMsg.SendLoginErrResponse(c, "login data parsing error") + if loginErr != nil { + n.logger.Errorf("error sending authorization response [parse message error info] to node[%s], error: %s", authMsg.ID, loginErr) + return + } + return + } + // first check if the secret is correct + if authMsg.Secret != n.secret { + n.logger.Errorf("invalid secret from node %s, can't get stats", authMsg.ID) + loginErr := authMsg.SendLoginErrResponse(c, "authorization error,invalid secret") + if loginErr != nil { + n.logger.Errorf("error sending authorization response [invalid secret] to node[%s], error: %s", authMsg.ID, loginErr) + return + } + return + } + //判断节点名称是否重复,遍历效率有点低,有时间了在考虑怎么优化,或者伙计们可以帮忙想个简单的法子 + for k, v := range n.channel.LoginIDs { + if v == authMsg.ID && k != c.RemoteAddr().String() { + n.logger.Errorf("the id [%s] has login", authMsg.ID) + loginErr := authMsg.SendLoginErrResponse(c, "the login id has being exist,please change the id name") + if loginErr != nil { + n.logger.Errorf("error sending authorization response [login id is exist] to node[%s], error: %s", authMsg.ID, loginErr) + return + } + return + } + + } + sendError := authMsg.SendResponse(c) + if sendError != nil { + n.logger.Errorf("error sending authorization response to node[%s], error: %s", authMsg.ID, sendError) + return + } + n.channel.LoginIDs[c.RemoteAddr().String()] = authMsg.ID + case messagePing: + // When the node emit a ping message, we need to respond with pong + // before five seconds to authorize that node to sent reports + ping, err := parseNodePingMessage(msg) + if err != nil { + n.logger.Warnf("can't parse ping message sent by node[%s], error: %s", ping.ID, err) + return + } + sendError := ping.SendResponse(c) + if sendError != nil { + n.logger.Errorf("error sending pong response to node[%s], error: %s", ping.ID, sendError) + } + n.channel.MsgPing <- content + case messageLatency: + n.channel.MsgLatency <- content + case messageStats: + // use node addr as identifier to check node availability + n.channel.Nodes[c.RemoteAddr().String()] = content + n.logger.Infof("currently there are %d connected nodes", len(n.channel.Nodes)) + } + } +} + +// parseNodePingMessage parse the current ping message sent bu the Ethereum node +// and creates a message.NodePing struct with that info +func parseNodePingMessage(msg model.Message) (*model.NodePing, error) { + value, err := msg.GetValue() + if err != nil { + return &model.NodePing{}, err + } + var ping model.NodePing + err = json.Unmarshal(value, &ping) + return &ping, err +} + +// parseAuthMessage parse the current byte array and transforms it to an AuthMessage struct. +// If an error occurs when json unmarshal, an error is returned +func parseAuthMessage(msg model.Message) (*model.AuthMessage, error) { + value, err := msg.GetValue() + if err != nil { + return &model.AuthMessage{}, err + } + var detail model.AuthMessage + err = json.Unmarshal(value, &detail) + return &detail, err +} diff --git a/server/cmd/cobra.go b/server/cmd/cobra.go new file mode 100644 index 0000000..6f4f5ee --- /dev/null +++ b/server/cmd/cobra.go @@ -0,0 +1,35 @@ +package cmd + +import ( + "errors" + "ethstats/server/cmd/run" + "ethstats/server/config" + "github.com/spf13/cobra" + "os" +) + +var rootCmd = &cobra.Command{ + Use: "server", + Short: "server", + SilenceUsage: true, + Long: "server", + Version: config.ApplicationConfig.Version, + Args: func(cmd *cobra.Command, args []string) error { + if len(args) < 1 { + return errors.New("requires at least one arg") + } + return nil + }, + PersistentPreRunE: func(*cobra.Command, []string) error { return nil }, +} + +func init() { + rootCmd.AddCommand(run.StartCmd) +} + +// Execute : apply commands +func Execute() { + if err := rootCmd.Execute(); err != nil { + os.Exit(-1) + } +} diff --git a/server/cmd/run/start.go b/server/cmd/run/start.go new file mode 100644 index 0000000..8c80eee --- /dev/null +++ b/server/cmd/run/start.go @@ -0,0 +1,153 @@ +package run + +import ( + "ethstats/server/app" + "ethstats/server/config" + "github.com/bitxx/load-config/source/file" + "github.com/spf13/cobra" + "log" +) + +var ( + configPath string + StartCmd *cobra.Command +) + +const ( + name = "name" + host = "host" + port = "port" + version = "version" + secret = "secret" + logPath = "log-path" + logLevel = "log-level" + logStdout = "log-stdout" + logType = "log-type" + logCap = "log-cap" + emailHost = "email-host" + emailPort = "email-port" + emailUsername = "email-username" + emailPassword = "email-password" + emailFrom = "email-from" + emailContentType = "email-content-type" + emailTo = "email-to" + emailSubjectPrefix = "email-subject-prefix" + monitorTime = "email-monitor-time" +) + +func init() { + StartCmd = &cobra.Command{ + Use: "start", + Short: "run the server", + Example: "server start -c config/settings.yml", + SilenceUsage: true, + PreRun: func(cmd *cobra.Command, args []string) { + config.Setup( + file.NewSource(file.WithPath(configPath)), + ) + flag := cmd.PersistentFlags() + + if name, _ := flag.GetString(name); name != "" { + config.ApplicationConfig.Name = name + } + if host, _ := flag.GetString(host); host != "" { + config.ApplicationConfig.Host = host + } + if port, _ := flag.GetString(port); port != "" { + config.ApplicationConfig.Port = port + } + if version, _ := flag.GetString(version); version != "" && config.ApplicationConfig.Version == "" { + config.ApplicationConfig.Version = version + } + if secret, _ := flag.GetString(secret); secret != "" && config.ApplicationConfig.Secret == "" { + config.ApplicationConfig.Secret = secret + } + if logPath, _ := flag.GetString(logPath); logPath != "" && config.LoggerConfig.Path == "" { + config.LoggerConfig.Path = logPath + } + if logLevel, _ := flag.GetString(logLevel); logLevel != "" && config.LoggerConfig.Level == "" { + config.LoggerConfig.Level = logLevel + } + if logStdout, _ := flag.GetString(logStdout); logStdout != "" && config.LoggerConfig.Stdout == "" { + config.LoggerConfig.Stdout = logStdout + } + if logType, _ := flag.GetString(logType); logType != "" && config.LoggerConfig.Type == "" { + config.LoggerConfig.Type = logType + } + if logCap, _ := flag.GetUint(logCap); logCap > 0 && config.LoggerConfig.Cap <= 0 { + config.LoggerConfig.Cap = logCap + } + if emailHost, _ := flag.GetString(emailHost); emailHost != "" && config.EmailConfig.Host == "" { + config.EmailConfig.Host = emailHost + } + if emailPort, _ := flag.GetInt(emailPort); emailPort > 0 && config.EmailConfig.Port <= 0 { + config.EmailConfig.Port = emailPort + } + if emailContentType, _ := flag.GetString(emailContentType); emailContentType != "" && config.EmailConfig.ContentType == "" { + config.EmailConfig.ContentType = emailContentType + } + if emailUsername, _ := flag.GetString(emailUsername); emailUsername != "" && config.EmailConfig.Username == "" { + config.EmailConfig.Username = emailUsername + } + if emailPassword, _ := flag.GetString(emailPassword); emailPassword != "" && config.EmailConfig.Password == "" { + config.EmailConfig.Password = emailPassword + } + if emailFrom, _ := flag.GetString(emailFrom); emailFrom != "" && config.EmailConfig.FromEmail == "" { + config.EmailConfig.FromEmail = emailFrom + } + if emailTo, _ := flag.GetString(emailTo); emailTo != "" && config.EmailConfig.ToEmail == "" { + config.EmailConfig.ToEmail = emailTo + } + if emailSubjectPrefix, _ := flag.GetString(emailSubjectPrefix); emailSubjectPrefix != "" && config.EmailConfig.SubjectPrefix == "" { + config.EmailConfig.SubjectPrefix = emailSubjectPrefix + } + if monitorTime, _ := flag.GetInt(monitorTime); monitorTime > 0 && config.EmailConfig.MonitorTime <= 0 { + config.EmailConfig.MonitorTime = monitorTime + } + + if config.ApplicationConfig.Name == "" { + log.Fatal("param name can't empty") + } + if config.ApplicationConfig.Host == "" { + log.Fatal("param host can't empty") + } + if config.ApplicationConfig.Port == "" { + log.Fatal("param port can't empty") + } + if config.ApplicationConfig.Secret == "" { + log.Fatal("param secret can't empty") + } + + }, + RunE: func(cmd *cobra.Command, args []string) error { + return run() + }, + } + cmd := StartCmd.PersistentFlags() + cmd.StringVarP(&configPath, "config", "c", "", "Start server with provided configuration file") + + cmd.String(name, "", "name") + cmd.String(host, "", "host") + cmd.String(port, "", "prot") + cmd.String(version, "v1.0.0", "version") + cmd.String(secret, "", "secret") + cmd.String(logPath, "files/logs", "log path") + cmd.String(logLevel, "trace", "log path") + cmd.String(logStdout, "file", "default,file") + cmd.String(logType, "default", "default、zap、logrus") + cmd.Uint(logCap, 50, "log cap") + cmd.String(emailHost, "", "email host") + cmd.Int(emailPort, 0, "email port") + cmd.String(emailContentType, "text/plain", "email content type") + cmd.String(emailUsername, "", "email username") + cmd.String(emailPassword, "", "email password") + cmd.String(emailFrom, "", "email from") + cmd.String(emailTo, "", "email to") + cmd.String(emailSubjectPrefix, "", "email subject prefix") + cmd.Int(monitorTime, 86400, "email monitor time") +} + +func run() error { + app.NewApp().Start() + return nil +} diff --git a/server/config/application.go b/server/config/application.go new file mode 100644 index 0000000..ae6b5a7 --- /dev/null +++ b/server/config/application.go @@ -0,0 +1,11 @@ +package config + +type Application struct { + Name string + Host string + Port string + Version string + Secret string +} + +var ApplicationConfig = new(Application) diff --git a/server/config/config.go b/server/config/config.go new file mode 100644 index 0000000..7ecc9ef --- /dev/null +++ b/server/config/config.go @@ -0,0 +1,57 @@ +package config + +import ( + "fmt" + loadconfig "github.com/bitxx/load-config" + "github.com/bitxx/load-config/source" + "log" +) + +type Config struct { + Application *Application `yaml:"application"` + Logger *Logger `yaml:"logger"` + Email *Email `yaml:"email"` + callbacks []func() +} + +func (e *Config) init() { + e.Logger.Setup() + e.runCallback() +} + +func (e *Config) Init() { + e.init() + log.Println("!!! server config init") +} + +func (e *Config) runCallback() { + for i := range e.callbacks { + e.callbacks[i]() + } +} + +func (e *Config) OnChange() { + e.init() + log.Println("!!! server config change and reload") +} + +// Setup 载入配置文件 +func Setup(s source.Source, + fs ...func()) { + _cfg := &Config{ + Application: ApplicationConfig, + Logger: LoggerConfig, + Email: EmailConfig, + callbacks: fs, + } + var err error + loadconfig.DefaultConfig, err = loadconfig.NewConfig( + loadconfig.WithSource(s), + loadconfig.WithEntity(_cfg), + ) + if err != nil { + log.Println(fmt.Sprintf("New server config object fail: %s, use default param to start", err.Error())) + return + } + _cfg.Init() +} diff --git a/server/config/email.go b/server/config/email.go new file mode 100644 index 0000000..3b82508 --- /dev/null +++ b/server/config/email.go @@ -0,0 +1,15 @@ +package config + +type Email struct { + Host string + Port int + Username string + Password string + FromEmail string + ContentType string + ToEmail string + SubjectPrefix string + MonitorTime int +} + +var EmailConfig = new(Email) diff --git a/server/config/logger.go b/server/config/logger.go new file mode 100644 index 0000000..516afbe --- /dev/null +++ b/server/config/logger.go @@ -0,0 +1,28 @@ +package config + +import ( + "github.com/bitxx/logger" +) + +type Logger struct { + Type string + Path string + Level string + Stdout string + EnabledDB bool + Cap uint +} + +// Setup 设置logger +func (e Logger) Setup() { + l := logger.SetupLogger( + logger.WithType(e.Type), + logger.WithPath(e.Path), + logger.WithLevel(e.Level), + logger.WithStdout(e.Stdout), + logger.WithCap(e.Cap), + ) + _ = l.Init() +} + +var LoggerConfig = new(Logger) diff --git a/server/config/settings.yml b/server/config/settings.yml new file mode 100644 index 0000000..4ce4b82 --- /dev/null +++ b/server/config/settings.yml @@ -0,0 +1,34 @@ +application: + name: test + host: "0.0.0.0" + port: "3000" + version: v1.0.0 + secret: "123456" +logger: + # 日志存放路径 + path: files/logs + # 日志输出,file:文件,default:命令行,其他:命令行 + stdout: '' #控制台日志,启用后,不输出到文件 + # 日志等级, trace, debug, info, warn, error, fatal + level: trace + # 支持default、zap、logrus + type: default + # 单文件条数 + cap: 100 + +# 如果邮件配置中的内容为空,则不会发送邮件,不影响程序运行 +email: + host: 邮箱服务地址 + port: 465 + username: 邮箱账户 + password: 邮箱密钥 + # 发送者邮箱地址 + fromEmail: 发件邮箱 + # 邮件内容格式 + contentType: text/plain + # 邮件标题前缀 + subjectPrefix: ethereum + # 多个地址用英文逗号隔开 + toEmail: 收件邮箱 + # 监控信息简报发送间隔时间,单位秒;每隔指定时间,会将监控设备的节点概要信息发送到邮箱 + monitorTime: 86400 diff --git a/server/frontend/.gitkeep b/server/frontend/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..0db31d2 --- /dev/null +++ b/server/server.go @@ -0,0 +1,13 @@ +package main + +import ( + "ethstats/server/cmd" +) + +//go:generate go env -w GO111MODULE=on +//go:generate go env -w GOPROXY=https://goproxy.cn,direct +//go:generate go mod tidy +//go:generate go mod download +func main() { + cmd.Execute() +}