Skip to content

Commit

Permalink
1. fix jsclient seq bug
Browse files Browse the repository at this point in the history
2. update websocket examples based on gin
  • Loading branch information
lesismal committed Apr 19, 2024
1 parent cf207fc commit 0fc7a14
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 123 deletions.
76 changes: 38 additions & 38 deletions examples/protocols/websocket/jsclient/arpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var _ErrDisconnected = "[error disconnected]";
var _ErrReconnecting = "[error reconnecting]";

function Codec() {
this.Marshal = function(obj) {
if (typeof(obj) == 'string') {
this.Marshal = function (obj) {
if (typeof (obj) == 'string') {
return new TextEncoder("utf-8").encode(obj);
}
return new TextEncoder("utf-8").encode(JSON.stringify(obj));
}
this.Unmarshal = function(data) {
this.Unmarshal = function (data) {
try {
data = JSON.parse(new TextDecoder("utf-8").decode(data));
return data;
Expand Down Expand Up @@ -69,90 +69,90 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {

this.state = _SOCK_STATE_CONNECTING;

this.handle = function(method, h) {
this.handle = function (method, h) {
if (this.handlers[method]) {
throw ("handler for [${method}] exists");
}
this.handlers[method] = { h: h };
}

this.callHttp = function(method, request, timeout, cb) {
this.callHttp = function (method, request, timeout, cb) {
this.call(method, request, timeout, cb, true);
}
this.call = function(method, request, timeout, cb, isHttp) {
this.call = function (method, request, timeout, cb, isHttp) {
if (this.state == _SOCK_STATE_CLOSED) {
if (typeof(cb) == 'function') {
if (typeof (cb) == 'function') {
cb({ data: null, err: _ErrClosed });
}
return new Promise(function(resolve, reject) {
return new Promise(function (resolve, reject) {
resolve({ data: null, err: _ErrClosed });
});
}
if (this.state == _SOCK_STATE_CONNECTING) {
if (typeof(cb) == 'function') {
if (typeof (cb) == 'function') {
cb({ data: null, err: _ErrReconnecting });
}
return new Promise(function(resolve, reject) {
return new Promise(function (resolve, reject) {
resolve({ data: null, err: _ErrReconnecting });
});
}
var session = {};
var p = new Promise(function(resolve, reject) {
var p = new Promise(function (resolve, reject) {
session.resolve = resolve;
});
if (typeof(cb) == 'function') {
if (typeof (cb) == 'function') {
session.resolve = cb;
}
this.sessionMap[seq] = session;
this.sessionMap[this.seqNum] = session;

if (timeout > 0) {
session.timer = setTimeout(function() {
delete(client.sessionMap[seq]);
session.timer = setTimeout(function () {
delete (client.sessionMap[seq]);
session.resolve({ data: null, err: "timeout" });
}, timeout);
}
this.write(_CmdRequest, method, request, this.seqNum, this._onMessage, isHttp);
this.write(_CmdRequest, method, request, this._onMessage, isHttp);
return p;
}

this.notifyHttp = function(method, notify) {
this.notifyHttp = function (method, notify) {
this.notify(method, notify, true);
}
this.notify = function(method, notify, isHttp) {
this.notify = function (method, notify, isHttp) {
if (this.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (this.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
this.write(_CmdNotify, method, notify, function() {}, isHttp);
this.write(_CmdNotify, method, notify, function () { }, isHttp);
}
this.ping = function() {
this.ping = function () {
if (client.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (client.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
client.write(_CmdPing, "", null, function() {});
client.write(_CmdPing, "", null, function () { });
}
this.pong = function() {
this.pong = function () {
if (client.state == _SOCK_STATE_CLOSED) {
return _ErrClosed;
}
if (client.state == _SOCK_STATE_CONNECTING) {
return _ErrReconnecting;
}
client.write(_CmdPong, "", null, function() {});
client.write(_CmdPong, "", null, function () { });
}
this.keepalive = function(timeout) {
this.keepalive = function (timeout) {
if (this._keepaliveInited) return;
this._keepaliveInited = true;
if (!timeout) timeout = 1000 * 30;
this.keepaliveIntervalID = setInterval(this.ping, timeout);
}

this.write = function(cmd, method, arg, cb, isHttp) {
this.write = function (cmd, method, arg, cb, isHttp) {
var buffer;
if (arg) {
var data = this.codec.Marshal(arg);
Expand All @@ -172,10 +172,10 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
}
buffer[_HeaderIndexCmd] = cmd & 0xFF;
buffer[_HeaderIndexMethodLen] = method.length & 0xFF;
this.seqNum++;
for (var i = _HeaderIndexSeqBegin; i < _HeaderIndexSeqBegin + 4; i++) {
buffer[i] = (this.seqNum >> ((i - _HeaderIndexSeqBegin) * 8)) & 0xFF;
}
this.seqNum++;
var methodBuffer = new TextEncoder("utf-8").encode(method);
for (var i = 0; i < methodBuffer.length; i++) {
buffer[16 + i] = methodBuffer[i];
Expand All @@ -187,27 +187,27 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
}
}

this.shutdown = function() {
this.shutdown = function () {
this.ws.close();
this.state = _SOCK_STATE_CLOSED;
if (!!this.keepaliveIntervalID) {
clearInterval(this.keepaliveIntervalID);
}
}

this.request = function(data, cb) {
this.request = function (data, cb) {
let resolve;
let p = new Promise(function(res) {
let p = new Promise(function (res) {
resolve = res;
if (typeof(cb) == 'function') {
resolve = function(ret) {
if (typeof (cb) == 'function') {
resolve = function (ret) {
res(ret);
cb(ret);
}
}
let r = new XMLHttpRequest();
r.open(this.httpMethod, this.httpUrl, true);
r.onreadystatechange = function() {
r.onreadystatechange = function () {
if (r.readyState != 4) {
return;
}
Expand All @@ -223,7 +223,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
return p;
}

this._onMessage = function(event) {
this._onMessage = function (event) {
try {
var offset = 0;
while (offset < event.data.byteLength) {
Expand Down Expand Up @@ -277,7 +277,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
if (session.timer) {
clearTimeout(session.timer);
}
delete(client.sessionMap[seq]);
delete (client.sessionMap[seq]);
var data = client.codec.Unmarshal(bodyArr);
if (isError) {
session.resolve({ data: null, err: data });
Expand All @@ -299,7 +299,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
}
}

this.init = function() {
this.init = function () {
console.log("[ArpcClient] init...");
if ('WebSocket' in window) {
client.ws = new WebSocket(this.url);
Expand All @@ -314,14 +314,14 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {

client.state = _SOCK_STATE_CONNECTING;

client.ws.onopen = function(event) {
client.ws.onopen = function (event) {
client.state = _SOCK_STATE_CONNECTED;
console.log("[ArpcClient] websocket onopen");
if (client.onOpen) {
client.onOpen(client);
}
};
client.ws.onclose = function(event) {
client.ws.onclose = function (event) {
console.log("[ArpcClient] websocket onclose");
if (client.onClose) {
client.onClose(client);
Expand All @@ -344,7 +344,7 @@ function ArpcClient(url, codec, httpUrl, httpMethod) {
client.state = _SOCK_STATE_CONNECTING;
client.init();
};
client.ws.onerror = function(event) {
client.ws.onerror = function (event) {
console.log("[ArpcClient] websocket onerror");
if (client.onError) {
client.onError(client);
Expand Down
51 changes: 43 additions & 8 deletions examples/protocols/websocket/server/server.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package main

import (
"flag"
"log"
"net"
"net/http"
"time"

"github.com/gin-gonic/gin"
"github.com/lesismal/arpc"
"github.com/lesismal/arpc/extension/protocol/websocket"
)

var useGinListener = flag.Bool("gin", true, "use gin listener")

func main() {
ln, _ := websocket.Listen("localhost:8888", nil)
http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
go func() {
err := http.ListenAndServe("localhost:8888", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}()
flag.Parse()

var ln net.Listener

if *useGinListener {
ln = ginListener()
} else {
ln = stdListener()
}

svr := arpc.NewServer()
// register router
Expand All @@ -41,3 +47,32 @@ func main() {

svr.Serve(ln)
}

func ginListener() net.Listener {
router := gin.New()
ln, _ := websocket.Listen("localhost:8888", nil)
router.GET("/ws", func(c *gin.Context) {
w := c.Writer
r := c.Request
ln.(*websocket.Listener).Handler(w, r)
})
go func() {
err := router.Run("localhost:8888")
if err != nil {
log.Fatalf("router.Run failed: %v", err)
}
}()
return ln
}

func stdListener() net.Listener {
ln, _ := websocket.Listen("localhost:8888", nil)
http.HandleFunc("/ws", ln.(*websocket.Listener).Handler)
go func() {
err := http.ListenAndServe("localhost:8888", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}()
return ln
}
Loading

0 comments on commit 0fc7a14

Please sign in to comment.