-
-
Notifications
You must be signed in to change notification settings - Fork 181
/
Copy pathput.go
133 lines (118 loc) · 3.38 KB
/
put.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package dynamo
import (
"context"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
// Put is a request to create or replace an item.
// See: http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
type Put struct {
table Table
returnType string
item Item
subber
condition string
err error
cc *ConsumedCapacity
}
// Put creates a new request to create or replace an item.
func (table Table) Put(item interface{}) *Put {
encoded, err := marshalItem(item)
return &Put{
table: table,
item: encoded,
err: err,
}
}
// If specifies a conditional expression for this put to succeed.
// Use single quotes to specificy reserved names inline (like 'Count').
// Use the placeholder ? within the expression to substitute values, and use $ for names.
// You need to use quoted or placeholder names when the name is a reserved word in DynamoDB.
// Multiple calls to If will be combined with AND.
func (p *Put) If(expr string, args ...interface{}) *Put {
expr, err := p.subExprN(expr, args...)
p.setError(err)
if p.condition != "" {
p.condition += " AND "
}
p.condition += wrapExpr(expr)
return p
}
// ConsumedCapacity will measure the throughput capacity consumed by this operation and add it to cc.
func (p *Put) ConsumedCapacity(cc *ConsumedCapacity) *Put {
p.cc = cc
return p
}
// Run executes this put.
func (p *Put) Run(ctx context.Context) error {
p.returnType = "NONE"
_, err := p.run(ctx)
return err
}
// OldValue executes this put, unmarshaling the previous value into out.
// Returns ErrNotFound is there was no previous value.
func (p *Put) OldValue(ctx context.Context, out interface{}) error {
p.returnType = "ALL_OLD"
output, err := p.run(ctx)
switch {
case err != nil:
return err
case output.Attributes == nil:
return ErrNotFound
}
return unmarshalItem(output.Attributes, out)
}
func (p *Put) run(ctx context.Context) (output *dynamodb.PutItemOutput, err error) {
if p.err != nil {
return nil, p.err
}
req := p.input()
p.table.db.retry(ctx, func() error {
output, err = p.table.db.client.PutItem(ctx, req)
p.cc.incRequests()
return err
})
if output != nil {
p.cc.add(output.ConsumedCapacity)
}
return
}
func (p *Put) input() *dynamodb.PutItemInput {
input := &dynamodb.PutItemInput{
TableName: &p.table.name,
Item: p.item,
ReturnValues: types.ReturnValue(p.returnType),
ExpressionAttributeNames: p.nameExpr,
ExpressionAttributeValues: p.valueExpr,
}
if p.condition != "" {
input.ConditionExpression = &p.condition
}
if p.cc != nil {
input.ReturnConsumedCapacity = types.ReturnConsumedCapacityIndexes
}
return input
}
func (p *Put) writeTxItem() (*types.TransactWriteItem, error) {
if p.err != nil {
return nil, p.err
}
input := p.input()
item := &types.TransactWriteItem{
Put: &types.Put{
TableName: input.TableName,
Item: input.Item,
ExpressionAttributeNames: input.ExpressionAttributeNames,
ExpressionAttributeValues: input.ExpressionAttributeValues,
ConditionExpression: input.ConditionExpression,
// TODO: add support when aws-sdk-go updates
// ReturnValuesOnConditionCheckFailure: aws.String(dynamodb.ReturnValuesOnConditionCheckFailureAllOld),
},
}
return item, nil
}
func (p *Put) setError(err error) {
if p.err == nil {
p.err = err
}
}