Skip to content

Commit

Permalink
Gateway rate limiting (#1942)
Browse files Browse the repository at this point in the history
  • Loading branch information
zkokelj authored Jul 24, 2024
1 parent 9bd08a2 commit ad61bdb
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 39 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/manual-deploy-dexynth-gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,5 @@ jobs:
--log-opt max-file=3 --log-opt max-size=10m \
${{ vars.DOCKER_BUILD_TAG_GATEWAY_DEXYNTH }} \
-host=0.0.0.0 -port=8080 -portWS=81 -nodeHost=${{ vars.L2_RPC_URL_VALIDATOR_DEXYNTH }} -verbose=true \
-logPath=sys_out -dbType=mariaDB -dbConnectionURL="obscurouser:${{ secrets.OBSCURO_GATEWAY_MARIADB_USER_PWD }}@tcp(obscurogateway-mariadb-${{ github.event.inputs.testnet_type }}.uksouth.cloudapp.azure.com:3306)/ogdb"'
-logPath=sys_out -dbType=mariaDB -dbConnectionURL="obscurouser:${{ secrets.OBSCURO_GATEWAY_MARIADB_USER_PWD }}@tcp(obscurogateway-mariadb-${{ github.event.inputs.testnet_type }}.uksouth.cloudapp.azure.com:3306)/ogdb" \
-rateLimitUserComputeTime=${{ vars.GATEWAY_RATE_LIMIT_USER_COMPUTE_TIME }} -rateLimitWindow=${{ vars.GATEWAY_RATE_LIMIT_WINDOW }} -maxConcurrentRequestsPerUser=${{ vars.GATEWAY_MAX_CONCURRENT_REQUESTS_PER_USER }} '
9 changes: 5 additions & 4 deletions .github/workflows/manual-deploy-obscuro-gateway.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ jobs:
&& cd /home/obscuro/go-obscuro/ \
&& docker run -d -p 80:80 -p 81:81 --name ${{ github.event.inputs.testnet_type }}-OG-${{ GITHUB.RUN_NUMBER }} \
-e OBSCURO_GATEWAY_VERSION="${{ GITHUB.RUN_NUMBER }}-${{ GITHUB.SHA }}" \
--log-opt max-file=3 --log-opt max-size=10m \
${{ vars.DOCKER_BUILD_TAG_GATEWAY }} \
-host=0.0.0.0 -port=8080 -portWS=81 -nodeHost=${{ vars.L2_RPC_URL_VALIDATOR }} -verbose=true \
-logPath=sys_out -dbType=mariaDB -dbConnectionURL="obscurouser:${{ secrets.OBSCURO_GATEWAY_MARIADB_USER_PWD }}@tcp(obscurogateway-mariadb-${{ github.event.inputs.testnet_type }}.uksouth.cloudapp.azure.com:3306)/ogdb"'
--log-opt max-file=3 --log-opt max-size=10m \
${{ vars.DOCKER_BUILD_TAG_GATEWAY }} \
-host=0.0.0.0 -port=8080 -portWS=81 -nodeHost=${{ vars.L2_RPC_URL_VALIDATOR }} -verbose=true \
-logPath=sys_out -dbType=mariaDB -dbConnectionURL="obscurouser:${{ secrets.OBSCURO_GATEWAY_MARIADB_USER_PWD }}@tcp(obscurogateway-mariadb-${{ github.event.inputs.testnet_type }}.uksouth.cloudapp.azure.com:3306)/ogdb" \
-rateLimitUserComputeTime=${{ vars.GATEWAY_RATE_LIMIT_USER_COMPUTE_TIME }} -rateLimitWindow=${{ vars.GATEWAY_RATE_LIMIT_WINDOW }} -maxConcurrentRequestsPerUser=${{ vars.GATEWAY_MAX_CONCURRENT_REQUESTS_PER_USER }} '
63 changes: 53 additions & 10 deletions integration/obscurogateway/tengateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,19 @@ func TestTenGateway(t *testing.T) {
createTenNetwork(t, startPort)

tenGatewayConf := wecommon.Config{
WalletExtensionHost: "127.0.0.1",
WalletExtensionPortHTTP: startPort + integration.DefaultTenGatewayHTTPPortOffset,
WalletExtensionPortWS: startPort + integration.DefaultTenGatewayWSPortOffset,
NodeRPCHTTPAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCHTTPOffset),
NodeRPCWebsocketAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCWSOffset),
LogPath: "sys_out",
VerboseFlag: false,
DBType: "sqlite",
TenChainID: 443,
StoreIncomingTxs: true,
WalletExtensionHost: "127.0.0.1",
WalletExtensionPortHTTP: startPort + integration.DefaultTenGatewayHTTPPortOffset,
WalletExtensionPortWS: startPort + integration.DefaultTenGatewayWSPortOffset,
NodeRPCHTTPAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCHTTPOffset),
NodeRPCWebsocketAddress: fmt.Sprintf("127.0.0.1:%d", startPort+integration.DefaultHostRPCWSOffset),
LogPath: "sys_out",
VerboseFlag: false,
DBType: "sqlite",
TenChainID: 443,
StoreIncomingTxs: true,
RateLimitUserComputeTime: 200 * time.Millisecond,
RateLimitWindow: 1 * time.Second,
RateLimitMaxConcurrentRequests: 3,
}

