-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathentanglement.go
304 lines (247 loc) · 8.95 KB
/
entanglement.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
package qpool
import (
"sync"
"time"
)
/*
Entanglement wraps a selection of jobs into a shared space.
Meant for jobs that each describe part of a larger task.
When one job in the entanglement changes state, it affects all others.
Inspired by quantum entanglement, this type provides a way to create groups of jobs
that share state and react to changes in that state simultaneously. Just as quantum
particles can be entangled such that the state of one instantly affects the other,
jobs in an Entanglement share a common state that, when changed, affects all jobs
in the group.
The shared state in an Entanglement is immutable and persistent. State changes are
recorded in a ledger and replayed for any job that joins or starts processing later,
ensuring that the quantum-like property of entanglement is maintained across time.
This means that even if jobs process at different times, they all see the complete
history of state changes, preserving the causal relationship between entangled jobs.
Use cases include:
- Distributed data processing where multiple jobs need to share intermediate results
- Coordinated tasks where jobs need to react to each other's progress
- State synchronization across a set of related operations
- Fan-out/fan-in patterns where multiple jobs contribute to a shared outcome
*/
type Entanglement struct {
ID string
Jobs []Job
SharedState map[string]any
CreatedAt time.Time
LastModified time.Time
mu sync.RWMutex
Dependencies []string
TTL time.Duration
OnStateChange func(oldState, newState map[string]any)
// StateChangeLedger maintains an ordered history of all state changes
// This ensures that even jobs that start processing later will see
// the complete history of state changes in the correct order
stateLedger []StateChange
quantumStates map[string]*State
}
func (e *Entanglement) UpdateQuantumState(key string, state *State) {
e.mu.Lock()
defer e.mu.Unlock()
e.quantumStates[key] = state
e.recordStateChangeQuantum(key, state)
}
func (e *Entanglement) recordStateChangeQuantum(key string, state *State) {
change := StateChange{
Timestamp: time.Now(),
Key: key,
Value: state,
Sequence: uint64(len(e.stateLedger)),
}
e.stateLedger = append(e.stateLedger, change)
}
/*
StateChange represents an immutable record of a change to the shared state.
Each change is timestamped and contains both the key and value that was changed,
allowing for precise replay of state evolution.
*/
type StateChange struct {
Timestamp time.Time
Key string
Value any
Sequence uint64 // Monotonically increasing sequence number
}
/*
NewEntanglement creates a new entanglement of jobs with the specified ID and TTL.
The entanglement acts as a quantum-inspired container that maintains shared state
across multiple jobs. Like quantum entangled particles that remain connected regardless
of distance, jobs in the entanglement remain connected through their shared state.
The state history is preserved and replayed for any job that starts processing later,
ensuring that the quantum-like property of entanglement is maintained across time.
Parameters:
- id: A unique identifier for the entanglement
- jobs: Initial set of jobs to be entangled
- ttl: Time-to-live duration after which the entanglement expires
Example:
jobs := []Job{job1, job2, job3}
entanglement := NewEntanglement("data-processing", jobs, 1*time.Hour)
*/
func NewEntanglement(id string, jobs []Job, ttl time.Duration) *Entanglement {
return &Entanglement{
ID: id,
Jobs: jobs,
SharedState: make(map[string]any),
CreatedAt: time.Now(),
LastModified: time.Now(),
TTL: ttl,
stateLedger: make([]StateChange, 0),
}
}
/*
UpdateState updates the shared state and notifies all entangled jobs of the change.
Similar to how measuring one quantum particle instantly affects its entangled partner,
updating state through this method instantly affects all jobs in the entanglement.
The state change is recorded in an immutable ledger, ensuring that even jobs that
haven't started processing yet will see this change when they begin.
The OnStateChange callback (if set) is triggered with both the old and new state,
allowing jobs to react to the change. For jobs that start later, these changes
are replayed in order during their initialization.
Parameters:
- key: The state key to update
- value: The new value for the state key
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
*/
func (e *Entanglement) UpdateState(key string, value any) {
e.mu.Lock()
defer e.mu.Unlock()
oldState := make(map[string]any)
for k, v := range e.SharedState {
oldState[k] = v
}
// Record the state change in the ledger
change := StateChange{
Timestamp: time.Now(),
Key: key,
Value: value,
Sequence: uint64(len(e.stateLedger)),
}
e.stateLedger = append(e.stateLedger, change)
// Update the current state
e.SharedState[key] = value
e.LastModified = change.Timestamp
if e.OnStateChange != nil {
e.OnStateChange(oldState, e.SharedState)
}
}
/*
GetStateHistory returns all state changes that have occurred since a given sequence number.
This allows jobs that start processing later to catch up on all state changes they missed.
Parameters:
- sinceSequence: The sequence number to start from (0 for all history)
Returns:
- []StateChange: Ordered list of state changes since the specified sequence
*/
func (e *Entanglement) GetStateHistory(sinceSequence uint64) []StateChange {
e.mu.RLock()
defer e.mu.RUnlock()
if sinceSequence >= uint64(len(e.stateLedger)) {
return []StateChange{}
}
return e.stateLedger[sinceSequence:]
}
/*
ReplayStateChanges applies all historical state changes to a newly starting job.
This ensures that jobs starting later still see the complete history of state
changes in the correct order.
Parameters:
- job: The job to replay state changes for
*/
func (e *Entanglement) ReplayStateChanges(job Job) {
e.mu.RLock()
defer e.mu.RUnlock()
history := e.GetStateHistory(0)
currentState := make(map[string]any)
for _, change := range history {
oldState := make(map[string]any)
for k, v := range currentState {
oldState[k] = v
}
// Create a new map for the new state
newState := make(map[string]any)
for k, v := range currentState {
newState[k] = v
}
newState[change.Key] = change.Value
currentState = newState
if e.OnStateChange != nil {
e.OnStateChange(oldState, newState)
}
}
}
/*
GetState retrieves a value from the shared state.
This method provides a way to observe the current state of the entanglement.
Like quantum measurement, it provides a snapshot of the current state at the
time of observation.
Parameters:
- key: The state key to retrieve
Returns:
- value: The value associated with the key
- exists: Boolean indicating whether the key exists in the state
Thread-safe: This method uses a read lock to ensure safe concurrent access.
*/
func (e *Entanglement) GetState(key string) (any, bool) {
e.mu.RLock()
defer e.mu.RUnlock()
value, exists := e.SharedState[key]
return value, exists
}
/*
AddJob adds a job to the entanglement.
This method expands the entanglement to include a new job, similar to how
quantum systems can be expanded to include more entangled particles. The newly
added job becomes part of the shared state system and will be affected by
state changes.
Parameters:
- job: The job to add to the entanglement
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
*/
func (e *Entanglement) AddJob(job Job) {
e.mu.Lock()
defer e.mu.Unlock()
e.Jobs = append(e.Jobs, job)
e.LastModified = time.Now()
}
/*
RemoveJob removes a job from the entanglement.
This method removes a job from the entanglement, effectively "disentangling" it
from the shared state system. The removed job will no longer be affected by or
contribute to state changes in the entanglement.
Parameters:
- jobID: The ID of the job to remove
Returns:
- bool: True if the job was found and removed, false otherwise
Thread-safe: This method uses mutual exclusion to ensure safe concurrent access.
*/
func (e *Entanglement) RemoveJob(jobID string) bool {
e.mu.Lock()
defer e.mu.Unlock()
for i, job := range e.Jobs {
if job.ID == jobID {
e.Jobs = append(e.Jobs[:i], e.Jobs[i+1:]...)
e.LastModified = time.Now()
return true
}
}
return false
}
/*
IsExpired checks if the entanglement has exceeded its TTL.
This method determines if the entanglement should be considered expired based on
its Time-To-Live (TTL) duration and the time since its last modification. An
expired entanglement might be cleaned up by the system, similar to how quantum
entanglement can be lost due to decoherence.
Returns:
- bool: True if the entanglement has expired, false otherwise
Note: A TTL of 0 or less means the entanglement never expires.
*/
func (e *Entanglement) IsExpired() bool {
if e.TTL <= 0 {
return false
}
return time.Since(e.LastModified) > e.TTL
}