Skip to content

Commit

Permalink
Disable move key range in LocalCoordinator + refactoring (#780)
Browse files Browse the repository at this point in the history
  • Loading branch information
EinKrebs authored Oct 1, 2024
1 parent ac7003f commit bb197ff
Showing 1 changed file with 47 additions and 50 deletions.
97 changes: 47 additions & 50 deletions pkg/coord/local/clocal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package local
import (
"context"
"fmt"
"github.com/pg-sharding/spqr/pkg/models/spqrerror"
"math/rand"
"sync"

Expand Down Expand Up @@ -439,21 +440,21 @@ func (lc *LocalCoordinator) WorldShards() []string {
//
// Returns:
// - error: an error if the move operation encounters any issues.
func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
func (lc *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) error {
var krmv *qdb.KeyRange
var err error
if krmv, err = qr.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil {
if krmv, err = lc.qdb.CheckLockedKeyRange(ctx, req.Krid); err != nil {
return err
}

ds, err := qr.qdb.GetDistribution(ctx, krmv.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krmv.DistributionId)
if err != nil {
return err
}

var reqKr = kr.KeyRangeFromDB(krmv, ds.ColTypes)
reqKr.ShardID = req.ShardId
return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, reqKr)
return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, reqKr)
}

// TODO : unit tests
Expand All @@ -466,12 +467,12 @@ func (qr *LocalCoordinator) Move(ctx context.Context, req *kr.MoveKeyRange) erro
//
// Returns:
// - error: an error if the unite operation encounters any issues.
func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error {
func (lc *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) error {
var krBase *qdb.KeyRange
var krAppendage *qdb.KeyRange
var err error

if krBase, err = qr.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
if krBase, err = lc.qdb.LockKeyRange(ctx, req.BaseKeyRangeId); err != nil { //nolint:all TODO
return err
}

Expand All @@ -481,19 +482,19 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
spqrlog.Zero.Error().Err(err).Msg("")
return
}
}(qr.qdb, ctx, req.BaseKeyRangeId)
}(lc.qdb, ctx, req.BaseKeyRangeId)

ds, err := qr.qdb.GetDistribution(ctx, krBase.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krBase.DistributionId)
if err != nil {
return err
}

// TODO: krRight seems to be empty.
if krAppendage, err = qr.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
if krAppendage, err = lc.qdb.GetKeyRange(ctx, req.AppendageKeyRangeId); err != nil {
return err
}

if err = qr.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
if err = lc.qdb.DropKeyRange(ctx, krAppendage.KeyRangeID); err != nil {
return err
}

Expand All @@ -506,7 +507,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
krBaseCopy.LowerBound = newBound
united := kr.KeyRangeFromDB(krBaseCopy, ds.ColTypes)

return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, united)
return ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, united)
}

