-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathCollection.txt
106 lines (87 loc) · 2.67 KB
/
Collection.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
// This is a original collection file with multiple observers supports for single UserID
// Will add multiple listeners based on demand
package com.rocketchat.common.data.lightstream.collection;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Created by sachin on 11/8/17.
*/
public class Collection<T, K> {
ConcurrentHashMap<T, K> documents;
//List of observers
ConcurrentHashMap<T, ConcurrentLinkedQueue<Observer<K>>> observers;
public Collection() {
documents = new ConcurrentHashMap<>();
observers = new ConcurrentHashMap<>();
}
public void add(T key, K value) {
documents.put(key, value);
publish(Type.ADDED, key, value);
}
public K get(T key) {
return documents.get(key);
}
public void update(T key, K newValue) {
publish(Type.CHANGED, key, newValue);
}
public K remove(T key) {
K value = documents.remove(key);
publish(Type.REMOVED, key, value);
return value;
}
public List<K> getData() {
ArrayList<K> list = new ArrayList();
Set<Map.Entry<T, K>> set = documents.entrySet();
for (Map.Entry entry : set) {
list.add((K) entry.getValue());
}
return list;
}
public void removeAll() {
documents.clear();
}
public enum Type {
ADDED,
CHANGED,
REMOVED
}
public void register(T key, Observer<K> o) {
if (observers.containsKey(key)) {
ConcurrentLinkedQueue<Observer<K>> queue = observers.get(key);
if (!queue.contains(o)) {
queue.add(o);
}
} else {
ConcurrentLinkedQueue<Observer<K>> queue = new ConcurrentLinkedQueue<>();
queue.add(o);
observers.put(key, queue);
}
}
public void unRegister(T key, Observer<K> o) {
if (observers.containsKey(key)) {
ConcurrentLinkedQueue<Observer<K>> queue = observers.get(key);
queue.remove(o);
}
}
public void unRegister(T key) {
observers.remove(key);
}
private void publish(Type type, T key, K document) {
if (observers.containsKey(key)) {
ConcurrentLinkedQueue<Observer<K>> queue = observers.get(key);
for (Observer<K> observer : queue) {
observer.onUpdate(type, document);
}
}
}
public void unRegisterAll() {
observers.clear();
}
public interface Observer<K> {
void onUpdate(Type type, K document);
}
}