Skip to content

Commit

Permalink
Wrap client broadcasts in a mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
bbedward committed Sep 3, 2022
1 parent 79cf6a9 commit 714fc39
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 47 deletions.
68 changes: 34 additions & 34 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,40 +76,40 @@ jobs:
file: ./Dockerfile
tags: appditto/natrium-server-rewrite:${{ env.BRANCH_NAME }}-${{ env.GITHUB_RUN_ID }}

deploy:
name: 🥳 Deploy natrium server
needs: build_and_publish
runs-on: ubuntu-latest
env:
GITHUB_RUN_ID: ${{ github.run_id }}
steps:
- uses: actions/checkout@master
- uses: imranismail/setup-kustomize@v1
with:
kustomize-version: "3.5.4"

- name: Get branch name (merge)
if: github.event_name != 'pull_request'
shell: bash
run: echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/} | tr / -)" >> $GITHUB_ENV

- name: Get branch name (pull request)
if: github.event_name == 'pull_request'
shell: bash
run: echo "BRANCH_NAME=$(echo ${GITHUB_HEAD_REF} | tr / -)" >> $GITHUB_ENV

- name: Set image
working-directory: ./kubernetes/natrium
run: |
kustomize edit set image replaceme=appditto/natrium-server-rewrite:${{ env.BRANCH_NAME }}-${{ env.GITHUB_RUN_ID }}
kustomize build . > deployment-k.yaml
- name: Deploy image to k8s cluster
uses: bbedward/kubectl@master
env:
KUBE_CONFIG_DATA: ${{ secrets.KUBE_CONFIG_DATA }}
with:
args: apply -f ./kubernetes/natrium/deployment-k.yaml
# deploy:
# name: 🥳 Deploy natrium server
# needs: build_and_publish
# runs-on: ubuntu-latest
# env:
# GITHUB_RUN_ID: ${{ github.run_id }}
# steps:
# - uses: actions/checkout@master
# - uses: imranismail/setup-kustomize@v1
# with:
# kustomize-version: "3.5.4"

# - name: Get branch name (merge)
# if: github.event_name != 'pull_request'
# shell: bash
# run: echo "BRANCH_NAME=$(echo ${GITHUB_REF#refs/heads/} | tr / -)" >> $GITHUB_ENV

# - name: Get branch name (pull request)
# if: github.event_name == 'pull_request'
# shell: bash
# run: echo "BRANCH_NAME=$(echo ${GITHUB_HEAD_REF} | tr / -)" >> $GITHUB_ENV

# - name: Set image
# working-directory: ./kubernetes/natrium
# run: |
# kustomize edit set image replaceme=appditto/natrium-server-rewrite:${{ env.BRANCH_NAME }}-${{ env.GITHUB_RUN_ID }}
# kustomize build . > deployment-k.yaml

# - name: Deploy image to k8s cluster
# uses: bbedward/kubectl@master
# env:
# KUBE_CONFIG_DATA: ${{ secrets.KUBE_CONFIG_DATA }}
# with:
# args: apply -f ./kubernetes/natrium/deployment-k.yaml

deploy_kal:
name: 🙉 Deploy kalium server
Expand Down
29 changes: 19 additions & 10 deletions controller/ws_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/appditto/natrium-wallet-server/database"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Client struct {
ID uuid.UUID
Accounts []string // Subscribed accounts
Currency string

mutex sync.Mutex
}

var Upgrader = websocket.Upgrader{}
Expand Down Expand Up @@ -117,6 +120,12 @@ func (h *Hub) Run() {
}
}

func (h *Hub) BroadcastToClient(client *Client, message []byte) {
client.mutex.Lock()
defer client.mutex.Unlock()
client.Send <- message
}

