-
Notifications
You must be signed in to change notification settings - Fork 0
/
pico_storage.py
183 lines (143 loc) · 4.16 KB
/
pico_storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#Convert to use nosql
import pickle
import os
items = []
filename = None
def everything(v):
return v
def always(v):
return True
def never(v):
return False
def next_id():
global items
id = 1
for item in items:
if item['id'] > id:
id = item['id']
return id
def open_database(name):
global filename
global items
filename = name
if os.path.exists(filename):
with open(filename, "rb") as f:
items = pickle.load(f)
else:
items = []
def commit():
global filename
global items
with open(filename, "wb") as f:
pickle.dump(items, f)
def insert(data):
items.append(data)
commit()
def query(select=everything, where=always): #default select *
return [select(item) for item in items if where(item)]
def update(where=never, content=lambda v:{}): #default update nothing
global items
for item in items:
if where(item):
new_values = content(item)
for key in new_values.keys():
item[key] = new_values[key]
commit()
return
def delete(where=never): #delete nothing by default
global items
#should lock items to this transaction
kept_items = [item for item in items if not where(item)] #make a copy of all items except those to be deleted
items = kept_items
commit()
#unlock
return
def get_items():
items = query(select=everything, where=always)
results = [(item['id'], item['task'], item['status']) for item in items]
return results
def update_status(id):
update(where=lambda v:v['id']==id, content=lambda v:{'status':(v['status']+1)%2})
def create_item(task, status):
id = next_id()
insert({"id":id, "task":task, "status":status})
return id
def get_item(id):
items = query(select=everything, where=lambda v:v['id']==id)
if len(items) == 0:
return None
results = [(item['id'], item['task'], item['status']) for item in items]
return results[0]
def delete_item(id):
delete(lambda v:v['id']==id)
def update_item(new_task, id):
update(where=lambda v:v['id']==id, content=lambda v:{'task':new_task})
#tests
import random
random.seed()
def _random_text():
rand_txt = str(random.randint(10000, 20000))
return rand_txt
def test_get_items():
print("Testing get_items()")
results = get_items()
assert type(results) is list
assert len(results) > 0
print(results)
def test_get_item():
print("Testing get_item(id)")
results = get_items()
assert len(results) > 0
id,_,_ = results[0]
result = get_item(id)
#print(result)
assert type(result) is tuple
def test_update_item():
print("Testing update_item(update, id)")
ex_task = "This is an example item: " + _random_text()
id = create_item(ex_task, 0)
_, _, status = get_item(id)
updated_task = ex_task + " UPDATED"
update_item(updated_task, id)
_, task, status = get_item(id)
assert task == updated_task
def test_create_item():
print("Testing create_item()")
ex_task = "This is an example item: " + _random_text()
create_item(ex_task, 0)
items = get_items()
found = False
for item in items:
if item[1] == ex_task:
found = True
assert found
def test_delete_item():
print("Testing delete_item(id)")
ex_task = "This is an example item: " + _random_text()
id = create_item(ex_task, 0)
delete_item(id)
check = get_item(id)
assert check == None
def test_update_status():
print("Testing update_status(id, value)")
ex_task = "This is an example item: " + _random_text()
create_item(ex_task, 0)
items = get_items()
for item in items:
if item[1] == ex_task:
ex_id = item[0]
assert item[2] == 0
update_status(ex_id)
tmp = get_item(ex_id)
#print(tmp)
assert tmp[2] == 1
#call tests if file is run by itself only
if __name__ == "__main__":
open_database("todo.pkl")
test_create_item()
test_get_items()
test_get_item()
test_update_item()
test_delete_item()
test_update_status()
print("Done")