Skip to content

Commit

Permalink
refactor: introduce pump()
Browse files Browse the repository at this point in the history
  • Loading branch information
nobody committed Jan 8, 2025
1 parent 1cd0d97 commit b436f01
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 92 deletions.
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
获取 WebSocket 协议的客户端 `config.json`。把 `fragment` 设置为 `false` 获取关闭分片功能的配置。

#### 在 workers 中部署 ws 代理
1.[src/index.js](./src/index.js) 里面的代码复制到 workers 的代码编辑器中
2. 在设置面板中添加环境变量: `UUID``WS_PATH`
1. 下载 [releases](https://github.com/vrnobody/cfxhttp/releases) 中的 cfxhttp.zip,解压得到 _worker.js
2. 复制 _worker.js 里面的代码到 Cloudflare workers 的代码编辑器中
3. 在设置面板中添加环境变量: `UUID``WS_PATH`

其他和 pages 类似。

#### 在 workers 中部署 xhttp 代理
1. 前置要求,拥有一个由 CF 托管的一级域名
1. 在 CF 控制面板的 `网络` 配置中启用 `gRPC` 功能
1. 在 DNS 配置中添加一个二级域名的 A 记录,随便填个 IPv4 地址,开启小黄云
1. 新建一个 workers 把 [src/index.js](./src/index.js) 里面的代码复制进去
1. 下载 [releases](https://github.com/vrnobody/cfxhttp/releases) 中的 cfxhttp.zip,解压得到 _worker.js
1. 新建一个 workers 把 _worker.js 里面的代码复制进去
1. 在 workers 的配置页面添加路由,指向上面新加的二级域名,例如: `sub-domain.your-website.com/*`
1. 在设置面板中添加环境变量: `UUID``XHTTP_PATH`

Expand All @@ -40,6 +42,7 @@
* `TIME_ZONE` 日志时间戳的时区,中国填 `8`

#### 注意事项
* src/index.js 是开发中的代码会有 bug,请到 releases 里面下载 Source code (zip)
* 网站测速结果是错的,这个脚本很慢,不要有太高的期望
* workers / pages 不支持 UDP,需要 UDP 功能的应用无法使用,例如:DNS
* workers / pages 有 CPU 时间限制,需要长时间链接的应用会随机断线,例如:下载大文件
Expand Down
21 changes: 12 additions & 9 deletions docs/en.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,28 @@ Please help me improve this document.
This script is used to deploy vless proxy to Cloudflare workers or pages.

#### Deploy WebSocket proxy to pages
1. Download cfxhttp.zip from [releases](https://github.com/vrnobody/cfxhttp/releases), and upload to pages
2. Add enviroment variables `UUID` and `WS_PATH`
1. Download `cfxhttp.zip` from [releases](https://github.com/vrnobody/cfxhttp/releases), and upload to pages
2. Add `UUID` and `WS_PATH` enviroment variables

If every thing goes right, you would see a `Hello world!` when accessing `https://your-project-name.pages.dev`.
Visit `https://your-project-name.pages.dev/(WS_PATH)/?fragment=true&uuid=(UUID)` to get a client `config.json` with WebSocket transport.
Set `fragemnt` to `false` to get a config without fragment settings.

#### Deploy WebSocket proxy to workers
1. Copy source code from [src/index.js](./src/index.js) into workers' code editor
2. Add enviroment variables `UUID` and `WS_PATH`
1. Download `cfxhttp.zip` from [releases](https://github.com/vrnobody/cfxhttp/releases), and extract `_worker.js`
2. Copy the source code of `_worker.js` into Cloudflare workers code editor
3. Add `UUID` and `WS_PATH` enviroment variables

The rest is similar to pages.

#### Deploy xhttp proxy to workers
1. Pre-requirment: have a domain managed by Cloudflare.
2. Enable `gRPC` feature in `network` settings in Cloudflare dashboard.
3. Create a DNS `A record` for a new sub-domain with a random IPv4 address. Enable `proxy` option.
4. Create a worker and copy-and-paste the source code from [src/index.js](../src/index.js).
5. Goto worker's config panel, add a routing rule to your new sub-domain. e.g. `sub-domain.your-website.com/*`.
6. Add enviroment variables `UUID` and `WS_PATH`
1. Enable `gRPC` feature in `network` settings in Cloudflare dashboard.
1. Create a DNS `A record` for a new sub-domain with a random IPv4 address. Enable `proxy` option.
1. Download `cfxhttp.zip` from [releases](https://github.com/vrnobody/cfxhttp/releases), and extract `_worker.js`
1. Create a worker and copy-and-paste the source code of `_worker.js`.
1. Goto worker's config panel, add a routing rule to your new sub-domain. e.g. `sub-domain.your-website.com/*`.
1. Add `UUID` and `WS_PATH` enviroment variables

Visit `https://sub-domain.your-website.com/(XHTTP_PATH)/?fragment=true&uuid=(UUID)` to get a client `config.json` with xhttp transport.
*The xhttp transport can not deploy to Cloudflare pages. [Issue #2](https://github.com/vrnobody/cfxhttp/issues/2)*
Expand All @@ -42,6 +44,7 @@ Visit `https://sub-domain.your-website.com/(XHTTP_PATH)/?fragment=true&uuid=(UUI
* `TIME_ZONE` Timestamp time zone of logs. e.g. Argentina is `-3`

#### Notice
* `src/index.js` is under developing, could have bugs, please download `Source code (zip)` from releases.
* This script is slow, do not expect too much.
* Workers and pages do not support UDP. Applications require UDP feature will not work. Such as DNS.
* Workers and pages have CPU executing-time limit. Applications require long-term connection would disconnect randomly. Such as downloading a big file.
Expand Down
119 changes: 39 additions & 80 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ async function read_vless_header(reader, cfg_uuid_str) {
let readed_len = buff.value.length
let header = buff.value

async function read_until(offset) {
async function inner_read_until(offset) {
if (buff.done) {
throw new Error('header length too short')
}
Expand All @@ -186,7 +186,7 @@ async function read_vless_header(reader, cfg_uuid_str) {
}
const pb_len = header[1 + 16]
const addr_plus1 = 1 + 16 + 1 + pb_len + 1 + 2 + 1
await read_until(addr_plus1 + 1)
await inner_read_until(addr_plus1 + 1)

const cmd = header[1 + 16 + 1 + pb_len]
const COMMAND_TYPE_TCP = 1
Expand All @@ -211,7 +211,7 @@ async function read_vless_header(reader, cfg_uuid_str) {
if (header_len < 0) {
throw new Error('read address type failed')
}
await read_until(header_len)
await inner_read_until(header_len)

const idx = addr_plus1
let hostname = ''
Expand Down Expand Up @@ -240,54 +240,16 @@ async function read_vless_header(reader, cfg_uuid_str) {
port,
data: header.slice(header_len),
resp: new Uint8Array([version, 0]),
reader,
more: !buff.done,
}
}

async function upload_to_remote(remote_writer, vless) {
if (get_length(vless.data) > 0) {
await remote_writer.write(vless.data)
}
while (vless.more) {
const r = await vless.reader.read()
if (r.value) {
await remote_writer.write(r.value)
}
if (r.done) {
break
}
}
await remote_writer.close()
}

function create_uploader(vless, remote_writable) {
const remote_writer = remote_writable.getWriter()
const done = new Promise((resolve, reject) => {
upload_to_remote(remote_writer, vless).catch(reject).finally(resolve)
})
return {
done,
}
}

function create_downloader(vless, client_writable, remote_readable) {
const client_writer = client_writable.getWriter()

const done = new Promise((resolve, reject) => {
client_writer
.write(vless.resp)
.then(() => {
client_writer.releaseLock()
return remote_readable.pipeTo(client_writable)
})
.then(resolve)
.catch(reject)
})

return {
done,
async function pump(readable, writable, first_packet) {
if (get_length(first_packet) > 0) {
const writer = writable.getWriter()
await writer.write(first_packet)
writer.releaseLock()
}
await readable.pipeTo(writable)
}

function pick_random_proxy(cfg_proxy) {
Expand All @@ -300,23 +262,24 @@ function pick_random_proxy(cfg_proxy) {
}

async function connect_remote(log, hostname, port, cfg_proxy) {
try {
log.info(`direct connect [${hostname}]:${port}`)
const conn = connect({ hostname, port })
async function inner_connect(remote) {
const conn = connect({ hostname: remote, port })
const info = await conn.opened
log.debug(`connection opened: ${info.remoteAddress}`)
return conn
}

try {
log.info(`direct connect [${hostname}]:${port}`)
return await inner_connect(hostname)
} catch (err) {
log.debug(`direct connect failed: ${err}`)
}

const proxy = pick_random_proxy(cfg_proxy)
if (proxy) {
log.info(`proxy [${hostname}]:${port} through [${proxy}]`)
const conn = connect({ hostname: proxy, port })
const info = await conn.opened
log.debug(`connection opened:`, info.remoteAddress)
return conn
return await inner_connect(proxy)
}

throw new Error('all attempts failed')
Expand All @@ -325,7 +288,9 @@ async function connect_remote(log, hostname, port, cfg_proxy) {
async function parse_header(log, uuid_str, client_readable) {
const reader = client_readable.getReader()
try {
return await read_vless_header(reader, uuid_str)
const vless = await read_vless_header(reader, uuid_str)
reader.releaseLock()
return vless
} catch (err) {
drain_connection(log, reader).catch((err) =>
log.info(`drain error: ${err}`),
Expand Down Expand Up @@ -356,15 +321,12 @@ async function read_atleast(reader, n) {
}
}

function create_xhttp_client(log, cfg, client_readable) {
function create_xhttp_client(cfg, client_readable) {
const buff_stream = new TransformStream(
{
transform(chunk, controller) {
controller.enqueue(chunk)
},
cancel(reason) {
log.error(`write error: ${reason}`)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: BUFFER_SIZE }),
new ByteLengthQueuingStrategy({ highWaterMark: BUFFER_SIZE }),
Expand Down Expand Up @@ -392,7 +354,7 @@ function create_xhttp_client(log, cfg, client_readable) {
}
}

function create_ws_client(log) {
function create_ws_client() {
const [ws_client, ws_server] = Object.values(new WebSocketPair())
ws_server.accept()

Expand All @@ -409,9 +371,6 @@ function create_ws_client(log) {
controller.close()
})
},
cancel(reason) {
log.error(`read error: ${reason}`)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: BUFFER_SIZE }),
)
Expand All @@ -421,16 +380,16 @@ function create_ws_client(log) {
write(chunk) {
ws_server.send(chunk)
},
close() {
ws_server.close()
},
abort(reason) {
log.error(`write error: ${reason}`)
},
},
new ByteLengthQueuingStrategy({ highWaterMark: BUFFER_SIZE }),
)

function on_closed() {
try {
ws_server.close()
} catch {}
}

const resp = new Response(null, {
status: 101,
webSocket: ws_client,
Expand All @@ -439,6 +398,7 @@ function create_ws_client(log) {
return {
readable,
writable,
on_closed,
resp,
}
}
Expand All @@ -453,18 +413,17 @@ async function handle_client(cfg, log, client) {
cfg.PROXY,
)

const uploader = create_uploader(vless, remote.writable)
const downloader = create_downloader(
vless,
client.writable,
remote.readable,
)
const upload_done = pump(client.readable, remote.writable, vless.data)
const download_done = pump(remote.readable, client.writable, vless.resp)

downloader.done
download_done
.catch((err) => log.error(`download error: ${err}`))
.finally(() => uploader.done)
.finally(() => upload_done)
.catch((err) => log.debug(`upload error: ${err}`))
.finally(() => log.info('connection closed'))
.finally(() => {
client.on_closed && client.on_closed()
log.info('connection closed')
})
}

function append_slash(path) {
Expand Down Expand Up @@ -720,7 +679,7 @@ async function main(request, env) {
path.endsWith(cfg.WS_PATH)
) {
log.info('handle ws client')
const client = create_ws_client(log)
const client = create_ws_client()
// Do not block here. Client is waiting for upgrade-response.
handle_client(cfg, log, client).catch((err) =>
log.error(`handle ws client error: ${err}`),
Expand All @@ -735,7 +694,7 @@ async function main(request, env) {
) {
log.info('handle xhttp client')
try {
const client = create_xhttp_client(log, cfg, request.body)
const client = create_xhttp_client(cfg, request.body)
await handle_client(cfg, log, client)
return client.resp
} catch (err) {
Expand Down

0 comments on commit b436f01

Please sign in to comment.