Skip to content

Commit

Permalink
Merge branch 'history-type-support' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
DJAndries committed Oct 12, 2023
2 parents b05d499 + cb0d54b commit 4ee5565
Show file tree
Hide file tree
Showing 12 changed files with 260 additions and 35 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ jobs:
name: ci
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@8ade135a41bc03ea155e62e844d188df1ea18608 # v4.1.0

- uses: actions/setup-go@v4
- uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0
with:
go-version: 1.18

- name: lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@3a919529898de77ec3da873e3063ca4b10e7f5cc # v3.7.0

- name: test
run: make docker && make docker-test
8 changes: 4 additions & 4 deletions cache/instrumented_redis.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package cache

// Code generated by gowrap. DO NOT EDIT.
// template: ../.prom-gowrap.tmpl
// gowrap: http://github.com/hexdigest/gowrap
// DO NOT EDIT!
// This code is generated with http://github.com/hexdigest/gowrap tool
// using ../.prom-gowrap.tmpl template

//go:generate gowrap gen -p github.com/brave/go-sync/cache -i RedisClient -t ../.prom-gowrap.tmpl -o instrumented_redis.go -l ""
//go:generate gowrap gen -p github.com/brave/go-sync/cache -i RedisClient -t ../.prom-gowrap.tmpl -o instrumented_redis.go

import (
"context"
Expand Down
16 changes: 15 additions & 1 deletion command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
setSyncPollInterval int32 = 30
nigoriTypeID int32 = 47745
deviceInfoTypeID int = 154522
historyTypeID int = 963985
maxActiveDevices int = 50
)

Expand Down Expand Up @@ -241,8 +242,21 @@ func handleCommitRequest(cache *cache.Cache, commitMsg *sync_pb.CommitMessage, c
}

