Skip to content

Commit

Permalink
Merge pull request #11 from aseerkt/feat/dm
Browse files Browse the repository at this point in the history
feat: direct message with redis streams adapter
  • Loading branch information
aseerkt authored Jul 18, 2024
2 parents 0539e87 + 619df55 commit ae76155
Show file tree
Hide file tree
Showing 51 changed files with 1,181 additions and 644 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pnpm --filter web test
- [x] leave group, transfer ownership, delete group
- [x] delete group
- [x] alert component
- [x] direct message
- [ ] confirm dialog
- [ ] read receipts
- [ ] e2e encryption
Expand Down
45 changes: 28 additions & 17 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
"author": "Aseer KT",
"license": "ISC",
"dependencies": {
"@socket.io/cluster-adapter": "^0.2.2",
"@socket.io/sticky": "^1.0.4",
"@socket.io/redis-streams-adapter": "^0.2.2",
"argon2": "^0.40.3",
"colors": "^1.4.0",
"cors": "^2.8.5",
Expand Down
36 changes: 35 additions & 1 deletion server/src/database/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
/* eslint-disable @typescript-eslint/ban-types */
import { isValidDate } from '@/utils/validations'
import { ColumnBaseConfig, ColumnDataType, SQL } from 'drizzle-orm'
import {
AnyColumn,
ColumnBaseConfig,
ColumnDataType,
sql,
SQL,
SQLWrapper,
} from 'drizzle-orm'
import { PgColumn, PgSelect } from 'drizzle-orm/pg-core'
import get from 'lodash/get'
import { defaultLimit } from './constants'
Expand Down Expand Up @@ -65,3 +72,30 @@ export const withPagination = async <T extends PgSelect>(
: null,
}
}

export const rowNumber = () => {
return {
over: <TReturnType>({
partitionBy,
orderBy,
as,
}: {
partitionBy: AnyColumn | SQLWrapper
orderBy: SQL
as: string
}) =>
sql<TReturnType>`ROW_NUMBER() OVER (PARTITION BY ${partitionBy} ORDER BY ${orderBy})`.as(
as,
),
}
}

export const coalesce = <T>(
value: SQL.Aliased<T> | SQL<T> | AnyColumn,
defaultValue: SQL.Aliased<T> | SQL<T> | AnyColumn | number,
) => sql<T>`COALESCE (${value}, ${defaultValue})`

export const nullAs = (as: string) => sql`null`.as(as)

export const columnAs = <T>(column: AnyColumn, as: string) =>
sql<T>`${column}`.as(as)
58 changes: 7 additions & 51 deletions server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
import {
createAdapter as createClusterAdapter,
setupPrimary,
} from '@socket.io/cluster-adapter'
import { setupMaster, setupWorker } from '@socket.io/sticky'
import { createAdapter } from '@socket.io/redis-streams-adapter'
import 'colors'
import cors from 'cors'
import express from 'express'
import helmet from 'helmet'
import morgan from 'morgan'
import cluster from 'node:cluster'
import { createServer } from 'node:http'
import { availableParallelism } from 'node:os'
import { Server } from 'socket.io'
import swaggerUi from 'swagger-ui-express'
import { config } from './config'
import { connectDB } from './database'
import { errorHandler } from './middlewares'
import { getRedisClient } from './redis'
import rootRouter from './routes'
import { registerSocketEvents } from './socket/events'
import { socketAuthMiddleware } from './socket/middlewares'
Expand All @@ -28,40 +23,10 @@ import {
import swaggerDocument from './swagger-output.json'

const createApp = async () => {
if (cluster.isPrimary && config.isProd) {
console.log(`Primary ${process.pid} is running`)

const numCPUs = availableParallelism()

const httpServer = createServer()

// setup sticky sessions
setupMaster(httpServer, { loadBalancingMethod: 'least-connection' })

// setup connection between the workers
setupPrimary()

httpServer.listen(config.port, () => {
console.log(`Server running at http://localhost:${config.port}`.blue.bold)
})

for (let i = 0; i < numCPUs; i++) {
// Spawn a new worker process.
// This can only be called from the primary process.
cluster.fork()
}

cluster.on('exit', worker => {
console.log(`worker ${worker.process.pid} died`)
cluster.fork()
})

return
}

console.log(`Worker ${process.pid} started`)

await connectDB()
const redisClient = getRedisClient()

const app = express()

Expand All @@ -81,16 +46,9 @@ const createApp = async () => {
SocketData
>(server, {
cors: { origin: config.corsOrigin },
adapter: createAdapter(redisClient),
})

if (config.isProd) {
// use cluster adapter
io.adapter(createClusterAdapter())

// setup connection with primary process
setupWorker(io)
}

io.use(socketAuthMiddleware)

registerSocketEvents(io)
Expand All @@ -108,11 +66,9 @@ const createApp = async () => {

app.use(errorHandler)

if (!config.isProd) {
server.listen(config.port, () => {
console.log(`Server running at http://localhost:${config.port}`.blue.bold)
})
}
server.listen(config.port, () => {
console.log(`Server running at http://localhost:${config.port}`.blue.bold)
})

return { server }
}
Expand Down
36 changes: 21 additions & 15 deletions server/src/middlewares.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { ErrorRequestHandler, RequestHandler } from 'express'
import { config } from './config'
import { MemberRole } from './modules/members/members.schema'
import { checkPermission } from './modules/members/members.service'
import { notAuthenticated, notAuthorized } from './utils/api'
import { badRequest, notAuthenticated, notAuthorized } from './utils/api'
import { verifyToken } from './utils/jwt'

// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down Expand Up @@ -32,31 +32,37 @@ export const auth: RequestHandler = (req, res, next) => {
}
}

export const hasGroupPermission =
export const hasChatPermission =
(role: MemberRole): RequestHandler =>
async (req, res, next) => {
try {
const groupId = Number(
req.params.groupId || req.query.groupId || req.body.groupId,
)

if (Number.isNaN(groupId)) {
return notAuthorized(res)
}

const { isAllowed, memberRole } = await checkPermission(
groupId,
req.user!.id,
role,
const partnerId = Number(
req.params.partnerId || req.query.partnerId || req.body.partnerId,
)

if (!isAllowed) {
return notAuthorized(res)
if (!groupId && !partnerId) {
return badRequest(res)
}

req.member = {
groupId: groupId,
role: memberRole!,
if (groupId) {
const { isAllowed, memberRole } = await checkPermission(
groupId,
req.user!.id,
role,
)

if (!isAllowed) {
return notAuthorized(res)
}

req.member = {
groupId: groupId,
role: memberRole!,
}
}

next()
Expand Down
Loading

0 comments on commit ae76155

Please sign in to comment.