Skip to content

Commit

Permalink
feat: initial introduction of support for TCP
Browse files Browse the repository at this point in the history
  • Loading branch information
r3inbowari committed Sep 8, 2024
1 parent a052d7f commit 508298f
Show file tree
Hide file tree
Showing 27 changed files with 1,244 additions and 809 deletions.
File renamed without changes.
46 changes: 46 additions & 0 deletions docs/speedtest_protocol_specifications.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# SpeedTest Specifications
This document records some of the interfaces defined in speedtest for reference only.

## Native Socket Interfaces

The protocol uses a plain text data stream and ends each message with '\n'.
And '\n' and the operators are included in the total bytes.

| Method | Protocol | Describe |
|--------|----------|---------------------------------------------------|
| Greet | TCP | Say Hello and get the server version information. |
| PING | TCP | Echo with the server. |
| Loss | TCP+UDP | Conduct UDP packet loss test. |
| Down | TCP | Sending data to the server. |
| Up | TCP | Receive data from the server. |

### Great
```shell
Clinet: HI\n
Server: HELLO [Major].[Minor] ([Major].[Minor].[Patch]) [YYYY]-[MM]-[DD].[LTSCCode].[GitHash]\n
```

### PING
```shell
Clinet: PING [Local Timestamp]\n
Server: PONG [Remote Timestamp]\n
```

### Loss
Please see https://github.com/showwin/speedtest-go/issues/169

### Down
```shell
Clinet: DOWNLOAD [Size]\n
Server: DOWNLOAD [Random Data]\n
```

### Up
```shell
Clinet: UPLOAD [Size]\n
Clinet: [Random Data]
Server: OK [Size] [Timestamp]
```

## References
[1] Reverse Engineering the Speedtest.net Protocol, Gökberk Yaltıraklı https://gist.github.com/sdstrowes/411fca9d900a846a704f68547941eb97
47 changes: 19 additions & 28 deletions speedtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/showwin/speedtest-go/speedtest/control"
"github.com/showwin/speedtest-go/speedtest/transport"
"gopkg.in/alecthomas/kingpin.v2"
"io"
Expand Down Expand Up @@ -37,7 +38,8 @@ var (
userAgent = kingpin.Flag("ua", "Set the user-agent header for the speedtest.").String()
noDownload = kingpin.Flag("no-download", "Disable download test.").Bool()
noUpload = kingpin.Flag("no-upload", "Disable upload test.").Bool()
pingMode = kingpin.Flag("ping-mode", "Select a method for Ping (support icmp/tcp/http).").Default("http").String()
pingMode = kingpin.Flag("ping-mode", "Select a method for Ping (options: icmp/tcp/http).").Default("http").String()
protocol = kingpin.Flag("protocol", "Select a protocol (default: http) for speedtest (options: tcp/http).").Default("http").Short('p').String()
unit = kingpin.Flag("unit", "Set human-readable and auto-scaled rate units for output (options: decimal-bits/decimal-bytes/binary-bits/binary-bytes).").Short('u').String()
debug = kingpin.Flag("debug", "Enable debug mode.").Short('d').Bool()
)
Expand Down Expand Up @@ -70,12 +72,13 @@ func main() {
Source: *source,
DnsBindSource: *dnsBindSource,
Debug: *debug,
PingMode: parseProto(*pingMode), // TCP as default
PingMode: control.ParseProto(*pingMode), // TCP as default
SavingMode: *savingMode,
MaxConnections: *thread,
CityFlag: *city,
LocationFlag: *location,
Keyword: *search,
Protocol: control.ParseProto(*protocol),
}))