oldVersion := *entityToCommit.Version
isUpdateOp := oldVersion != 0
*entityToCommit.Version = *entityToCommit.Mtime
if oldVersion == 0 { // Create
if *entityToCommit.DataType == historyTypeID {
// Check if item exists using client_unique_tag
isUpdateOp, err = db.HasItem(clientID, *entityToCommit.ClientDefinedUniqueTag)
if err != nil {
log.Error().Err(err).Msg("Insert history sync entity failed")
rspType := sync_pb.CommitResponse_TRANSIENT_ERROR
entryRsp.ResponseType = &rspType
entryRsp.ErrorMessage = aws.String(fmt.Sprintf("Insert history sync entity failed: %v", err.Error()))
continue
}
}

if !isUpdateOp { // Create
if itemCount+count >= maxClientObjectQuota {
rspType := sync_pb.CommitResponse_OVER_QUOTA
entryRsp.ResponseType = &rspType
Expand Down
2 changes: 2 additions & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ type Datastore interface {
DisableSyncChain(clientID string) error
// IsSyncChainDisabled checks whether a given sync chain is deleted
IsSyncChainDisabled(clientID string) (bool, error)
// Checks if sync item exists for a client
HasItem(clientID string, ID string) (bool, error)
}
5 changes: 5 additions & 0 deletions datastore/datastoretest/mock_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func (m *MockDatastore) HasServerDefinedUniqueTag(clientID string, tag string) (
return args.Bool(0), args.Error(1)
}

func (m *MockDatastore) HasItem(clientID string, ID string) (bool, error) {
args := m.Called(clientID, ID)
return args.Bool(0), args.Error(1)
}

// GetClientItemCount mocks calls to GetClientItemCount
func (m *MockDatastore) GetClientItemCount(clientID string) (int, error) {
args := m.Called(clientID)
Expand Down
22 changes: 18 additions & 4 deletions datastore/instrumented_datastore.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package datastore

// Code generated by gowrap. DO NOT EDIT.
// template: ../.prom-gowrap.tmpl
// gowrap: http://github.com/hexdigest/gowrap
// DO NOT EDIT!
// This code is generated with http://github.com/hexdigest/gowrap tool
// using ../.prom-gowrap.tmpl template

//go:generate gowrap gen -p github.com/brave/go-sync/datastore -i Datastore -t ../.prom-gowrap.tmpl -o instrumented_datastore.go -l ""
//go:generate gowrap gen -p github.com/brave/go-sync/datastore -i Datastore -t ../.prom-gowrap.tmpl -o instrumented_datastore.go

import (
"time"
Expand Down Expand Up @@ -93,6 +93,20 @@ func (_d DatastoreWithPrometheus) GetUpdatesForType(dataType int, clientToken in
return _d.base.GetUpdatesForType(dataType, clientToken, fetchFolders, clientID, maxSize)
}

// HasItem implements Datastore
func (_d DatastoreWithPrometheus) HasItem(clientID string, ID string) (b1 bool, err error) {
_since := time.Now()
defer func() {
result := "ok"
if err != nil {
result = "error"
}

datastoreDurationSummaryVec.WithLabelValues(_d.instanceName, "HasItem", result).Observe(time.Since(_since).Seconds())
}()
return _d.base.HasItem(clientID, ID)
}

// HasServerDefinedUniqueTag implements Datastore
func (_d DatastoreWithPrometheus) HasServerDefinedUniqueTag(clientID string, tag string) (b1 bool, err error) {
_since := time.Now()
Expand Down
99 changes: 80 additions & 19 deletions datastore/sync_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
)

const (
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
maxBatchGetItemSize = 100 // Limited by AWS.
maxTransactDeleteItemSize = 10 // Limited by AWS.
clientTagItemPrefix = "Client#"
serverTagItemPrefix = "Server#"
conditionalCheckFailed = "ConditionalCheckFailed"
disabledChainID = "disabled_chain"
reasonDeleted = "deleted"
historyTypeID int = 963985
historyDeleteDirectiveTypeID int = 150251
// Expiration time for history and history delete directive
// entities in seconds
HistoryExpirationIntervalSecs = 60 * 60 * 24 * 90 // 90 days
)

// SyncEntity is used to marshal and unmarshal sync items in dynamoDB.
Expand All @@ -50,6 +55,7 @@ type SyncEntity struct {
ClientDefinedUniqueTag *string `dynamodbav:",omitempty"`
UniquePosition []byte `dynamodbav:",omitempty"`
DataTypeMtime *string
ExpirationTime *int64
}

// SyncEntityByClientIDID implements sort.Interface for []SyncEntity based on
Expand Down Expand Up @@ -159,7 +165,9 @@ func (dynamo *Dynamo) InsertSyncEntity(entity *SyncEntity) (bool, error) {
return false, fmt.Errorf("error building expression to insert sync entity: %w", err)
}

if entity.ClientDefinedUniqueTag != nil {
// Write tag item for all data types, except for
// the history type, which does not use tag items.
if entity.ClientDefinedUniqueTag != nil && *entity.DataType != historyTypeID {
items := []*dynamodb.TransactWriteItem{}
// Additional item for ensuring tag's uniqueness for a specific client.
item := NewServerClientUniqueTagItem(entity.ClientID, *entity.ClientDefinedUniqueTag, false)
Expand Down Expand Up @@ -253,6 +261,28 @@ func (dynamo *Dynamo) HasServerDefinedUniqueTag(clientID string, tag string) (bo
return out.Item != nil, nil
}

func (dynamo *Dynamo) HasItem(clientID string, ID string) (bool, error) {
primaryKey := PrimaryKey{ClientID: clientID, ID: ID}
key, err := dynamodbattribute.MarshalMap(primaryKey)

if err != nil {
return false, fmt.Errorf("error marshalling key to check if item existed: %w", err)
}

input := &dynamodb.GetItemInput{
Key: key,
ProjectionExpression: aws.String(projPk),
TableName: aws.String(Table),
}

out, err := dynamo.GetItem(input)
if err != nil {
return false, fmt.Errorf("error calling GetItem to check if item existed: %w", err)
}

return out.Item != nil, nil
}

// InsertSyncEntitiesWithServerTags is used to insert sync entities with
// server-defined unique tags. To ensure the uniqueness, for each sync entity,
// we will write a tag item and a sync item. Items for all the entities in the
Expand Down Expand Up @@ -470,10 +500,12 @@ func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (bo
return false, false, fmt.Errorf("error marshalling key to update sync entity: %w", err)
}

// condition to ensure to be update only and the version is matched.
cond := expression.And(
expression.AttributeExists(expression.Name(pk)),
expression.Name("Version").Equal(expression.Value(oldVersion)))
// condition to ensure the request is update only...
cond := expression.AttributeExists(expression.Name(pk))
// ...and the version matches, if applicable
if *entity.DataType != historyTypeID {
cond = expression.And(cond, expression.Name("Version").Equal(expression.Value(oldVersion)))
}

update := expression.Set(expression.Name("Version"), expression.Value(entity.Version))
update = update.Set(expression.Name("Mtime"), expression.Value(entity.Mtime))
Expand Down Expand Up @@ -507,7 +539,7 @@ func (dynamo *Dynamo) UpdateSyncEntity(entity *SyncEntity, oldVersion int64) (bo

// Soft-delete a sync item with a client tag, use a transaction to delete its
// tag item too.
if entity.Deleted != nil && entity.ClientDefinedUniqueTag != nil && *entity.Deleted {
if entity.Deleted != nil && entity.ClientDefinedUniqueTag != nil && *entity.Deleted && *entity.DataType != historyTypeID {
pk := PrimaryKey{
ClientID: entity.ClientID, ID: clientTagItemPrefix + *entity.ClientDefinedUniqueTag}
tagItemKey, err := dynamodbattribute.MarshalMap(pk)
Expand Down Expand Up @@ -614,10 +646,12 @@ func (dynamo *Dynamo) GetUpdatesForType(dataType int, clientToken int64, fetchFo
expression.Value(dataTypeMtimeUpperBound))
keyCond := expression.KeyAnd(pkCond, skCond)
exprs := expression.NewBuilder().WithKeyCondition(keyCond)

if !fetchFolders { // Filter folder entities out if fetchFolder is false.
exprs = exprs.WithFilter(
expression.Equal(expression.Name("Folder"), expression.Value(false)))
}

expr, err := exprs.Build()
if err != nil {
return false, syncEntities, fmt.Errorf("error building expression to get updates: %w", err)
Expand Down Expand Up @@ -680,8 +714,21 @@ func (dynamo *Dynamo) GetUpdatesForType(dataType int, clientToken int64, fetchFo
if err != nil {
return false, syncEntities, fmt.Errorf("error unmarshalling updated sync entities: %w", err)
}
sort.Sort(SyncEntityByMtime(syncEntities))
return hasChangesRemaining, syncEntities, nil

// filter out any expired items, i.e. history sync entities over 90 days old
nowUnix := time.Now().Unix()
var filteredSyncEntities []SyncEntity
for _, syncEntity := range syncEntities {
if syncEntity.ExpirationTime != nil && *syncEntity.ExpirationTime > 0 {
if *syncEntity.ExpirationTime < nowUnix {
continue
}
}
filteredSyncEntities = append(filteredSyncEntities, syncEntity)
}

sort.Sort(SyncEntityByMtime(filteredSyncEntities))
return hasChangesRemaining, filteredSyncEntities, nil
}

func validatePBEntity(entity *sync_pb.SyncEntity) error {
Expand Down Expand Up @@ -745,16 +792,29 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
originatorClientItemID = entity.IdString
}

now := aws.Int64(utils.UnixMilli(time.Now()))
// The client tag hash must be used as the primary key
// for the history type.
if dataType == historyTypeID {
id = *entity.ClientTagHash
}

now := time.Now()

var expirationTime *int64
if dataType == historyTypeID || dataType == historyDeleteDirectiveTypeID {
expirationTime = aws.Int64(now.Unix() + HistoryExpirationIntervalSecs)
}

nowMillis := aws.Int64(now.UnixMilli())
// ctime is only used when inserting a new entity, here we use client passed
// ctime if it is passed, otherwise, use current server time as the creation
// time. When updating, ctime will be ignored later in the query statement.
cTime := now
cTime := nowMillis
if entity.Ctime != nil {
cTime = entity.Ctime
}

dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*now, 10)
dataTypeMtime := strconv.Itoa(dataType) + "#" + strconv.FormatInt(*nowMillis, 10)

// Set default values on Deleted and Folder attributes for new entities, the
// default values are specified by sync.proto protocol.
Expand All @@ -775,7 +835,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
ParentID: entity.ParentIdString,
Version: entity.Version,
Ctime: cTime,
Mtime: now,
Mtime: nowMillis,
Name: entity.Name,
NonUniqueName: entity.NonUniqueName,
ServerDefinedUniqueTag: entity.ServerDefinedUniqueTag,
Expand All @@ -788,6 +848,7 @@ func CreateDBSyncEntity(entity *sync_pb.SyncEntity, cacheGUID *string, clientID
UniquePosition: uniquePosition,
DataType: aws.Int(dataType),
DataTypeMtime: aws.String(dataTypeMtime),
ExpirationTime: expirationTime,
}, nil
}

Expand Down
Loading

0 comments on commit 4ee5565

Please sign in to comment.