-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrecords.go
161 lines (140 loc) · 4.35 KB
/
records.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main
import (
"errors"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodbstreams"
logging "github.com/sirupsen/logrus"
)
func (listener *streamListener) processSelfRecords(records []*dynamodbstreams.Record, isBidirectional bool) {
for _, r := range records {
isDeleteItem := *r.EventName == "REMOVE"
isFreshItem := r.Dynamodb.NewImage[replicationSource] == nil || r.Dynamodb.NewImage[replicationTimestamp] == nil
isUpdatedFromClient := r.Dynamodb.NewImage[replicationTimestamp] != nil &&
r.Dynamodb.OldImage[replicationTimestamp] != nil &&
*r.Dynamodb.NewImage[replicationTimestamp].N == *r.Dynamodb.OldImage[replicationTimestamp].N
if isDeleteItem && r.Dynamodb.NewImage == nil {
// no need to update table; just send it to channel
newRec := listener.getNewRecord(r)
listener.recordQueue <- newRec
continue
}
if isFreshItem || isUpdatedFromClient {
// Update metadata & add to channel
newRec := listener.getNewRecord(r)
if isBidirectional {
err := listener.updateTable(newRec)
if err != nil {
logger.WithFields(logging.Fields{
"Table": listener.streamOwner,
"Record Old Image": r.Dynamodb.OldImage,
"Record New Image": r.Dynamodb.NewImage,
"Error": err,
}).Error("Failed to update table with Pendulum fields")
// To continue or not to continue?
continue
}
}
listener.recordQueue <- newRec
} else {
// Discard record if it is an item updated by sync
// nothing to do
logger.WithFields(logging.Fields{
"Table": listener.streamOwner,
"Record Key": r.Dynamodb.Keys,
"Record Old Image": r.Dynamodb.OldImage,
"Record New Image": r.Dynamodb.NewImage,
}).Debug("Ignoring record")
}
}
}
func (listener *streamListener) getNewRecord(record *dynamodbstreams.Record) (*dynamodbstreams.Record) {
item := record.Dynamodb.NewImage
// item can be nil if it is a record from "REMOVE" action
if item == nil {
item = map[string]*dynamodb.AttributeValue{}
}
ts := strconv.FormatInt(record.Dynamodb.ApproximateCreationDateTime.Unix(), 10)
updateSource := dynamodb.AttributeValue{S: aws.String(listener.streamOwner)}
updateTimestamp := dynamodb.AttributeValue{N: aws.String(ts)}
item[replicationSource] = &updateSource
item[replicationTimestamp] = &updateTimestamp
cur := record.Dynamodb
cur.NewImage = item
newRec := record.SetDynamodb(cur)
return newRec
}
func (listener *streamListener) updateTable(record *dynamodbstreams.Record) (error){
var err error = nil
switch *record.EventName {
case "MODIFY":
// same as INSERT
fallthrough
case "INSERT":
err = listener.insertRecord(record.Dynamodb.NewImage)
case "REMOVE":
err = listener.removeRecord(record.Dynamodb.Keys)
default:
// Should not reach here unless AWS Dynamodb change Event Names or
// add new one(s)
err = errors.New("unknown event on record")
}
if err != nil {
logger.WithFields(logging.Fields{
"Table": listener.streamOwner,
"Record": *record.Dynamodb.SequenceNumber,
"Key": record.Dynamodb.Keys,
"Error": err,
}).Error("Failed to handle record")
return err
}
logger.WithFields(logging.Fields{
"Table": listener.streamOwner,
"Record": *record.Dynamodb.SequenceNumber,
"Key": record.Dynamodb.Keys,
"New Record": record.Dynamodb.NewImage,
}).Debug("Handled record successfully")
return nil
}
func (listener *streamListener) extractTableFromOwner() string {
return strings.Split(listener.streamOwner, ".account.")[0]
}
func (listener *streamListener) insertRecord(item map[string]*dynamodb.AttributeValue) (error) {
input := &dynamodb.PutItemInput{
Item: item,
TableName: aws.String(listener.extractTableFromOwner()),
}
for i := 1; i <= maxRetries; i++ {
_, err := listener.dynamo.PutItem(input)
if err != nil {
if i == maxRetries {
return err
}
backoff(i, "PutItem")
} else {
return nil
}
}
return nil
}
func (listener *streamListener) removeRecord(item map[string]*dynamodb.AttributeValue) (error) {
var err error = nil
for i := 1; i <= maxRetries; i++ {
_, err = listener.dynamo.DeleteItem(&dynamodb.DeleteItemInput{
TableName: aws.String(listener.extractTableFromOwner()),
Key: item,
})
if err != nil {
if i == maxRetries {
break
} else {
backoff(i, "DeleteItem")
}
} else {
break
}
}
return err
}