var (
newline = []byte{'\n'}
space = []byte{' '}
Expand Down Expand Up @@ -151,13 +160,13 @@ func (c *Client) readPump() {
if err = json.Unmarshal(msg, &baseRequest); err != nil {
klog.Errorf("Error unmarshalling websocket base request %s", err)
errJson, _ := json.Marshal(InvalidRequestError)
c.Send <- errJson
c.Hub.BroadcastToClient(c, errJson)
continue
}

if _, ok := baseRequest["action"]; !ok {
errJson, _ := json.Marshal(InvalidRequestError)
c.Send <- errJson
c.Hub.BroadcastToClient(c, errJson)
continue
}

Expand All @@ -166,13 +175,13 @@ func (c *Client) readPump() {
if err = mapstructure.Decode(baseRequest, &subscribeRequest); err != nil {
klog.Errorf("Error unmarshalling websocket subscribe request %s", err)
errJson, _ := json.Marshal(InvalidRequestError)
c.Send <- errJson
c.Hub.BroadcastToClient(c, errJson)
continue
}
// Check if account is valid
if !utils.ValidateAddress(subscribeRequest.Account, c.Hub.BananoMode) {
klog.Errorf("Invalid account %s , %v", subscribeRequest.Account, c.Hub.BananoMode)
c.Send <- []byte("{\"error\":\"Invalid account\"}")
c.Hub.BroadcastToClient(c, []byte("{\"error\":\"Invalid account\"}"))
continue
}

Expand Down Expand Up @@ -209,7 +218,7 @@ func (c *Client) readPump() {
accountInfo, err := c.Hub.RPCClient.MakeAccountInfoRequest(subscribeRequest.Account)
if err != nil || accountInfo == nil {
klog.Errorf("Error getting account info %v", err)
c.Send <- []byte("{\"error\":\"subscribe error\"}")
c.Hub.BroadcastToClient(c, []byte("{\"error\":\"subscribe error\"}"))
continue
}

Expand Down Expand Up @@ -252,10 +261,10 @@ func (c *Client) readPump() {
response, err := json.Marshal(accountInfo)
if err != nil {
klog.Errorf("Error marshalling account info %v", err)
c.Send <- []byte("{\"error\":\"subscribe error\"}")
c.Hub.BroadcastToClient(c, []byte("{\"error\":\"subscribe error\"}"))
continue
}
c.Send <- response
c.Hub.BroadcastToClient(c, response)

// The user may have a different UUID every time, 1 token, and multiple accounts
// We store account/token in postgres since that's what we care about
Expand All @@ -273,12 +282,12 @@ func (c *Client) readPump() {
if err = mapstructure.Decode(baseRequest, &fcmUpdateRequest); err != nil {
klog.Errorf("Error unmarshalling websocket fcm_update request %s", err)
errJson, _ := json.Marshal(InvalidRequestError)
c.Send <- errJson
c.Hub.BroadcastToClient(c, errJson)
continue
}
// Check if account is valid
if !utils.ValidateAddress(fcmUpdateRequest.Account, c.Hub.BananoMode) {
c.Send <- []byte("{\"error\":\"Invalid account\"}")
c.Hub.BroadcastToClient(c, []byte("{\"error\":\"Invalid account\"}"))
continue
}
// Do the updoot
Expand All @@ -292,7 +301,7 @@ func (c *Client) readPump() {
} else {
klog.Errorf("Unknown websocket request %s", msg)
errJson, _ := json.Marshal(InvalidRequestError)
c.Send <- errJson
c.Hub.BroadcastToClient(c, errJson)
continue
}
}
Expand Down
5 changes: 2 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func main() {
for client, _ := range wsHub.Clients {
for _, account := range client.Accounts {
if account == msg.Block.LinkAsAccount {
client.Send <- serialized
client.Hub.BroadcastToClient(client, serialized)
}
}
}
Expand Down Expand Up @@ -365,8 +365,7 @@ func main() {
klog.Errorf("Error serializing price message: %v", err)
continue
}
client.Send <- serialized

client.Hub.BroadcastToClient(client, serialized)
}
})
s.StartAsync()
Expand Down

0 comments on commit 714fc39

Please sign in to comment.