forked from tpjg/goriakpbc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rfile.go
256 lines (240 loc) · 7.96 KB
/
rfile.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package riak
import (
"errors"
"fmt"
"io"
"strconv"
)
/* The RFile struct stores (large) values in Riak and behaves very similar
to a regular os.File object. It implements the io.Reader, io.Writer and
io.Seeker interfaces. The value is split into chunks because really large
values (>10Mb) can't be stored efficiently in Riak and also because growing
or changing large values is inefficient (changing a single byte would require
a PUT of the entire, possibly large, value).
*/
type RFile struct {
client *Client
root *RObject // The "root", holding meta-data only
chunk *RObject // The current chunk or segment of data
chunk_size int
pos int
size int
}
var (
NotFile = errors.New("Not suitable to use as RFile")
ErrorInFile = errors.New("Error in RFile")
)
// Return the Key for a specific chunk
func chunkKey(key string, chunkno int) string {
return fmt.Sprintf("%v-%06d", key, chunkno)
}
// Create a new RFile. Will overwrite/truncate existing data.
func (c *Client) CreateFile(bucketname string, key string, contentType string, chunk_size int, options ...map[string]uint32) (*RFile, error) {
bucket, err := c.Bucket(bucketname)
if err != nil {
return nil, err
}
// Create the root object, holding meta-data only
root := bucket.NewObject(key, options...)
root.ContentType = contentType
root.Data = []byte{}
root.Meta["chunk_size"] = strconv.Itoa(chunk_size)
root.Meta["chunk_count"] = strconv.Itoa(0)
err = root.Store()
if err != nil {
return nil, err
}
// Return the completed struct
return &RFile{c, root, nil, chunk_size, 0, 0}, nil
}
func CreateFile(bucketname string, key string, contentType string, chunk_size int, options ...map[string]uint32) (*RFile, error) {
return defaultClient.CreateFile(bucketname, key, contentType, chunk_size, options...)
}
// Open a File. Will return an error if it does not exist in Riak yet or does
// not have the correct meta-tags to support File-like operations.
func (c *Client) OpenFile(bucketname string, key string, options ...map[string]uint32) (*RFile, error) {
root, err := c.GetFrom(bucketname, key, options...)
if err != nil {
return nil, err
}
chunk_size, err := strconv.Atoi(root.Meta["chunk_size"])
if err != nil || chunk_size < 1 || chunk_size > 100*1024*1024 {
// Supports chunks up to 100Mb, there is some conflicting information about maximum
// value size in Riak ranging from 100Kb (for low latency guarantees) to 20Mb.
return nil, NotFile
}
chunk_count, err := strconv.Atoi(root.Meta["chunk_count"])
if err != nil || chunk_count < 0 {
return nil, NotFile
}
// Determine the size by looking at the last chunk
if chunk_count > 0 {
chunk, err := c.GetFrom(bucketname, chunkKey(key, chunk_count-1), options...)
if err != nil {
return nil, ErrorInFile
}
return &RFile{c, root, chunk, chunk_size, 0, (chunk_count-1)*chunk_size + len(chunk.Data)}, nil
}
// Otherwise size is 0
return &RFile{c, root, nil, chunk_size, 0, 0}, nil
}
func OpenFile(bucketname string, key string, options ...map[string]uint32) (*RFile, error) {
return defaultClient.OpenFile(bucketname, key, options...)
}
// Implements the io.Seeker interface
func (r *RFile) Seek(offset int64, whence int) (int64, error) {
if whence == 0 {
if offset < 0 || int(offset) > r.size {
return int64(r.pos), io.EOF
}
r.pos = int(offset)
} else if whence == 1 {
if r.pos+int(offset) < 0 || r.pos+int(offset) > r.size {
return int64(r.pos), io.EOF
}
r.pos += int(offset)
} else if whence == 2 {
if r.size+int(offset) < 0 || r.size+int(offset) > r.size {
return int64(r.pos), io.EOF
}
r.pos = r.size + int(offset)
}
return int64(r.pos), nil
}
// Implements the io.Writer interface
func (r *RFile) Write(p []byte) (n int, err error) {
wpos := 0 // Keep track how much of p has been written
for wpos < len(p) {
k := chunkKey(r.root.Key, r.pos/r.chunk_size)
// Check if a chunk must be loaded (position<size or not completely written)
if r.pos < r.size || r.size%r.chunk_size != 0 {
// If the current chunk happens to be the one we're looking for, then skip loading.
if !(r.chunk != nil && r.chunk.Key == k) {
r.chunk, err = r.client.GetFrom(r.root.Bucket.name, k, r.root.Options...)
if err != nil {
return wpos, err
}
}
} else {
// Create a new chunk
r.chunk, err = r.client.NewObjectIn(r.root.Bucket.name, k, r.root.Options...)
r.chunk.ContentType = r.root.ContentType
if err != nil {
return wpos, err
}
}
// Check where to start writing within the chunk
cpos := r.pos % r.chunk_size
// Determine how many bytes to write into this chunk (fill up to chunk_size)
towrite := len(p) - wpos
if towrite > (r.chunk_size - cpos) {
towrite = r.chunk_size - cpos
}
// Check if the chunk Data is large enough, otherwise write the first part
// and then append the last part.
check := 0
if len(r.chunk.Data) == 0 {
// Specialized case for a new chunk
r.chunk.Data = make([]byte, towrite)
check = copy(r.chunk.Data, p[wpos:wpos+towrite])
} else if len(r.chunk.Data) < cpos+towrite {
if cpos == len(r.chunk.Data) {
// Just append to the chunk
r.chunk.Data = append(r.chunk.Data, p[wpos:wpos+towrite]...)
check = towrite // just assume
} else {
// Copy the part that fits, then append
part := len(r.chunk.Data) - cpos
check = copy(r.chunk.Data[cpos:], p[wpos:wpos+part])
r.chunk.Data = append(r.chunk.Data, p[wpos+part:wpos+towrite-part]...)
check += towrite - part
}
} else {
// Copy to the chunk
check = copy(r.chunk.Data[cpos:], p[wpos:wpos+towrite])
}
if check != towrite {
errors.New("Should never happen!")
}
// Save the chunk to Riak
err = r.chunk.Store()
if err != nil {
return wpos, err
}
// Update the counters
r.pos += towrite
wpos += towrite
// Update the size if necessary
if r.pos > r.size {
if ((r.pos / r.chunk_size) > (r.size / r.chunk_size)) || r.root.Meta["chunk_count"] == "0" {
// Update the root KV
r.root.Meta["chunk_count"] = strconv.Itoa(r.pos/r.chunk_size + 1)
err = r.root.Store()
if err != nil {
return wpos, nil
}
}
r.size = r.pos
}
}
// Return the number of bytes written
return wpos, nil
}
// Implements the io.Reader interface
func (r *RFile) Read(p []byte) (n int, err error) {
rpos := 0 // Keep track how much of p has been read
for rpos < len(p) {
k := chunkKey(r.root.Key, r.pos/r.chunk_size)
// Check if a chunk must be loaded
if r.pos < r.size || r.size%r.chunk_size != 0 {
// If the current chunk happens to be the one we're looking for, then skip loading.
if !(r.chunk != nil && r.chunk.Key == k) {
r.chunk, err = r.client.GetFrom(r.root.Bucket.name, k, r.root.Options...)
if err != nil {
return rpos, err
}
}
} else {
// Reading beyong EOF
return rpos, io.EOF
}
// Check where to start reading within the chunk
cpos := r.pos % r.chunk_size
// Determine how many bytes to write into this chunk (fill up to chunk_size)
toread := len(p) - rpos
if toread > (r.chunk_size - cpos) {
toread = r.chunk_size - cpos
}
// Check what is left to read
if toread > (len(p) - rpos) {
toread = len(p) - rpos
}
// Check if the chunk Data is large enough, otherwise read it and return EOF
if len(r.chunk.Data[cpos:]) < toread {
copy(p[rpos:], r.chunk.Data[cpos:])
return rpos + len(r.chunk.Data[cpos:]), io.EOF
}
// Read the chunk
copy(p[rpos:], r.chunk.Data[cpos:cpos+toread])
// Update counters
r.pos += len(r.chunk.Data[cpos : cpos+toread])
rpos += len(r.chunk.Data[cpos : cpos+toread])
}
// Return the number of bytes written
return rpos, nil
}
func (r *RFile) Size() int {
return r.size
}
// Expose Meta information of the underlying root RObject
func (r *RFile) Meta() map[string]string {
return r.root.Meta
}
// Expose Indexes of the underlying root RObject
func (r *RFile) Indexes() map[string][]string {
return r.root.Indexes
}
// Force a write of the underlying root RObject (e.g. after changing Meta and/or Indexes)
func (r *RFile) Flush() error {
return r.root.Store()
}