-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support hooks on websocket #486
Open
guox191
wants to merge
2
commits into
alibaba:master
Choose a base branch
from
guox191:master
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,236 @@ | ||
'use strict'; | ||
|
||
const co = require('co'); | ||
const WebSocket = require('ws'); | ||
const logUtil = require('../log'); | ||
|
||
/** | ||
* construct the request headers based on original connection, | ||
* but delete the `sec-websocket-*` headers as they are already consumed by AnyProxy | ||
*/ | ||
function getNoWsHeaders(headers) { | ||
const originHeaders = Object.assign({}, headers); | ||
|
||
Object.keys(originHeaders).forEach((key) => { | ||
// if the key matchs 'sec-websocket', delete it | ||
if (/sec-websocket/ig.test(key)) { | ||
delete originHeaders[key]; | ||
} | ||
}); | ||
|
||
delete originHeaders.connection; | ||
delete originHeaders.upgrade; | ||
return originHeaders; | ||
} | ||
|
||
/** | ||
* get request info from the ws client | ||
* @param @required wsClient the ws client of WebSocket | ||
*/ | ||
function getWsReqInfo(wsReq) { | ||
const headers = wsReq.headers || {}; | ||
const host = headers.host; | ||
const hostname = host.split(':')[0]; | ||
const port = host.split(':')[1]; | ||
// TODO 如果是windows机器,url是不是全路径?需要对其过滤,取出 | ||
const path = wsReq.url || '/'; | ||
const isEncript = wsReq.connection && wsReq.connection.encrypted; | ||
|
||
return { | ||
url: `${isEncript ? 'wss' : 'ws'}://${hostname}:${port}${path}`, | ||
headers: headers, // the full headers of origin ws connection | ||
noWsHeaders: getNoWsHeaders(headers), | ||
secure: Boolean(isEncript), | ||
hostname: hostname, | ||
port: port, | ||
path: path | ||
}; | ||
} | ||
|
||
/** | ||
* When the source ws is closed, we need to close the target websocket. | ||
* If the source ws is normally closed, that is, the code is reserved, we need to transfrom them | ||
* @param {object} event CloseEvent | ||
*/ | ||
const getCloseFromOriginEvent = (closeEvent) => { | ||
const code = closeEvent.code || ''; | ||
const reason = closeEvent.reason || ''; | ||
let targetCode = ''; | ||
let targetReason = ''; | ||
if (code >= 1004 && code <= 1006) { | ||
targetCode = 1000; // normal closure | ||
targetReason = `Normally closed. The origin ws is closed at code: ${code} and reason: ${reason}`; | ||
} else { | ||
targetCode = code; | ||
targetReason = reason; | ||
} | ||
|
||
return { | ||
code: targetCode, | ||
reason: targetReason | ||
}; | ||
} | ||
|
||
/** | ||
* get a websocket event handler | ||
* @param @required {object} wsClient | ||
*/ | ||
function handleWs(userRule, recorder, wsClient, wsReq) { | ||
const self = this; | ||
let resourceInfoId = -1; | ||
const resourceInfo = { | ||
wsMessages: [] // all ws messages go through AnyProxy | ||
}; | ||
const clientMsgQueue = []; | ||
const serverInfo = getWsReqInfo(wsReq); | ||
// proxy-layer websocket client | ||
const proxyWs = new WebSocket(serverInfo.url, '', { | ||
rejectUnauthorized: !self.dangerouslyIgnoreUnauthorized, | ||
headers: serverInfo.noWsHeaders | ||
}); | ||
|
||
if (recorder) { | ||
Object.assign(resourceInfo, { | ||
host: serverInfo.hostname, | ||
method: 'WebSocket', | ||
path: serverInfo.path, | ||
url: serverInfo.url, | ||
req: wsReq, | ||
startTime: new Date().getTime() | ||
}); | ||
resourceInfoId = recorder.appendRecord(resourceInfo); | ||
} | ||
|
||
/** | ||
* store the messages before the proxy ws is ready | ||
*/ | ||
const sendProxyMessage = (finalMsg) => { | ||
const message = finalMsg.data; | ||
if (proxyWs.readyState === 1) { | ||
// if there still are msg queue consuming, keep it going | ||
if (clientMsgQueue.length > 0) { | ||
clientMsgQueue.push(message); | ||
} else { | ||
proxyWs.send(message); | ||
} | ||
} else { | ||
clientMsgQueue.push(message); | ||
} | ||
}; | ||
|
||
/** | ||
* consume the message in queue when the proxy ws is not ready yet | ||
* will handle them from the first one-by-one | ||
*/ | ||
const consumeMsgQueue = () => { | ||
while (clientMsgQueue.length > 0) { | ||
const message = clientMsgQueue.shift(); | ||
proxyWs.send(message); | ||
} | ||
}; | ||
|
||
/** | ||
* consruct a message Record from message event | ||
* @param @required {object} finalMsg based on the MessageEvent from websockt.onmessage | ||
* @param @required {boolean} isToServer whether the message is to or from server | ||
*/ | ||
const recordMessage = (finalMsg, isToServer) => { | ||
const message = { | ||
time: Date.now(), | ||
message: finalMsg.data, | ||
isToServer: isToServer | ||
}; | ||
|
||
// resourceInfo.wsMessages.push(message); | ||
recorder && recorder.updateRecordWsMessage(resourceInfoId, message); | ||
}; | ||
|
||
/** | ||
* prepare messageDetail object for intercept hooks | ||
* @param {object} messageEvent | ||
* @returns {object} | ||
*/ | ||
const prepareMessageDetail = (messageEvent) => { | ||
return { | ||
requestOptions: { | ||
port: serverInfo.port, | ||
hostname: serverInfo.hostname, | ||
path: serverInfo.path, | ||
secure: serverInfo.secure, | ||
}, | ||
url: serverInfo.url, | ||
data: messageEvent.data, | ||
}; | ||
}; | ||
|
||
proxyWs.onopen = () => { | ||
consumeMsgQueue(); | ||
}; | ||
|
||
// this event is fired when the connection is build and headers is returned | ||
proxyWs.on('upgrade', (response) => { | ||
resourceInfo.endTime = new Date().getTime(); | ||
const headers = response.headers; | ||
resourceInfo.res = { //construct a self-defined res object | ||
statusCode: response.statusCode, | ||
headers: headers, | ||
}; | ||
|
||
resourceInfo.statusCode = response.statusCode; | ||
resourceInfo.resHeader = headers; | ||
resourceInfo.resBody = ''; | ||
resourceInfo.length = resourceInfo.resBody.length; | ||
|
||
recorder && recorder.updateRecord(resourceInfoId, resourceInfo); | ||
}); | ||
|
||
proxyWs.onerror = (e) => { | ||
// https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent#Status_codes | ||
wsClient.close(1001, e.message); | ||
proxyWs.close(1001); | ||
}; | ||
|
||
proxyWs.onmessage = (event) => { | ||
co(function *() { | ||
const modifiedMsg = (yield userRule.beforeSendWsMessageToClient(prepareMessageDetail(event))) || {}; | ||
const finalMsg = { | ||
data: modifiedMsg.data || event.data, | ||
}; | ||
recordMessage(finalMsg, false); | ||
wsClient.readyState === 1 && wsClient.send(finalMsg.data); | ||
}); | ||
}; | ||
|
||
proxyWs.onclose = (event) => { | ||
logUtil.debug(`proxy ws closed with code: ${event.code} and reason: ${event.reason}`); | ||
const targetCloseInfo = getCloseFromOriginEvent(event); | ||
wsClient.readyState !== 3 && wsClient.close(targetCloseInfo.code, targetCloseInfo.reason); | ||
}; | ||
|
||
wsClient.onmessage = (event) => { | ||
co(function *() { | ||
const modifiedMsg = (yield userRule.beforeSendWsMessageToServer(prepareMessageDetail(event))) || {}; | ||
const finalMsg = { | ||
data: modifiedMsg.data || event.data, | ||
}; | ||
recordMessage(finalMsg, true); | ||
sendProxyMessage(finalMsg); | ||
}); | ||
}; | ||
|
||
wsClient.onclose = (event) => { | ||
logUtil.debug(`original ws closed with code: ${event.code} and reason: ${event.reason}`); | ||
const targetCloseInfo = getCloseFromOriginEvent(event); | ||
proxyWs.readyState !== 3 && proxyWs.close(targetCloseInfo.code, targetCloseInfo.reason); | ||
}; | ||
} | ||
|
||
module.exports = function getWsHandler(userRule, recorder, wsClient, wsReq) { | ||
try { | ||
handleWs.call(this, userRule, recorder, wsClient, wsReq); | ||
} catch (e) { | ||
logUtil.debug('WebSocket Proxy Error:' + e.message); | ||
logUtil.debug(e.stack); | ||
console.error(e); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that some weboscet url has not
:{port}
.So, you need to normalize
port
before buildingurl
.ref #450
This changes is patched and this ws proxy work on https://www.websocket.org/echo.html.