Skip to content

Commit

Permalink
Enable hash routing for copy protocol (#837)
Browse files Browse the repository at this point in the history
* Enable hash routing for copy protocol

* Fix linter

* Add tests
  • Loading branch information
reshke authored Nov 22, 2024
1 parent fcc94fd commit 4831d30
Show file tree
Hide file tree
Showing 11 changed files with 471 additions and 248 deletions.
47 changes: 44 additions & 3 deletions pkg/models/hashfunction/hashfunction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hashfunction
import (
"encoding/binary"
"fmt"
"strconv"

"github.com/go-faster/city"
"github.com/pg-sharding/spqr/qdb"
Expand All @@ -13,9 +14,9 @@ type HashFunctionType int

/* Pre-defined hash functions */
const (
HashFunctionIdent = 0
HashFunctionMurmur = 1
HashFunctionCity = 2
HashFunctionIdent = HashFunctionType(0)
HashFunctionMurmur = HashFunctionType(1)
HashFunctionCity = HashFunctionType(2)
)

var (
Expand Down Expand Up @@ -99,6 +100,46 @@ func ApplyHashFunction(inp interface{}, ctype string, hf HashFunctionType) (inte
}
}

/*
* Apply routing hash function on bytes receieved in their string representation (from COPY).
*/
func ApplyHashFunctionOnStringRepr(inp []byte, ctype string, hf HashFunctionType) (interface{}, error) {
switch hf {
case HashFunctionIdent:
/* ident is a special case here
* We need to convert raw bytes to approrpiate interface
* because caller expect data in form compatable with CompareKeyRange
*/
switch ctype {
case qdb.ColumnTypeInteger:
n, err := strconv.ParseInt(string(inp), 10, 64)
if err != nil {
return nil, err
}
return n, err
case qdb.ColumnTypeUinteger:
n, err := strconv.ParseUint(string(inp), 10, 64)
if err != nil {
return nil, err
}
return n, err
case qdb.ColumnTypeVarchar:
fallthrough
case qdb.ColumnTypeVarcharDeprecated:
return string(inp), nil
}
return inp, nil
case HashFunctionMurmur:
h := murmur3.Sum32(inp)
return uint64(h), nil
case HashFunctionCity:
h := city.Hash32(inp)
return uint64(h), nil
default:
return nil, errNoSuchHashFunction
}
}

// HashFunctionByName returns the corresponding HashFunctionType based on the given hash function name.
// It accepts a string parameter `hfn` representing the hash function name.
// It returns the corresponding HashFunctionType and an error if the hash function name is not recognized.
Expand Down
2 changes: 2 additions & 0 deletions router/pgcopy/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pgcopy

import (
"github.com/pg-sharding/spqr/pkg/models/hashfunction"
"github.com/pg-sharding/spqr/pkg/models/kr"
"github.com/pg-sharding/spqr/router/routingstate"
)
Expand All @@ -12,4 +13,5 @@ type CopyState struct {
ColumnOffset int
AllowMultishard bool
Krs []*kr.KeyRange
HashFunc hashfunction.HashFunctionType
}
7 changes: 5 additions & 2 deletions router/qrouter/proxy_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ func (qr *ProxyQrouter) routeWithRules(ctx context.Context, stmt lyx.Node, sph s
return nil, err
}
case *lyx.Copy:
return routingstate.MultiMatchState{}, nil
return routingstate.CopyState{}, nil
default:
spqrlog.Zero.Debug().Interface("statement", stmt).Msg("proxy-routing message to all shards")
}
Expand Down Expand Up @@ -1196,12 +1196,15 @@ func (qr *ProxyQrouter) Route(ctx context.Context, stmt lyx.Node, sph session.Se
return v, nil
case routingstate.DDLState:
return v, nil
case routingstate.CopyState:
/* temporary */
return routingstate.MultiMatchState{}, nil
case routingstate.MultiMatchState:
switch sph.DefaultRouteBehaviour() {
case "BLOCK":
return routingstate.SkipRoutingState{}, spqrerror.NewByCode(spqrerror.SPQR_NO_DATASHARD)
default:
return routingstate.MultiMatchState{}, nil
return v, nil
}
}
return routingstate.SkipRoutingState{}, nil
Expand Down
26 changes: 10 additions & 16 deletions router/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -722,10 +721,12 @@ func (rst *RelayStateImpl) ProcCopyPrepare(ctx context.Context, stmt *lyx.Copy)
return nil, fmt.Errorf("multi-column copy processing is not yet supported")
}

