Directory Structure or Architecture for pub/sub for handling min 200k concurrent connections #830
Unanswered
vinayak-ventura
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hi Everyone,
I am not able to handle even 200 concurrent clients, my CPU usage reaches around 100% with only 200 clients being connected.
I wanted to ask if anyone having a good prod-level directory/file structure or architecture or any best practices while working with uWebsocketJs, please let me know.
Any help appreciated.
Thanks
`import { createSocket } from "dgram";
const server = createSocket("udp4");
import { StringDecoder } from "string_decoder";
const decoder = new StringDecoder("utf-8");
import { App } from "uWebSockets.js";
import { Buffer } from "node:buffer";
const MEMBERSHIP_IP = "123.123.123.123";
const MEMBERSHIP_PORT = 8080;
const app = App()
.ws("/", {
message: (ws, message, binary) => {
/ Parse JSON and perform the action /
let json = JSON.parse(decoder.write(Buffer.from(message)));
switch (json.action) {
case "sub": {
/ Subscribe to the share's value stream /
ws.subscribe("shares/" + json.share + "/value");
}
case "unsub": {
/ UnSubscribe to the share's value stream */
ws.unsubscribe("shares/" + json.share + "/value");
break;
}
}
server.on("message", (msg, rinfo) => {
// Check if packet is coming
if (msg) {
// some decoding and business logic
let decodedValue = 10;
app.publish("shares/" + msg.shareName + "/value", decodedValue);
}
});
},
open: (ws) => {},
close: (ws) => {},
})
.listen(9001, (listenSocket) => {
if (listenSocket) {
console.log("Socket Listening to port 9001");
}
});
server.on("error", (err) => {
server.close();
});
server.on("listening", () => {
const address = server.address();
});
server.bind(MEMBERSHIP_PORT, () => {
server.addMembership(MEMBERSHIP_IP);
});`
Below code is the code I used to test the concurrent connections
`import asyncio
import websockets
import json
SOCKET_URL = "ws://127.122.123.123:9001/socket"
SOCKET_MSG = {
"action": "sub",
"share": "PAYTM"
}
async def async_processing():
async with websockets.connect(SOCKET_URL) as websocket:
try:
await websocket.send(json.dumps(SOCKET_MSG))
message = await websocket.recv()
print(message)
except:
print('ConnectionClosed')
is_alive = False
# break
tasks = []
for i in range(20):
tasks.insert(i,async_processing())
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))
`
Beta Was this translation helpful? Give feedback.
All reactions