Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Add support for proxying UDP over HTTP #1005

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions e2e/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ all: update-helper-images update-test-image run-e2e
update-helper-images:
@$(MAKE) -C dns update-image
@$(MAKE) -C sc-2450 update-image
@$(MAKE) -C udp update-image

.PHONY: update-test-image
update-test-image:
Expand Down
1 change: 1 addition & 0 deletions e2e/udp/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/server
9 changes: 9 additions & 0 deletions e2e/udp/Containerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
FROM alpine

RUN apk add --no-cache netcat-openbsd

COPY server /usr/bin/server
ENTRYPOINT ["/usr/bin/server"]

HEALTHCHECK --interval=1s --timeout=3s \
CMD nc -z localhost 5005 || exit 1
15 changes: 15 additions & 0 deletions e2e/udp/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
CONTAINER_RUNTIME ?= docker

.PHONY: server
server:
@CGO_ENABLED=0 GOOS=linux go build ./cmd/server

export BUILDAH_FORMAT=docker

.PHONY: update-image
update-image: server
@$(CONTAINER_RUNTIME) buildx build --network host -t e2e-udp -f Containerfile .

.PHONY: run
run:
@$(CONTAINER_RUNTIME) run --rm -p 5005:5005/udp e2e-udp
45 changes: 45 additions & 0 deletions e2e/udp/cmd/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022-2024 Sauce Labs Inc., all rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package main

import (
"flag"
"log"
"net"
)

var (
address = flag.String("address", "0.0.0.0:5005", "UDP address to listen on")
bufSize = flag.Int("bufsize", 1024, "Size of the buffer to read data into")
)

func main() {
log.Println("Listening on UDP port", *address)

conn, err := net.ListenPacket("udp", *address)
if err != nil {
log.Fatalf("Error starting UDP listener: %s", err)
}
defer conn.Close()

msgCnt := uint64(0)
buffer := make([]byte, *bufSize)
for {
msgCnt++

n, caddr, err := conn.ReadFrom(buffer)
if err != nil {
log.Printf("Error reading from connection: %s", err)
continue
}
log.Printf("Recv(%d) %s: %s\n", msgCnt, caddr, string(buffer[:n]))

if _, err := conn.WriteTo(buffer[:n], caddr); err != nil {
log.Printf("Error writing to connection %s: %s", caddr, err)
}
}
}
49 changes: 49 additions & 0 deletions e2e/udp/cmd/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2022-2024 Sauce Labs Inc., all rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

package main

import (
"io"
"net/http"
"testing"
"time"
)

func TestUDPOverHTTP(t *testing.T) {
pr, pw := io.Pipe()
defer pr.Close()
defer pw.Close()

req, err := http.NewRequest(http.MethodGet, "http://localhost:3128/.well-known/masque/udp/localhost/5005", pr)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", "connect-udp")
req.ContentLength = -1

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatal(err)
}

go func() {
for i := 0; i < 10; i++ {
pw.Write([]byte("hello"))
time.Sleep(100 * time.Millisecond)
}
}()

buf := make([]byte, 1000)
for i := 0; i < 10; i++ {
n, err := resp.Body.Read(buf)
if err != nil {
t.Fatal(err)
}
t.Log(string(buf[:n]))
}
}
28 changes: 28 additions & 0 deletions e2e/udp/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Sauce Labs Inc., all rights reserved.

package packetdrop

import (
"github.com/saucelabs/forwarder/utils/compose"
)

type service compose.Service

const (
Image = "e2e-udp"
ServiceName = "udp"
)

func Service() *service {
return &service{
Name: ServiceName,
Image: Image,
Environment: map[string]string{},
Privileged: true,
Network: map[string]compose.ServiceNetwork{},
}
}

func (s *service) Service() *compose.Service {
return (*compose.Service)(s)
}
3 changes: 3 additions & 0 deletions internal/martian/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bufio"
"context"
"io"
"net"
"sync"

