diff --git a/app/cmd/root.go b/app/cmd/root.go index cc7f39f731..a58e2bc21d 100644 --- a/app/cmd/root.go +++ b/app/cmd/root.go @@ -29,20 +29,24 @@ const ( var ( // These values will be injected by the build system - appVersion = "Unknown" - appDate = "Unknown" - appType = "Unknown" // aka channel - appCommit = "Unknown" - appPlatform = "Unknown" - appArch = "Unknown" + appVersion = "Unknown" + appDate = "Unknown" + appType = "Unknown" // aka channel + appToolchain = "Unknown" + appCommit = "Unknown" + appPlatform = "Unknown" + appArch = "Unknown" + libVersion = "Unknown" appVersionLong = fmt.Sprintf("Version:\t%s\n"+ "BuildDate:\t%s\n"+ "BuildType:\t%s\n"+ + "Toolchain:\t%s\n"+ "CommitHash:\t%s\n"+ "Platform:\t%s\n"+ - "Architecture:\t%s", - appVersion, appDate, appType, appCommit, appPlatform, appArch) + "Architecture:\t%s\n"+ + "LibVersion:\t%s", + appVersion, appDate, appType, appToolchain, appCommit, appPlatform, appArch, libVersion) appAboutLong = fmt.Sprintf("%s\n%s\n%s\n\n%s", appLogo, appDesc, appAuthors, appVersionLong) ) diff --git a/app/cmd/server.go b/app/cmd/server.go index 3da748d7bd..a4b8470940 100644 --- a/app/cmd/server.go +++ b/app/cmd/server.go @@ -804,6 +804,9 @@ func (c *serverConfig) fillMasqHandler(hyConfig *server.Config) error { if err != nil { return configError{Field: "masquerade.proxy.url", Err: err} } + if u.Scheme != "http" && u.Scheme != "https" { + return configError{Field: "masquerade.proxy.url", Err: fmt.Errorf("unsupported protocol scheme \"%s\"", u.Scheme)} + } handler = &httputil.ReverseProxy{ Rewrite: func(r *httputil.ProxyRequest) { r.SetURL(u) diff --git a/app/cmd/share.go b/app/cmd/share.go new file mode 100644 index 0000000000..ad96e80cc2 --- /dev/null +++ b/app/cmd/share.go @@ -0,0 +1,55 @@ +package cmd + +import ( + "fmt" + + "github.com/apernet/hysteria/app/v2/internal/utils" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +var ( + noText bool + withQR bool +) + +// shareCmd represents the share command +var shareCmd = &cobra.Command{ + Use: "share", + Short: "Generate share URI", + Long: "Generate a hysteria2:// URI from a client config for sharing", + Run: runShare, +} + +func init() { + initShareFlags() + rootCmd.AddCommand(shareCmd) +} + +func initShareFlags() { + shareCmd.Flags().BoolVar(&noText, "notext", false, "do not show URI as text") + shareCmd.Flags().BoolVar(&withQR, "qr", false, "show URI as QR code") +} + +func runShare(cmd *cobra.Command, args []string) { + if err := viper.ReadInConfig(); err != nil { + logger.Fatal("failed to read client config", zap.Error(err)) + } + var config clientConfig + if err := viper.Unmarshal(&config); err != nil { + logger.Fatal("failed to parse client config", zap.Error(err)) + } + if _, err := config.Config(); err != nil { + logger.Fatal("failed to load client config", zap.Error(err)) + } + + u := config.URI() + + if !noText { + fmt.Println(u) + } + if withQR { + utils.PrintQR(u) + } +} diff --git a/app/go.mod b/app/go.mod index 7353f6a98d..dab7e51776 100644 --- a/app/go.mod +++ b/app/go.mod @@ -30,7 +30,7 @@ require ( require ( github.com/andybalholm/brotli v1.1.0 // indirect - github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d // indirect + github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 // indirect github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6 // indirect github.com/cloudflare/circl v1.3.9 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/app/go.sum b/app/go.sum index 73ee9638ab..dec519bcc1 100644 --- a/app/go.sum +++ b/app/go.sum @@ -42,8 +42,8 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1 github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/apernet/go-tproxy v0.0.0-20230809025308-8f4723fd742f h1:uVh0qpEslrWjgzx9vOcyCqsOY3c9kofDZ1n+qaw35ZY= github.com/apernet/go-tproxy v0.0.0-20230809025308-8f4723fd742f/go.mod h1:xkkq9D4ygcldQQhKS/w9CadiCKwCngU7K9E3DaKahpM= -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d h1:KWRCWISqJOgY9/0hhH8Bevjw/k4tCQ7oJlXLyFv8u9s= -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d/go.mod h1:x0paLlmCzNOUDDQIgmgFWmnpWQIEuH1GNfA6NdgSTuM= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 h1:zO38yBOvQ1dLHbSuaU5BFZ8zalnSDQslj+i/9AGOk9s= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7/go.mod h1:LoSUY2chVqNQCDyi4IZGqPpXLy1FuCkE37PKwtJvNGg= github.com/apernet/sing-tun v0.2.6-0.20240323130332-b9f6511036ad h1:QzQ2sKpc9o42HNRR8ukM5uMC/RzR2HgZd/Nvaqol2C0= github.com/apernet/sing-tun v0.2.6-0.20240323130332-b9f6511036ad/go.mod h1:S5IydyLSN/QAfvY+r2GoomPJ6hidtXWm/Ad18sJVssk= github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6 h1:4NNbNM2Iq/k57qEu7WfL67UrbPq1uFWxW4qODCohi+0= diff --git a/core/go.mod b/core/go.mod index 8d4cec0d01..beb0372201 100644 --- a/core/go.mod +++ b/core/go.mod @@ -5,7 +5,7 @@ go 1.22 toolchain go1.23.2 require ( - github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d + github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 github.com/stretchr/testify v1.9.0 go.uber.org/goleak v1.2.1 golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 diff --git a/core/go.sum b/core/go.sum index 4af12ad7d8..d65498b2c7 100644 --- a/core/go.sum +++ b/core/go.sum @@ -1,5 +1,5 @@ -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d h1:KWRCWISqJOgY9/0hhH8Bevjw/k4tCQ7oJlXLyFv8u9s= -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d/go.mod h1:x0paLlmCzNOUDDQIgmgFWmnpWQIEuH1GNfA6NdgSTuM= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 h1:zO38yBOvQ1dLHbSuaU5BFZ8zalnSDQslj+i/9AGOk9s= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7/go.mod h1:LoSUY2chVqNQCDyi4IZGqPpXLy1FuCkE37PKwtJvNGg= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/core/internal/integration_tests/mocks/mock_TrafficLogger.go b/core/internal/integration_tests/mocks/mock_TrafficLogger.go index 9de44b976e..1ed977efd0 100644 --- a/core/internal/integration_tests/mocks/mock_TrafficLogger.go +++ b/core/internal/integration_tests/mocks/mock_TrafficLogger.go @@ -2,7 +2,12 @@ package mocks -import mock "github.com/stretchr/testify/mock" +import ( + quic "github.com/apernet/quic-go" + mock "github.com/stretchr/testify/mock" + + server "github.com/apernet/hysteria/core/v2/server" +) // MockTrafficLogger is an autogenerated mock type for the TrafficLogger type type MockTrafficLogger struct { @@ -99,6 +104,73 @@ func (_c *MockTrafficLogger_LogTraffic_Call) RunAndReturn(run func(string, uint6 return _c } +// TraceStream provides a mock function with given fields: stream, stats +func (_m *MockTrafficLogger) TraceStream(stream quic.Stream, stats *server.StreamStats) { + _m.Called(stream, stats) +} + +// MockTrafficLogger_TraceStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'TraceStream' +type MockTrafficLogger_TraceStream_Call struct { + *mock.Call +} + +// TraceStream is a helper method to define mock.On call +// - stream quic.Stream +// - stats *server.StreamStats +func (_e *MockTrafficLogger_Expecter) TraceStream(stream interface{}, stats interface{}) *MockTrafficLogger_TraceStream_Call { + return &MockTrafficLogger_TraceStream_Call{Call: _e.mock.On("TraceStream", stream, stats)} +} + +func (_c *MockTrafficLogger_TraceStream_Call) Run(run func(stream quic.Stream, stats *server.StreamStats)) *MockTrafficLogger_TraceStream_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(quic.Stream), args[1].(*server.StreamStats)) + }) + return _c +} + +func (_c *MockTrafficLogger_TraceStream_Call) Return() *MockTrafficLogger_TraceStream_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTrafficLogger_TraceStream_Call) RunAndReturn(run func(quic.Stream, *server.StreamStats)) *MockTrafficLogger_TraceStream_Call { + _c.Call.Return(run) + return _c +} + +// UntraceStream provides a mock function with given fields: stream +func (_m *MockTrafficLogger) UntraceStream(stream quic.Stream) { + _m.Called(stream) +} + +// MockTrafficLogger_UntraceStream_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UntraceStream' +type MockTrafficLogger_UntraceStream_Call struct { + *mock.Call +} + +// UntraceStream is a helper method to define mock.On call +// - stream quic.Stream +func (_e *MockTrafficLogger_Expecter) UntraceStream(stream interface{}) *MockTrafficLogger_UntraceStream_Call { + return &MockTrafficLogger_UntraceStream_Call{Call: _e.mock.On("UntraceStream", stream)} +} + +func (_c *MockTrafficLogger_UntraceStream_Call) Run(run func(stream quic.Stream)) *MockTrafficLogger_UntraceStream_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(quic.Stream)) + }) + return _c +} + +func (_c *MockTrafficLogger_UntraceStream_Call) Return() *MockTrafficLogger_UntraceStream_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTrafficLogger_UntraceStream_Call) RunAndReturn(run func(quic.Stream)) *MockTrafficLogger_UntraceStream_Call { + _c.Call.Return(run) + return _c +} + // NewMockTrafficLogger creates a new instance of MockTrafficLogger. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockTrafficLogger(t interface { diff --git a/core/internal/integration_tests/trafficlogger_test.go b/core/internal/integration_tests/trafficlogger_test.go index b5355ff694..841f4ffa8f 100644 --- a/core/internal/integration_tests/trafficlogger_test.go +++ b/core/internal/integration_tests/trafficlogger_test.go @@ -62,6 +62,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) { return nil }) serverOb.EXPECT().TCP(addr).Return(sobConn, nil).Once() + trafficLogger.EXPECT().TraceStream(mock.Anything, mock.Anything).Return().Once() conn, err := c.TCP(addr) assert.NoError(t, err) @@ -84,6 +85,7 @@ func TestClientServerTrafficLoggerTCP(t *testing.T) { time.Sleep(1 * time.Second) // Need some time for the server to receive the data // Client reads from server again but blocked + trafficLogger.EXPECT().UntraceStream(mock.Anything).Return().Once() trafficLogger.EXPECT().LogTraffic("nobody", uint64(0), uint64(4)).Return(false).Once() trafficLogger.EXPECT().LogOnlineState("nobody", false).Return().Once() sobConnCh <- []byte("nope") diff --git a/core/internal/utils/atomic.go b/core/internal/utils/atomic.go index e3c3d97782..7739013ec0 100644 --- a/core/internal/utils/atomic.go +++ b/core/internal/utils/atomic.go @@ -22,3 +22,33 @@ func (t *AtomicTime) Set(new time.Time) { func (t *AtomicTime) Get() time.Time { return t.v.Load().(time.Time) } + +type Atomic[T any] struct { + v atomic.Value +} + +func (a *Atomic[T]) Load() T { + value := a.v.Load() + if value == nil { + var zero T + return zero + } + return value.(T) +} + +func (a *Atomic[T]) Store(value T) { + a.v.Store(value) +} + +func (a *Atomic[T]) Swap(new T) T { + old := a.v.Swap(new) + if old == nil { + var zero T + return zero + } + return old.(T) +} + +func (a *Atomic[T]) CompareAndSwap(old, new T) bool { + return a.v.CompareAndSwap(old, new) +} diff --git a/core/server/config.go b/core/server/config.go index f90c820557..a01f478f4d 100644 --- a/core/server/config.go +++ b/core/server/config.go @@ -4,10 +4,12 @@ import ( "crypto/tls" "net" "net/http" + "sync/atomic" "time" "github.com/apernet/hysteria/core/v2/errors" "github.com/apernet/hysteria/core/v2/internal/pmtud" + "github.com/apernet/hysteria/core/v2/internal/utils" "github.com/apernet/quic-go" ) @@ -212,4 +214,66 @@ type EventLogger interface { type TrafficLogger interface { LogTraffic(id string, tx, rx uint64) (ok bool) LogOnlineState(id string, online bool) + TraceStream(stream quic.Stream, stats *StreamStats) + UntraceStream(stream quic.Stream) +} + +type StreamState int + +const ( + // StreamStateInitial indicates the initial state of a stream. + // Client has opened the stream, but we have not received the proxy request yet. + StreamStateInitial StreamState = iota + + // StreamStateHooking indicates that the hook (usually sniff) is processing. + // Client has sent the proxy request, but sniff requires more data to complete. + StreamStateHooking + + // StreamStateConnecting indicates that we are connecting to the proxy target. + StreamStateConnecting + + // StreamStateEstablished indicates the proxy is established. + StreamStateEstablished + + // StreamStateClosed indicates the stream is closed. + StreamStateClosed +) + +func (s StreamState) String() string { + switch s { + case StreamStateInitial: + return "init" + case StreamStateHooking: + return "hook" + case StreamStateConnecting: + return "connect" + case StreamStateEstablished: + return "estab" + case StreamStateClosed: + return "closed" + default: + return "unknown" + } +} + +type StreamStats struct { + State utils.Atomic[StreamState] + + AuthID string + ConnID uint32 + InitialTime time.Time + + ReqAddr utils.Atomic[string] + HookedReqAddr utils.Atomic[string] + + Tx atomic.Uint64 + Rx atomic.Uint64 + + LastActiveTime utils.Atomic[time.Time] +} + +func (s *StreamStats) setHookedReqAddr(addr string) { + if addr != s.ReqAddr.Load() { + s.HookedReqAddr.Store(addr) + } } diff --git a/core/server/copy.go b/core/server/copy.go index d55dcefe3f..7123fc89ea 100644 --- a/core/server/copy.go +++ b/core/server/copy.go @@ -3,6 +3,7 @@ package server import ( "errors" "io" + "time" ) var errDisconnect = errors.New("traffic logger requested disconnect") @@ -31,15 +32,19 @@ func copyBufferLog(dst io.Writer, src io.Reader, log func(n uint64) bool) error } } -func copyTwoWayWithLogger(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger) error { +func copyTwoWayEx(id string, serverRw, remoteRw io.ReadWriter, l TrafficLogger, stats *StreamStats) error { errChan := make(chan error, 2) go func() { errChan <- copyBufferLog(serverRw, remoteRw, func(n uint64) bool { + stats.LastActiveTime.Store(time.Now()) + stats.Rx.Add(n) return l.LogTraffic(id, 0, n) }) }() go func() { errChan <- copyBufferLog(remoteRw, serverRw, func(n uint64) bool { + stats.LastActiveTime.Store(time.Now()) + stats.Tx.Add(n) return l.LogTraffic(id, n, 0) }) }() @@ -47,7 +52,7 @@ func copyTwoWayWithLogger(id string, serverRw, remoteRw io.ReadWriter, l Traffic return <-errChan } -// copyTwoWay is the "fast-path" version of copyTwoWayWithLogger that does not log traffic. +// copyTwoWay is the "fast-path" version of copyTwoWayEx that does not log traffic or update stream stats. // It uses the built-in io.Copy instead of our own copyBufferLog. func copyTwoWay(serverRw, remoteRw io.ReadWriter) error { errChan := make(chan error, 2) diff --git a/core/server/server.go b/core/server/server.go index ba55b315b6..696f1d0956 100644 --- a/core/server/server.go +++ b/core/server/server.go @@ -3,8 +3,10 @@ package server import ( "context" "crypto/tls" + "math/rand" "net/http" "sync" + "time" "github.com/apernet/quic-go" "github.com/apernet/quic-go/http3" @@ -100,6 +102,7 @@ type h3sHandler struct { authenticated bool authMutex sync.Mutex authID string + connID uint32 // a random id for dump streams udpSM *udpSessionManager // Only set after authentication } @@ -108,6 +111,7 @@ func newH3sHandler(config *Config, conn quic.Connection) *h3sHandler { return &h3sHandler{ config: config, conn: conn, + connID: rand.Uint32(), } } @@ -205,12 +209,29 @@ func (h *h3sHandler) ProxyStreamHijacker(ft http3.FrameType, id quic.ConnectionT } func (h *h3sHandler) handleTCPRequest(stream quic.Stream) { + trafficLogger := h.config.TrafficLogger + streamStats := &StreamStats{ + AuthID: h.authID, + ConnID: h.connID, + InitialTime: time.Now(), + } + streamStats.State.Store(StreamStateInitial) + streamStats.LastActiveTime.Store(time.Now()) + defer func() { + streamStats.State.Store(StreamStateClosed) + }() + if trafficLogger != nil { + trafficLogger.TraceStream(stream, streamStats) + defer trafficLogger.UntraceStream(stream) + } + // Read request reqAddr, err := protocol.ReadTCPRequest(stream) if err != nil { _ = stream.Close() return } + streamStats.ReqAddr.Store(reqAddr) // Call the hook if set var putback []byte var hooked bool @@ -220,12 +241,14 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) { // so that the client will send whatever request the hook wants to see. // This is essentially a server-side fast-open. if hooked { + streamStats.State.Store(StreamStateHooking) _ = protocol.WriteTCPResponse(stream, true, "RequestHook enabled") putback, err = h.config.RequestHook.TCP(stream, &reqAddr) if err != nil { _ = stream.Close() return } + streamStats.setHookedReqAddr(reqAddr) } } // Log the event @@ -233,6 +256,7 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) { h.config.EventLogger.TCPRequest(h.conn.RemoteAddr(), h.authID, reqAddr) } // Dial target + streamStats.State.Store(StreamStateConnecting) tConn, err := h.config.Outbound.TCP(reqAddr) if err != nil { if !hooked { @@ -248,13 +272,15 @@ func (h *h3sHandler) handleTCPRequest(stream quic.Stream) { if !hooked { _ = protocol.WriteTCPResponse(stream, true, "Connected") } + streamStats.State.Store(StreamStateEstablished) // Put back the data if the hook requested if len(putback) > 0 { - _, _ = tConn.Write(putback) + n, _ := tConn.Write(putback) + streamStats.Tx.Add(uint64(n)) } // Start proxying - if h.config.TrafficLogger != nil { - err = copyTwoWayWithLogger(h.authID, stream, tConn, h.config.TrafficLogger) + if trafficLogger != nil { + err = copyTwoWayEx(h.authID, stream, tConn, trafficLogger, streamStats) } else { // Use the fast path if no traffic logger is set err = copyTwoWay(stream, tConn) diff --git a/extras/go.mod b/extras/go.mod index be48ce6a76..3da331af60 100644 --- a/extras/go.mod +++ b/extras/go.mod @@ -6,7 +6,7 @@ toolchain go1.23.2 require ( github.com/apernet/hysteria/core/v2 v2.0.0-00010101000000-000000000000 - github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d + github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6 github.com/hashicorp/golang-lru/v2 v2.0.5 github.com/miekg/dns v1.1.59 diff --git a/extras/go.sum b/extras/go.sum index d1f42eaed1..d07ba7cfce 100644 --- a/extras/go.sum +++ b/extras/go.sum @@ -1,7 +1,7 @@ github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d h1:KWRCWISqJOgY9/0hhH8Bevjw/k4tCQ7oJlXLyFv8u9s= -github.com/apernet/quic-go v0.47.1-0.20241004180137-a80d14e2080d/go.mod h1:x0paLlmCzNOUDDQIgmgFWmnpWQIEuH1GNfA6NdgSTuM= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7 h1:zO38yBOvQ1dLHbSuaU5BFZ8zalnSDQslj+i/9AGOk9s= +github.com/apernet/quic-go v0.48.2-0.20241104191913-cb103fcecfe7/go.mod h1:LoSUY2chVqNQCDyi4IZGqPpXLy1FuCkE37PKwtJvNGg= github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6 h1:4NNbNM2Iq/k57qEu7WfL67UrbPq1uFWxW4qODCohi+0= github.com/babolivier/go-doh-client v0.0.0-20201028162107-a76cff4cb8b6/go.mod h1:J29hk+f9lJrblVIfiJOtTFk+OblBawmib4uz/VdKzlg= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= diff --git a/extras/trafficlogger/http.go b/extras/trafficlogger/http.go index 9ab943af17..d8e6ebd4f1 100644 --- a/extras/trafficlogger/http.go +++ b/extras/trafficlogger/http.go @@ -1,12 +1,18 @@ package trafficlogger import ( + "cmp" "encoding/json" + "fmt" "net/http" + "slices" "strconv" + "strings" "sync" + "time" "github.com/apernet/hysteria/core/v2/server" + "github.com/apernet/quic-go" ) const ( @@ -25,6 +31,7 @@ func NewTrafficStatsServer(secret string) TrafficStatsServer { StatsMap: make(map[string]*trafficStatsEntry), KickMap: make(map[string]struct{}), OnlineMap: make(map[string]int), + StreamMap: make(map[quic.Stream]*server.StreamStats), Secret: secret, } } @@ -33,6 +40,7 @@ type trafficStatsServerImpl struct { Mutex sync.RWMutex StatsMap map[string]*trafficStatsEntry OnlineMap map[string]int + StreamMap map[quic.Stream]*server.StreamStats KickMap map[string]struct{} Secret string } @@ -78,6 +86,20 @@ func (s *trafficStatsServerImpl) LogOnlineState(id string, online bool) { } } +func (s *trafficStatsServerImpl) TraceStream(stream quic.Stream, stats *server.StreamStats) { + s.Mutex.Lock() + defer s.Mutex.Unlock() + + s.StreamMap[stream] = stats +} + +func (s *trafficStatsServerImpl) UntraceStream(stream quic.Stream) { + s.Mutex.Lock() + defer s.Mutex.Unlock() + + delete(s.StreamMap, stream) +} + func (s *trafficStatsServerImpl) ServeHTTP(w http.ResponseWriter, r *http.Request) { if s.Secret != "" && r.Header.Get("Authorization") != s.Secret { http.Error(w, "unauthorized", http.StatusUnauthorized) @@ -99,6 +121,10 @@ func (s *trafficStatsServerImpl) ServeHTTP(w http.ResponseWriter, r *http.Reques s.getOnline(w, r) return } + if r.Method == http.MethodGet && r.URL.Path == "/dump/streams" { + s.getDumpStreams(w, r) + return + } http.NotFound(w, r) } @@ -137,6 +163,126 @@ func (s *trafficStatsServerImpl) getOnline(w http.ResponseWriter, r *http.Reques _, _ = w.Write(jb) } +type dumpStreamEntry struct { + State string `json:"state"` + + Auth string `json:"auth"` + Connection uint32 `json:"connection"` + Stream uint64 `json:"stream"` + + ReqAddr string `json:"req_addr"` + HookedReqAddr string `json:"hooked_req_addr"` + + Tx uint64 `json:"tx"` + Rx uint64 `json:"rx"` + + InitialAt string `json:"initial_at"` + LastActiveAt string `json:"last_active_at"` + + // for text/plain output + initialTime time.Time + lastActiveTime time.Time +} + +func (e *dumpStreamEntry) fromStreamStats(stream quic.Stream, s *server.StreamStats) { + e.State = s.State.Load().String() + e.Auth = s.AuthID + e.Connection = s.ConnID + e.Stream = uint64(stream.StreamID()) + e.ReqAddr = s.ReqAddr.Load() + e.HookedReqAddr = s.HookedReqAddr.Load() + e.Tx = s.Tx.Load() + e.Rx = s.Rx.Load() + e.initialTime = s.InitialTime + e.lastActiveTime = s.LastActiveTime.Load() + e.InitialAt = e.initialTime.Format(time.RFC3339Nano) + e.LastActiveAt = e.lastActiveTime.Format(time.RFC3339Nano) +} + +func formatDumpStreamLine(state, auth, connection, stream, reqAddr, hookedReqAddr, tx, rx, lifetime, lastActive string) string { + return fmt.Sprintf("%-8s %-12s %12s %8s %12s %12s %12s %12s %-16s %s", state, auth, connection, stream, tx, rx, lifetime, lastActive, reqAddr, hookedReqAddr) +} + +func (e *dumpStreamEntry) String() string { + stateText := strings.ToUpper(e.State) + connectionText := fmt.Sprintf("%08X", e.Connection) + streamText := strconv.FormatUint(e.Stream, 10) + reqAddrText := e.ReqAddr + if reqAddrText == "" { + reqAddrText = "-" + } + hookedReqAddrText := e.HookedReqAddr + if hookedReqAddrText == "" { + hookedReqAddrText = "-" + } + txText := strconv.FormatUint(e.Tx, 10) + rxText := strconv.FormatUint(e.Rx, 10) + lifetime := time.Now().Sub(e.initialTime) + if lifetime < 10*time.Minute { + lifetime = lifetime.Round(time.Millisecond) + } else { + lifetime = lifetime.Round(time.Second) + } + lastActive := time.Now().Sub(e.lastActiveTime) + if lastActive < 10*time.Minute { + lastActive = lastActive.Round(time.Millisecond) + } else { + lastActive = lastActive.Round(time.Second) + } + + return formatDumpStreamLine(stateText, e.Auth, connectionText, streamText, reqAddrText, hookedReqAddrText, txText, rxText, lifetime.String(), lastActive.String()) +} + +func (s *trafficStatsServerImpl) getDumpStreams(w http.ResponseWriter, r *http.Request) { + var entries []dumpStreamEntry + + s.Mutex.RLock() + entries = make([]dumpStreamEntry, len(s.StreamMap)) + index := 0 + for stream, stats := range s.StreamMap { + entries[index].fromStreamStats(stream, stats) + index++ + } + s.Mutex.RUnlock() + + slices.SortFunc(entries, func(lhs, rhs dumpStreamEntry) int { + if ret := cmp.Compare(lhs.Auth, rhs.Auth); ret != 0 { + return ret + } + if ret := cmp.Compare(lhs.Connection, rhs.Connection); ret != 0 { + return ret + } + if ret := cmp.Compare(lhs.Stream, rhs.Stream); ret != 0 { + return ret + } + return 0 + }) + + accept := r.Header.Get("Accept") + + if strings.Contains(accept, "text/plain") { + // Generate netstat-like output for humans + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + + // Print table header + _, _ = fmt.Fprintln(w, formatDumpStreamLine("State", "Auth", "Connection", "Stream", "Req-Addr", "Hooked-Req-Addr", "TX-Bytes", "RX-Bytes", "Lifetime", "Last-Active")) + for _, entry := range entries { + _, _ = fmt.Fprintln(w, entry.String()) + } + return + } + + // Response with json by default + wrapper := struct { + Streams []dumpStreamEntry `json:"streams"` + }{entries} + w.Header().Set("Content-Type", "application/json; charset=utf-8") + err := json.NewEncoder(w).Encode(&wrapper) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + func (s *trafficStatsServerImpl) kick(w http.ResponseWriter, r *http.Request) { var ids []string err := json.NewDecoder(r.Body).Decode(&ids) diff --git a/extras/utils/portunion.go b/extras/utils/portunion.go index 20a31d0f1b..f76a6fd0a6 100644 --- a/extras/utils/portunion.go +++ b/extras/utils/portunion.go @@ -74,7 +74,7 @@ func (u PortUnion) Normalize() PortUnion { normalized := PortUnion{u[0]} for _, current := range u[1:] { last := &normalized[len(normalized)-1] - if current.Start <= last.End+1 { + if uint32(current.Start) <= uint32(last.End)+1 { if current.End > last.End { last.End = current.End } @@ -89,8 +89,8 @@ func (u PortUnion) Normalize() PortUnion { func (u PortUnion) Ports() []uint16 { var ports []uint16 for _, r := range u { - for i := r.Start; i <= r.End; i++ { - ports = append(ports, i) + for i := uint32(r.Start); i <= uint32(r.End); i++ { + ports = append(ports, uint16(i)) } } return ports diff --git a/extras/utils/portunion_test.go b/extras/utils/portunion_test.go index 551bae137a..ba056a3741 100644 --- a/extras/utils/portunion_test.go +++ b/extras/utils/portunion_test.go @@ -2,6 +2,7 @@ package utils import ( "reflect" + "slices" "testing" ) @@ -51,6 +52,16 @@ func TestParsePortUnion(t *testing.T) { s: "5678,1200-1236,9100-9012,1234-1240", want: PortUnion{{1200, 1240}, {5678, 5678}, {9012, 9100}}, }, + { + name: "multiple ports and ranges with 65535 (reversed, unsorted, overlapping)", + s: "5678,1200-1236,65531-65535,65532-65534,9100-9012,1234-1240", + want: PortUnion{{1200, 1240}, {5678, 5678}, {9012, 9100}, {65531, 65535}}, + }, + { + name: "multiple ports and ranges with 65535 (reversed, unsorted, overlapping) 2", + s: "5678,1200-1236,65532-65535,65531-65534,9100-9012,1234-1240", + want: PortUnion{{1200, 1240}, {5678, 5678}, {9012, 9100}, {65531, 65535}}, + }, { name: "invalid 1", s: "1234-", @@ -90,3 +101,50 @@ func TestParsePortUnion(t *testing.T) { }) } } + +func TestPortUnion_Ports(t *testing.T) { + tests := []struct { + name string + pu PortUnion + want []uint16 + }{ + { + name: "single port", + pu: PortUnion{{1234, 1234}}, + want: []uint16{1234}, + }, + { + name: "multiple ports", + pu: PortUnion{{1234, 1236}}, + want: []uint16{1234, 1235, 1236}, + }, + { + name: "multiple ports and ranges", + pu: PortUnion{{1234, 1236}, {5678, 5680}, {9000, 9002}}, + want: []uint16{1234, 1235, 1236, 5678, 5679, 5680, 9000, 9001, 9002}, + }, + { + name: "single port 65535", + pu: PortUnion{{65535, 65535}}, + want: []uint16{65535}, + }, + { + name: "port range with 65535", + pu: PortUnion{{65530, 65535}}, + want: []uint16{65530, 65531, 65532, 65533, 65534, 65535}, + }, + { + name: "multiple ports and ranges with 65535", + pu: PortUnion{{65530, 65535}, {1234, 1236}}, + want: []uint16{65530, 65531, 65532, 65533, 65534, 65535, 1234, 1235, 1236}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.pu.Ports(); !slices.Equal(got, tt.want) { + t.Errorf("PortUnion.Ports() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go.work.sum b/go.work.sum index b00f726c50..79da3faf24 100644 --- a/go.work.sum +++ b/go.work.sum @@ -290,6 +290,7 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.12.0/go.mod h1:zEVYFnQC7m/vmpQFELhcD1EWkZlX69l4oqgmer6hfKA= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20221014153046-6fdb5e3db783 h1:nt+Q6cXKz4MosCSpnbMtqiQ8Oz0pxTef2B4Vca2lvfk= @@ -298,6 +299,7 @@ golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852 h1:xYq6+9AtI+xP3M4r0N1hCkHr golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181029174526-d69651ed3497/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190316082340-a2f829d7f35f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/hyperbole.py b/hyperbole.py index fc38eba9f0..ecc248dbf3 100755 --- a/hyperbole.py +++ b/hyperbole.py @@ -145,12 +145,33 @@ def get_app_commit(): return app_commit +def get_toolchain(): + try: + output = subprocess.check_output(["go", "version"]).decode().strip() + if output.startswith("go version "): + output = output[11:] + return output + except Exception: + return "Unknown" + + def get_current_os_arch(): d_os = subprocess.check_output(["go", "env", "GOOS"]).decode().strip() d_arch = subprocess.check_output(["go", "env", "GOARCH"]).decode().strip() return (d_os, d_arch) +def get_lib_version(): + try: + with open(CORE_SRC_DIR + "/go.mod") as f: + for line in f: + line = line.strip() + if line.startswith("github.com/apernet/quic-go"): + return line.split(" ")[1].strip() + except Exception: + return "Unknown" + + def get_app_platforms(): platforms = os.environ.get("HY_APP_PLATFORMS") if not platforms: @@ -176,8 +197,12 @@ def cmd_build(pprof=False, release=False, race=False): os.makedirs(BUILD_DIR, exist_ok=True) app_version = get_app_version() - app_date = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + app_date = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + app_toolchain = get_toolchain() app_commit = get_app_commit() + lib_version = get_lib_version() ldflags = [ "-X", @@ -190,7 +215,11 @@ def cmd_build(pprof=False, release=False, race=False): + ("release" if release else "dev") + ("-pprof" if pprof else ""), "-X", + '"' + APP_SRC_CMD_PKG + ".appToolchain=" + app_toolchain + '"', + "-X", APP_SRC_CMD_PKG + ".appCommit=" + app_commit, + "-X", + APP_SRC_CMD_PKG + ".libVersion=" + lib_version, ] if release: ldflags.append("-s") @@ -267,8 +296,12 @@ def cmd_run(args, pprof=False, race=False): return app_version = get_app_version() - app_date = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + app_date = datetime.datetime.now(datetime.timezone.utc).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + app_toolchain = get_toolchain() app_commit = get_app_commit() + lib_version = get_lib_version() current_os, current_arch = get_current_os_arch() @@ -280,11 +313,15 @@ def cmd_run(args, pprof=False, race=False): "-X", APP_SRC_CMD_PKG + ".appType=dev-run", "-X", + '"' + APP_SRC_CMD_PKG + ".appToolchain=" + app_toolchain + '"', + "-X", APP_SRC_CMD_PKG + ".appCommit=" + app_commit, "-X", APP_SRC_CMD_PKG + ".appPlatform=" + current_os, "-X", APP_SRC_CMD_PKG + ".appArch=" + current_arch, + "-X", + APP_SRC_CMD_PKG + ".libVersion=" + lib_version, ] cmd = ["go", "run", "-ldflags", " ".join(ldflags)] diff --git a/scripts/install_server.sh b/scripts/install_server.sh index 0c85dc7f22..4277ab946b 100644 --- a/scripts/install_server.sh +++ b/scripts/install_server.sh @@ -872,7 +872,7 @@ is_hysteria1_version() { get_installed_version() { if is_hysteria_installed; then if "$EXECUTABLE_INSTALL_PATH" version > /dev/null 2>&1; then - "$EXECUTABLE_INSTALL_PATH" version | grep Version | grep -o 'v[.0-9]*' + "$EXECUTABLE_INSTALL_PATH" version | grep '^Version' | grep -o 'v[.0-9]*' elif "$EXECUTABLE_INSTALL_PATH" -v > /dev/null 2>&1; then # hysteria 1 "$EXECUTABLE_INSTALL_PATH" -v | cut -d ' ' -f 3