-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReplica.java
417 lines (354 loc) · 10.5 KB
/
Replica.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import java.lang.*;
import java.io.*;
import java.net.*;
import java.util.*;
public class Replica
{
protected HashSet<SongEntry> database= new HashSet<SongEntry>(); //current database version
protected Set<SongEntry> stableDatabase; //back up after garbage collection
protected ArrayList<Write> writeLog=new ArrayList<Write>();
protected HashSet<Write> committedLog= new HashSet<Write>();
protected int id;
protected int acceptTime=0;
protected ArrayList<Integer> V= new ArrayList<Integer>(); // initialize this + implement removal of nodes
protected InetAddress svr; //svr who is conencted to
protected int CSN=0;
Replica(int id, String ip)
{
this.id=id; //init id
for(int i =0; i<Init.totalReplicas; i++) //init V for total nodes = 4
{
Integer number = new Integer(0);
V.add(i,number);
}
setServerConnection(ip);
System.out.println("Replica id is: " +id);
}
public void setServerConnection(String add)
{
svr =InetAddress.getByName(add);
}
public void sendWrite(Write w)
{
try{
Thread.sleep(3000); //wait for 3 seconds before sending to ensure process has completed processing and is waiting to receive
Socket skt = new Socket(svr,1237);
OutputStream os = skt.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(w);
oos.close();
os.close();
skt.close();
Thread.sleep(1000);
}
catch(Exception e)
{
}
}
public void sendVersionVector(ArrayList<Integer> V)
{
try{
Thread.sleep(3000); //wait for 3 seconds before sending to ensure process has completed processing and is waiting to receive
Socket skt = new Socket(svr,1230);
OutputStream os = skt.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(V);
oos.close();
os.close();
skt.close();
Thread.sleep(1000);
}
catch(Exception e)
{
}
}
public void sendCSN(int CSN)
{
try{
Thread.sleep(3000); //wait for 3 seconds before sending to ensure process has completed processing and is waiting to receive
Socket skt = new Socket(svr,1236);
OutputStream os = skt.getOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(os);
oos.writeObject(CSN);
oos.close();
os.close();
skt.close();
Thread.sleep(1000);
}
catch(Exception e)
{
}
}
public Write receiveWriteClient() throws IOException
{
ServerSocket ss= new ServerSocket(1234);
ss.setReuseAddress(true);
ss.setSoTimeout(4000);
//n++;
try
{
Socket s = ss.accept();
s.setReuseAddress(true);
s.setSoTimeout(4000);
//System.out.println(s.getSoTimeout() );
//System.out.println(s.getReuseAddress() );
InputStream is = s.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
Write w=null;
w=(Write)ois.readObject();
is.close();
s.close();
ss.close();
return w;
}
catch(Exception e)
{
System.out.println(e.getLocalizedMessage());
return null;
}
}
public Write receiveWriteReplica() throws IOException
{
ServerSocket ss= new ServerSocket(1237);
ss.setReuseAddress(true);
ss.setSoTimeout(4000);
//n++;
try
{
Socket s = ss.accept();
s.setReuseAddress(true);
s.setSoTimeout(4000);
//System.out.println(s.getSoTimeout() );
//System.out.println(s.getReuseAddress() );
InputStream is = s.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
Write w=null;
w=(Write)ois.readObject();
is.close();
s.close();
ss.close();
return w;
}
catch(Exception e)
{
System.out.println(e.getLocalizedMessage());
return null;
}
}
public ArrayList<Integer> receiveVersionVector() throws IOException
{
ServerSocket ss= new ServerSocket(1230);
//System.out.println(n);
ss.setReuseAddress(true);
ss.setSoTimeout(4000);
//n++;
try
{
Socket s = ss.accept();
s.setReuseAddress(true);
s.setSoTimeout(4000);
//System.out.println(s.getSoTimeout() );
//System.out.println(s.getReuseAddress() );
InputStream is = s.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
ArrayList<Integer> V=null;
V=(ArrayList<Integer>)ois.readObject();
is.close();
s.close();
ss.close();
return V;
}
catch(Exception e)
{
System.out.println(e.getLocalizedMessage());
return null;
}
}
public int receiveCSN() throws IOException
{
ServerSocket ss= new ServerSocket(1236);
ss.setReuseAddress(true);
ss.setSoTimeout(4000);
//n++;
try
{
Socket s = ss.accept();
s.setReuseAddress(true);
s.setSoTimeout(4000);
//System.out.println(s.getSoTimeout() );
//System.out.println(s.getReuseAddress() );
InputStream is = s.getInputStream();
ObjectInputStream ois = new ObjectInputStream(is);
int w=0;
w=((Integer)ois.readObject()).intValue();
is.close();
s.close();
ss.close();
return w;
}
catch(Exception e)
{
System.out.println(e.getLocalizedMessage());
return 0;
}
}
public void receiveWriteFromClient()
{
Write w;
do{
try{
w=receiveWriteClient(); //new write that was received
w.setReplicaId(id);
System.out.println("Id Assigned to write is: "+w.getReplicaId());
w.setAcceptTime(acceptTime);
writeLog.add(w);
System.out.println("Song received from client: "+(w.getSongEntry()).getSongName());
Integer i = V.get(id);
i=new Integer(i.intValue() + 1); //increment version id of current replica
V.set(id,i);
}
catch (Exception e)
{
//System.out.println(e.getLocalizedMessage());
break;
}
}while(w!=null);
}
/* public void selectToSend() throws IOException,InterruptedException
{
ArrayList<Integer> V = receiveVersionVector(); //Version vector received from replica server
System.out.println(V);
Thread.sleep(1000);
for(int i=0;i<writeLog.size();i++)
{
Write w = writeLog.get(i);
System.out.println("Song that is being sent: "+(w.getSongEntry()).getSongName());
if(V.get(w.getReplicaId())<=w.getAcceptTime()) //possible error comparing 2 Integers
{
System.out.println("sent");
sendWrite(w); //send new write to the server
}
}
}*/
public void selectToSend() throws IOException,InterruptedException
{
System.out.println("Waiting on V");
ArrayList<Integer> V=receiveVersionVector(); //Version vector received from replica server
System.out.println("V rec'd" +V);
int RCSN = receiveCSN(); //highest CSN from the replica server
//Thread.sleep(1000); //wait for Other server to start receiving
//-----------------------//
//SEND COMMITTED WRITES
if(RCSN<CSN)
{
for(Write w: committedLog)
{
if(w.getCSN()>RCSN)
{
sendWrite(w);
}
}
//Send over all the committed writes after RCSN
}
//-------------------------//
//SEND TENTATIVE WRITES
for(int i=0;i<writeLog.size();i++)
{
Write w = writeLog.get(i);
System.out.println("Song that is being sent: "+(w.getSongEntry()).getSongName());
if(V.get(w.getReplicaId())<=w.getAcceptTime()) //possible error comparing 2 Integers
{
sendWrite(w); //send new write to the server
}
}
Thread.sleep(3000); //so that selectToSend and replica receive end at the same tiem
}
public void receiveWriteFromReplica() throws IOException,InterruptedException
{ System.out.println("Send V done" + V);
sendVersionVector(V); //sends V to replica server
sendCSN(CSN); //sends CSN to replica server
System.out.println("Waiting to receive write from Replica");
Write w;
do
{
try{
w=receiveWriteReplica(); //new write received from Replica
System.out.println("Received write with Song Name: "+w.getSongEntry().getSongName());
if(w.getCSN()!=-1) // SUPPORT COMMITTED WRITES
{System.out.println("being committed");
for(Write existingWrite : writeLog)
{
if((existingWrite.getReplicaId()==w.getReplicaId())&&(existingWrite.getAcceptTime()==w.getAcceptTime())) //check if write exists in write log (if it does, basically doing committNotificaiton())
{
writeLog.remove(existingWrite);
committedLog.add(w); //remove write from writeLog and add to committedLog
CSN++; //update CSN number - Assume they come in order
}
}
committedLog.add(w); //if committed write doesnt exist in writeLog, add to committedLog
}
writeLog.add(w);
Integer i = V.get(w.getReplicaId());
i=new Integer(i.intValue() + 1);
V.set(w.getReplicaId(),i); //update version vector value of write
}
catch (Exception e)
{
System.out.println(e.getLocalizedMessage());
Thread.sleep(1000);
break;
}
}while(w!=null);
//check for CSN
}
public void mergeWithDatabase()
{
for(Write w: committedLog)
{
if(w.getOp().equals("add"))
{
database.add(w.getSongEntry());
}
if(w.getOp().equals("delete"))
{
database.remove(w.getSongEntry());
//increment CSN, ADD CSN TO WRITE IN WRITE LOG
}
if(w.getOp().equals("modify"))
{
for(SongEntry s : database)
{
if(s.getSongName().equals(w.getSongEntry().getSongName()))
{
database.remove(w.getSongEntry());
}
}
database.add(w.getSongEntry());
}
}
}
public void displayDatabase()
{
System.out.println("Songs in database of Replica "+id+":");
for(SongEntry s: database)
{
System.out.println(s.getSongName());
}
}
public void displayWriteLog()
{
System.out.println("Songs in Write Log of Replica "+id+":");
for(Write w: writeLog)
{
System.out.println("Song Name: "+ w.getSongEntry().getSongName()+ " Operation: "+w.getOp());
}
}
public void displayCommittedLog()
{
System.out.println("Songs in Committed Log of Replica "+id+":");
for(Write w: committedLog)
{
System.out.println("Song Name: "+ w.getSongEntry().getSongName()+ " Operation: "+w.getOp());
}
}
}