-
Notifications
You must be signed in to change notification settings - Fork 0
/
DHT.pas
956 lines (896 loc) · 26.4 KB
/
DHT.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
unit DHT;
{
implementation of custom dht, based on pastry and kademlia.
keyspace is divided into buckets of limited capacity
node belongs to bucket, where at least 'depth' bits match 'prefix'
node priority: old>new>dead
TODO: weight nodes by IP-Address common prefix length.
}
{
Coral like behaviour:
- backtracking when node full
- multilevels
}
{
Adaption of Coral clusters:
The buckets will prefer two kinds of nodes:
- 5 long-living nodes as in Kademlia
- 3 closest (by ttl or rtt) nodes
Deeper buckets will naturally contain less close nodes.
}
{used by: messages, fileshare}
INTERFACE
uses Classes,sysutils,ObjectModel,HostKey,Crypto,ServerLoop,RPCsrv,opcode,inifiles,Database;
TYPE
tPID=tKey20;
tPeer_ptr=^tPeer;
tBucket_ptr=^tBucket;
tPeer=object
ID :tPID;
Addr :tNetAddr;
ReqDelta:word;
LastMsg,
LastReply,
VerifiedTill :tMtime;
function Assigned: boolean;
procedure Clear;
function FullyValid: boolean;
private
CRPending:boolean;
Challenge:tEccKey;
end;
tBucket=object
Prefix: tPID;
Depth: byte;
ModifyTime: tMTime;
peer: array [1..6] of tPeer;
next: ^tBucket;
function MatchPrefix(const tp:tPID):boolean;
function IDString:string;
private
desperate:word;
procedure Refresh;
end;
tSearchNode=record
ID :tPID;
Addr :tNetAddr;
LastReq:tMTime;
reqc:byte;{number of requests}
rplc:byte;{1=replied with cap, 2=replied with peers}
end;
tCustomSearch=class(tTask)
Target:tPID;
TrID:Word;
Query:tCustomMemoryStream;
Nodes:array [0..10] of tSearchNode;
constructor Create;
protected
procedure Cleanup; override;
function AddNode(const iID:tPID; const iAddr:tNetAddr) :integer; virtual;
procedure AddNodes(s:tStream);
procedure LoadNodes; {from dht and node cache} virtual;
procedure Step;
procedure HandleReply(var sAddr: tNetAddr; var sPID: tPID; op:byte; st:tStream); virtual;
procedure Exhausted; virtual;
private
procedure IntHandleReply(msg:tSMsg);
end;
tSearch=class(tCustomSearch)
constructor Create( const iTarget: tPID );
end;
var MyID: tPID;
procedure NodeBootstrap(const contact:tNetAddr);
function CheckNode(const id: tPID; const addr: tNetAddr; recv:boolean): boolean;
function FindBucket(const prefix:tPID):tBucket_ptr; overload;
function GetFirstBucket:tBucket_ptr;
procedure GetNodes(r:tStream; const Target: tPID; max: word);
procedure SendNodes(const rcpt: tNetAddr; const Target: tPID; TrID:word);
IMPLEMENTATION
const
crdRecvd=0; {ReqDelta set on message reception}
crdDoPingThr=1; {ping often when rd>this}
crdDontPingThr=4; {ping less often whien rd>this}
crdVerifyError=5; {set on verify error}
crdOthersCanReAddThr=6;{peer reinitialized when rd>this and suggested by other peer}
crdInvalidateThr=4; {request verify when rd>this}
crdReplacableThr=2; {new peer can replace old when rd>this}
cBanDuration=10*60*1000;
cVerifiedDuration=43*60*1000;
cStichRar=7;
cNudgeQuietThr=12*1000;
cRefreshWaitBase=18*1000;
cRefreshWaitMul=600;
cRefreshWaitOther=30*1000;
cRefreshWaitRtrDiv=3;
cRecheckThr=cNudgeQuietThr;
cNodesDat='nodes.dat';
cMaxNodesDat=12;
cInitAdd=6; {n of peers to add from dht}
cInitWait=800; {? Init to Step delay}
cAddWait=1; {new peers to Step delay}
cStepRqc=3; {max requests per Step}
cStepMinDelay=800; {min delta of requests to same peer}
cStepPeerReqc=6; {max (unsuccessful) requests to peer}
cStepRplc=6; {? something per Step}
cStepPeriod=900; {max period between Steps}
var Table:^tBucket;
{deepest first}
var log:tEventLog;
function tBucket.MatchPrefix(const tp:tPID):boolean;
begin
result:=(depth=0)or(PrefixLength(prefix,tp)>=depth);
end;
function GetFirstBucket:tBucket_ptr;
begin
result:=Table;
end;
function tPeer.Assigned: boolean;
begin result:=self.ReqDelta<255 end;
procedure tPeer.Clear;
begin
ReqDelta:=255;
Addr.Clear;
CRPending:=false;
{rest is not needed}
FillChar(ID,20,0);
LastMsg:=0;
LastReply:=0;
VerifiedTill:=0;
end;
function tPeer.FullyValid: boolean;
begin
result:=
(self.ReqDelta<crdDontPingThr)
and(self.VerifiedTill>ServerLoop.mNow)
end;
procedure VerifyInit(b:tBucket_ptr; var peer:tPeer); forward;
function tBucket.IDString:string;
var l:byte;
begin
l:=depth div 8;
if (depth mod 8)>0 then inc(l);
if l<1 then l:=1;
SetLength(result,l*2);
ToHex(@result[1],prefix,l);
result:=result+'/'+IntToStr(depth);
end;
function FindBucket(const prefix:tPID):tBucket_ptr; overload;
var cur:^tBucket;
begin
cur:=Table;
result:=nil;
while (cur<>nil) and (result=nil) do begin
if cur^.MatchPrefix(prefix) {first matching is deepest}
then result:=cur;
cur:=cur^.next;
end;
assert(assigned(result) xor (Table=nil));
end;
(*** Mighty Split and Check procedures ***)
procedure SplitBucket(ob:tBucket_ptr);
procedure Toggle(var prefix:tPID; bit:byte);
begin
prefix[bit div 8]:= prefix[bit div 8] xor ($80 shr (bit mod 8));
end;
var nb:tBucket_ptr;
var i:byte;
begin
log.info(' SplitBucket %s/%d',[string(ob^.prefix),ob^.depth]);
{find pref to old bucket, in order to unlink}
if ob=Table then table:=table^.next else begin
nb:=Table;
while assigned(nb) and (nb^.next<>ob) do nb:=nb^.next;
assert(assigned(nb),'old bucket not in table');
{unlink}
nb^.next:=nb^.next^.next; nb:=nil;
end;
{increase depth of this bucket}
Inc(ob^.depth);
ob^.ModifyTime:=mNow;
{create new bucket with toggled bit}
New(nb);
nb^:=ob^;
Toggle(nb^.Prefix,nb^.depth-1);
nb^.next:=ob;
{clear nodes that do not belong in bucket}
for i:=1 to high(tBucket.peer) do begin
if ob^.peer[i].ReqDelta>=255 then continue;
if ob^.MatchPrefix(ob^.peer[i].id)
then nb^.peer[i].Clear
else ob^.peer[i].Clear;
end;
log.debug(' -> %s/%d',[string(ob^.prefix),ob^.depth]);
for i:=1 to high(tBucket.peer) do if ob^.peer[i].ReqDelta<255
then log.debug(' -> -> %s',[string(ob^.peer[i].id)]);
log.debug(' -> %s/%d',[string(nb^.prefix),nb^.depth]);
for i:=1 to high(tBucket.peer) do if nb^.peer[i].ReqDelta<255
then log.debug(' -> -> %s',[string(nb^.peer[i].id)]);
if table=nil then table:=nb else begin
ob:=Table;
while assigned(ob^.next)and (ob^.next^.depth<=nb^.depth) do ob:=ob^.next;
ob^.next:=nb;
log.debug(' -> after /%d',[ob^.depth]);
end;
Shedule(2000,@nb^.Refresh);
end;
function CheckNode(const id: tPID; const addr: tNetAddr; recv:boolean): boolean;
{return false if node is banned}
var b:^tBucket;
var i,ifree,idup,iold:byte;
var adm,idm:boolean;
label again;
begin
Assert(not addr.isNil,'CheckNode with nil address');
CheckNode:=false;
if id=MyID then exit;
again:
b:=FindBucket(id);
if not assigned(b) then begin
New(Table); b:=Table;
b^.Prefix:=MyID;
b^.Depth:=0;
b^.ModifyTime:=mNow;
b^.next:=nil;
b^.desperate:=3;
for i:=1 to high(b^.peer) do b^.peer[i].Clear;
Shedule(2000,@b^.Refresh);
end;
{order: update, free, banned, split, bad}
ifree:=0;idup:=0;iold:=0;
for i:=1 to high(b^.peer) do begin
adm:=(b^.peer[i].Addr=addr);
if b^.peer[i].ReqDelta<255 then begin
{the peer slot is assigned}
if adm or (b^.peer[i].ID=ID) then
idup:=i
else if b^.peer[i].ReqDelta>crdReplacableThr
then iold:=i;
end else begin
if adm and ((b^.peer[i].LastReply+cBanDuration)>ServerLoop.MNow)
then exit; {reject for recent auth failure}
if ifree=0 then ifree:=i;
end;
end;
if (idup>0) and ( (recv and (b^.peer[idup].ReqDelta>crdReplacableThr))
or((not recv) and (b^.peer[idup].ReqDelta>crdOthersCanReAddThr)) )
then begin
ifree:=idup; idup:=0;
end;
if idup>0 then begin
{updating}
adm:=(b^.peer[idup].Addr=addr);
idm:=(b^.peer[idup].ID=ID);
if adm and idm then begin
CheckNode:=true;
if recv then begin
b^.peer[idup].LastMsg:=mNow;
if (b^.peer[idup].VerifiedTill > ServerLoop.MNow)
or (not PublicPoWReady) then begin
{only update by self and verified, else waiting for auth}
b^.peer[idup].LastReply:=mNow;
b^.peer[idup].ReqDelta:=0;
//log.debug('udpated',[]);
end
else VerifyInit(b,b^.peer[idup]);
b^.peer[idup].LastMsg:=mNow;
end;
end else begin
{id-addr Conflict!}
// v<- wrong condition
if (b^.peer[idup].LastMsg<=cVerifiedDuration)
or ((b^.peer[idup].LastMsg-cVerifiedDuration+cNudgeQuietThr) < ServerLoop.MNow)
then
VerifyInit(b,b^.peer[idup]);
end;
end else begin
{inserting}
if (ifree=0) and b^.MatchPrefix(MyID) then begin
SplitBucket(b);
goto again;
end;
if ifree>0 then i:=ifree
else if iold>0 then i:=iold
else exit;
{add node here}
log.info(' AddNode %s%s to %s#%d',[string(id),string(addr),b^.IDString,i]);
b^.ModifyTime:=mNow;
b^.peer[i].ID:=ID;
b^.peer[i].Addr:=Addr;
b^.peer[i].LastMsg:=mNow;
b^.peer[i].LastReply:=0;
if recv then b^.peer[i].LastReply:=mNow;
b^.peer[I].ReqDelta:=0;
b^.peer[I].VerifiedTill:=0;
b^.peer[I].CRPending:=false;
VerifyInit(b,b^.peer[i]);
CheckNode:=true;
end
end;
procedure GetNodes(r:tStream; const Target: tPID; max: word; partial:boolean);
var i,ctrl:integer;
var bucket:tBucket_ptr;
begin
ctrl:=0;
bucket:=DHT.FindBucket(Target);
while assigned(bucket) do begin
//if r.WRBufLen<36 then break;
for i:=1 to high(bucket^.peer) do if
bucket^.peer[i].assigned
and (partial or bucket^.peer[i].FullyValid) then begin
r.Write(bucket^.peer[i].Addr,18);
r.Write(bucket^.peer[i].ID,20);
end;
bucket:=bucket^.next;
if (bucket=nil) and (ctrl=0) then begin
bucket:=GetFirstBucket;
ctrl:=1;
end;
end;
end;
procedure GetNodes(r:tStream; const Target: tPID; max: word);
inline;
begin
GetNodes(r, Target, max, true);
end;
procedure SendNodes(const rcpt: tNetAddr; const Target: tPID; trid:word);
var r:tMemoryStream;
begin
r:=tMemoryStream.Create; try
r.WriteByte(opcode.dhtNodes);
r.Write(trid,2);
r.Write(dht.MyID,20);
GetNodes(r,target,(cDGramSz-23) div 36);
ServerLoop.SendMessage(r.Memory^,r.Size,rcpt);
finally r.Free end;
end;
(*** Protocol Communication ***)
procedure RecvBeatQ(msg:tSMsg);
var s:tMemoryStream absolute msg.st;
var sID:tPID;
var Target:tPID;
var mark:word;
var r:tMemoryStream;
var Hops:integer;
begin
s.skip(1);
s.RB(sID,20);
s.RB(Target,20);
s.RB(mark,2);
Hops:=s.R1-msg.TTL;
//log.debug(' BeatQ: %s Request for %s HC %d',[string(msg.source),string(Target),Hops]);
Hops:=hops;{???}
if not CheckNode(sID,msg.source,true) then exit;
r:=tMemoryStream.Create; try
r.W1(opcode.dhtBeatR);
r.WB(MyID,20);
r.WB(mark,2);
r.W1(GetSocketTTL(msg.source));
GetNodes(r,target,(cDGramSz-24) div 36,false);
// todo! if list.p^.addr=msg.source then continue;
SendMessage(r.Memory^,r.size,msg.source);
finally r.Free end;
end;
procedure SendBeat(const contact:tNetAddr; const forid: tPID; mark:word);
var r:tMemoryStream;
begin
//log.debug(' SendBeat: to %s for %s',[string(contact),string(forid)]);
r:=tMemoryStream.Create; try
r.W1(opcode.dhtBeatQ);
r.WB(MyID,sizeof(tPID));
r.WB(ForID,sizeof(tPID));
r.WB(mark,2);
r.W1(GetSocketTTL(contact));
SendMessage(r.Memory^,r.size,contact);
finally r.Free end;
end;
procedure RecvBeatR(msg:tSMsg);
var ID:tPID;
var Addr:tNetAddr;
begin
msg.st.skip(1);
msg.st.RB(ID,20);
msg.st.skip(3); {todo,ttl}
if not CheckNode(ID,msg.source,true) then exit;
//log.debug(' BeatR: %s is %s',[string(msg.source),string(ID)]);
while msg.st.Left>36 do begin
msg.st.RB(Addr,18);
msg.st.RB(ID,20);
CheckNode(ID,Addr,false);
end;
end;
{Messages: (Still valid?)
d)VfyCh: op, SendPub, PoWork, Challenge, Ver
e)VfyRe: op, SendPub, PoWork, Respoonse, Ver
1, 32, 36, 32, =101 +35
}
procedure SendCheck(var p:tPeer);
var r:tMemoryStream;
begin
r:=tMemoryStream.Create;
r.W1(opcode.dhtCheckQ);
assert(PublicPoWReady);
r.WB(HostKey.PublicKey,sizeof(HostKey.PublicKey));
r.WB(HostKey.PublicPoW,sizeof(HostKey.PublicPoW));
r.WB(p.Challenge,sizeof(tEccKey));
r.WB(VersionString[1],Length(VersionString));
log.debug(' SendCheck: to %s size=%dB',[string(p.addr),r.Size]);
p.LastMsg:=mNow;
SendMessage(r.Memory^,r.Size,p.Addr);
r.Free;
end;
procedure RecvCheckQ(msg:tSMsg);
var id:tPID;
var pub:tEccKey;
var pow:tPoWRec;
var challenge:tEccKey;
var right_resp:tEccKey;
var r:tMemoryStream;
begin
msg.st.skip(1);
msg.st.RB(pub,sizeof(tEccKey));
msg.st.RB(pow,sizeof(tPoWRec));
msg.st.RB(challenge,sizeof(tEccKey));
{Pub->ID}
SHA256_Buffer(id,20{<-},Pub,sizeof(pub));
{CheckNode}
if not CheckNode(id,msg.source,true) then exit;
if PublicPoWReady then begin
{Verify PoW}
if not HostKey.VerifyPoW(pow,pub) then begin
log.warn(' CheckQ: Invalid PoW in request from %s',[string(msg.source)]);
exit end;
{Solve C/R}
HostKey.CreateResponse(Challenge, right_resp, pub);
{reply}
r:=tMemoryStream.Create;
r.w1(opcode.dhtCheckR);
r.wb(HostKey.PublicKey,sizeof(HostKey.PublicKey));
r.wb(HostKey.PublicPoW,sizeof(HostKey.PublicPoW));
r.wb(right_resp,sizeof(right_resp));
r.wb(VersionString[1],Length(VersionString));
//writeln('DHT.CheckQ: responding to ',string(msg.source),' ',r.size,'B');
SendMessage(r.Memory^,r.Size,msg.source);
r.Free;
end else log.warn(' CheckQ: not responding to %s, PoW is not ready.',[string(msg.source)]);
end;
procedure RecvCheckR(msg:tSMsg);
var b:^tBucket;
var i:byte;
var id:tPID;
var pub:tEccKey;
var pow:tPoWRec;
var resp:tEccKey;
var right_resp:tEccKey;
begin
msg.st.skip(1);
msg.st.RB(pub,sizeof(tEccKey));
msg.st.RB(pow,sizeof(tPoWRec));
msg.st.RB(resp,sizeof(tEccKey));
{Pub->ID}
SHA256_Buffer(id,sizeof(id),Pub,sizeof(pub));
{ID->bkt:idx}
b:=FindBucket(id);
if not assigned(b) then exit;
i:=1; while b^.peer[i].ID<>ID do begin
inc(i); if i>high(b^.peer) then exit;
end;
{drop banned n unknown}
//if b^.peer[i].Banned then exit;
b^.peer[i].LastMsg:=mNow;
b^.peer[i].LastReply:=mNow;
{Verify PoW}
if not HostKey.VerifyPoW(pow,pub) then begin
b^.peer[i].ReqDelta:=255;
log.warn(' CheckR: Invalid PoW in response from %s',[string(msg.source)]);
exit end;
{Verify C/R}
HostKey.CreateResponse(b^.peer[i].Challenge, right_resp, pub);
if CompareByte(resp,right_resp,sizeof(right_resp))<>0 then begin
log.warn(' CheckR: Invalid C/R in response from %s',[string(msg.source)]);
b^.peer[i].ReqDelta:=255;
exit end;
{set node verified, rqd, last}
b^.peer[i].VerifiedTill:=ServerLoop.mNow+cVerifiedDuration;
b^.peer[i].ReqDelta:=0;
b^.peer[i].CrPending:=false;
log.info(' CheckR: Valid response from %s',[string(msg.source)]);
end;
procedure RecvGetNodes(Msg:tSMsg);
var trid:word;
var sID:tPID;
var Target:tKey20;
begin
msg.st.skip(1);
msg.st.rb(trid,2);
msg.st.RB(sID,20);
if not DHT.CheckNode(sID, msg.source, true) then exit;
msg.st.rb(Target,20);
log.debug(' RecvGetNodes from %s',[string(msg.source)]);
SendNodes(msg.source, target, trid);
end;
procedure NodeBootstrap(const contact:tNetAddr);
var fake_id:tPID;
begin
fillchar(fake_id,sizeof(fake_id),0);
fake_id[0]:= not MyID[0];
Move(contact,fake_id[1],18);
CheckNode(fake_id,contact,false);
//SendBeat(contact,MyID,0);
//SendBeat(contact,MyID,0); {xD}
end;
(*** The bucket Refresh procedure ***)
{BUG: after PoW is generated CRPending nodes are not verified}
procedure tBucket.Refresh;
var my,rtr,stich:boolean;
var i,ol:byte;
var wait:LongWord;
var debug:ansistring;
procedure lSend(i: byte);
var target: tPID;
var j,z,k:integer;
begin
{todo: use random target with prefix}
if PublicPoWReady and peer[i].CrPending then SendCheck(peer[i])
else if PublicPoWReady and (peer[i].VerifiedTill < (ServerLoop.MNow+cRecheckThr))
then VerifyInit(@self,peer[i])
else begin
target:=prefix;
for j:=0 to high(tPID) do begin
z:=depth-(j*8);
if z<0 then k:=255 else if z>7 then k:=0 else k:=255 shr z;
target[j]:=(prefix[j] and not k) or random(k+1);
end;
SendBeat(peer[i].addr,target,0);
end;
Inc(peer[i].ReqDelta);
end;
begin
debug:='';debug:=debug;
my:=MatchPrefix(MyID);
ol:=0; rtr:=false;
{1 of 10 times try to contact dead nodes in attempt to recover from network split}
debug:='DHT.Refresh('+self.IDString+')';
stich:=Random(cStichRar)=0;
for i:=1 to high(tBucket.peer)
do if peer[i].ReqDelta<255 then begin
if peer[i].ReqDelta>=crdDoPingThr then begin
if (peer[i].ReqDelta<=crdDontPingThr) xor stich then begin
{this will get rid of half-dead nodes}
//writeln(debug,' R',peer[i].ReqDelta,' ',copy(string(peer[i].id),1,6),string(peer[i].addr));
lSend(i);
rtr:=true;
end
end
else if (ol=0) or (peer[i].LastReply<peer[ol].LastReply)
then ol:=i;
end;
{now nudge the most quiet peer, but not too often}
if (ol>0) and (((mNow-peer[ol].LastMsg)>cNudgeQuietThr)
or peer[ol].CrPending) then begin
//writeln(debug,' T',mNow-peer[ol].LastReply,' ',string(peer[ol].addr));
lSend(ol);
end;
{try to recover bucket full of bad nodes}
if (ol=0){and(not rtr)} then begin
{$note bucket recovery is not implemented}
{list.Init(Prefix); TODO
list.bans:=true;
list.maxRD:=desperate; list.Next;
if assigned(list.bkt) then begin
//writeln(debug,' V ',copy(string(list.p^.id),1,6),string(list.p^.addr));
SendBeat(list.p^.addr,prefix,0);
end else} inc(desperate);
end else desperate:=3;
if my
then wait:=cRefreshWaitBase+(depth*cRefreshWaitMul)
else wait:=cRefreshWaitOther;
if rtr and(not stich) then wait:=wait div cRefreshWaitRtrDiv;
Shedule(wait,@Refresh);
end;
procedure VerifyInit(b:tBucket_ptr; var peer:tPeer);
begin
with peer do begin
if not CrPending then begin
log.debug(' Verifying node %s',[string(peer.addr)]);
VerifiedTill:=0;
CrPending:=true;
HostKey.CreateChallenge(challenge);
if PublicPoWReady then SendCheck(peer);
end end end;
(*** The Search Object ***)
constructor tCustomSearch.Create;
var i:integer;
begin
for i:=high(nodes) downto 0 do Nodes[i].Addr.Clear;
NewMsgTr(TrID, @IntHandleReply);
inherited Create;
log.debug(' Lookup@%s: Initialize',[string(@self)]);
end;
constructor tSearch.Create( const iTarget: tPID );
begin
inherited Create;
Target:=iTarget;
Query:=tMemoryStream.Create;
with Query do begin
//SetSize(43);
WriteByte(opcode.dhtGetNodes);
Write(TrID,2);
Write(DHT.MyID,20);
Write(Target,20);
end;
LoadNodes;
Step;
end;
function tCustomSearch.AddNode(const iID:tPID; const iAddr:tNetAddr) :integer;
var tpfl,idx,j:byte;
begin
assert(not iAddr.isNil,'AddNode with nil address');
if iID=MyID then exit;
idx:=0; result:=-1;
tpfl:=PrefixLength(iid,Target);
{write('dhtLookup.AddPeer@',string(@self),' tpfl=',tpfl,' addr=',string(iaddr));}
for idx:=0 to high(nodes) do begin
if nodes[idx].addr.isNil then break;
if nodes[idx].addr=iaddr then begin
result:=idx;
{//writeln(' update ',idx);}
exit end;
if PrefixLength(nodes[idx].id,Target)<tpfl then break;
end;
{//writeln(' insert ',idx);}
log.debug(' Lookup@%s: Add [%d] %s',[string(@self),idx,string(iAddr)]);
for j:=high(nodes)-1 downto idx do nodes[j+1]:=nodes[j];
nodes[idx].id:=iid; nodes[idx].addr:=iaddr;
nodes[idx].reqc:=0; nodes[idx].rplc:=0;
result:=idx;
{if assigned(OnProgress) then OnProgress(tpfl,nodes[idx]);}
end;
procedure tCustomSearch.Step;
var ix:byte;
var r:tMemoryStream;
var rqc,rpc,again:byte;
begin
{send request to at most 3 peers,
count nodes that replied
}
rqc:=0;rpc:=0; again:=0;
repeat
for ix:=0 to high(nodes) do begin
if nodes[ix].addr.isNil then break;
if (rqc>=cStepRqc)or(rpc>=cStepRplc) then break;
if nodes[ix].rplc>=2 then inc(rpc)
else if (nodes[ix].reqc<=cStepPeerReqc)
and(rqc<cStepRqc)
{and again...}
then begin
inc(rqc);
if (mNow-nodes[ix].LastReq)<cStepMinDelay then continue;
if nodes[ix].rplc=0 then begin
ServerLoop.SendMessage(Query.Memory^,Query.Size,nodes[ix].Addr);
end else begin
r:=tmemorystream.create;
//r.SetSize(43);
r.WriteByte(opcode.dhtGetNodes);
r.Write(self.TrID,2);
r.Write(DHT.MyID,20);
r.Write(Target,20);
ServerLoop.SendMessage(R.memory^,R.size,nodes[ix].Addr);
r.Free;
end;
log.debug(' Lookup@%s: [%d,%d,%d] %s',[string(@self),ix,rqc,nodes[ix].reqc,string(nodes[ix].addr)]);
inc(nodes[ix].reqc);
nodes[ix].LastReq:=MNow;
end;
end; inc(again);
until (again>1)or(rqc>=cStepRqc)or(rpc>=cStepRplc);
if rqc=0 then begin
log.debug(' Lookup@%s: Exhausted',[string(@self)]);
self.Exhausted;
end
else Shedule(cStepPeriod,@Step);
end;
procedure tCustomSearch.Exhausted;
begin
SendEvent(tevError,nil);
end;
procedure tCustomSearch.IntHandleReply(msg:tSMsg);
var sender:tPID;
var op:byte;
begin
op:=msg.st.readbyte;
msg.st.skip(2{trid});
msg.st.rb(sender,20);
if not DHT.CheckNode(sender, msg.source, true) then exit;
HandleReply(msg.source,sender,op, msg.st);
end;
procedure tCustomSearch.AddNodes(s:tStream);
var id:tPID;
var ad:tNetAddr;
var node:integer;
begin
log.debug(' dhtLookup.AddNodes stream offset %d',[s.position]);
while s.left>=36 do begin
s.rb(ad,18);
s.rb(id,20);
if DHT.CheckNode(id, ad, false) then continue; {is this needed?}
node:=self.AddNode(id, ad);
if node<>-1 then ;
end;
UnShedule(@Step);
Shedule(cAddWait,@Step);
end;
procedure tCustomSearch.HandleReply(var sAddr: tNetAddr; var sPID: tPID; op:byte; st:tStream);
var node:integer;
begin
node:=self.AddNode(sPID, sAddr);
if {node=0} sPID=Target then begin
log.debug(' Lookup@%s: Target Found',[string(@self)]);
SendEvent(tevComplete,nil);
exit end;
if OP=opcode.dhtNodes then begin
log.debug(' Lookup@%s: Nodes from %s',[string(@self),string(sAddr)]);
if node<>-1 then nodes[node].rplc:=2;
self.AddNodes(st);
end else begin
log.warn('Lookup@%s: Unknown from %s',[string(@self),string(sAddr)]);
{not expecting any values}
{warning, prehaps?}
end;
end;
procedure tCustomSearch.Cleanup;
begin
log.debug(' Lookup@%s: Cleanup',[string(@self)]);
Query.Free;
SetMsgTr(TrID, nil);
UnShedule(@Step); {this is very important}
end;
procedure tCustomSearch.LoadNodes;
var bucket:tBucket_ptr;
var i,n:integer;
var ctrl:byte;
begin
log.debug(' Lookup@%s: Load Nodes',[string(@self)]);
ctrl:=0;
bucket:=DHT.FindBucket(Target);
while assigned(bucket) do begin
for i:=1 to high(bucket^.peer) do begin
if bucket^.peer[i].Addr.isNil then break;
n:=AddNode(bucket^.peer[i].ID,bucket^.peer[i].Addr);
if n>cInitAdd then exit;
end;
bucket:=bucket^.next;
if (bucket=nil) and (ctrl=0) then begin
{not enough nodes in best bucket, add whole dht,
this is not fast operation optimize?}
bucket:=GetFirstBucket;
ctrl:=1;
end;
end;
end;
(*** Persistent data Manager ***)
type tPersist=object
statef:tCustomMemoryStream;
bootf:tStringList;
readcnt:longword;
booti:integer;
procedure OpenState;
procedure ReadState;
procedure ReadBS;
procedure SaveState;
end;
var Persist:tPersist;
procedure tPersist.OpenState;
begin
readcnt:=0;
booti:=0;
bootf:=tStringList.Create;
ServerLoop.Config.ReadSectionValues('bootstrap',bootf,[svoIncludeInvalid]);
statef:=Database.dbGet(dbMisc,cNodesDat,length(cNodesDat));
shedule(1,@ReadState);
end;
procedure tPersist.ReadState;
var addr:tNetAddr;
var id:tPID;
begin
try
if statef.Left=0 then raise eReadPastEoF.Create('eof'); //temporary fix
statef.rb(addr,sizeof(addr));
statef.rb(id,sizeof(id));
//writeln('DHT.ReadState: ',string(addr));
CheckNode(id,addr,false);
shedule(200,@ReadState);
inc(readcnt);
except on e: eReadPastEoF do begin
//writeln('DHT.Boot: Pinged ',readcnt,' nodes from '+cNodesDat);
statef.Free;
readcnt:=0;
shedule(1,@ReadBS);
end end;
end;
procedure tPersist.ReadBS;
var addr:tNetAddr;
begin
if booti>=bootf.count then begin
shedule(5000,@SaveState);
//writeln('DHT.Boot: Pinged ',readcnt,' nodes from [bootstrap]');
bootf.Free;
end else begin
addr.FromString(bootf[booti]);
inc(booti);
//writeln('DHT.ReadBoot: ',string(addr));
NodeBootstrap(addr);
inc(readcnt);
shedule(300,@ReadBS);
end;
end;
procedure tPersist.SaveState;
var bkt:^tBucket;
var p:integer;
var cntr:word;
begin
//writeln('DHT.SaveState');
statef:=tMemoryStream.create;
statef.Size:=0;
bkt:=Table;
cntr:=0;
while assigned(bkt) and (cntr<cMaxNodesDat) do begin
for p:=1 to high(bkt^.peer)
do if bkt^.peer[p].ReqDelta<=crdOthersCanReAddThr
then begin
statef.wb(bkt^.peer[p].Addr,sizeof(tNetAddr));
statef.wb(bkt^.peer[p].id,sizeof(tPID));
inc(cntr);
end;
bkt:=bkt^.next;
end;
Database.dbSet(dbMisc,cNodesDat,length(cNodesDat),statef);
statef.Free;
shedule(61273,@SaveState);
end;
(*** Remote Procedures ***)
procedure rpcPeer(con: tRPCCon);
var contact:tNetAddr;
begin
con.a.RB(contact,sizeof(tNetAddr));
DHT.NodeBootstrap(contact);
con.r.W2(cRPCSuccess);
end;
procedure rpcDump(con: tRPCCon);
var bkt: ^DHT.tBucket;
var i:integer;
begin
con.r.W2(0);
con.r.WB(DHT.MyID,20);
bkt:=DHT.GetFirstBucket;
while assigned(bkt) do with bkt^ do begin
con.r.W1(Depth);
con.r.w1(high(peer));
con.r.w4(MNow-ModifyTime);
for i:=1 to high(peer) do with peer[i] do begin
con.r.wb(ID,20);
con.r.Wb(Addr,sizeof(tNetAddr));
con.r.W2(ReqDelta);
con.r.W4(MNow-LastMsg);
con.r.W4(MNow-LastReply);
if VerifiedTill>MNow then
con.r.W4(VerifiedTill-MNow)
else con.r.W4(0);
con.r.W1( ord(CRPending) or (ord(VerifiedTill>MNow) shl 1));
end;
bkt:=next;
end;
end;
BEGIN
ServerLoop.CreateLog(log,'DHT');
assert((sizeof(tNetAddr)+sizeof(tPID))=38);
SetupOpcode(opcode.dhtBeatQ,@recvBeatQ);
SetupOpcode(opcode.dhtBeatR,@recvBeatR);
SetupOpcode(opcode.dhtCheckQ,@recvCheckQ);
SetupOpcode(opcode.dhtCheckR,@recvCheckR);
SetupOpcode(opcode.dhtGetNodes,@recvGetNodes);
SetupRemoteProc(3605,@rpcPeer);
SetupRemoteProc(4451,@rpcDump);
SHA256_Buffer(MyID,sizeof(MyID),PublicKey,sizeof(PublicKey));
log.info(' set ID to %S from HostKey',[string(MyID)]);
Persist.OpenState;
END.