-
-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathloader.lisp
231 lines (199 loc) · 9.53 KB
/
loader.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
(in-package #:org.shirakumo.fraf.trial)
(defclass staging-area ()
((load-state :initform (make-hash-table :test 'eq) :reader load-state)
(observers :initform (make-hash-table :test 'eq) :reader observers)
(loader :initarg :loader :initform NIL :accessor loader)))
(defgeneric dependencies (object))
(defgeneric stage (object staging-area))
(defgeneric unstage (object staging-area))
(defgeneric observe-load-state (observer changing new-state staging-area))
(defgeneric change-state (staging-area object new-state))
(defgeneric register-load-observer (staging-area observer changing))
(defmethod dependencies (object) ())
(defmethod stage (object (area staging-area)))
(defmethod observe-load-state (object changing new-state (area staging-area)))
(defmethod change-state ((area staging-area) object new-state)
(let ((normalized-state (ecase new-state
((:loaded :was-loaded) :loaded)
((:allocated :was-allocated) :allocated)
((NIL) NIL))))
(setf (gethash object (load-state area)) new-state)
(loop for observer across (gethash object (observers area) #())
do (observe-load-state observer object normalized-state area))))
(defmethod register-load-observer ((area staging-area) observer changing)
(unless (find observer (gethash changing (observers area)))
(vector-push-extend observer (or (gethash changing (observers area))
(setf (gethash changing (observers area))
(make-array 0 :adjustable T :fill-pointer T))))
;; Backfill for current state if registration occurs live.
(let ((state (gethash changing (load-state area))))
(case state
((:loaded :was-loaded) (observe-load-state observer changing :loaded area))
((:allocated :was-allocated) (observe-load-state observer changing :allocated area))))))
(defmethod restage (object (area staging-area))
(setf (gethash object (load-state area)) NIL)
(stage object area))
(defmethod stage :around (object (area staging-area))
(let ((state (load-state area)))
(case (gethash object state)
(:tentative
(unless (typep object 'entity)
(error "Circular staging on ~a!" object)))
((NIL)
(setf (gethash object state) :tentative)
(when (loader area)
(let ((count (hash-table-count state)))
(progress (loader area) count (* 1000 (1+ (truncate count 1000))))))
(prog1 (call-next-method)
(when (eql :tentative (gethash object state))
(setf (gethash object state) :done)))))))
(defmethod stage :before (object (area staging-area))
(dolist (dependency (dependencies object))
(stage dependency area)))
(defmethod stage ((objects cons) (area staging-area))
(dolist (object objects)
(stage object area)))
(defmethod stage :after ((container container) (area staging-area))
(for:for ((child over container))
(stage child area)))
(defmethod stage :before ((object resource) (area staging-area))
(when (generator object)
(stage (generator object) area)))
(defmethod stage ((object resource) (area staging-area))
(cond ((allocated-p object)
(change-state area object :was-allocated))
(T
(allocate object)
(change-state area object :allocated))))
(defmethod stage ((object asset) (area staging-area))
(cond ((loaded-p object)
(change-state area object :was-loaded))
(T
(load object)
(change-state area object :loaded))))
(defmethod stage ((other staging-area) (area staging-area))
(loop for resource being the hash-keys of (load-state other) using (hash-value state)
do (setf (gethash resource (load-state area)) state)))
(defmethod unstage ((object resource) (area staging-area))
(when (allocated-p object)
(deallocate object))
(change-state area object NIL))
(defmethod unstage ((object asset) (area staging-area))
(deallocate object)
(change-state area object NIL))
(defmethod abort-commit ((area staging-area))
(loop for resource being the hash-keys of (load-state area) using (hash-value state)
do (case state
((:loaded :allocated)
(unstage resource area)))))
(defmethod finalize ((area staging-area))
(abort-commit area))
(defclass loader ()
((loaded :initform (make-hash-table :test 'eq) :reader loaded)
(current-area :initform NIL :accessor current-area)))
(defgeneric commit (staging-area loader &key unload))
(defgeneric abort-commit (loader))
(defgeneric progress (loader so-far total))
(defmethod finalize ((loader loader))
(loop for loadable being the hash-keys of (loaded loader)
do (typecase loadable
(resource
(when (allocated-p loadable)
(deallocate loadable)))
(asset
(when (loaded-p loadable)
(unload loadable))
(deallocate loadable))))
(clrhash (loaded loader)))
(defmethod progress ((loader loader) so-far total))
(defun dependency-sort-loads (sequence &key (status (make-hash-table :test 'eq)))
(let ((orig-seq (copy-seq sequence)))
(labels ((visit (object)
(case (gethash object status :invalid)
(:invalid
(setf (gethash object status) :temporary)
(dolist (dependency (dependencies object))
(when dependency
(visit dependency)))
(setf (gethash object status) :validated)
(vector-push-extend object sequence))
(:temporary
(warn "Dependency loop detected on ~a." object)))))
;; TODO: It's possible we might be able to perform tarjan in-place
;; to avoid potentially copying thousands of elements here.
(setf (fill-pointer sequence) 0)
(map NIL #'visit orig-seq)
sequence)))
(defmethod commit ((area staging-area) (loader loader) &key (unload T))
(cond ((eq area (current-area loader)))
((current-area loader)
(stage area (current-area loader)))
(T
(with-unwind-protection (setf (current-area loader) NIL)
(setf (current-area loader) area)
(with-simple-restart (abort-commit "Abort the load operation")
(with-cleanup-on-failure (abort-commit area)
(when unload
(loop for resource being the hash-keys of (loaded loader) using (hash-value status)
do (case status
((:loaded :allocated)
(unless (gethash resource (load-state area))
(typecase resource
(asset
(when (loaded-p resource)
(unload resource)))
(resource
(when (allocated-p resource)
(deallocate resource))))
(remhash resource (loaded loader))))))
(tg:gc :full #-nx T #+nx NIL))
;; Persist load state into loader
(loop for loadable being the hash-keys of (load-state area) using (hash-value status)
do (case status
((:loaded :allocated)
(setf (gethash loadable (loaded loader)) status))))
(let ((to-load (make-array 0 :adjustable T :fill-pointer T)))
(loop for resource being the hash-keys of (load-state area) using (hash-value status)
do (when (typep resource 'loadable)
(setf (gethash resource (loaded loader)) status)
(unless (loaded-p resource)
(vector-push-extend resource to-load))))
(prog1 (load-with loader (dependency-sort-loads to-load))
(remhash to-load (loaded loader))
(progress loader 100 100)))))))))
(defmethod commit (object (loader loader) &rest args)
(v:info :trial.loader "Incrementally loading ~a" object)
(if (current-area loader)
(stage object (current-area loader))
(with-timing-report (:info :trial.loader)
(let ((area (make-instance 'staging-area :loader loader)))
(with-cleanup-on-failure (abort-commit area)
(stage object area))
(apply #'commit area loader args)))))
(defmethod abort-commit ((loader loader))
(cond ((find-restart 'abort-commit)
(invoke-restart 'abort-commit))
((current-area loader)
(abort-commit (current-area loader))
(setf (current-area loader) NIL))))
(defmethod load-with ((loader loader) (resources vector))
(loop for resource across resources
for i from 0
do (load resource)
(progress loader i (length resources))))
(defmethod commit ((area staging-area) (loader (eql T)) &rest args)
(apply #'commit area (loader area) args))
(defclass streamed-loader (task-runner loader)
((context :initform NIL :accessor context)))
(defmethod finalize :after ((loader streamed-loader))
(finalize (context loader)))
(defmethod load-with ((loader streamed-loader) (resources vector))
(unless (context loader)
(setf (context loader) (create-child-context *context*)))
(with-eval-in-task-thread (:runner loader)
(with-context ((context loader) :reentrant T)
(loop for resource across resources
do (load resource)))))
(defmethod abort-commit ((loader streamed-loader))
;; TODO: implement commit abort for streamed loaders
(implement!))