From 8bedd65c9bcb1e770db8b702b3f796348cc04fc2 Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Thu, 16 May 2024 17:33:55 +0800 Subject: [PATCH 1/2] fix(api): hook all traffic after write for upload --- speedtest/data_manager.go | 44 ++++++++++++++++++++++++++------------- speedtest/request.go | 4 +--- 2 files changed, 31 insertions(+), 17 deletions(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 5d80720..98031a2 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -55,7 +55,7 @@ type Chunk interface { Read(b []byte) (n int, err error) } -const readChunkSize = 1024 * 32 // 32 KBytes +const readChunkSize = 1024 // 1 KBytes with higher frequency rate feedback type DataType int32 @@ -448,22 +448,38 @@ func (dc *DataChunk) GetParent() Manager { return dc.manager } -func (dc *DataChunk) Read(b []byte) (n int, err error) { - if !dc.manager.running { - return n, io.EOF - } - if dc.remainOrDiscardSize < readChunkSize { - if dc.remainOrDiscardSize <= 0 { +// WriteTo Used to hook all traffic. +func (dc *DataChunk) WriteTo(w io.Writer) (written int64, err error) { + nw := 0 + nr := readChunkSize + for { + if !dc.manager.running || dc.remainOrDiscardSize <= 0 { dc.endTime = time.Now() - return n, io.EOF + return written, io.EOF + } + if dc.remainOrDiscardSize < readChunkSize { + nr = int(dc.remainOrDiscardSize) + nw, err = w.Write((*dc.manager.repeatByte)[:nr]) + } else { + nw, err = w.Write(*dc.manager.repeatByte) + } + if err != nil { + return + } + n64 := int64(nw) + written += n64 + dc.remainOrDiscardSize -= n64 + dc.manager.AddTotalUpload(n64) + if nr != nw { + return written, io.ErrShortWrite } - n = copy(b, (*dc.manager.repeatByte)[:dc.remainOrDiscardSize]) - } else { - n = copy(b, *dc.manager.repeatByte) } - n64 := int64(n) - dc.remainOrDiscardSize -= n64 - atomic.AddInt64(&dc.manager.upload.totalDataVolume, n64) +} + +// Please don't call it, only used to wrapped by [io.NopCloser] +// We use [DataChunk.WriteTo] that implements [io.WriterTo] to bypass this function. +func (dc *DataChunk) Read(b []byte) (n int, err error) { + panic("unexpected call: only used to implement the io.Reader") return } diff --git a/speedtest/request.go b/speedtest/request.go index 54acd6b..f188d56 100644 --- a/speedtest/request.go +++ b/speedtest/request.go @@ -186,13 +186,11 @@ func downloadRequest(ctx context.Context, s *Server, w int) error { func uploadRequest(ctx context.Context, s *Server, w int) error { size := ulSizes[w] dc := s.Context.NewChunk().UploadHandler(int64(size*100-51) * 10) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.URL, dc) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.URL, io.NopCloser(dc)) if err != nil { return err } - req.ContentLength = dc.(*DataChunk).ContentLength dbg.Printf("Len=%d, XulURL: %s\n", req.ContentLength, s.URL) - req.Header.Set("Content-Type", "application/octet-stream") resp, err := s.Context.doer.Do(req) if err != nil { From 820287f493bb317ff95e25793383a23b33c8861b Mon Sep 17 00:00:00 2001 From: r3inbowari Date: Thu, 16 May 2024 17:44:53 +0800 Subject: [PATCH 2/2] chore: lint --- speedtest/data_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/speedtest/data_manager.go b/speedtest/data_manager.go index 98031a2..04546ac 100644 --- a/speedtest/data_manager.go +++ b/speedtest/data_manager.go @@ -480,7 +480,6 @@ func (dc *DataChunk) WriteTo(w io.Writer) (written int64, err error) { // We use [DataChunk.WriteTo] that implements [io.WriterTo] to bypass this function. func (dc *DataChunk) Read(b []byte) (n int, err error) { panic("unexpected call: only used to implement the io.Reader") - return } // calcMAFilter Median-Averaging Filter