"github.com/saucelabs/forwarder/internal/martian/log"
Expand Down Expand Up @@ -72,6 +73,8 @@ func (c copier) copy(ctx context.Context, donec chan<- struct{}) {
closeErr = cw.CloseWrite()
} else if pw, ok := c.dst.(*io.PipeWriter); ok {
closeErr = pw.Close()
} else if uc, ok := c.dst.(*net.UDPConn); ok {
closeErr = uc.Close()
} else {
log.Errorf(ctx, "cannot close write side of %s tunnel (%T)", c.name, c.dst)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/martian/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,10 @@ func (p *Proxy) roundTrip(req *http.Request) (*http.Response, error) {
return proxyutil.NewResponse(200, http.NoBody, req), nil
}

if isUDPMasque(req) {
return p.roundTripUDPMasque(req)
}

res, err := p.rt.RoundTrip(req)
if err != nil {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions internal/martian/proxy_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,13 @@ func (p *Proxy) connectSOCKS5(req *http.Request, proxyURL *url.URL) (*http.Respo
}

func newConnectResponse(req *http.Request) *http.Response {
ok := http.StatusOK
return newConnectResponseStatus(req, http.StatusOK)
}

func newConnectResponseStatus(req *http.Request, statusCode int) *http.Response {
return &http.Response{
Status: fmt.Sprintf("%d %s", ok, http.StatusText(ok)),
StatusCode: ok,
Status: fmt.Sprintf("%d %s", statusCode, http.StatusText(statusCode)),
StatusCode: statusCode,
Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
Expand Down
95 changes: 95 additions & 0 deletions internal/martian/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http"
"net/url"
"os"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1543,6 +1544,100 @@ func TestIntegrationTransparentMITM(t *testing.T) {
}
}

func TestIntegrationUDPMasque(t *testing.T) {
const bufSize = 64

// UDP echo server.
udpEchoServer := func(conn net.PacketConn) error {
buf := make([]byte, bufSize)
for {
n, caddr, err := conn.ReadFrom(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
err = nil
}
return fmt.Errorf("conn.ReadFrom(): got %v, want no error", err)
}

t.Logf("Recv %s: %s\n", caddr, string(buf[:n]))

if _, err := conn.WriteTo(buf[:n], caddr); err != nil {
return fmt.Errorf("conn.WriteTo(): got %v, want no error", err)
}
}
}

uc, err := net.ListenPacket("udp", "localhost:0")
if err != nil {
t.Fatalf("net.ListenPacket(): got %v, want no error", err)
}
defer uc.Close()

go udpEchoServer(uc)

h := testHelper{
Proxy: func(p *Proxy) {
p.AllowHTTP = true
},
}

conn, cancel := h.proxyConn(t)
defer cancel()

// GET /.well-known/masque/udp/<host>/<port> HTTP/1.1
udpMasqueURL := "/.well-known/masque/udp/127.0.0.1/" + strconv.Itoa(uc.LocalAddr().(*net.UDPAddr).Port)
req, err := http.NewRequest(http.MethodGet, udpMasqueURL, http.NoBody)
if err != nil {
t.Fatal(err)
}
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", "connect-udp")

if err := req.Write(conn); err != nil {
t.Fatalf("req.Write(): got %v, want no error", err)
}

br := bufio.NewReader(conn)

res, err := http.ReadResponse(br, req)
if err != nil {
t.Fatalf("http.ReadResponse(): got %v, want no error", err)
}
defer res.Body.Close()

if res.StatusCode != 101 {
t.Fatalf("res.StatusCode: got %d, want 101", res.StatusCode)
}
if got, want := res.Header.Get("Connection"), "Upgrade"; got != want {
t.Errorf("res.Header.Get(%q): got %q, want %q", "Connection", got, want)
}
if got, want := res.Header.Get("Upgrade"), "connect-udp"; got != want {
t.Errorf("res.Header.Get(%q): got %q, want %q", "Upgrade", got, want)
}

assertUDPEcho := func(msg string) {
if _, err := conn.Write([]byte(msg)); err != nil {
t.Fatalf("conn.Write(): got %v, want no error", err)
}

buf := make([]byte, bufSize)
n, err := br.Read(buf)
if err != nil {
t.Fatalf("conn.Read(): got %v, want no error", err)
}
t.Logf("Recv: %s\n", string(buf[:n]))

if string(buf[:n]) != msg {
t.Errorf("conn.Read(): got %q, want %q", buf[:n], msg)
}
}

assertUDPEcho("hello")
assertUDPEcho("world")
conn.(*net.TCPConn).CloseWrite()

}

func TestIntegrationFailedRoundTrip(t *testing.T) {
t.Parallel()

Expand Down
Loading
Loading