if *showCityList {
Expand Down Expand Up @@ -171,43 +174,43 @@ func main() {
accEcho := newAccompanyEcho(server, time.Millisecond*500)
taskManager.RunWithTrigger(!*noDownload, "Download", func(task *Task) {
accEcho.Run()
speedtestClient.SetCallbackDownload(func(downRate speedtest.ByteRate) {
callback := func(downRate float64) {
lc := accEcho.CurrentLatency()
if lc == 0 {
task.Updatef("Download: %s (Latency: --)", downRate)
task.Updatef("Download: %s (Latency: --)", speedtest.ByteRate(downRate))
} else {
task.Updatef("Download: %s (Latency: %dms)", downRate, lc/1000000)
task.Updatef("Download: %s (Latency: %dms)", speedtest.ByteRate(downRate), lc/1000000)
}
})
}
if *multi {
task.CheckError(server.MultiDownloadTestContext(context.Background(), servers))
task.CheckError(server.MultiDownloadTestContext(context.Background(), servers, callback))
} else {
task.CheckError(server.DownloadTest())
task.CheckError(server.DownloadTest(callback))
}
accEcho.Stop()
mean, _, std, minL, maxL := speedtest.StandardDeviation(accEcho.Latencies())
task.Printf("Download: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.DLSpeed, float64(server.Context.Manager.GetTotalDownload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000)
task.Printf("Download: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.DLSpeed, float64(server.Received)/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000)
task.Complete()
})

taskManager.RunWithTrigger(!*noUpload, "Upload", func(task *Task) {
accEcho.Run()
speedtestClient.SetCallbackUpload(func(upRate speedtest.ByteRate) {
callback := func(upRate float64) {
lc := accEcho.CurrentLatency()
if lc == 0 {
task.Updatef("Upload: %s (Latency: --)", upRate)
task.Updatef("Upload: %s (Latency: --)", speedtest.ByteRate(upRate))
} else {
task.Updatef("Upload: %s (Latency: %dms)", upRate, lc/1000000)
task.Updatef("Upload: %s (Latency: %dms)", speedtest.ByteRate(upRate), lc/1000000)
}
})
}
if *multi {
task.CheckError(server.MultiUploadTestContext(context.Background(), servers))
task.CheckError(server.MultiUploadTestContext(context.Background(), servers, callback))
} else {
task.CheckError(server.UploadTest())
task.CheckError(server.UploadTest(callback))
}
accEcho.Stop()
mean, _, std, minL, maxL := speedtest.StandardDeviation(accEcho.Latencies())
task.Printf("Upload: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.ULSpeed, float64(server.Context.Manager.GetTotalUpload())/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000)
task.Printf("Upload: %s (Used: %.2fMB) (Latency: %dms Jitter: %dms Min: %dms Max: %dms)", server.ULSpeed, float64(server.Sent)/1000/1000, mean/1000000, std/1000000, minL/1000000, maxL/1000000)
task.Complete()
})

Expand All @@ -220,7 +223,6 @@ func main() {
taskManager.Println(server.PacketLoss.String())
}
taskManager.Reset()
speedtestClient.Manager.Reset()
}
taskManager.Stop()

Expand Down Expand Up @@ -309,17 +311,6 @@ func parseUnit(str string) speedtest.UnitType {
}
}

func parseProto(str string) speedtest.Proto {
str = strings.ToLower(str)
if str == "icmp" {
return speedtest.ICMP
} else if str == "tcp" {
return speedtest.TCP
} else {
return speedtest.HTTP
}
}

func AppInfo() {
if !*jsonOutput {
fmt.Println()
Expand Down
35 changes: 35 additions & 0 deletions speedtest/control/chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package control

import (
"errors"
"io"
"sync"
"time"
)

const DefaultReadChunkSize = 1024 // 1 KBytes with higher frequency rate feedback

var (
ErrDuplicateCall = errors.New("multiple calls to the same chunk handler are not allowed")
)

type Chunk interface {
UploadHandler(size int64) Chunk
DownloadHandler(r io.Reader) error

Rate() float64
Duration() time.Duration

Type() Proto

Len() int64

Read(b []byte) (n int, err error)
}

var BlackHole = sync.Pool{
New: func() any {
b := make([]byte, 8192)
return &b
},
}
12 changes: 12 additions & 0 deletions speedtest/control/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package control

type Controller interface {
// Get Reference counter volume
Get() int64
// Add Reference counter increment
Add(delta int64)
// Repeat Pointing to duplicate memory space
Repeat() []byte
// Done Notification processing completed
Done() <-chan struct{}
}
61 changes: 61 additions & 0 deletions speedtest/control/lb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package control

import (
"math"
"sync"
)

type Task func() error

type TaskItem struct {
fn func() error
SlothIndex int64
Currents int64
}

// LoadBalancer The implementation of Least-Connections Load Balancer with Failure Drop.
type LoadBalancer struct {
TaskQueue []*TaskItem
sync.Mutex
}

func NewLoadBalancer() *LoadBalancer {
return &LoadBalancer{}
}

func (lb *LoadBalancer) Len() int {
return len(lb.TaskQueue)
}

// Add a new task to the [LoadBalancer]
// @param priority The smaller the value, the higher the priority.
func (lb *LoadBalancer) Add(task Task, priority int64) {
if task == nil {
panic("empty task is not allowed")
}
lb.TaskQueue = append(lb.TaskQueue, &TaskItem{fn: task, SlothIndex: priority, Currents: 0})
}

func (lb *LoadBalancer) Dispatch() {
var candidate *TaskItem
lb.Lock()
var minWeighted int64 = math.MaxInt64
for i := 0; i < lb.Len(); i++ {
weighted := lb.TaskQueue[i].Currents * lb.TaskQueue[i].SlothIndex
if weighted < minWeighted {
minWeighted = weighted
candidate = lb.TaskQueue[i]
}
}
if candidate == nil || candidate.fn == nil {
return
}
candidate.Currents++
lb.Unlock()
err := candidate.fn()
lb.Lock()
defer lb.Unlock()
if err == nil {
candidate.Currents--
}
}
117 changes: 117 additions & 0 deletions speedtest/control/lb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package control

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestSLB(t *testing.T) {
lb := NewLoadBalancer()
var a int64 = 0

lb.Add(func() error {
atomic.AddInt64(&a, 1)
time.Sleep(time.Second * 2)
return errors.New("error")
}, 2)

go func() {
for {
fmt.Printf("a:%d\n", a)
time.Sleep(time.Second)
}
}()

wg := sync.WaitGroup{}

for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
for {
lb.Dispatch()
}
}()
}

wg.Wait()
}

func TestLB(t *testing.T) {
lb := NewLoadBalancer()
var a int64 = 0
var b int64 = 0
var c int64 = 0
var d int64 = 0

lb.Add(func() error {
atomic.AddInt64(&a, 1)
time.Sleep(time.Second * 2)
return nil
}, 2)

lb.Add(func() error {
atomic.AddInt64(&b, 1)
time.Sleep(time.Second * 2)
return nil
}, 1)

lb.Add(func() error {
atomic.AddInt64(&c, 1)
time.Sleep(time.Second * 2)
fmt.Println("error")
return errors.New("error")
}, 1)

lb.Add(func() error {
atomic.AddInt64(&d, 1)
time.Sleep(time.Second * 2)
return nil
}, 5)

wg := sync.WaitGroup{}

go func() {
for {
fmt.Printf("a:%d, b:%d, c:%d, d:%d\n", a, b, c, d)
time.Sleep(time.Second)
}
}()

for i := 0; i < 8; i++ {
wg.Add(1)
go func() {
for {
lb.Dispatch()
}
}()
}

wg.Wait()
}

func BenchmarkDP(b *testing.B) {
lb := NewLoadBalancer()
lb.Add(func() error {
return nil
}, 1)
lb.Add(func() error {
return nil
}, 1)
lb.Add(func() error {
return nil
}, 1)
lb.Add(func() error {
return nil
}, 1)
lb.Add(func() error {
return nil
}, 1)

for i := 0; i < b.N; i++ {
lb.Dispatch()
}
}
Loading

0 comments on commit 508298f

Please sign in to comment.