tenGwContainer := walletextension.NewContainerFromConfig(tenGatewayConf, testlog.Logger())
Expand Down Expand Up @@ -111,6 +114,7 @@ func TestTenGateway(t *testing.T) {
"testDifferentMessagesOnRegister": testDifferentMessagesOnRegister,
"testInvokeNonSensitiveMethod": testInvokeNonSensitiveMethod,
"testGetStorageAtForReturningUserID": testGetStorageAtForReturningUserID,
"testRateLimiter": testRateLimiter,
} {
t.Run(name, func(t *testing.T) {
test(t, httpURL, wsURL, w)
Expand All @@ -124,6 +128,45 @@ func TestTenGateway(t *testing.T) {
assert.NoError(t, err)
}

func testRateLimiter(t *testing.T, httpURL, wsURL string, w wallet.Wallet) {
user0, err := NewGatewayUser([]wallet.Wallet{w, datagenerator.RandomWallet(integration.TenChainID)}, httpURL, wsURL)
require.NoError(t, err)
testlog.Logger().Info("Created user with encryption token", "t", user0.tgClient.UserID())
// register the user so we can call the endpoints that require authentication
err = user0.RegisterAccounts()
require.NoError(t, err)

// call BalanceAt - fist call should be successful
_, err = user0.HTTPClient.BalanceAt(context.Background(), user0.Wallets[0].Address(), nil)
require.NoError(t, err)

// sleep for a period of time to allow the rate limiter to reset
time.Sleep(1 * time.Second)

// first call after the rate limiter reset should be successful
_, err = user0.HTTPClient.BalanceAt(context.Background(), user0.Wallets[0].Address(), nil)
require.NoError(t, err)

address := user0.Wallets[0].Address()

// make 1000 requests with the same user to "spam" the gateway
for i := 0; i < 1000; i++ {
msg := ethereum.CallMsg{
From: address,
To: &address, // Example: self-call to the user's address
Gas: uint64(i),
Data: nil,
}

user0.HTTPClient.EstimateGas(context.Background(), msg)
}

// after 1000 requests, the rate limiter should block the user
_, err = user0.HTTPClient.BalanceAt(context.Background(), user0.Wallets[0].Address(), nil)
require.Error(t, err)
require.Equal(t, "rate limit exceeded", err.Error())
}

func testNewHeadsSubscription(t *testing.T, httpURL, wsURL string, w wallet.Wallet) {
user0, err := NewGatewayUser([]wallet.Wallet{w, datagenerator.RandomWallet(integration.TenChainID)}, httpURL, wsURL)
require.NoError(t, err)
Expand Down
29 changes: 17 additions & 12 deletions tools/walletextension/common/config.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
package common

import "time"

// Config contains the configuration required by the WalletExtension.
type Config struct {
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int
NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string
LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool
DBType string
DBConnectionURL string
TenChainID int
StoreIncomingTxs bool
WalletExtensionHost string
WalletExtensionPortHTTP int
WalletExtensionPortWS int
NodeRPCHTTPAddress string
NodeRPCWebsocketAddress string
LogPath string
DBPathOverride string // Overrides the database file location. Used in tests.
VerboseFlag bool
DBType string
DBConnectionURL string
TenChainID int
StoreIncomingTxs bool
RateLimitUserComputeTime time.Duration
RateLimitWindow time.Duration
RateLimitMaxConcurrentRequests int
}
43 changes: 31 additions & 12 deletions tools/walletextension/main/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"time"

wecommon "github.com/ten-protocol/go-ten/tools/walletextension/common"
)
Expand Down Expand Up @@ -59,6 +60,18 @@ const (
storeIncomingTxs = "storeIncomingTxs"
storeIncomingTxsDefault = true
storeIncomingTxsUsage = "Flag to enable storing incoming transactions in the database for debugging purposes. Default: true"

rateLimitUserComputeTimeName = "rateLimitUserComputeTime"
rateLimitUserComputeTimeDefault = 10 * time.Second
rateLimitUserComputeTimeUsage = "rateLimitUserComputeTime represents how much compute time is user allowed to used in rateLimitWindow time. If rateLimitUserComputeTime is set to 0, rate limiting is turned off. Default: 10s."

rateLimitWindowName = "rateLimitWindow"
rateLimitWindowDefault = 1 * time.Minute
rateLimitWindowUsage = "rateLimitWindow represents time window in which we allow one user to use compute time defined with rateLimitUserComputeTimeMs Default: 1m"

rateLimitMaxConcurrentRequestsName = "maxConcurrentRequestsPerUser"
rateLimitMaxConcurrentRequestsDefault = 3
rateLimitMaxConcurrentRequestsUsage = "Number of concurrent requests allowed per user. Default: 3"
)

func parseCLIArgs() wecommon.Config {
Expand All @@ -75,20 +88,26 @@ func parseCLIArgs() wecommon.Config {
dbConnectionURL := flag.String(dbConnectionURLFlagName, dbConnectionURLFlagDefault, dbConnectionURLFlagUsage)
tenChainID := flag.Int(tenChainIDName, tenChainIDDefault, tenChainIDFlagUsage)
storeIncomingTransactions := flag.Bool(storeIncomingTxs, storeIncomingTxsDefault, storeIncomingTxsUsage)
rateLimitUserComputeTime := flag.Duration(rateLimitUserComputeTimeName, rateLimitUserComputeTimeDefault, rateLimitUserComputeTimeUsage)
rateLimitWindow := flag.Duration(rateLimitWindowName, rateLimitWindowDefault, rateLimitWindowUsage)
rateLimitMaxConcurrentRequests := flag.Int(rateLimitMaxConcurrentRequestsName, rateLimitMaxConcurrentRequestsDefault, rateLimitMaxConcurrentRequestsUsage)
flag.Parse()

return wecommon.Config{
WalletExtensionHost: *walletExtensionHost,
WalletExtensionPortHTTP: *walletExtensionPort,
WalletExtensionPortWS: *walletExtensionPortWS,
NodeRPCHTTPAddress: fmt.Sprintf("%s:%d", *nodeHost, *nodeHTTPPort),
NodeRPCWebsocketAddress: fmt.Sprintf("%s:%d", *nodeHost, *nodeWebsocketPort),
LogPath: *logPath,
DBPathOverride: *databasePath,
VerboseFlag: *verboseFlag,
DBType: *dbType,
DBConnectionURL: *dbConnectionURL,
TenChainID: *tenChainID,
StoreIncomingTxs: *storeIncomingTransactions,
WalletExtensionHost: *walletExtensionHost,
WalletExtensionPortHTTP: *walletExtensionPort,
WalletExtensionPortWS: *walletExtensionPortWS,
NodeRPCHTTPAddress: fmt.Sprintf("%s:%d", *nodeHost, *nodeHTTPPort),
NodeRPCWebsocketAddress: fmt.Sprintf("%s:%d", *nodeHost, *nodeWebsocketPort),
LogPath: *logPath,
DBPathOverride: *databasePath,
VerboseFlag: *verboseFlag,
DBType: *dbType,
DBConnectionURL: *dbConnectionURL,
TenChainID: *tenChainID,
StoreIncomingTxs: *storeIncomingTransactions,
RateLimitUserComputeTime: *rateLimitUserComputeTime,
RateLimitWindow: *rateLimitWindow,
RateLimitMaxConcurrentRequests: *rateLimitMaxConcurrentRequests,
}
}
Loading

0 comments on commit ad61bdb

Please sign in to comment.