// Caller should lock key range
Expand All @@ -520,7 +521,7 @@ func (qr *LocalCoordinator) Unite(ctx context.Context, req *kr.UniteKeyRange) er
//
// Returns:
// - error: an error if the split operation encounters any issues.
func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error {
func (lc *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) error {
var krOld *qdb.KeyRange
var err error

Expand All @@ -530,17 +531,17 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
Str("source-id", req.SourceID).
Msg("split request is")

if krOld, err = qr.qdb.LockKeyRange(ctx, req.SourceID); err != nil {
if krOld, err = lc.qdb.LockKeyRange(ctx, req.SourceID); err != nil {
return err
}

defer func() {
if err := qr.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil {
if err := lc.qdb.UnlockKeyRange(ctx, req.SourceID); err != nil {
spqrlog.Zero.Error().Err(err).Msg("")
}
}()

ds, err := qr.qdb.GetDistribution(ctx, krOld.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, krOld.DistributionId)
if err != nil {
return err
}
Expand Down Expand Up @@ -570,11 +571,11 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
krOld.LowerBound = req.Bound // TODO: fix
}

if err := ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil {
if err := ops.ModifyKeyRangeWithChecks(ctx, lc.qdb, kr.KeyRangeFromDB(krOld, ds.ColTypes)); err != nil {
return err
}

if err := ops.CreateKeyRangeWithChecks(ctx, qr.qdb, krNew); err != nil {
if err := ops.CreateKeyRangeWithChecks(ctx, lc.qdb, krNew); err != nil {
return fmt.Errorf("failed to add a new key range: %w", err)
}

Expand All @@ -592,13 +593,13 @@ func (qr *LocalCoordinator) Split(ctx context.Context, req *kr.SplitKeyRange) er
// Returns:
// - *kr.KeyRange: the locked KeyRange object.
// - error: an error if the lock operation encounters any issues.
func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) {
keyRangeDB, err := qr.qdb.LockKeyRange(ctx, krid)
func (lc *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.KeyRange, error) {
keyRangeDB, err := lc.qdb.LockKeyRange(ctx, krid)
if err != nil {
return nil, err
}

ds, err := qr.qdb.GetDistribution(ctx, keyRangeDB.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, keyRangeDB.DistributionId)
if err != nil {
return nil, err
}
Expand All @@ -616,8 +617,8 @@ func (qr *LocalCoordinator) LockKeyRange(ctx context.Context, krid string) (*kr.
//
// Returns:
// - error: an error if the unlock operation encounters any issues.
func (qr *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error {
return qr.qdb.UnlockKeyRange(ctx, krid)
func (lc *LocalCoordinator) UnlockKeyRange(ctx context.Context, krid string) error {
return lc.qdb.UnlockKeyRange(ctx, krid)
}

// TODO : unit tests
Expand Down Expand Up @@ -652,10 +653,10 @@ func (lc *LocalCoordinator) AddDataShard(ctx context.Context, ds *datashards.Dat
//
// Returns:
// - []string: a slice of strings containing the names of the data shards.
func (qr *LocalCoordinator) Shards() []string {
func (lc *LocalCoordinator) Shards() []string {
var ret []string

for name := range qr.DataShardCfgs {
for name := range lc.DataShardCfgs {
ret = append(ret, name)
}

Expand Down Expand Up @@ -697,13 +698,13 @@ func (lc *LocalCoordinator) GetKeyRange(ctx context.Context, krId string) (*kr.K
// Returns:
// - []*kr.KeyRange: a slice of KeyRange objects retrieved.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) {
func (lc *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution string) ([]*kr.KeyRange, error) {
var ret []*kr.KeyRange
if krs, err := qr.qdb.ListKeyRanges(ctx, distribution); err != nil {
if krs, err := lc.qdb.ListKeyRanges(ctx, distribution); err != nil {
return nil, err
} else {
for _, keyRange := range krs {
ds, err := qr.qdb.GetDistribution(ctx, keyRange.DistributionId)
ds, err := lc.qdb.GetDistribution(ctx, keyRange.DistributionId)

if err != nil {
return nil, err
Expand All @@ -726,8 +727,8 @@ func (qr *LocalCoordinator) ListKeyRanges(ctx context.Context, distribution stri
// Returns:
// - []*kr.KeyRange: a slice of KeyRange objects representing all key ranges.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
if krs, err := qr.qdb.ListAllKeyRanges(ctx); err != nil {
func (lc *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRange, error) {
if krs, err := lc.qdb.ListAllKeyRanges(ctx); err != nil {
return nil, err
} else {
var ret []*kr.KeyRange
Expand All @@ -738,7 +739,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang
var err error
var ok bool
if ds, ok = cache[keyRange.DistributionId]; !ok {
ds, err = qr.qdb.GetDistribution(ctx, keyRange.DistributionId)
ds, err = lc.qdb.GetDistribution(ctx, keyRange.DistributionId)
if err != nil {
return nil, err
}
Expand All @@ -761,7 +762,7 @@ func (qr *LocalCoordinator) ListAllKeyRanges(ctx context.Context) ([]*kr.KeyRang
// Returns:
// - []*topology.Router: a slice of Router objects representing all routers.
// - error: an error if the retrieval encounters any issues.
func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) {
func (lc *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router, error) {
return []*topology.Router{{
ID: "local",
}}, nil
Expand All @@ -775,20 +776,16 @@ func (qr *LocalCoordinator) ListRouters(ctx context.Context) ([]*topology.Router
//
// Returns:
// - error: An error if the creation encounters any issues.
func (qr *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.CreateKeyRangeWithChecks(ctx, qr.qdb, kr)
func (lc *LocalCoordinator) CreateKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.CreateKeyRangeWithChecks(ctx, lc.qdb, kr)
}

// MoveKeyRange moves a key range in the LocalCoordinator.
//
// Parameters:
// - ctx (context.Context): The context of the operation.
// - kr (*kr.KeyRange): The key range object to be moved.
// MoveKeyRange is disabled in LocalCoordinator
//
// Returns:
// - error: An error if the move encounters any issues.
func (qr *LocalCoordinator) MoveKeyRange(ctx context.Context, kr *kr.KeyRange) error {
return ops.ModifyKeyRangeWithChecks(ctx, qr.qdb, kr)
// - error: SPQR_INVALID_REQUEST error
func (lc *LocalCoordinator) MoveKeyRange(_ context.Context, _ *kr.KeyRange) error {
return spqrerror.New(spqrerror.SPQR_INVALID_REQUEST, "MoveKeyRange is not available in local coordinator")
}

var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router")
Expand All @@ -801,7 +798,7 @@ var ErrNotCoordinator = fmt.Errorf("request is unprocessable in router")
//
// Returns:
// - error: An error indicating the registration status.
func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error {
func (lc *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -813,7 +810,7 @@ func (qr *LocalCoordinator) RegisterRouter(ctx context.Context, r *topology.Rout
//
// Returns:
// - error: An error indicating the unregistration status.
func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error {
func (lc *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) error {
return ErrNotCoordinator
}

Expand All @@ -825,7 +822,7 @@ func (qr *LocalCoordinator) UnregisterRouter(ctx context.Context, id string) err
//
// Returns:
// - error: An error indicating the synchronization status. In this case, it returns ErrNotCoordinator.
func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error {
func (lc *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -837,7 +834,7 @@ func (qr *LocalCoordinator) SyncRouterMetadata(ctx context.Context, router *topo
//
// Returns:
// - error: An error indicating the update status.
func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
func (lc *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, router *topology.Router) error {
return ErrNotCoordinator
}

Expand All @@ -849,8 +846,8 @@ func (qr *LocalCoordinator) SyncRouterCoordinatorAddress(ctx context.Context, ro
//
// Returns:
// - error: An error indicating the update status.
func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return qr.qdb.UpdateCoordinator(ctx, addr)
func (lc *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string) error {
return lc.qdb.UpdateCoordinator(ctx, addr)
}

// GetCoordinator retrieves the coordinator address from the local coordinator.
Expand All @@ -861,8 +858,8 @@ func (qr *LocalCoordinator) UpdateCoordinator(ctx context.Context, addr string)
// Returns:
// - string: The address of the coordinator.
// - error: An error indicating the retrieval status.
func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) {
addr, err := qr.qdb.GetCoordinator(ctx)
func (lc *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) {
addr, err := lc.qdb.GetCoordinator(ctx)
spqrlog.Zero.Debug().Str("address", addr).Msg("resp local coordiantor: get coordinator")
return addr, err
}
Expand All @@ -876,7 +873,7 @@ func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error)
// Returns:
// - *datashards.DataShard: The retrieved DataShard, or nil if it doesn't exist.
// - error: An error indicating the retrieval status, or ErrNotCoordinator if the operation is not supported by the LocalCoordinator.
func (qr *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) {
func (lc *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) {
return nil, ErrNotCoordinator
}

Expand Down

0 comments on commit bb197ff

Please sign in to comment.