var hashFunc hashfunction.HashFunctionType

if v, err := hashfunction.HashFunctionByName(ds.Relations[relname].DistributionKey[0].HashFunction); err != nil {
return nil, err
} else if v != hashfunction.HashFunctionIdent {
return nil, fmt.Errorf("multi-column copy HASH based processing is not yet supported")
} else {
hashFunc = v
}

krs, err := rst.Qr.Mgr().ListKeyRanges(ctx, ds.Id)
Expand All @@ -750,6 +751,7 @@ func (rst *RelayStateImpl) ProcCopyPrepare(ctx context.Context, stmt *lyx.Copy)
AllowMultishard: allow_multishard,
Krs: krs,
TargetType: TargetType,
HashFunc: hashFunc,
}, nil
}

Expand Down Expand Up @@ -788,19 +790,11 @@ func (rst *RelayStateImpl) ProcCopy(ctx context.Context, data *pgproto3.CopyData
if b == '\n' || b == cps.Delimiter {

if currentAttr == cps.ColumnOffset {
switch cps.TargetType {
case "varchar":
values = append(values, string(data.Data[prevDelimiter:i]))
case "integer":
n, err := strconv.ParseInt(string(data.Data[prevDelimiter:i]), 10, 64)
if err != nil {
return nil, err
}
values = append(values, n)

default:
return nil, fmt.Errorf("copy type %v not supported", cps.TargetType)
tmp, err := hashfunction.ApplyHashFunctionOnStringRepr(data.Data[prevDelimiter:i], cps.TargetType, cps.HashFunc)
if err != nil {
return nil, err
}
values = append(values, tmp)
}

currentAttr++
Expand All @@ -811,7 +805,7 @@ func (rst *RelayStateImpl) ProcCopy(ctx context.Context, data *pgproto3.CopyData
}

// check where this tuple should go
currroute, err := rst.Qr.DeparseKeyWithRangesInternal(ctx, []interface{}{values[cps.ColumnOffset]}, cps.Krs)
currroute, err := rst.Qr.DeparseKeyWithRangesInternal(ctx, values, cps.Krs)
if err != nil {
return nil, err
}
Expand Down
4 changes: 4 additions & 0 deletions router/routingstate/routingstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type RandomMatchState struct {
RoutingState
}

type CopyState struct {
RoutingState
}

type WorldRouteState struct {
RoutingState
}
Expand Down
1 change: 1 addition & 0 deletions test/regress/schedule/router
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ test: routing_hint
test: begin
test: alter_distribution
test: show_processing
test: copy_multishard
114 changes: 114 additions & 0 deletions test/regress/tests/router/expected/copy_multishard.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
\c spqr-console

SPQR router admin console
Here you can configure your routing rules
------------------------------------------------
You can find documentation here
https://github.com/pg-sharding/spqr/tree/master/docs

-- SETUP
CREATE DISTRIBUTION ds1 COLUMN TYPES int hash;
add distribution
------------------------
distribution id -> ds1
(1 row)

CREATE KEY RANGE krid1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
add key range
---------------
bound -> 0
(1 row)

CREATE KEY RANGE krid2 FROM 2147483648 ROUTE TO sh2 FOR DISTRIBUTION ds1;
add key range
---------------------
bound -> 2147483648
(1 row)

-- the set of all unsigned 32-bit integers (0 to 4294967295)
ALTER DISTRIBUTION ds1 ATTACH RELATION xx DISTRIBUTION KEY i HASH FUNCTION MURMUR;
attach table
------------------------
relation name -> xx
distribution id -> ds1
(2 rows)

-- TEST
\c regress
CREATE TABLE xx (i int, j int);
NOTICE: send query to shard(s) : sh1,sh2
COPY xx (i, j) FROM STDIN WITH DELIMITER '|' /* __spqr__allow_multishard: true */;
NOTICE: send query to shard(s) : sh1,sh2
INSERT INTO xx (i, j) VALUES(1,1);
NOTICE: send query to shard(s) : sh1
INSERT INTO xx (i, j) VALUES(2,2);
NOTICE: send query to shard(s) : sh2
INSERT INTO xx (i, j) VALUES(3,3);
NOTICE: send query to shard(s) : sh2
INSERT INTO xx (i, j) VALUES(4,4);
NOTICE: send query to shard(s) : sh2
INSERT INTO xx (i, j) VALUES(5,5);
NOTICE: send query to shard(s) : sh1
INSERT INTO xx (i, j) VALUES(6,6);
NOTICE: send query to shard(s) : sh1
INSERT INTO xx (i, j) VALUES(7,7);
NOTICE: send query to shard(s) : sh2
INSERT INTO xx (i, j) VALUES(8,8);
NOTICE: send query to shard(s) : sh2
INSERT INTO xx (i, j) VALUES(9,9);
NOTICE: send query to shard(s) : sh1
INSERT INTO xx (i, j) VALUES(10,10);
NOTICE: send query to shard(s) : sh2
TABLE xx /* __spqr__execute_on: sh1 */;
NOTICE: send query to shard(s) : sh1
i | j
---+---
2 | 2
3 | 3
5 | 5
6 | 6
7 | 7
9 | 9
1 | 1
5 | 5
6 | 6
9 | 9
(10 rows)

TABLE xx /* __spqr__execute_on: sh2 */;
NOTICE: send query to shard(s) : sh2
i | j
----+----
1 | 1
4 | 4
8 | 8
10 | 10
2 | 2
3 | 3
4 | 4
7 | 7
8 | 8
10 | 10
(10 rows)

DROP TABLE xx;
NOTICE: send query to shard(s) : sh1,sh2
\c spqr-console

SPQR router admin console
Here you can configure your routing rules
------------------------------------------------
You can find documentation here
https://github.com/pg-sharding/spqr/tree/master/docs

DROP DISTRIBUTION ALL CASCADE;
drop distribution
------------------------
distribution id -> ds1
(1 row)

DROP KEY RANGE ALL;
drop key range
----------------
(0 rows)

48 changes: 48 additions & 0 deletions test/regress/tests/router/sql/copy_multishard.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
\c spqr-console
-- SETUP
CREATE DISTRIBUTION ds1 COLUMN TYPES int hash;

CREATE KEY RANGE krid1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1;
CREATE KEY RANGE krid2 FROM 2147483648 ROUTE TO sh2 FOR DISTRIBUTION ds1;

-- the set of all unsigned 32-bit integers (0 to 4294967295)
ALTER DISTRIBUTION ds1 ATTACH RELATION xx DISTRIBUTION KEY i HASH FUNCTION MURMUR;

-- TEST
\c regress
CREATE TABLE xx (i int, j int);

COPY xx (i, j) FROM STDIN WITH DELIMITER '|' /* __spqr__allow_multishard: true */;
1|1
2|2
3|3
4|4
5|5
6|6
7|7
8|8
9|9
10|10
\.

INSERT INTO xx (i, j) VALUES(1,1);
INSERT INTO xx (i, j) VALUES(2,2);
INSERT INTO xx (i, j) VALUES(3,3);
INSERT INTO xx (i, j) VALUES(4,4);
INSERT INTO xx (i, j) VALUES(5,5);
INSERT INTO xx (i, j) VALUES(6,6);
INSERT INTO xx (i, j) VALUES(7,7);
INSERT INTO xx (i, j) VALUES(8,8);
INSERT INTO xx (i, j) VALUES(9,9);
INSERT INTO xx (i, j) VALUES(10,10);

TABLE xx /* __spqr__execute_on: sh1 */;
TABLE xx /* __spqr__execute_on: sh2 */;

DROP TABLE xx;

\c spqr-console
DROP DISTRIBUTION ALL CASCADE;
DROP KEY RANGE ALL;


Loading

0 comments on commit 4831d30

Please sign in to comment.