Actor是计算机科学领域中的一个并行计算模型,它把actors当做通用的并行计算原语:一个actor对接收到的消息做出响应,进行本地决策,可以创建更多的actor,或者发送更多的消息;同时准备接收下一条消息。
在Actor理论中,一切都被认为是actor,这和面向对象语言里一切都被看成对象很类似。但包括面向对象语言在内的软件通常是顺序执行的,而Actor模型本质上则是并发的。
每个Actor都有一个(只有一个)Mailbox。Mailbox相当于是一个小型的队列,一旦Sender发送消息,就是将该消息入队到Mailbox中。入队的顺序按照消息发送的时间顺序。Mailbox有多种实现,默认为FIFO。但也可以根据优先级考虑出队顺序,实现算法则不相同。
-
actor的mailbox容量是无限的,不会造成写入时的阻塞
-
每个actor中所有消息共用一个mailbox(channel)。
-
actor并不关心消息的发送方(writer),可以对各模块间的逻辑进行解耦合。
-
actor可以部署在不同节点上。
- 因为Actor被设计为异步模型,同步调用的性能不高。
Props为声明如何创建Actors提供了基础,下面的例子通过定义如何处理消息的函数声明定义了Actor Props:
var props Props = actor.FromFunc(func(c Context) {
// process messages
})
另外,可以创建一个结构体,通过定义一个Receive方法,实现了Actor的接口:
type MyActor struct {}
func (a *MyActor) Receive(c Context) {
// process messages
}
var props Props = actor.FromProducer(func() Actor { return &MyActor{} })
Spawn和SpawnNamed使用给定的props去创建Actor的运行实例。一旦启动Actor就开始准备处理发来的消息。用系统给定的唯一名称来启动actor,使用:
pid := actor.Spawn(props)
结果返回唯一的PID。自己命名PID请使用 SpawnNamed 来启动Actor。
每次一个actor启动时,一个新的邮箱会被创建并关联PID。消息会发送到该邮箱然后actor来处理这些消息。
Actor通过Receive方法来处理消息,此函数定义为:
Receive(c actor.Context)
系统会保证该方法被同步调用,因此无需做另外的保护措施。
PID是向actors发送消息的主要接口,PID.Tell方法用于向该PID异步的发送消息:
pid.Tell("Hello World")
根据不同的业务需求,actors之间的通讯可以异步或者同步进行,不论任何时候,actors总是通过PID来进行通讯。
当使用PID.Request或者PID.RequestFuture来进行消息发送时,接受消息的actor将会通过Context.Sender方法来回应发送者,该方法返回发送者的PID。
同步通讯方面,actor使用Future来实现,actor再继续下一步之前会等待结果获取。
向actor发送消息并等待结果获取,请使用RequestFuture方法,该方法会返回一个Future:
f := actor.RequestFuture(pid,"Hello", 50 * time.Millisecond)
res, err := f.Result() // waits for pid to reply */
protoactor-go目前可以每秒在两个actor之间传递200万条消息,并且能够保证消息的顺序。
/app/go/bin/go build -o "/tmp/Build performanceTest.go and rungo"
/app/gopath/src/github.com/ontio/ontology-eventbus/example/performanceTest.go
start at time: 1516953710985385134
end at time 1516953716291953904
run time:10000000 elapsed time:5306 ms
protoactor-go目前可以在串行同步调用的情况下每秒在client和server间传递超过50万条消息!
goos: linux
goarch: amd64
pkg: github.com/ontio/ontology-eventbus/example/benchmark
benchmark iter time/iter bytes alloc allocs
--------- ---- --------- ----------- ------
BenchmarkSyncTest-4 1000000 1967 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1987 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1952 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1975 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
BenchmarkSyncTest-4 1000000 1987 ns/op 432 B/op 13 allocs/op
testing: BenchmarkSyncTest-4 left GOMAXPROCS set to 1
PASS
ok github.com/ontio/ontology-eventbus/example/benchmarks 10.984s
type Hello struct{ Who string }
type HelloActor struct{}
func (state *HelloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case Hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
props := actor.FromProducer(func() actor.Actor { return &HelloActor{} })
pid := actor.Spawn(props)
pid.Tell(Hello{Who: "Roger"})
console.ReadLine()
}
本例主要描述两个actor之间如何进行异步通讯,主要定义actor接收到信息之后的行为(Receive),包括处理方式和处理后的消息发送给哪个actor,异步通讯保证了actor的利用率。
type ping struct{ val int }
type pingActor struct{}
func (state *pingActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Started, initialize actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case *ping:
val := msg.val
if val < 10000000 {
context.Sender().Request(&ping{val: val + 1}, context.Self())
} else {
end := time.Now().UnixNano()
fmt.Printf("%s end %d\n", context.Self().Id, end)
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
props := actor.FromProducer(func() actor.Actor { return &pingActor{} })
actora := actor.Spawn(props)
actorb := actor.Spawn(props)
fmt.Printf("begin time %d\n", time.Now().UnixNano())
actora.Request(&ping{val: 1}, actorb)
time.Sleep(10 * time.Second)
actora.Stop()
actorb.Stop()
console.ReadLine()
}
本例主要描述如何与actor(server)进行同步通讯,客户端将需求消息发送给actor,并等待actor的返回结果,该需求可能需要多个actor共同协作完成,多个actor之间采用上面例子中的异步通讯来进行处理,最后处理结果返回给client。
type Request struct {
Who string
}
type Response struct {
Welcome string
}
type Server struct {}
func (server *Server) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *actor.Started:
fmt.Println("Started, initialize server actor here")
case *actor.Stopping:
fmt.Println("Stopping, actor is about shut down")
case *actor.Restarting:
fmt.Println("Restarting, actor is about restart")
case *message.Request:
fmt.Println("Receive message", msg.Who)
context.Sender().Request(&message.Response{Welcome: "Welcome!"}, context.Self())
}
}
func (server *Server) Start() *actor.PID{
props := actor.FromProducer(func() actor.Actor { return &Server{} })
pid := actor.Spawn(props)
return pid
}
func (server *Server) Stop(pid *actor.PID) {
pid.Stop()
}
type Client struct {}
//Call the server synchronously
func (client *Client) SyncCall(serverPID *actor.PID) (interface{}, error) {
future := serverPID.RequestFuture(&message.Request{Who: "Ontology"}, 10*time.Second)
result, err := future.Result()
return result, err
}
func main() {
server := &server.Server{}
client := &client.Client{}
serverPID := server.Start()
result, err := client.SyncCall(serverPID)
if err != nil {
fmt.Println("ERROR:", err)
}
fmt.Println(result)
}
Actor可以通过EventHub 进行广播和订阅操作,支持ALL,ROUNDROBIN,RANDOM的广播模式
package main
import (
"github.com/ontio/ontology-eventbus/eventhub"
"fmt"
"github.com/ontio/ontology-eventbus/actor"
"time"
)
type PubMessage struct{
message string
}
type ResponseMessage struct{
message string
}
func main() {
eh:= eventhub.GlobalEventHub
subprops := actor.FromFunc(func(context actor.Context) {
switch msg := context.Message().(type) {
case PubMessage:
fmt.Println(context.Self().Id + " get message "+msg.message)
context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())
default:
}
})
pubprops := actor.FromFunc(func(context actor.Context) {
switch msg := context.Message().(type) {
case ResponseMessage:
fmt.Println(context.Self().Id + " get message "+msg.message)
//context.Sender().Request(ResponseMessage{"response message from "+context.Self().Id },context.Self())
default:
//fmt.Println("unknown message type")
}
})
publisher, _ := actor.SpawnNamed(pubprops, "publisher")
sub1, _ := actor.SpawnNamed(subprops, "sub1")
sub2, _ := actor.SpawnNamed(subprops, "sub2")
sub3, _ := actor.SpawnNamed(subprops, "sub3")
topic:= "TEST"
eh.Subscribe(topic,sub1)
eh.Subscribe(topic,sub2)
eh.Subscribe(topic,sub3)
event := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyAll}
eh.Publish(event)
time.Sleep(2*time.Second)
fmt.Println("before unsubscribe sleeping...")
eh.Unsubscribe(topic,sub2)
eh.Publish(event)
time.Sleep(2*time.Second)
fmt.Println("random event...")
randomevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRandom}
for i:=0 ;i<10;i++{
eh.Publish(randomevent)
}
time.Sleep(2*time.Second)
fmt.Println("roundrobin event...")
roundevent := eventhub.Event{Publisher:publisher,Message:PubMessage{"hello fellows"},Topic:topic,Policy:eventhub.PublishPolicyRoundRobin}
for i:=0 ;i<10;i++{
eh.Publish(roundevent)
}
time.Sleep(2*time.Second)
}
本项目采用两种方式实现跨节点通讯,分别是grpc和zeromq,对应于项目中的remote和zmqremote包,使用过程中请根据需求选择所需导入的包(接口是一样的)。
使用zero mq 需要安装libzmq [https://github.com/zeromq/libzmq]
模式 | 256B大小消息 | 512B大小消息 | 1k大小消息 | 10k大小消息 | 100k大小消息 | 1M大小消息 | 4M大小消息 | 8M大小消息 |
---|---|---|---|---|---|---|---|---|
grpc | 120000/s | 100000/s | 85000/s | 40000/s | 4600/s | 490/s | 123/s | 超出grpc默认4m限制 |
zeromq | 170000/s | 140000/s | 10000/s | 45000/s | 4900/s | 500/s | 123/s | 62/s |
该模块默认采用protobuf进行序列化和反序列化,使用过程中需要定义一系列的protobuf消息结构,为了和Ontology项目中采用的序列化方式集成,减轻系统改造工作量,目前也支持个性化的序列化和反序列化方式:
目前采用的方式是定义了一个通用的系统消息,消息结构为:消息类型 + 消息内容(序列化之后的数据),针对Ontology的常用结构体,目前枚举了六种常用的消息类型(address, block, header, Transaction, TxAttribute, VMCode),如下所示:
enum MsgType {
ADDRESS_MSG_TYPE = 0;
BLOCK_MSG_TYPE = 1;
HEADER_MSG_TYPE = 2;
TX_MSG_TYPE = 3;
TX_ATT_MSG_TYPE = 4;
VM_CODE_MSG_TYPE = 5;
}
message MsgData {
MsgType msgType = 1;
bytes data = 2;
}
使用时现将所需传输的数据采用自定义的方式序列化,然后构造MsgData{msgType:xx, data:xx}, msgType按照上述枚举定义,data即为自定义序列化之后的数据。接收到的消息同样如此,收到消息之后按照msgType来执行相应的反序列化方法去反序列化data。简单的案例如下:
server.go
func main() {
log.Debug("test")
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
zmqremote.Start("127.0.0.1:8080")
props := actor.
FromFunc(
func(context actor.Context) {
switch context.Message().(type) {
case *zmqremote.MsgData:
switch MsgData.MsgType:
case 0: //反序列化MsgData.Data
case 1: //反序列化MsgData.Data
case 2: //反序列化MsgData.Data
case 3: //反序列化MsgData.Data
case 4: //反序列化MsgData.Data
case 5: //反序列化MsgData.Data
context.Sender().Tell(&zmqremote.MsgData{MsgType: 1, Data: []byte("123")})
}
}).
WithMailbox(mailbox.Bounded(1000000))
pid, _ := actor.SpawnNamed(props, "remote")
fmt.Println(pid)
for {
time.Sleep(1 * time.Second)
}
}
client.go
func main() {
log.Debug("test")
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
var wg sync.WaitGroup
messageCount := 500
zmqremote.Start("127.0.0.1:8081")
props := actor.
FromProducer(newLocalActor(&wg, messageCount)).
WithMailbox(mailbox.Bounded(1000000))
pid := actor.Spawn(props)
fmt.Println(pid)
remotePid := actor.NewPID("127.0.0.1:8080", "remote")
wg.Add(1)
start := time.Now()
fmt.Println("Starting to send")
message := &zmqremote.MsgData{MsgType: 1, Data: []byte("123")}
for i := 0; i < messageCount; i++ {
remotePid.Request(message, pid)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Elapsed %s", elapsed)
x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
fmt.Printf("Msg per sec %v", x)
}
node2/main.go
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
remote.Start("127.0.0.1:8080")
var sender *actor.PID
props := actor.
FromFunc(
func(context actor.Context) {
switch msg := context.Message().(type) {
case *messages.StartRemote:
fmt.Println("Starting")
sender = msg.Sender
context.Respond(&messages.Start{})
case *messages.Ping:
sender.Tell(&messages.Pong{})
}
}).
WithMailbox(mailbox.Bounded(1000000))
actor.SpawnNamed(props, "remote")
for{
time.Sleep(1 * time.Second)
}
}
node1/main.go
type localActor struct {
count int
wgStop *sync.WaitGroup
messageCount int
}
func (state *localActor) Receive(context actor.Context) {
switch context.Message().(type) {
case *messages.Pong:
state.count++
if state.count%50000 == 0 {
fmt.Println(state.count)
}
if state.count == state.messageCount {
state.wgStop.Done()
}
}
}
func newLocalActor(stop *sync.WaitGroup, messageCount int) actor.Producer {
return func() actor.Actor {
return &localActor{
wgStop: stop,
messageCount: messageCount,
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 1)
runtime.GC()
var wg sync.WaitGroup
messageCount := 50000
//remote.DefaultSerializerID = 1
remote.Start("127.0.0.1:8081")
props := actor.
FromProducer(newLocalActor(&wg, messageCount)).
WithMailbox(mailbox.Bounded(1000000))
pid := actor.Spawn(props)
remotePid := actor.NewPID("127.0.0.1:8080", "remote")
remotePid.
RequestFuture(&messages.StartRemote{
Sender: pid,
}, 5*time.Second).
Wait()
wg.Add(1)
start := time.Now()
fmt.Println("Starting to send")
bb := bytes.NewBuffer([]byte(""))
for i := 0; i < 2000; i++ {
bb.WriteString("1234567890")
}
message := &messages.Ping{Data: bb.Bytes()}
for i := 0; i < messageCount; i++ {
remotePid.Tell(message)
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("Elapsed %s", elapsed)
x := int(float32(messageCount*2) / (float32(elapsed) / float32(time.Second)))
fmt.Printf("Msg per sec %v", x)
}
messages/protos.proto
protobuf文件的生成命令: protoc -I=$GOPATH/src -I=$GOPATH/src/github.com/gogo/protobuf/protobuf/ --gogoslick_out=plugins=grpc:. /path/to/protos.proto
syntax = "proto3";
package messages;
import "github.com/ontio/ontology-eventbus/actor/protos.proto";
message Start {}
message StartRemote {
actor.PID Sender = 1;
}
message Ping {
bytes Data = 1;
}
message Pong {}
代码详见example/testRemoteCrypto目录以及example/testSyncCrypto目录
测试环境为微软云机器
模式 | 256B大小消息 | 512B大小消息 | 1k大小消息 | 10k大小消息 |
---|---|---|---|---|
一台验签(zeromq) | 3666/s | 3590/s | 3479/s | 2848/s |
两台验签(zeromq) | 7509/s | 7431/s | 7204/s | 6976/s |
指标 | 256B大小消息 | 512B大小消息 | 1k大小消息 | 10k大小消息 |
---|---|---|---|---|
验签时间 | 0.242ms | 0.247ms | 0.246ms | 0.334ms |
latency | 1.36ms | 1.31ms | 1.39ms | 1.94ms |
This Module is based on AsynkronIT/protoactor-go project, more details goes to https://github.com/AsynkronIT/protoactor-go.