-
Notifications
You must be signed in to change notification settings - Fork 55
/
transaction.go
157 lines (147 loc) · 3.5 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright (c) 2012-2013 Jason McVetta. This is Free Software, released under
// the terms of the GPL v3. See http://www.gnu.org/copyleft/gpl.html for details.
// Resist intellectual serfdom - the ownership of ideas is akin to slavery.
package neoism
import (
"encoding/json"
"errors"
)
// A Tx is an in-progress database transaction.
type Tx struct {
db *Database
hrefCommit string
Location string
Errors []TxError
Expires string // Cannot unmarshall into time.Time :(
}
type txRequest struct {
Statements []*CypherQuery `json:"statements"`
}
type txResponse struct {
Commit string
Results []struct {
Columns []string
Data []struct {
Row []*json.RawMessage
}
Stats *Stats
}
Transaction struct {
Expires string
}
Errors []TxError
}
// unmarshal populates a slice of CypherQuery object with result data returned
// from the server.
func (tr *txResponse) unmarshal(qs []*CypherQuery) error {
if len(tr.Results) != len(qs) {
return errors.New("Result count does not match query count")
}
// NOTE: Beginning in 2.0.0-M05, the data format returned by transaction
// endpoint diverged from the format returned by cypher batch. At least
// until final 2.0.0 release, we will work around this by munging the new
// result format into the existing cypherResult struct.
for i, res := range tr.Results {
data := make([][]*json.RawMessage, len(res.Data))
for n, d := range res.Data {
data[n] = d.Row
}
q := qs[i]
cr := cypherResult{
Columns: res.Columns,
Data: data,
Stats: res.Stats,
}
q.cr = cr
if q.Result != nil {
err := q.Unmarshal(q.Result)
if err != nil {
return err
}
}
q.stats = cr.Stats
}
return nil
}
// Begin opens a new transaction, executing zero or more cypher queries
// inside the transaction.
func (db *Database) Begin(qs []*CypherQuery) (*Tx, error) {
payload := txRequest{Statements: qs}
result := txResponse{}
ne := NeoError{}
resp, err := db.Session.Post(db.HrefTransaction, payload, &result, &ne)
if err != nil {
return nil, err
}
if resp.Status() != 201 {
return nil, ne
}
t := Tx{
db: db,
hrefCommit: result.Commit,
Location: resp.HttpResponse().Header.Get("Location"),
Errors: result.Errors,
Expires: result.Transaction.Expires,
}
if len(t.Errors) != 0 {
return &t, TxQueryError
}
err = result.unmarshal(qs)
if err != nil {
return &t, err
}
return &t, err
}
// Commit commits an open transaction.
func (t *Tx) Commit() error {
if len(t.Errors) > 0 {
return TxQueryError
}
ne := NeoError{}
resp, err := t.db.Session.Post(t.hrefCommit, nil, nil, &ne)
if err != nil {
return err
}
if resp.Status() != 200 {
return ne
}
return nil // Success
}
// Query executes statements in an open transaction.
func (t *Tx) Query(qs []*CypherQuery) error {
payload := txRequest{Statements: qs}
result := txResponse{}
ne := NeoError{}
resp, err := t.db.Session.Post(t.Location, payload, &result, &ne)
if err != nil {
return err
}
if resp.Status() == 404 {
return NotFound
}
if resp.Status() != 200 {
return &ne
}
t.Expires = result.Transaction.Expires
t.Errors = append(t.Errors, result.Errors...)
if len(t.Errors) != 0 {
return TxQueryError
}
err = result.unmarshal(qs)
if err != nil {
return err
}
return nil
}
// Rollback rolls back an open transaction.
func (t *Tx) Rollback() error {
ne := NeoError{}
resp, err := t.db.Session.Delete(t.Location, nil, nil, &ne)
if err != nil {
return err
}
if resp.Status() != 200 {
return ne
}
return nil // Success
}