-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathl-rpc.go
162 lines (142 loc) · 3.52 KB
/
l-rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main
//
// toy RPC library
//
import "io"
import "fmt"
import "sync"
import "encoding/binary"
type ToyClient struct {
mu sync.Mutex
conn io.ReadWriteCloser // connection to server
xid int64 // next unique request #
pending map[int64]chan int32 // waiting calls [xid]
}
func MakeToyClient(conn io.ReadWriteCloser) *ToyClient {
tc := &ToyClient{}
tc.conn = conn
tc.pending = map[int64]chan int32{}
tc.xid = 1
go tc.Listener()
return tc
}
func (tc *ToyClient) WriteRequest(xid int64, procNum int32, arg int32) {
binary.Write(tc.conn, binary.LittleEndian, xid)
binary.Write(tc.conn, binary.LittleEndian, procNum)
if err := binary.Write(tc.conn, binary.LittleEndian, arg); err != nil {
fmt.Printf("xx %v\n", err)
}
}
func (tc *ToyClient) ReadReply() (int64, int32) {
var xid int64
var arg int32
binary.Read(tc.conn, binary.LittleEndian, &xid)
binary.Read(tc.conn, binary.LittleEndian, &arg)
return xid, arg
}
//
// client application uses Call() to make an RPC.
// client := MakeClient(server)
// reply := client.Call(procNum, arg)
//
func (tc *ToyClient) Call(procNum int32, arg int32) int32 {
done := make(chan int32) // for tc.Listener()
tc.mu.Lock()
xid := tc.xid // allocate a unique xid
tc.xid++
tc.pending[xid] = done // for tc.Listener()
tc.WriteRequest(xid, procNum, arg) // send to server
tc.mu.Unlock()
reply := <- done // wait for reply via tc.Listener()
tc.mu.Lock()
delete(tc.pending, xid)
tc.mu.Unlock()
return reply
}
//
// listen for client requests, call the handler,
// send back replies. runs as a background thread.
//
func (tc *ToyClient) Listener() {
for {
xid, reply := tc.ReadReply()
tc.mu.Lock()
ch, ok := tc.pending[xid]
tc.mu.Unlock()
if ok {
ch <- reply
}
}
}
type ToyServer struct {
mu sync.Mutex
conn io.ReadWriteCloser // connection from client
handlers map[int32]func(int32)int32 // procedures
}
func MakeToyServer(conn io.ReadWriteCloser) *ToyServer {
ts := &ToyServer{}
ts.conn = conn
ts.handlers = map[int32](func(int32)int32){}
go ts.Dispatcher()
return ts
}
func (ts *ToyServer) WriteReply(xid int64, arg int32) {
binary.Write(ts.conn, binary.LittleEndian, xid)
binary.Write(ts.conn, binary.LittleEndian, arg)
}
func (ts *ToyServer) ReadRequest() (int64, int32, int32) {
var xid int64
var procNum int32
var arg int32
binary.Read(ts.conn, binary.LittleEndian, &xid)
binary.Read(ts.conn, binary.LittleEndian, &procNum)
binary.Read(ts.conn, binary.LittleEndian, &arg)
return xid, procNum, arg
}
//
// listen for client requests,
// dispatch them to the right handler,
// send back replies.
//
func (ts *ToyServer) Dispatcher() {
for {
xid, procNum, arg := ts.ReadRequest()
ts.mu.Lock()
fn, ok := ts.handlers[procNum]
ts.mu.Unlock()
go func() {
var reply int32
if ok {
reply = fn(arg)
}
ts.mu.Lock()
ts.WriteReply(xid, reply)
ts.mu.Unlock()
}()
}
}
type Pair struct {
r *io.PipeReader
w *io.PipeWriter
}
func (p Pair) Read(data []byte) (int, error) {
return p.r.Read(data)
}
func (p Pair) Write(data []byte) (int, error) {
return p.w.Write(data)
}
func (p Pair) Close() error {
p.r.Close()
return p.w.Close()
}
func main() {
r1, w1 := io.Pipe()
r2, w2 := io.Pipe()
cp := Pair{r : r1, w : w2}
sp := Pair{r : r2, w : w1}
tc := MakeToyClient(cp)
ts := MakeToyServer(sp)
ts.handlers[22] = func(a int32) int32 { return a+1 }
reply := tc.Call(22, 100)
fmt.Printf("Call(22, 100) -> %v\n", reply)
}