diff --git a/go.mod b/go.mod index 67f5eff2c..958c9c7e7 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,8 @@ require ( github.com/coreos/etcd v3.3.18+incompatible // indirect github.com/go-redis/redis/v7 v7.0.0-beta.5 github.com/google/uuid v1.1.1 // indirect + github.com/klauspost/cpuid v1.2.3 // indirect + github.com/klauspost/reedsolomon v1.9.3 // indirect github.com/pion/rtcp v1.2.1 github.com/pion/rtp v1.3.1 github.com/pion/stun v0.3.3 @@ -18,5 +20,11 @@ require ( github.com/shirou/gopsutil v2.19.12+incompatible github.com/spf13/viper v1.6.2 github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 + github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect + github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b // indirect + github.com/tjfoc/gmsm v1.2.0 // indirect + github.com/xtaci/kcp-go v5.4.20+incompatible + github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 // indirect go.etcd.io/etcd v3.3.18+incompatible + golang.org/x/crypto v0.0.0-20191219195013-becbf705a915 ) diff --git a/go.sum b/go.sum index 8d4e34225..0292e9b58 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,10 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs= +github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/reedsolomon v1.9.3 h1:N/VzgeMfHmLc+KHMD1UL/tNkfXAt8FnUqlgXGIduwAY= +github.com/klauspost/reedsolomon v1.9.3/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -183,10 +187,20 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= +github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU= +github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU= +github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b h1:fj5tQ8acgNUr6O8LEplsxDhUIe2573iLkJc+PqnzZTI= +github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= +github.com/tjfoc/gmsm v1.2.0 h1:oTXUFetR8GphwGmUUxWFxrRZJTaDcZo1Lt2mRxlVzEI= +github.com/tjfoc/gmsm v1.2.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/xtaci/kcp-go v5.4.20+incompatible h1:TN1uey3Raw0sTz0Fg8GkfM0uH3YwzhnZWQ1bABv5xAg= +github.com/xtaci/kcp-go v5.4.20+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE= +github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37 h1:EWU6Pktpas0n8lLQwDsRyZfmkPeRbdgPtW609es+/9E= +github.com/xtaci/lossyconn v0.0.0-20200209145036-adba10fffc37/go.mod h1:HpMP7DB2CyokmAh4lp0EQnnWhmycP/TvwBGzvuie+H0= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v3.3.18+incompatible h1:5aomL5mqoKHxw6NG+oYgsowk8tU8aOalo2IdZxdWHkw= @@ -204,6 +218,8 @@ golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20191205180655-e7c4368fe9dd/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vKV/xzVTO7XPAwm8xbf4w2g= golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20191219195013-becbf705a915 h1:aJ0ex187qoXrJHPo8ZasVTASQB7llQP6YeNzgDALPRk= +golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= diff --git a/pkg/rtc/init.go b/pkg/rtc/init.go deleted file mode 100644 index 0b77f5a7a..000000000 --- a/pkg/rtc/init.go +++ /dev/null @@ -1,57 +0,0 @@ -package rtc - -import ( - "time" - - "github.com/pion/ion/pkg/log" - "github.com/pion/ion/pkg/rtc/rtpengine" -) - -const ( - statCycle = 3 * time.Second -) - -var ( - - //CleanChannel return the dead pub's mid - CleanChannel = make(chan string) -) - -// Init init port and ice urls -func Init(port int, ices []string) { - - //init ice - initICE(ices) - - // show stat about all pipelines - go stat() - - // accept relay conn - connCh := rtpengine.Serve(port) - go func() { - for { - select { - case conn := <-connCh: - t := newRTPTransport(conn) - if t != nil { - t.receiveRTP() - } - mid := t.getMID() - cnt := 0 - for mid == "" && cnt < 10 { - mid = t.getMID() - time.Sleep(time.Millisecond) - cnt++ - } - if mid == "" && cnt >= 10 { - log.Infof("mid == \"\" && cnt >=10 return") - return - } - log.Infof("accept new rtp mid=%s conn=%s", mid, conn.RemoteAddr().String()) - if p := addPipeline(mid); p != nil { - p.addPub(mid, t) - } - } - } - }() -} diff --git a/pkg/rtc/pipeline.go b/pkg/rtc/pipeline.go deleted file mode 100644 index 94d65c9fe..000000000 --- a/pkg/rtc/pipeline.go +++ /dev/null @@ -1,373 +0,0 @@ -package rtc - -import ( - "errors" - "sync" - "time" - - "github.com/bluele/gcache" - "github.com/pion/ion/pkg/log" - "github.com/pion/ion/pkg/rtc/plugins" - "github.com/pion/ion/pkg/util" - "github.com/pion/rtcp" - "github.com/pion/rtp" -) - -const ( - maxWriteErr = 100 - maxSize = 1024 - jbPlugin = "jitterBuffer" - liveCycle = 6 * time.Second - checkCycle = 3 * time.Second -) - -var ( - errInvalidPlugin = errors.New("plugin is nil") -) - -// pipeline is a rtp pipeline -// pipline has three loops, in handler and out -// |-----in-----|-----handler------|---------out---------| -// +--->sub -// | -// pub--->pubCh-->plugin...-->subCh---+--->sub -// | -// +--->sub -type pipeline struct { - pub Transport - subs map[string]Transport - subLock sync.RWMutex - plugins []plugin - pluginLock sync.RWMutex - pubCh chan *rtp.Packet - subCh chan *rtp.Packet - stop bool - pubLive gcache.Cache - live bool - rtcpCh chan rtcp.Packet -} - -func newPipeline(id string) *pipeline { - jb := plugins.NewJitterBuffer(jbPlugin) - p := &pipeline{ - subs: make(map[string]Transport), - pubCh: make(chan *rtp.Packet, maxSize), - subCh: make(chan *rtp.Packet, maxSize), - pubLive: gcache.New(maxSize).Simple().Build(), - live: true, - rtcpCh: jb.GetRTCPChan(), - } - p.addPlugin(jbPlugin, jb) - p.start() - return p -} - -func (p *pipeline) check() { - go func() { - ticker := time.NewTicker(checkCycle) - defer ticker.Stop() - for range ticker.C { - if p.stop { - return - } - pub := p.getPub() - if pub != nil { - val, err := p.pubLive.Get(pub.ID()) - if err != nil || val == "" { - log.Warnf("pub is not alive val=%v err=%v", val, err) - p.live = false - } - } - } - }() -} - -func (p *pipeline) in() { - go func() { - defer util.Recover("[pipeline.in]") - count := uint64(0) - for { - if p.stop { - return - } - pub := p.getPub() - if pub == nil { - time.Sleep(10 * time.Millisecond) - continue - } - rtp, err := pub.ReadRTP() - if err == nil { - // log.Infof("rtp.Extension=%t rtp.ExtensionProfile=%x rtp.ExtensionPayload=%x", rtp.Extension, rtp.ExtensionProfile, rtp.ExtensionPayload) - p.pubCh <- rtp - if count%300 == 0 { - p.pubLive.SetWithExpire(p.getPub().ID(), "live", liveCycle) - } - count++ - } else { - log.Errorf("pipeline.in err=%v", err) - } - } - }() -} - -func (p *pipeline) handle() { - go func() { - defer util.Recover("[pipeline.handle]") - count := uint64(0) - for { - if p.stop { - return - } - - pkt := <-p.pubCh - log.Debugf("pkt := <-p.pubCh %v", pkt) - p.subCh <- pkt - log.Debugf("p.subCh <- pkt %v", pkt) - if pkt == nil { - continue - } - //only buffer video - if util.IsVideo(pkt.PayloadType) { - if count%3000 == 0 { - // Init args: (ssrc uint32, pt uint8, rembCycle int, pliCycle int) - p.getPlugin(jbPlugin).Init(pkt.SSRC, pkt.PayloadType, 2, 1) - } - p.getPlugin(jbPlugin).PushRTP(pkt) - count++ - } - } - }() -} - -func (p *pipeline) out() { - go func() { - defer util.Recover("[pipeline.out]") - for { - if p.stop { - return - } - - pkt := <-p.subCh - log.Debugf("pkt := <-p.subCh %v", pkt) - if pkt == nil { - continue - } - // nonblock sending - go func() { - for _, t := range p.getSubs() { - if t == nil { - log.Errorf("Transport is nil") - continue - } - - // log.Infof("pipeline.out WriteRTP %v:%v to %v ", pkt.SSRC, pkt.SequenceNumber, t.ID()) - if err := t.WriteRTP(pkt); err != nil { - log.Errorf("wt.WriteRTP err=%v", err) - // del sub when err is increasing - if t.writeErrTotal() > maxWriteErr { - p.delSub(t.ID()) - } - } - t.writeErrReset() - } - }() - } - }() -} - -func (p *pipeline) jitter() { - go func() { - defer util.Recover("[pipeline.out]") - for { - if p.stop { - return - } - - pkt := <-p.rtcpCh - switch pkt.(type) { - case *rtcp.TransportLayerNack, *rtcp.ReceiverEstimatedMaximumBitrate, *rtcp.PictureLossIndication: - log.Infof("pipeline.jitter p.getPub().WriteRTCP %v", pkt) - p.getPub().WriteRTCP(pkt) - } - } - }() -} - -func (p *pipeline) start() { - p.in() - p.out() - p.handle() - p.check() - p.jitter() -} - -func (p *pipeline) addPub(id string, t Transport) Transport { - p.pub = t - return t -} - -func (p *pipeline) isRtpPub() bool { - if p.pub != nil { - switch p.pub.(type) { - case *RTPTransport: - return true - } - } - return false -} - -func (p *pipeline) delPub() { - // first close pub - if p.pub != nil { - p.pub.Close() - } -} - -func (p *pipeline) getPub() Transport { - return p.pub -} - -func (p *pipeline) addSub(id string, t Transport) Transport { - p.subLock.Lock() - defer p.subLock.Unlock() - p.subs[id] = t - log.Infof("pipeline.AddSub id=%s t=%p", id, t) - return t -} - -func (p *pipeline) getSub(id string) Transport { - p.subLock.Lock() - defer p.subLock.Unlock() - // log.Infof("pipeline.GetSub id=%s p.subs=%v", id, p.subs) - return p.subs[id] -} - -func (p *pipeline) getSubByAddr(addr string) Transport { - p.subLock.RLock() - defer p.subLock.RUnlock() - for _, sub := range p.subs { - switch sub.(type) { - case *RTPTransport: - rt := sub.(*RTPTransport) - if rt.getAddr() == addr { - return rt - } - } - } - return nil -} - -func (p *pipeline) getSubs() map[string]Transport { - p.subLock.RLock() - defer p.subLock.RUnlock() - return p.subs -} - -func (p *pipeline) noSub() bool { - p.subLock.RLock() - defer p.subLock.RUnlock() - isNoSub := len(p.subs) == 0 - log.Infof("pipeline.noSub %v", isNoSub) - return isNoSub -} - -func (p *pipeline) delSub(id string) { - p.subLock.Lock() - defer p.subLock.Unlock() - if p.subs[id] != nil { - p.subs[id].Close() - } - delete(p.subs, id) - log.Infof("pipeline.DelSub id=%s", id) -} - -func (p *pipeline) delSubs() { - p.subLock.Lock() - defer p.subLock.Unlock() - for _, sub := range p.subs { - if sub != nil { - sub.Close() - } - } -} - -func (p *pipeline) addPlugin(id string, i plugin) { - p.pluginLock.Lock() - defer p.pluginLock.Unlock() - p.plugins = append(p.plugins, i) -} - -func (p *pipeline) getPlugin(id string) plugin { - p.pluginLock.RLock() - defer p.pluginLock.RUnlock() - for i := 0; i < len(p.plugins); i++ { - if p.plugins[i].ID() == id { - return p.plugins[i] - } - } - return nil -} - -func (p *pipeline) delPlugin(id string) { - p.pluginLock.Lock() - defer p.pluginLock.Unlock() - for i := 0; i < len(p.plugins); i++ { - if p.plugins[i].ID() == id { - p.plugins[i].Stop() - p.plugins = append(p.plugins[:i], p.plugins[i+1:]...) - } - } -} - -func (p *pipeline) delPlugins() { - p.pluginLock.Lock() - defer p.pluginLock.Unlock() - for _, plugin := range p.plugins { - plugin.Stop() - } -} - -// Close release all -func (p *pipeline) Close() { - if p.stop { - return - } - p.delPub() - p.stop = true - p.delPlugins() - p.delSubs() -} - -func (p *pipeline) writeRTP(sid string, ssrc uint32, sn uint16) bool { - if p.pub == nil { - return false - } - hd := p.getPlugin(jbPlugin) - if hd != nil { - jb := hd.(*plugins.JitterBuffer) - pkt := jb.GetPacket(ssrc, sn) - if pkt == nil { - // log.Infof("pipeline.writeRTP pkt not found sid=%s ssrc=%d sn=%d pkt=%v", sid, ssrc, sn, pkt) - return false - } - sub := p.getSub(sid) - if sub != nil { - sub.WriteRTP(pkt) - // log.Infof("pipeline.writeRTP sid=%s ssrc=%d sn=%d", sid, ssrc, sn) - return true - } - } - return false -} - -func (p *pipeline) IsLive() bool { - return p.live -} - -func (p *pipeline) PushRTCP(pkt rtcp.Packet) error { - jbPlugin := p.getPlugin(jbPlugin) - if jbPlugin == nil { - return errInvalidPlugin - } - return jbPlugin.PushRTCP(pkt) -} diff --git a/pkg/rtc/plugin.go b/pkg/rtc/plugin.go deleted file mode 100644 index 395f7f950..000000000 --- a/pkg/rtc/plugin.go +++ /dev/null @@ -1,17 +0,0 @@ -package rtc - -import ( - "github.com/pion/rtcp" - "github.com/pion/rtp" -) - -//rtp in--->plugin1--->plugin2--->out -//rtcp in<---plugin1<---plugin2<---out -//TODO: maybe https://www.chromium.org/developers/design-documents/video -type plugin interface { - ID() string - Init(...interface{}) - PushRTP(*rtp.Packet) error - PushRTCP(rtcp.Packet) error - Stop() -} diff --git a/pkg/rtc/plugins/plugin.go b/pkg/rtc/plugins/plugin.go new file mode 100644 index 000000000..e1d14b12d --- /dev/null +++ b/pkg/rtc/plugins/plugin.go @@ -0,0 +1,15 @@ +package plugins + +import ( + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +// Plugin some interfaces +type Plugin interface { + ID() string + Init(...interface{}) + PushRTP(*rtp.Packet) error + PushRTCP(rtcp.Packet) error + Stop() +} diff --git a/pkg/rtc/router.go b/pkg/rtc/router.go new file mode 100644 index 000000000..1caf3633a --- /dev/null +++ b/pkg/rtc/router.go @@ -0,0 +1,386 @@ +package rtc + +import ( + "errors" + "sync" + "time" + + "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/rtc/plugins" + "github.com/pion/ion/pkg/rtc/transport" + "github.com/pion/ion/pkg/util" + "github.com/pion/rtcp" + "github.com/pion/rtp" +) + +const ( + maxWriteErr = 100 + maxSize = 1024 + jbPlugin = "jitterBuffer" + liveCycle = 6 * time.Second +) + +var ( + errInvalidPlugin = errors.New("plugin is nil") +) + +// Router is a rtp Router +// +--->sub +// | +// pub--->pubCh-->plugin...-->subCh---+--->sub +// | +// +--->sub + +type rtcpInfo struct { + id string + rtcp.Packet +} + +// Router is rtp router +type Router struct { + pub transport.Transport + subs map[string]transport.Transport + subLock sync.RWMutex + plugins []plugins.Plugin + pluginLock sync.RWMutex + pubCh chan *rtp.Packet + subCh chan *rtp.Packet + stop bool + liveTime time.Time + jbRtcpCh chan rtcp.Packet +} + +// NewRouter return a new Router +func NewRouter(id string) *Router { + log.Infof("NewRouter id=%s", id) + jb := plugins.NewJitterBuffer(jbPlugin) + r := &Router{ + subs: make(map[string]transport.Transport), + pubCh: make(chan *rtp.Packet, maxSize), + subCh: make(chan *rtp.Packet, maxSize), + liveTime: time.Now().Add(liveCycle), + jbRtcpCh: jb.GetRTCPChan(), + } + r.AddPlugin(jbPlugin, jb) + r.start() + return r +} + +func (r *Router) in() { + go func() { + defer util.Recover("[Router.in]") + count := uint64(0) + for { + if r.stop { + return + } + pub := r.GetPub() + if pub == nil { + time.Sleep(10 * time.Millisecond) + continue + } + rtp, err := pub.ReadRTP() + if err == nil { + // log.Infof("rtp.Extension=%t rtp.ExtensionProfile=%x rtp.ExtensionPayload=%x", rtp.Extension, rtp.ExtensionProfile, rtp.ExtensionPayload) + r.pubCh <- rtp + if count%300 == 0 { + r.liveTime = time.Now().Add(liveCycle) + } + count++ + } else { + log.Errorf("Router.in err=%v", err) + } + } + }() +} + +func (r *Router) handle() { + go func() { + defer util.Recover("[Router.handle]") + count := uint64(0) + for { + if r.stop { + return + } + + pkt := <-r.pubCh + log.Debugf("pkt := <-r.pubCh %v", pkt) + r.subCh <- pkt + log.Debugf("r.subCh <- pkt %v", pkt) + if pkt == nil { + continue + } + //only buffer video + if util.IsVideo(pkt.PayloadType) { + if count%3000 == 0 { + // Init args: (ssrc uint32, pt uint8, rembCycle int, pliCycle int) + r.GetPlugin(jbPlugin).Init(pkt.SSRC, pkt.PayloadType, 2, 1) + } + r.GetPlugin(jbPlugin).PushRTP(pkt) + count++ + } + } + }() +} + +func (r *Router) out() { + go func() { + defer util.Recover("[Router.out]") + for { + if r.stop { + return + } + + pkt := <-r.subCh + log.Debugf("pkt := <-r.subCh %v", pkt) + if pkt == nil { + continue + } + // nonblock sending + go func() { + for _, t := range r.GetSubs() { + if t == nil { + log.Errorf("Transport is nil") + continue + } + + // log.Infof("Router.out WriteRTP %v:%v to %v ", pkt.SSRC, pkt.SequenceNumber, t.ID()) + if err := t.WriteRTP(pkt); err != nil { + log.Errorf("wt.WriteRTP err=%v", err) + // del sub when err is increasing + if t.WriteErrTotal() > maxWriteErr { + r.DelSub(t.ID()) + } + } + t.WriteErrReset() + } + }() + } + }() +} + +func (r *Router) jitter() { + go func() { + defer util.Recover("[Router.out]") + for { + if r.stop { + return + } + + pkt := <-r.jbRtcpCh + switch pkt.(type) { + case *rtcp.TransportLayerNack, *rtcp.ReceiverEstimatedMaximumBitrate, *rtcp.PictureLossIndication: + log.Infof("Router.jitter r.GetPub().WriteRTCP %v", pkt) + r.GetPub().WriteRTCP(pkt) + } + + } + }() +} + +func (r *Router) start() { + r.in() + r.out() + r.handle() + r.jitter() +} + +// AddPub add a pub transport +func (r *Router) AddPub(id string, t transport.Transport) transport.Transport { + log.Infof("AddPub id=%s", id) + r.pub = t + return t +} + +// DelPub del pub +func (r *Router) DelPub() { + log.Infof("Router.DelPub %v", r.pub) + // first close pub + if r.pub != nil { + r.pub.Close() + } + r.pub = nil +} + +// GetPub get pub +func (r *Router) GetPub() transport.Transport { + log.Infof("Router.GetPub %v", r.pub) + return r.pub +} + +// AddSub add a pub to router +func (r *Router) AddSub(id string, t transport.Transport) transport.Transport { + r.subLock.Lock() + defer r.subLock.Unlock() + r.subs[id] = t + log.Infof("Router.AddSub id=%s t=%p", id, t) + go func() { + for { + pkt := <-t.GetRTCPChan() + if r.stop { + return + } + switch pkt.(type) { + case *rtcp.TransportLayerNack: + log.Debugf("rtptransport got nack: %+v", pkt) + nack := pkt.(*rtcp.TransportLayerNack) + for _, nackPair := range nack.Nacks { + if !r.writeRTP(id, nack.MediaSSRC, nackPair.PacketID) { + n := &rtcp.TransportLayerNack{ + //origin ssrc + SenderSSRC: nack.SenderSSRC, + MediaSSRC: nack.MediaSSRC, + Nacks: []rtcp.NackPair{rtcp.NackPair{PacketID: nackPair.PacketID}}, + } + if r.pub != nil { + r.GetPub().WriteRTCP(n) + } + } + } + + default: + r.PushRTCP(pkt) + } + } + }() + return t +} + +// GetSub get a sub by id +func (r *Router) GetSub(id string) transport.Transport { + r.subLock.Lock() + defer r.subLock.Unlock() + log.Infof("Router.GetSub id=%s sub=%v", id, r.subs[id]) + return r.subs[id] +} + +// GetSubs get all subs +func (r *Router) GetSubs() map[string]transport.Transport { + r.subLock.RLock() + defer r.subLock.RUnlock() + log.Infof("Router.GetSubs len=%v", len(r.subs)) + return r.subs +} + +// HasNoneSub check if sub == 0 +func (r *Router) HasNoneSub() bool { + r.subLock.RLock() + defer r.subLock.RUnlock() + isNoSub := len(r.subs) == 0 + log.Infof("Router.HasNoneSub=%v", isNoSub) + return isNoSub +} + +// DelSub del sub by id +func (r *Router) DelSub(id string) { + log.Infof("Router.DelSub id=%s", id) + r.subLock.Lock() + defer r.subLock.Unlock() + if r.subs[id] != nil { + r.subs[id].Close() + } + delete(r.subs, id) +} + +// DelSubs del all sub +func (r *Router) DelSubs() { + log.Infof("Router.DelSubs") + r.subLock.Lock() + defer r.subLock.Unlock() + for _, sub := range r.subs { + if sub != nil { + sub.Close() + } + } + r.subs = nil +} + +// AddPlugin add a plugin +func (r *Router) AddPlugin(id string, i plugins.Plugin) { + r.pluginLock.Lock() + defer r.pluginLock.Unlock() + r.plugins = append(r.plugins, i) +} + +// GetPlugin get plugin by id +func (r *Router) GetPlugin(id string) plugins.Plugin { + r.pluginLock.RLock() + defer r.pluginLock.RUnlock() + for i := 0; i < len(r.plugins); i++ { + if r.plugins[i].ID() == id { + return r.plugins[i] + } + } + return nil +} + +// DelPlugin del plugin +func (r *Router) DelPlugin(id string) { + r.pluginLock.Lock() + defer r.pluginLock.Unlock() + for i := 0; i < len(r.plugins); i++ { + if r.plugins[i].ID() == id { + r.plugins[i].Stop() + r.plugins = append(r.plugins[:i], r.plugins[i+1:]...) + } + } +} + +// DelPlugins del all plugins +func (r *Router) DelPlugins() { + r.pluginLock.Lock() + defer r.pluginLock.Unlock() + for _, plugin := range r.plugins { + plugin.Stop() + } +} + +// Close release all +func (r *Router) Close() { + if r.stop { + return + } + r.DelPub() + r.stop = true + r.DelPlugins() + r.DelSubs() +} + +func (r *Router) writeRTP(sid string, ssrc uint32, sn uint16) bool { + if r.pub == nil { + return false + } + hd := r.GetPlugin(jbPlugin) + if hd != nil { + jb := hd.(*plugins.JitterBuffer) + pkt := jb.GetPacket(ssrc, sn) + if pkt == nil { + // log.Infof("Router.writeRTP pkt not found sid=%s ssrc=%d sn=%d pkt=%v", sid, ssrc, sn, pkt) + return false + } + sub := r.GetSub(sid) + if sub != nil { + sub.WriteRTP(pkt) + // log.Infof("Router.writeRTP sid=%s ssrc=%d sn=%d", sid, ssrc, sn) + return true + } + } + return false +} + +// Alive return router status +func (r *Router) Alive() bool { + if r.liveTime.Before(time.Now()) { + return false + } + return true +} + +// PushRTCP push rtcp to jitterbuffer +func (r *Router) PushRTCP(pkt rtcp.Packet) error { + jbPlugin := r.GetPlugin(jbPlugin) + if jbPlugin == nil { + return errInvalidPlugin + } + return jbPlugin.PushRTCP(pkt) +} diff --git a/pkg/rtc/rtc.go b/pkg/rtc/rtc.go index b1bbec90c..2fd3be2cf 100644 --- a/pkg/rtc/rtc.go +++ b/pkg/rtc/rtc.go @@ -2,199 +2,168 @@ package rtc import ( "fmt" - "strings" "sync" "time" "github.com/pion/ion/pkg/log" "github.com/pion/ion/pkg/rtc/plugins" + "github.com/pion/ion/pkg/rtc/rtpengine" + "github.com/pion/ion/pkg/rtc/transport" ) -var ( - pipes = make(map[string]*pipeline) - pipeLock sync.RWMutex +const ( + statCycle = 3 * time.Second + maxCleanSize = 100 ) -// DelPipeline delete pub -func DelPipeline(mid string) { - log.Infof("DelPub mid=%s", mid) - p := getPipeline(mid) - if p == nil { - log.Infof("DelPub p=nil") - return - } - p.Close() - pipeLock.Lock() - defer pipeLock.Unlock() - delete(pipes, mid) -} +var ( + routers = make(map[string]*Router) + routerLock sync.RWMutex -// Close close all pipeline -func Close() { - pipeLock.Lock() - for mid, pipeline := range pipes { - if pipeline != nil { - pipeline.Close() - delete(pipes, mid) - } - } - pipeLock.Unlock() -} + //CleanChannel return the dead pub's mid + CleanChannel = make(chan string, maxCleanSize) -// GetPub get pub -func GetPub(mid string) Transport { - p := getPipeline(mid) - if p == nil { - return nil - } - return p.getPub() -} + stop bool +) -// GetWebRtcMIDByPID .. -func GetWebRtcMIDByPID(id string) []string { - m := getPipelinesByPrefix(id) - var mids []string +// Init port and ice urls +func Init(port int, ices []string, kcpKey, kcpSalt string) error { - //find webrtc pub mid - for mid, p := range m { - switch p.getPub().(type) { - case *WebRTCTransport: - default: - mids = append(mids, mid) - } + //init ice urls and trickle-ICE + transport.InitWebRTC(ices, true) + + // show stat about all pipelines + go check() + + var connCh chan *transport.RTPTransport + var err error + // accept relay rtptransport + if kcpKey != "" && kcpSalt != "" { + connCh, err = rtpengine.ServeWithKCP(port, kcpKey, kcpSalt) + } else { + connCh, err = rtpengine.Serve(port) + } + if err != nil { + log.Errorf("rtc.Init err=%v", err) + return err } - return mids + go func() { + for { + if stop { + return + } + select { + case rtpTransport := <-connCh: + id := rtpTransport.ID() + cnt := 0 + for id == "" && cnt < 100 { + id = rtpTransport.ID() + time.Sleep(time.Millisecond) + cnt++ + } + if id == "" && cnt >= 100 { + log.Errorf("invalid id from incoming rtp transport") + return + } + log.Infof("accept new rtp id=%s conn=%s", id, rtpTransport.RemoteAddr().String()) + if router := AddRouter(id); router != nil { + router.AddPub(id, rtpTransport) + } + } + } + }() + return nil } -// GetSubs get sub by mid -func GetSubs(mid string) map[string]Transport { - p := getPipeline(mid) - if p == nil { - return nil +// GetOrNewRouter get router from map +func GetOrNewRouter(id string) *Router { + log.Infof("rtc.GetOrNewRouter id=%s", id) + router := GetRouter(id) + if router == nil { + return AddRouter(id) } - return p.getSubs() + return router } -// DelSubFromAllPub del all sub by id -func DelSubFromAllPub(id string) map[string]string { - log.Infof("DelSubFromAllPub id=%v", id) - m := make(map[string]string) - pipeLock.Lock() - defer pipeLock.Unlock() - for mid, p := range pipes { - p.delSub(id) - if p.noSub() && p.isRtpPub() { - m[mid] = mid - } - } - return m +// GetRouter get router from map +func GetRouter(id string) *Router { + log.Infof("rtc.GetRouter id=%s", id) + routerLock.RLock() + defer routerLock.RUnlock() + return routers[id] } -// DelSub del sub -func DelSub(mid, id string) { - p := getPipeline(mid) - if p == nil { - return - } - p.delSub(id) +// AddRouter add a new router +func AddRouter(id string) *Router { + log.Infof("rtc.AddRouter id=%s", id) + routerLock.Lock() + defer routerLock.Unlock() + routers[id] = NewRouter(id) + return routers[id] } -// NewWebRTCTransport new a webrtc transport -func NewWebRTCTransport(mid, id string, isPub bool) *WebRTCTransport { - log.Infof("rtc.NewWebRTCTransport mid=%v id=%v isPub=%v", mid, id, isPub) - p := getPipeline(mid) - if p == nil { - p = addPipeline(mid) - } - if isPub { - wt := newWebRTCTransport(mid) - p.addPub(mid, wt) - return wt +// DelRouter delete pub +func DelRouter(id string) { + log.Infof("DelRouter id=%s", id) + router := GetRouter(id) + if router == nil { + return } - wt := newWebRTCTransport(id) - p.addSub(id, wt) - return wt + router.Close() + routerLock.Lock() + defer routerLock.Unlock() + delete(routers, id) } -// NewRTPTransportSub new a rtp transport suber -func NewRTPTransportSub(mid, sid, addr string) { - log.Infof("rtc.NewRTPTransportSub mid=%v sid=%v addr=%v", mid, sid, addr) - p := getPipeline(mid) - if p == nil { - p = addPipeline(mid) +// Close close all Router +func Close() { + if stop { + return } - if p.getSubByAddr(addr) == nil { - p.addSub(sid, newPubRTPTransport(sid, mid, addr)) + stop = true + routerLock.Lock() + defer routerLock.Unlock() + for id, router := range routers { + if router != nil { + router.Close() + delete(routers, id) + } } } -// stat show all pipelines' stat -func stat() { +// check show all Routers' stat +func check() { t := time.NewTicker(statCycle) - for range t.C { - info := "\n----------------rtc-----------------\n" - pipeLock.Lock() - - for id, pipeline := range pipes { - if !pipeline.IsLive() { - pipeline.Close() - delete(pipes, id) - CleanChannel <- id - log.Infof("Stat delete %v", id) - } - info += "pub: " + id + "\n" - info += pipeline.getPlugin(jbPlugin).(*plugins.JitterBuffer).Stat() - subs := pipeline.getSubs() - if len(subs) < 6 { - for id := range subs { - info += fmt.Sprintf("sub: %s\n\n", id) + for { + select { + case <-t.C: + info := "\n----------------rtc-----------------\n" + routerLock.Lock() + + for id, Router := range routers { + if !Router.Alive() { + Router.Close() + delete(routers, id) + CleanChannel <- id + log.Infof("Stat delete %v", id) + } + info += "pub: " + id + "\n" + info += Router.GetPlugin(jbPlugin).(*plugins.JitterBuffer).Stat() + subs := Router.GetSubs() + if len(subs) < 6 { + for id := range subs { + info += fmt.Sprintf("sub: %s\n\n", id) + } + } else { + info += fmt.Sprintf("subs: %d\n\n", len(subs)) } - } else { - info += fmt.Sprintf("subs: %d\n\n", len(subs)) } - } - pipeLock.Unlock() - log.Infof(info) - } -} - -// DelSubFromAllPubByPrefix del sub from all pipelines by prefix -func DelSubFromAllPubByPrefix(id string) map[string]string { - log.Infof("DelSubFromAllPubByPrefix id=%s", id) - m := make(map[string]string) - pipeLock.Lock() - defer pipeLock.Unlock() - for mid, p := range pipes { - p.delSub(id) - if p.noSub() && p.isRtpPub() { - m[mid] = mid - } - } - return m -} - -func getPipelinesByPrefix(id string) map[string]*pipeline { - pipeLock.RLock() - defer pipeLock.RUnlock() - m := make(map[string]*pipeline) - for mid := range pipes { - if strings.Contains(mid, id) { - m[mid] = pipes[mid] + routerLock.Unlock() + log.Infof(info) + default: + if stop { + return + } } } - return m -} - -func getPipeline(mid string) *pipeline { - // log.Infof("getPipeline mid=%v", mid) - pipeLock.RLock() - defer pipeLock.RUnlock() - return pipes[mid] -} - -func addPipeline(id string) *pipeline { - pipeLock.Lock() - defer pipeLock.Unlock() - pipes[id] = newPipeline(id) - return pipes[id] } diff --git a/pkg/rtc/rtc_test.go b/pkg/rtc/rtc_test.go index a77d2d6c2..661426b40 100644 --- a/pkg/rtc/rtc_test.go +++ b/pkg/rtc/rtc_test.go @@ -6,25 +6,154 @@ import ( "time" "github.com/pion/ion/pkg/rtc/rtpengine" + "github.com/pion/ion/pkg/rtc/transport" + "github.com/pion/rtp" + "github.com/pion/webrtc/v2" ) -func TestEngine(t *testing.T) { - connCh := rtpengine.Serve(6789) +func TestRTPEngineAcceptAndRead(t *testing.T) { + connCh, err := rtpengine.Serve(6789) + if err != nil { + t.Fatal("TestRTPEngineAcceptAndRead ", err) + } + + go func() { + for { + select { + case rtpTransport := <-connCh: + fmt.Println("accept new conn from connCh", rtpTransport.RemoteAddr().String()) + go func() { + for { + // must read otherwise can't get new conn + pkt, _ := rtpTransport.ReadRTP() + fmt.Println("read rtp", pkt) + } + }() + } + } + }() + + for i := 0; i < 1; i++ { + rawPkt := []byte{ + 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, + 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, + } + + rtp := &rtp.Packet{} + rtpTransport := transport.NewOutRTPTransport("awsome", "0.0.0.0:6789") + if err := rtp.Unmarshal(rawPkt); err == nil { + rtpTransport.WriteRTP(rtp) + } else { + fmt.Println("rtpTransport.WriteRTP ", err) + } + time.Sleep(time.Second) + } +} + +func TestRTPEngineAcceptKCPAndRead(t *testing.T) { + connCh, err := rtpengine.ServeWithKCP(1234, "key", "salt") + if err != nil { + t.Fatal("TestRTPEngineAcceptKCPAndRead ", err) + } go func() { for { select { - case conn := <-connCh: - fmt.Println("conn from connCh", conn.RemoteAddr()) - b := make([]byte, 4000) + case rtpTransport := <-connCh: + fmt.Println("accept new conn over kcp from connCh", rtpTransport.RemoteAddr().String()) go func() { for { - n, err := conn.Read(b) - fmt.Println("conn read", n, err) + // must read otherwise can't get new conn + pkt, _ := rtpTransport.ReadRTP() + fmt.Println("read rtp over kcp", pkt) } }() } } }() - time.Sleep(time.Second) - select {} + + for i := 0; i < 1; i++ { + rawPkt := []byte{ + 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, + 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, + } + + rtp := &rtp.Packet{} + rtpTransport := transport.NewOutRTPTransportWithKCP("awsome", "0.0.0.0:1234", "key", "salt") + if err := rtp.Unmarshal(rawPkt); err == nil { + rtpTransport.WriteRTP(rtp) + } else { + fmt.Println("rtpTransport.WriteRTP ", err) + } + time.Sleep(time.Second) + } +} + +func TestWebRTCTransportP2P(t *testing.T) { + options := make(map[string]interface{}) + options["codec"] = "vp8" + + // new pub + pub := transport.NewWebRTCTransport("pub", options) + if pub == nil { + t.Fatal("pub == nil") + } + + // pub add track + _, err := pub.AddTrack(476325762, webrtc.DefaultPayloadTypeVP8) + if err != nil { + t.Fatalf("pub.AddTrack err=%v", err) + } + + // pub create offer + offer, err := pub.Offer() + if err != nil { + t.Fatalf("pub.Offer err=%v", err) + } + + // new sub + sub := transport.NewWebRTCTransport("sub", options) + + // sub answer offer + options = make(map[string]interface{}) + options["publish"] = "true" + answer, err := sub.Answer(offer, options) + if err != nil { + t.Fatalf("err=%v answer=%v", err, answer) + } + + // pub set remote sdp + err = pub.SetRemoteSDP(answer) + if err != nil { + t.Fatalf("err=%v", err) + } + + go func() { + for { + rtp, err := sub.ReadRTP() + fmt.Printf("rtp=%v err=%v\n", rtp, err) + } + }() + + count := 0 + for { + if count > 1 { + return + } + rawPkt := []byte{ + 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, + 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, + } + + rtp := &rtp.Packet{} + err := rtp.Unmarshal(rawPkt) + if err != nil { + t.Fatal("rtp.Unmarshal", err) + } + err = pub.WriteRTP(rtp) + if err != nil { + t.Fatal("pub.WriteRTP ", err) + } + time.Sleep(time.Second) + count++ + } } diff --git a/pkg/rtc/rtpengine/engine.go b/pkg/rtc/rtpengine/engine.go index 98bd13d0f..88fdfd067 100644 --- a/pkg/rtc/rtpengine/engine.go +++ b/pkg/rtc/rtpengine/engine.go @@ -1,58 +1,101 @@ package rtpengine import ( + "crypto/sha1" "net" - "sync" + + "fmt" "github.com/pion/ion/pkg/log" "github.com/pion/ion/pkg/rtc/rtpengine/udp" + "github.com/pion/ion/pkg/rtc/transport" + kcp "github.com/xtaci/kcp-go" + "golang.org/x/crypto/pbkdf2" +) + +const ( + maxRtpConnSize = 1024 ) var ( - listener *udp.Listener - stopCh = make(chan struct{}) - wg sync.WaitGroup + listener *udp.Listener + kcpListener *kcp.Listener + stop bool ) // Serve listen on a port and accept udp conn -func Serve(port int) chan *udp.Conn { - log.Infof("UDP listening:%d", port) +// func Serve(port int) chan *udp.Conn { +func Serve(port int) (chan *transport.RTPTransport, error) { + log.Infof("rtpengine.Serve port=%d ", port) if listener != nil { listener.Close() } - ch := make(chan *udp.Conn, 100) + ch := make(chan *transport.RTPTransport, maxRtpConnSize) var err error listener, err = udp.Listen("udp", &net.UDPAddr{IP: net.IPv4zero, Port: port}) if err != nil { log.Errorf("failed to listen %v", err) - return nil + return nil, err } - wg.Add(1) go func() { for { - select { - case <-stopCh: - wg.Done() + if stop { return - default: - conn, err := listener.Accept() - if err != nil { - log.Errorf("failed to accept conn %v", err) - continue - } - log.Infof("accept new rtp conn %s", conn.RemoteAddr().String()) - - ch <- conn } + conn, err := listener.Accept() + if err != nil { + log.Errorf("failed to accept conn %v", err) + continue + } + log.Infof("accept new rtp conn %s", conn.RemoteAddr().String()) + + ch <- transport.NewRTPTransport(conn) } }() - return ch + return ch, nil } -// Close close listening loop +// ServeWithKCP accept kcp conn +func ServeWithKCP(port int, kcpPwd, kcpSalt string) (chan *transport.RTPTransport, error) { + log.Infof("kcp Serve port=%d", port) + if kcpListener != nil { + kcpListener.Close() + } + ch := make(chan *transport.RTPTransport, maxRtpConnSize) + var err error + key := pbkdf2.Key([]byte(kcpPwd), []byte(kcpSalt), 1024, 32, sha1.New) + block, _ := kcp.NewAESBlockCrypt(key) + kcpListener, err = kcp.ListenWithOptions(fmt.Sprintf("0.0.0.0:%d", port), block, 10, 3) + if err != nil { + log.Errorf("kcp Listen err=%v", err) + return nil, err + } + + go func() { + for { + if stop { + return + } + conn, err := kcpListener.AcceptKCP() + if err != nil { + log.Errorf("failed to accept conn %v", err) + continue + } + log.Infof("accept new kcp conn %s", conn.RemoteAddr().String()) + + ch <- transport.NewRTPTransport(conn) + } + }() + return ch, nil +} + +// Close close listener and break loop func Close() { - close(stopCh) - wg.Wait() + if !stop { + return + } + stop = true listener.Close() + kcpListener.Close() } diff --git a/pkg/rtc/rtpengine/udp/conn.go b/pkg/rtc/rtpengine/udp/conn.go index 83d5f4f20..cee0e90df 100644 --- a/pkg/rtc/rtpengine/udp/conn.go +++ b/pkg/rtc/rtpengine/udp/conn.go @@ -4,7 +4,6 @@ package udp import ( "context" "errors" - "fmt" "io" "net" "sync" @@ -121,7 +120,6 @@ func (l *Listener) readLoop() { if err != nil { return } - fmt.Println("Listener.readLoop raddr=%v", raddr) conn, err := l.getConn(raddr) if err != nil { continue @@ -136,7 +134,6 @@ func (l *Listener) getConn(raddr net.Addr) (*Conn, error) { l.connLock.Lock() defer l.connLock.Unlock() conn, ok := l.conns[raddr.String()] - fmt.Println("Listener.getConn raddr=%v ok=%v", raddr, ok) if !ok { if !l.accepting.Load().(bool) { return nil, errClosedListener diff --git a/pkg/rtc/rtptransport.go b/pkg/rtc/rtptransport.go deleted file mode 100644 index ee4d49937..000000000 --- a/pkg/rtc/rtptransport.go +++ /dev/null @@ -1,309 +0,0 @@ -package rtc - -import ( - "net" - "strconv" - "strings" - "sync" - - "github.com/pion/ion/pkg/log" - "github.com/pion/ion/pkg/rtc/rtpengine/muxrtp" - "github.com/pion/ion/pkg/rtc/rtpengine/muxrtp/mux" - "github.com/pion/ion/pkg/util" - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/pion/webrtc/v2" -) - -const ( - extSentInit = 30 - receiveMTU = 8192 -) - -// RTPTransport .. -type RTPTransport struct { - rtpSession *muxrtp.SessionRTP - rtcpSession *muxrtp.SessionRTCP - rtpEndpoint *mux.Endpoint - rtcpEndpoint *mux.Endpoint - conn net.Conn - mux *mux.Mux - rtpCh chan *rtp.Packet - ssrcPT map[uint32]uint8 - ssrcPTLock sync.RWMutex - stop bool - extSent int - // id == mid if this is a pub - // id != mid if this is a sub - id string - mid string - idLock sync.RWMutex - addr string - writeErrCnt int -} - -func newRTPTransport(conn net.Conn) *RTPTransport { - t := &RTPTransport{ - conn: conn, - rtpCh: make(chan *rtp.Packet, 1000), - ssrcPT: make(map[uint32]uint8), - extSent: extSentInit, - } - config := mux.Config{ - Conn: conn, - BufferSize: receiveMTU, - } - t.mux = mux.NewMux(config) - t.rtpEndpoint = t.newEndpoint(mux.MatchRTP) - t.rtcpEndpoint = t.newEndpoint(mux.MatchRTCP) - var err error - t.rtpSession, err = muxrtp.NewSessionRTP(t.rtpEndpoint) - if err != nil { - log.Errorf(err.Error()) - return nil - } - t.rtcpSession, err = muxrtp.NewSessionRTCP(t.rtcpEndpoint) - if err != nil { - log.Errorf(err.Error()) - return nil - } - return t -} - -func newPubRTPTransport(id, mid, addr string) *RTPTransport { - n := strings.Index(addr, ":") - if n == 0 { - return nil - } - ip := addr[:n] - port, _ := strconv.Atoi(addr[n+1:]) - - srcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0} - dstAddr := &net.UDPAddr{IP: net.ParseIP(ip), Port: port} - conn, err := net.DialUDP("udp", srcAddr, dstAddr) - if err != nil { - log.Errorf(err.Error()) - return nil - } - t := newRTPTransport(conn) - t.id = id - t.mid = mid - t.addr = addr - t.receiveRTCP() - log.Infof("newSubRTPTransport %s %d", ip, port) - return t -} - -// ID return id -func (t *RTPTransport) ID() string { - return t.id -} - -// Close release all -func (t *RTPTransport) Close() { - if t.stop { - return - } - t.stop = true - t.rtpSession.Close() - t.rtcpSession.Close() - t.rtpEndpoint.Close() - t.rtcpEndpoint.Close() - t.mux.Close() - t.conn.Close() -} - -// newEndpoint registers a new endpoint on the underlying mux. -func (t *RTPTransport) newEndpoint(f mux.MatchFunc) *mux.Endpoint { - return t.mux.NewEndpoint(f) -} - -func (t *RTPTransport) receiveRTP() { - go func() { - for { - readStream, ssrc, err := t.rtpSession.AcceptStream() - if err != nil { - log.Warnf("Failed to accept stream %v ", err) - //for non-blocking ReadRTP() - t.rtpCh <- nil - continue - } - go func() { - rtpBuf := make([]byte, receiveMTU) - for { - _, pkt, err := readStream.ReadRTP(rtpBuf) - if err != nil { - log.Warnf("Failed to read rtp %v %d ", err, ssrc) - //for non-blocking ReadRTP() - t.rtpCh <- nil - continue - // return - } - log.Debugf("RTPTransport.receiveRTP pkt=%v", pkt) - if t.getMID() == "" { - t.idLock.Lock() - t.mid = util.GetIDFromRTP(pkt) - t.idLock.Unlock() - } - t.rtpCh <- pkt - t.ssrcPTLock.Lock() - t.ssrcPT[pkt.Header.SSRC] = pkt.Header.PayloadType - t.ssrcPTLock.Unlock() - - // log.Debugf("got RTP: %+v", pkt.Header) - } - }() - } - }() -} - -// ReadRTP read rtp from transport -func (t *RTPTransport) ReadRTP() (*rtp.Packet, error) { - return <-t.rtpCh, nil -} - -// rtp sub receive rtcp -func (t *RTPTransport) receiveRTCP() { - go func() { - for { - readStream, ssrc, err := t.rtcpSession.AcceptStream() - if err != nil { - log.Warnf("Failed to accept RTCP %v ", err) - return - } - - go func() { - rtcpBuf := make([]byte, receiveMTU) - for { - rtcps, err := readStream.ReadRTCP(rtcpBuf) - if err != nil { - log.Warnf("Failed to read rtcp %v %d ", err, ssrc) - return - } - log.Debugf("got RTCPs: %+v ", rtcps) - for _, pkt := range rtcps { - switch pkt.(type) { - case *rtcp.PictureLossIndication: - log.Infof("got pli pipeline not need send key frame!") - case *rtcp.TransportLayerNack: - log.Debugf("rtptransport got nack: %+v", pkt) - nack := pkt.(*rtcp.TransportLayerNack) - for _, nackPair := range nack.Nacks { - p := getPipeline(t.mid) - if p != nil { - if !p.writeRTP(t.id, nack.MediaSSRC, nackPair.PacketID) { - n := &rtcp.TransportLayerNack{ - //origin ssrc - SenderSSRC: nack.SenderSSRC, - MediaSSRC: nack.MediaSSRC, - Nacks: []rtcp.NackPair{rtcp.NackPair{PacketID: nackPair.PacketID}}, - } - log.Debugf("getPipeline(t.mid).GetPub().sendNack(n) %v", n) - p.getPub().WriteRTCP(n) - } - } - } - } - } - } - }() - } - }() -} - -// WriteRTP send rtp packet -func (t *RTPTransport) WriteRTP(rtp *rtp.Packet) error { - log.Debugf("RTPTransport.WriteRTP rtp=%v", rtp) - writeStream, err := t.rtpSession.OpenWriteStream() - if err != nil { - t.writeErrCnt++ - return err - } - - if t.extSent > 0 { - util.SetIDToRTP(rtp, t.mid) - } - - _, err = writeStream.WriteRTP(&rtp.Header, rtp.Payload) - if err == nil && t.extSent > 0 { - t.extSent-- - } - if err != nil { - log.Errorf(err.Error()) - t.writeErrCnt++ - } - return err -} - -// WriteRawRTCP write rtcp data -func (t *RTPTransport) WriteRawRTCP(data []byte) (int, error) { - writeStream, err := t.rtcpSession.OpenWriteStream() - if err != nil { - return 0, err - } - return writeStream.WriteRawRTCP(data) -} - -// WriteRTCP send rtp header and payload -// func (t *RTPTransport) WriteRTCP(header *rtcp.Header, payload []byte) (int, error) { -// writeStream, err := t.rtcpSession.OpenWriteStream() -// if err != nil { -// return 0, err -// } -// return writeStream.WriteRTCP(header, payload) -// } - -// used by rtp pub, tell remote ion to send key frame -func (t *RTPTransport) sendPLI() { - t.ssrcPTLock.RLock() - for ssrc, pt := range t.ssrcPT { - if pt == webrtc.DefaultPayloadTypeVP8 || pt == webrtc.DefaultPayloadTypeH264 || pt == webrtc.DefaultPayloadTypeVP9 { - pli := rtcp.PictureLossIndication{MediaSSRC: ssrc} - data, err := pli.Marshal() - if err != nil { - log.Warnf("pli marshal failed: %v", err) - return - } - t.WriteRawRTCP(data) - log.Infof("RTPTransport.SendPLI ssrc=%d pt=%v", ssrc, pt) - } - } - t.ssrcPTLock.RUnlock() -} - -// SSRCPT playload type and ssrc -func (t *RTPTransport) SSRCPT() map[uint32]uint8 { - t.ssrcPTLock.RLock() - defer t.ssrcPTLock.RUnlock() - return t.ssrcPT -} - -func (t *RTPTransport) getMID() string { - t.idLock.RLock() - defer t.idLock.RUnlock() - return t.mid -} - -func (t *RTPTransport) getAddr() string { - return t.addr -} - -func (t *RTPTransport) WriteRTCP(pkt rtcp.Packet) error { - bin, err := pkt.Marshal() - if err != nil { - return err - } - _, err = t.WriteRawRTCP(bin) - if err != nil { - return err - } - return err -} - -func (t *RTPTransport) writeErrTotal() int { - return t.writeErrCnt -} - -func (t *RTPTransport) writeErrReset() { - t.writeErrCnt = 0 -} diff --git a/pkg/rtc/rtptransport_test.go b/pkg/rtc/rtptransport_test.go deleted file mode 100644 index 873ab03a3..000000000 --- a/pkg/rtc/rtptransport_test.go +++ /dev/null @@ -1,23 +0,0 @@ -package rtc - -import ( - "fmt" - "testing" - - "github.com/pion/rtp" -) - -func TestNewRTPTransport(t *testing.T) { - rawPkt := []byte{ - 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, - 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, - } - - p := &rtp.Packet{} - r1 := newPubRTPTransport("1", "1", "0.0.0.0:6789") - if err := p.Unmarshal(rawPkt); err == nil { - r1.WriteRTP(p) - } else { - fmt.Println("r1.WriteRTP ", err) - } -} diff --git a/pkg/rtc/transport/rtptransport.go b/pkg/rtc/transport/rtptransport.go new file mode 100644 index 000000000..7c8a2eefe --- /dev/null +++ b/pkg/rtc/transport/rtptransport.go @@ -0,0 +1,325 @@ +package transport + +import ( + "crypto/sha1" + "errors" + "net" + "strconv" + "strings" + "sync" + + "github.com/pion/ion/pkg/log" + "github.com/pion/ion/pkg/rtc/rtpengine/muxrtp" + "github.com/pion/ion/pkg/rtc/rtpengine/muxrtp/mux" + "github.com/pion/ion/pkg/util" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/xtaci/kcp-go" + "golang.org/x/crypto/pbkdf2" +) + +const ( + extSentInit = 30 + receiveMTU = 8192 + maxPktSize = 1024 +) + +var ( + errInvalidConn = errors.New("invalid conn") + errInvalidAddr = errors.New("invalid addr") +) + +// RTPTransport .. +type RTPTransport struct { + rtpSession *muxrtp.SessionRTP + rtcpSession *muxrtp.SessionRTCP + rtpEndpoint *mux.Endpoint + rtcpEndpoint *mux.Endpoint + conn net.Conn + mux *mux.Mux + rtpCh chan *rtp.Packet + ssrcPT map[uint32]uint8 + ssrcPTLock sync.RWMutex + stop bool + extSent int + id string + idLock sync.RWMutex + writeErrCnt int + rtcpCh chan rtcp.Packet +} + +// NewRTPTransport create a RTPTransport by net.Conn +func NewRTPTransport(conn net.Conn) *RTPTransport { + if conn == nil { + log.Errorf("NewRTPTransport err=%v", errInvalidConn) + return nil + } + t := &RTPTransport{ + conn: conn, + rtpCh: make(chan *rtp.Packet, maxPktSize), + ssrcPT: make(map[uint32]uint8), + extSent: extSentInit, + rtcpCh: make(chan rtcp.Packet, maxPktSize), + } + config := mux.Config{ + Conn: conn, + BufferSize: receiveMTU, + } + t.mux = mux.NewMux(config) + t.rtpEndpoint = t.newEndpoint(mux.MatchRTP) + t.rtcpEndpoint = t.newEndpoint(mux.MatchRTCP) + var err error + t.rtpSession, err = muxrtp.NewSessionRTP(t.rtpEndpoint) + if err != nil { + log.Errorf(err.Error()) + return nil + } + t.rtcpSession, err = muxrtp.NewSessionRTCP(t.rtcpEndpoint) + if err != nil { + log.Errorf(err.Error()) + return nil + } + t.receiveRTP() + return t +} + +// NewOutRTPTransport new a outgoing RTPTransport +func NewOutRTPTransport(id, addr string) *RTPTransport { + n := strings.Index(addr, ":") + if n == 0 { + log.Errorf("NewOutRTPTransport err=%v", errInvalidAddr) + return nil + } + ip := addr[:n] + port, _ := strconv.Atoi(addr[n+1:]) + + srcAddr := &net.UDPAddr{IP: net.IPv4zero, Port: 0} + dstAddr := &net.UDPAddr{IP: net.ParseIP(ip), Port: port} + conn, err := net.DialUDP("udp", srcAddr, dstAddr) + if err != nil { + log.Errorf(err.Error()) + return nil + } + r := NewRTPTransport(conn) + r.receiveRTCP() + log.Infof("NewOutRTPTransport %s %s", id, addr) + r.idLock.Lock() + defer r.idLock.Unlock() + r.id = id + return r +} + +// NewOutRTPTransportWithKCP new a outgoing RTPTransport by kcp +func NewOutRTPTransportWithKCP(id, addr string, kcpKey, kcpSalt string) *RTPTransport { + key := pbkdf2.Key([]byte(kcpKey), []byte(kcpSalt), 1024, 32, sha1.New) + block, _ := kcp.NewAESBlockCrypt(key) + + // dial to the echo server + conn, err := kcp.DialWithOptions(addr, block, 10, 3) + if err != nil { + log.Errorf("NewOutRTPTransportWithKCP err=%v", err) + } + r := NewRTPTransport(conn) + r.receiveRTCP() + log.Infof("NewOutRTPTransportWithKCP %s %s", id, addr) + r.idLock.Lock() + defer r.idLock.Unlock() + r.id = id + return r +} + +// ID return id +func (r *RTPTransport) ID() string { + r.idLock.RLock() + defer r.idLock.RUnlock() + return r.id +} + +// Type return type of transport +func (r *RTPTransport) Type() int { + return TypeRTPTransport +} + +// Close release all +func (r *RTPTransport) Close() { + if r.stop { + return + } + r.stop = true + r.rtpSession.Close() + r.rtcpSession.Close() + r.rtpEndpoint.Close() + r.rtcpEndpoint.Close() + r.mux.Close() + r.conn.Close() +} + +// newEndpoint registers a new endpoint on the underlying mux. +func (r *RTPTransport) newEndpoint(f mux.MatchFunc) *mux.Endpoint { + return r.mux.NewEndpoint(f) +} + +// ReceiveRTP receive rtp +func (r *RTPTransport) receiveRTP() { + go func() { + for { + readStream, ssrc, err := r.rtpSession.AcceptStream() + if err != nil { + log.Warnf("Failed to accept stream %v ", err) + //for non-blocking ReadRTP() + r.rtpCh <- nil + continue + } + go func() { + rtpBuf := make([]byte, receiveMTU) + for { + if r.stop { + return + } + _, pkt, err := readStream.ReadRTP(rtpBuf) + if err != nil { + log.Warnf("Failed to read rtp %v %d ", err, ssrc) + //for non-blocking ReadRTP() + r.rtpCh <- nil + continue + // return + } + + log.Debugf("RTPTransport.receiveRTP pkt=%v", pkt) + r.idLock.Lock() + if r.id == "" { + r.id = util.GetIDFromRTP(pkt) + } + r.idLock.Unlock() + + r.rtpCh <- pkt + r.ssrcPTLock.Lock() + r.ssrcPT[pkt.Header.SSRC] = pkt.Header.PayloadType + r.ssrcPTLock.Unlock() + // log.Debugf("got RTP: %+v", pkt.Header) + } + }() + } + }() +} + +// ReadRTP read rtp from transport +func (r *RTPTransport) ReadRTP() (*rtp.Packet, error) { + return <-r.rtpCh, nil +} + +// rtp sub receive rtcp +func (r *RTPTransport) receiveRTCP() { + go func() { + for { + readStream, ssrc, err := r.rtcpSession.AcceptStream() + if err != nil { + log.Warnf("Failed to accept RTCP %v ", err) + return + } + + go func() { + rtcpBuf := make([]byte, receiveMTU) + for { + if r.stop { + return + } + rtcps, err := readStream.ReadRTCP(rtcpBuf) + if err != nil { + log.Warnf("Failed to read rtcp %v %d ", err, ssrc) + return + } + log.Debugf("got RTCPs: %+v ", rtcps) + for _, pkt := range rtcps { + switch pkt.(type) { + case *rtcp.PictureLossIndication: + log.Debugf("got pli, not need send key frame!") + case *rtcp.TransportLayerNack: + log.Debugf("rtptransport got nack: %+v", pkt) + r.rtcpCh <- pkt + } + } + } + }() + } + }() +} + +// WriteRTP send rtp packet +func (r *RTPTransport) WriteRTP(rtp *rtp.Packet) error { + log.Debugf("RTPTransport.WriteRTP rtp=%v", rtp) + writeStream, err := r.rtpSession.OpenWriteStream() + if err != nil { + r.writeErrCnt++ + return err + } + + if r.extSent > 0 { + r.idLock.Lock() + util.SetIDToRTP(rtp, r.id) + r.idLock.Unlock() + } + + _, err = writeStream.WriteRTP(&rtp.Header, rtp.Payload) + if err == nil && r.extSent > 0 { + r.extSent-- + } + if err != nil { + log.Errorf(err.Error()) + r.writeErrCnt++ + } + return err +} + +// WriteRawRTCP write rtcp data +func (r *RTPTransport) WriteRawRTCP(data []byte) (int, error) { + writeStream, err := r.rtcpSession.OpenWriteStream() + if err != nil { + return 0, err + } + return writeStream.WriteRawRTCP(data) +} + +// SSRCPT playload type and ssrc +func (r *RTPTransport) SSRCPT() map[uint32]uint8 { + r.ssrcPTLock.RLock() + defer r.ssrcPTLock.RUnlock() + return r.ssrcPT +} + +// WriteRTCP write rtcp +func (r *RTPTransport) WriteRTCP(pkt rtcp.Packet) error { + bin, err := pkt.Marshal() + if err != nil { + return err + } + _, err = r.WriteRawRTCP(bin) + if err != nil { + return err + } + return err +} + +// WriteErrTotal return write error +func (r *RTPTransport) WriteErrTotal() int { + return r.writeErrCnt +} + +// WriteErrReset reset write error +func (r *RTPTransport) WriteErrReset() { + r.writeErrCnt = 0 +} + +// GetRTCPChan return a rtcp channel +func (r *RTPTransport) GetRTCPChan() chan rtcp.Packet { + return r.rtcpCh +} + +// RemoteAddr return remote addr +func (r *RTPTransport) RemoteAddr() net.Addr { + if r.conn == nil { + log.Errorf("RemoteAddr err=%v", errInvalidConn) + return nil + } + return r.conn.RemoteAddr() +} diff --git a/pkg/rtc/transport/rtptransport_test.go b/pkg/rtc/transport/rtptransport_test.go new file mode 100644 index 000000000..0ebbee924 --- /dev/null +++ b/pkg/rtc/transport/rtptransport_test.go @@ -0,0 +1,47 @@ +package transport + +import ( + "testing" + + "github.com/pion/rtp" +) + +func TestNewRTPTransport(t *testing.T) { + rawPkt := []byte{ + 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, + 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, + } + + p := &rtp.Packet{} + rtpTransport := NewOutRTPTransport("awsome", "0.0.0.0:6789") + err := p.Unmarshal(rawPkt) + if err != nil { + t.Fatalf("rtp Unmarshal err=%v", err) + return + } + err = rtpTransport.WriteRTP(p) + if err != nil { + t.Fatalf("rtpTransport.WriteRTP err=%v", err) + return + } +} + +func TestNewRTPTransportKCP(t *testing.T) { + rawPkt := []byte{ + 0x90, 0xe0, 0x69, 0x8f, 0xd9, 0xc2, 0x93, 0xda, 0x1c, 0x64, + 0x27, 0x82, 0x00, 0x01, 0x00, 0x01, 0xFF, 0xFF, 0xFF, 0xFF, 0x98, 0x36, 0xbe, 0x88, 0x9e, + } + + p := &rtp.Packet{} + rtpTransport := NewOutRTPTransportWithKCP("awsome", "0.0.0.0:6789", "a", "b") + err := p.Unmarshal(rawPkt) + if err != nil { + t.Fatalf("rtp Unmarshal err=%v", err) + return + } + err = rtpTransport.WriteRTP(p) + if err != nil { + t.Fatalf("rtpTransport.WriteRTP err=%v", err) + return + } +} diff --git a/pkg/rtc/transport.go b/pkg/rtc/transport/transport.go similarity index 53% rename from pkg/rtc/transport.go rename to pkg/rtc/transport/transport.go index c01c1541e..394b780da 100644 --- a/pkg/rtc/transport.go +++ b/pkg/rtc/transport/transport.go @@ -1,17 +1,27 @@ -package rtc +package transport import ( "github.com/pion/rtcp" "github.com/pion/rtp" ) +// type of transport +const ( + TypeWebRTCTransport = iota + TypeRTPTransport + + TypeUnkown = -1 +) + // Transport is a interface type Transport interface { ID() string + Type() int ReadRTP() (*rtp.Packet, error) WriteRTP(*rtp.Packet) error WriteRTCP(rtcp.Packet) error + GetRTCPChan() chan rtcp.Packet Close() - writeErrTotal() int - writeErrReset() + WriteErrTotal() int + WriteErrReset() } diff --git a/pkg/rtc/transport/util.go b/pkg/rtc/transport/util.go new file mode 100644 index 000000000..937167e7e --- /dev/null +++ b/pkg/rtc/transport/util.go @@ -0,0 +1,33 @@ +package transport + +import ( + "strings" +) + +// KvOK check flag and value is "true" +func KvOK(m map[string]interface{}, k, v string) bool { + str := "" + val, ok := m[k] + if ok { + str, ok = val.(string) + if ok { + if strings.EqualFold(str, v) { + return true + } + } + } + return false +} + +// ValUpper get upper string by key +func ValUpper(m map[string]interface{}, k string) string { + str := "" + val, ok := m[k] + if ok { + str, ok = val.(string) + if ok { + return strings.ToUpper(str) + } + } + return "" +} diff --git a/pkg/rtc/transport/util_test.go b/pkg/rtc/transport/util_test.go new file mode 100644 index 000000000..bd3bbfa8b --- /dev/null +++ b/pkg/rtc/transport/util_test.go @@ -0,0 +1,29 @@ +package transport + +import ( + "testing" +) + +func TestKvOK(t *testing.T) { + m := make(map[string]interface{}) + m["abc"] = "true" + if !KvOK(m, "abc", "true") { + t.Fatal("flag is not true!") + } + m["abc"] = 1 + if KvOK(m, "abc", "true") { + t.Fatal("flag is not true!") + } +} + +func TestValUpper(t *testing.T) { + m := make(map[string]interface{}) + m["abc"] = "true" + if ValUpper(m, "abc") != "TRUE" { + t.Fatal("val is not true!") + } + m["abc"] = 1 + if ValUpper(m, "abc") != "" { + t.Fatal("ValUpper error!") + } +} diff --git a/pkg/rtc/transport/webrtctransport.go b/pkg/rtc/transport/webrtctransport.go new file mode 100644 index 000000000..24fb46f4d --- /dev/null +++ b/pkg/rtc/transport/webrtctransport.go @@ -0,0 +1,417 @@ +package transport + +import ( + "errors" + "io" + + "sync" + + "github.com/pion/ion/pkg/log" + "github.com/pion/rtcp" + "github.com/pion/rtp" + "github.com/pion/webrtc/v2" +) + +const ( + maxChanSize = 100 +) + +var ( + // only support unified plan + cfg = webrtc.Configuration{ + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } + + setting webrtc.SettingEngine + + errChanClosed = errors.New("channel closed") + errInvalidTrack = errors.New("track is nil") + errInvalidPacket = errors.New("packet is nil") + errInvalidPC = errors.New("pc is nil") + errInvalidOptions = errors.New("invalid options") +) + +// InitWebRTC init WebRTCTransport setting +func InitWebRTC(ices []string, trickICE bool) { + cfg.ICEServers = []webrtc.ICEServer{ + { + URLs: ices, + }, + } + setting.SetTrickle(trickICE) +} + +// WebRTCTransport contains pc incoming and outgoing tracks +type WebRTCTransport struct { + mediaEngine webrtc.MediaEngine + api *webrtc.API + id string + pc *webrtc.PeerConnection + outTracks map[uint32]*webrtc.Track + outTrackLock sync.RWMutex + inTracks map[uint32]*webrtc.Track + inTrackLock sync.RWMutex + writeErrCnt int + + rtpCh chan *rtp.Packet + rtcpCh chan rtcp.Packet + stop bool + pendingCandidates []*webrtc.ICECandidate + candidateLock sync.RWMutex + candidateCh chan *webrtc.ICECandidate + alive bool +} + +func (w *WebRTCTransport) init(options map[string]interface{}) error { + w.mediaEngine = webrtc.MediaEngine{} + w.mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) + + rtcpfb := []webrtc.RTCPFeedback{ + webrtc.RTCPFeedback{ + Type: webrtc.TypeRTCPFBTransportCC, + }, + } + tcc := KvOK(options, "transport-cc", "true") + dc := KvOK(options, "data-channel", "true") + codec := ValUpper(options, "codec") + + if codec == webrtc.H264 && tcc { + w.mediaEngine.RegisterCodec(webrtc.NewRTPH264CodecExt(webrtc.DefaultPayloadTypeH264, 90000, rtcpfb)) + } else if codec == webrtc.VP8 && tcc { + w.mediaEngine.RegisterCodec(webrtc.NewRTPVP8CodecExt(webrtc.DefaultPayloadTypeVP8, 90000, rtcpfb)) + } else if codec == webrtc.VP8 { + w.mediaEngine.RegisterCodec(webrtc.NewRTPVP8Codec(webrtc.DefaultPayloadTypeVP8, 90000)) + } else if codec == webrtc.VP9 { + w.mediaEngine.RegisterCodec(webrtc.NewRTPVP9Codec(webrtc.DefaultPayloadTypeVP9, 90000)) + } else { + w.mediaEngine.RegisterCodec(webrtc.NewRTPH264Codec(webrtc.DefaultPayloadTypeH264, 90000)) + } + + if !dc { + setting.DetachDataChannels() + } + w.api = webrtc.NewAPI(webrtc.WithMediaEngine(w.mediaEngine), webrtc.WithSettingEngine(setting)) + return nil +} + +// NewWebRTCTransport create a WebRTCTransport +// options: +// "video" = webrtc.H264[default] webrtc.VP8 webrtc.VP9 +// "audio" = webrtc.Opus[default] webrtc.PCMA webrtc.PCMU webrtc.G722 +// "transport-cc" = "true" or "false"[default] +// "data-channel" = "true" or "false"[default] +func NewWebRTCTransport(id string, options map[string]interface{}) *WebRTCTransport { + w := &WebRTCTransport{ + id: id, + outTracks: make(map[uint32]*webrtc.Track), + inTracks: make(map[uint32]*webrtc.Track), + rtpCh: make(chan *rtp.Packet, maxChanSize), + rtcpCh: make(chan rtcp.Packet, maxChanSize), + candidateCh: make(chan *webrtc.ICECandidate, maxChanSize), + alive: true, + } + err := w.init(options) + if err != nil { + log.Errorf("NewWebRTCTransport init %v", err) + return nil + } + + w.pc, err = w.api.NewPeerConnection(cfg) + if err != nil { + log.Errorf("NewWebRTCTransport api.NewPeerConnection %v", err) + return nil + } + + _, err = w.pc.AddTransceiver(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendrecv}) + if err != nil { + log.Errorf("w.pc.AddTransceiver video %v", err) + return nil + } + + _, err = w.pc.AddTransceiver(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendrecv}) + if err != nil { + log.Errorf("w.pc.AddTransceiver audio %v", err) + return nil + } + + w.pc.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + + remoteSDP := w.pc.RemoteDescription() + if remoteSDP == nil { + w.candidateLock.Lock() + defer w.candidateLock.Unlock() + w.pendingCandidates = append(w.pendingCandidates, c) + } else { + w.candidateCh <- c + } + }) + + w.pc.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { + if connectionState == webrtc.ICEConnectionStateDisconnected { + log.Errorf("webrtc ice disconnected") + w.alive = false + } + }) + + return w +} + +// ID return id +func (w *WebRTCTransport) ID() string { + return w.id +} + +// Type return type of transport +func (w *WebRTCTransport) Type() int { + return TypeWebRTCTransport +} + +// Offer return a offer +func (w *WebRTCTransport) Offer() (webrtc.SessionDescription, error) { + if w.pc == nil { + return webrtc.SessionDescription{}, errInvalidPC + } + offer, err := w.pc.CreateOffer(nil) + if err != nil { + return webrtc.SessionDescription{}, err + } + err = w.pc.SetLocalDescription(offer) + if err != nil { + return webrtc.SessionDescription{}, err + } + return offer, nil +} + +// SetRemoteSDP after Offer() +func (w *WebRTCTransport) SetRemoteSDP(sdp webrtc.SessionDescription) error { + if w.pc == nil { + return errInvalidPC + } + return w.pc.SetRemoteDescription(sdp) +} + +// AddTrack add track to pc +func (w *WebRTCTransport) AddTrack(ssrc uint32, pt uint8) (*webrtc.Track, error) { + if w.pc == nil { + return nil, errInvalidPC + } + track, err := w.pc.NewTrack(pt, ssrc, "pion", "pion") + if err != nil { + return nil, err + } + if _, err = w.pc.AddTrack(track); err != nil { + return nil, err + } + + w.outTrackLock.Lock() + w.outTracks[ssrc] = track + w.outTrackLock.Unlock() + return track, nil +} + +// AddCandidate add candidate to pc +func (w *WebRTCTransport) AddCandidate(candidate string) error { + if w.pc == nil { + return errInvalidPC + } + + err := w.pc.AddICECandidate(webrtc.ICECandidateInit{Candidate: string(candidate)}) + if err != nil { + return err + } + return nil +} + +// Answer answer to pub or sub +func (w *WebRTCTransport) Answer(offer webrtc.SessionDescription, options map[string]interface{}) (webrtc.SessionDescription, error) { + isPub := KvOK(options, "publish", "true") + if isPub { + w.pc.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { + w.inTrackLock.Lock() + w.inTracks[remoteTrack.SSRC()] = remoteTrack + w.inTrackLock.Unlock() + // TODO replace with broadcast when receiving rtp failed + // etcdKeepFunc(remoteTrack.SSRC(), remoteTrack.PayloadType()) + w.receiveInTrackRTP(remoteTrack) + }) + } else { + ssrcPT, _ := options["ssrcpt"] + if ssrcPT == nil { + return webrtc.SessionDescription{}, errInvalidOptions + } + ssrcPTMap, _ := ssrcPT.(map[uint32]uint8) + if len(ssrcPTMap) == 0 { + return webrtc.SessionDescription{}, errInvalidOptions + } + + for ssrc, pt := range ssrcPTMap { + track, _ := w.pc.NewTrack(pt, ssrc, "pion", "pion") + if track != nil { + w.pc.AddTrack(track) + w.outTrackLock.Lock() + w.outTracks[ssrc] = track + w.outTrackLock.Unlock() + } + } + w.receiveOutTrackRTCP() + } + + err := w.pc.SetRemoteDescription(offer) + if err != nil { + log.Errorf("pc.SetRemoteDescription %v", err) + return webrtc.SessionDescription{}, err + } + + answer, err := w.pc.CreateAnswer(nil) + if err != nil { + log.Errorf("pc.CreateAnswer answer=%v err=%v", answer, err) + return webrtc.SessionDescription{}, err + } + + err = w.pc.SetLocalDescription(answer) + if err != nil { + log.Errorf("pc.SetLocalDescription answer=%v err=%v", answer, err) + } + go func() { + w.candidateLock.Lock() + defer w.candidateLock.Unlock() + for _, candidate := range w.pendingCandidates { + w.candidateCh <- candidate + } + w.pendingCandidates = nil + }() + return answer, err +} + +// receiveInTrackRTP receive all incoming tracks' rtp and sent to one channel +func (w *WebRTCTransport) receiveInTrackRTP(remoteTrack *webrtc.Track) { + for { + if w.stop { + return + } + + rtp, err := remoteTrack.ReadRTP() + if err != nil { + if err == io.EOF { + return + } + log.Errorf("rtp err => %v", err) + } + w.rtpCh <- rtp + } +} + +// ReadRTP read rtp packet +func (w *WebRTCTransport) ReadRTP() (*rtp.Packet, error) { + rtp, ok := <-w.rtpCh + if !ok { + return nil, errChanClosed + } + return rtp, nil +} + +// WriteRTP send rtp packet to outgoing tracks +func (w *WebRTCTransport) WriteRTP(pkt *rtp.Packet) error { + if pkt == nil { + return errInvalidPacket + } + w.outTrackLock.RLock() + track := w.outTracks[pkt.SSRC] + w.outTrackLock.RUnlock() + + if track == nil { + log.Errorf("WebRTCTransport.WriteRTP track==nil pkt.SSRC=%d", pkt.SSRC) + return errInvalidTrack + } + + log.Debugf("WebRTCTransport.WriteRTP pkt=%v", pkt) + err := track.WriteRTP(pkt) + if err != nil { + log.Errorf(err.Error()) + w.writeErrCnt++ + return err + } + return nil +} + +// Close all +func (w *WebRTCTransport) Close() { + if w.stop { + return + } + log.Infof("WebRTCTransport.Close t.ID()=%v", w.ID()) + // close pc first, otherwise remoteTrack.ReadRTP will be blocked + w.pc.Close() + w.stop = true +} + +// receive rtcp from outgoing tracks +func (w *WebRTCTransport) receiveOutTrackRTCP() { + go func() { + for _, sender := range w.pc.GetSenders() { + for { + if w.stop { + return + } + + pkts, err := sender.ReadRTCP() + if err != nil { + if err == io.EOF { + return + } + log.Errorf("rtcp err => %v", err) + } + + for _, pkt := range pkts { + w.rtcpCh <- pkt + } + } + + } + }() +} + +// GetInTracks return incoming tracks +func (w *WebRTCTransport) GetInTracks() map[uint32]*webrtc.Track { + w.inTrackLock.RLock() + defer w.inTrackLock.RUnlock() + return w.inTracks +} + +// GetOutTracks return incoming tracks +func (w *WebRTCTransport) GetOutTracks() map[uint32]*webrtc.Track { + w.outTrackLock.RLock() + defer w.outTrackLock.RUnlock() + return w.outTracks +} + +// WriteRTCP write rtcp packet to pc +func (w *WebRTCTransport) WriteRTCP(pkt rtcp.Packet) error { + if w.pc == nil { + return errInvalidPC + } + return w.pc.WriteRTCP([]rtcp.Packet{pkt}) +} + +// WriteErrTotal return write error +func (w *WebRTCTransport) WriteErrTotal() int { + return w.writeErrCnt +} + +// WriteErrReset reset write error +func (w *WebRTCTransport) WriteErrReset() { + w.writeErrCnt = 0 +} + +// GetRTCPChan return a rtcp channel +func (w *WebRTCTransport) GetRTCPChan() chan rtcp.Packet { + return w.rtcpCh +} + +// GetCandidateChan return a candidate channel +func (w *WebRTCTransport) GetCandidateChan() chan *webrtc.ICECandidate { + return w.candidateCh +} diff --git a/pkg/rtc/transport/webrtctransport_test.go b/pkg/rtc/transport/webrtctransport_test.go new file mode 100644 index 000000000..09dec28c1 --- /dev/null +++ b/pkg/rtc/transport/webrtctransport_test.go @@ -0,0 +1,45 @@ +package transport + +import ( + "testing" + + "github.com/pion/webrtc/v2" +) + +func TestWebRTCTransportOffer(t *testing.T) { + options := make(map[string]interface{}) + options["codec"] = "h264" + options["transport-cc"] = "" + pub := NewWebRTCTransport("pub", options) + _, err := pub.Offer() + if err != nil { + t.Fatalf("err=%v", err) + } +} + +func TestWebRTCTransportAnswer(t *testing.T) { + options := make(map[string]interface{}) + options["codec"] = "h264" + options["transport-cc"] = "" + pub := NewWebRTCTransport("pub", options) + offer, err := pub.Offer() + if err != nil { + t.Fatalf("err=%v", err) + } + + _, err = pub.AddTrack(12345, webrtc.DefaultPayloadTypeH264) + if err != nil { + t.Fatalf("err=%v", err) + } + + sub := NewWebRTCTransport("sub", options) + options["subscribe"] = "" + options["ssrcpt"] = make(map[uint32]uint8) + for ssrc, track := range pub.GetOutTracks() { + options["ssrcpt"].(map[uint32]uint8)[ssrc] = track.PayloadType() + } + answer, err := sub.Answer(offer, options) + if err != nil { + t.Fatalf("err=%v answer=%v", err, answer) + } +} diff --git a/pkg/rtc/webrtctransport.go b/pkg/rtc/webrtctransport.go deleted file mode 100644 index d77d16e37..000000000 --- a/pkg/rtc/webrtctransport.go +++ /dev/null @@ -1,343 +0,0 @@ -package rtc - -import ( - "errors" - "io" - "strings" - - "sync" - - "github.com/pion/ion/pkg/log" - "github.com/pion/rtcp" - "github.com/pion/rtp" - "github.com/pion/webrtc/v2" -) - -var ( - cfg webrtc.Configuration - - errChanClosed = errors.New("channel closed") - errInvalidTrack = errors.New("track is nil") - errInvalidPacket = errors.New("packet is nil") - errInvalidPC = errors.New("pc is nil") -) - -func initICE(ices []string) { - cfg = webrtc.Configuration{ - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - ICEServers: []webrtc.ICEServer{ - { - URLs: ices, - }, - }, - } -} - -// WebRTCTransport .. -type WebRTCTransport struct { - id string - pc *webrtc.PeerConnection - track map[uint32]*webrtc.Track - trackLock sync.RWMutex - stop bool - rtpCh chan *rtp.Packet - ssrcPT map[uint32]uint8 - ssrcPTLock sync.RWMutex - byteRate uint64 - hasVideo bool - hasAudio bool - hasScreen bool - writeErrCnt int -} - -func newWebRTCTransport(id string) *WebRTCTransport { - w := &WebRTCTransport{ - id: id, - track: make(map[uint32]*webrtc.Track), - rtpCh: make(chan *rtp.Packet, 1000), - ssrcPT: make(map[uint32]uint8), - } - - return w -} - -// ID return id -func (t *WebRTCTransport) ID() string { - return t.id -} - -// AnswerPublish answer to pub -func (t *WebRTCTransport) AnswerPublish(rid string, offer webrtc.SessionDescription, options map[string]interface{}, fn func(ssrc uint32, pt uint8)) (answer webrtc.SessionDescription, err error) { - if options == nil { - return webrtc.SessionDescription{}, errors.New("invalid options") - } - mediaEngine := webrtc.MediaEngine{} - mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) - - // only register one video codec which client need - rtcpfb := []webrtc.RTCPFeedback{ - webrtc.RTCPFeedback{ - Type: webrtc.TypeRTCPFBTransportCC, - }, - } - if codec, ok := options["codec"]; ok { - codecStr := codec.(string) - if strings.EqualFold(codecStr, "h264") { - mediaEngine.RegisterCodec(webrtc.NewRTPH264CodecExt(webrtc.DefaultPayloadTypeH264, 90000, rtcpfb)) - } else if strings.EqualFold(codecStr, "vp9") { - mediaEngine.RegisterCodec(webrtc.NewRTPVP9Codec(webrtc.DefaultPayloadTypeVP9, 90000)) - } else { - // mediaEngine.RegisterCodec(webrtc.NewRTPVP8CodecExt(webrtc.DefaultPayloadTypeVP8, 90000, rtcpfb)) - mediaEngine.RegisterCodec(webrtc.NewRTPVP8Codec(webrtc.DefaultPayloadTypeVP8, 90000)) - } - } - - //check video audio screen - if v, ok := options["video"].(bool); ok { - t.hasVideo = v - } - if a, ok := options["audio"].(bool); ok { - t.hasAudio = a - } - if s, ok := options["screen"].(bool); ok { - t.hasScreen = s - } - - api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) - t.pc, err = api.NewPeerConnection(cfg) - if err != nil { - log.Errorf("api.NewPeerConnection %v", err) - return webrtc.SessionDescription{}, err - } - - // Allow us to receive 1 video track - _, err = t.pc.AddTransceiver(webrtc.RTPCodecTypeVideo, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) - if err != nil { - log.Errorf("pc.AddTransceiver video %v", err) - return webrtc.SessionDescription{}, err - } - - // Allow us to receive 1 audio track - _, err = t.pc.AddTransceiver(webrtc.RTPCodecTypeAudio, webrtc.RtpTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}) - if err != nil { - log.Errorf("pc.AddTransceiver audio %v", err) - return webrtc.SessionDescription{}, err - } - - t.pc.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { - t.ssrcPTLock.Lock() - t.ssrcPT[remoteTrack.SSRC()] = remoteTrack.PayloadType() - t.ssrcPTLock.Unlock() - fn(remoteTrack.SSRC(), remoteTrack.PayloadType()) - t.receiveRTP(remoteTrack) - }) - - err = t.pc.SetRemoteDescription(offer) - if err != nil { - log.Errorf("pc.SetRemoteDescription %v", err) - return webrtc.SessionDescription{}, err - } - - answer, err = t.pc.CreateAnswer(nil) - if err != nil { - log.Errorf("SetLocalDescription answer=%v err=%v", answer, err) - } - err = t.pc.SetLocalDescription(answer) - //TODO recently not use, fix panic? - // t.pubReceiveRTCP() - - if err != nil { - log.Errorf("SetLocalDescription answer=%v err=%v", answer, err) - } - return answer, err -} - -// AnswerSubscribe answer to sub -func (t *WebRTCTransport) AnswerSubscribe(offer webrtc.SessionDescription, ssrcPT map[uint32]uint8, mid string) (answer webrtc.SessionDescription, err error) { - mediaEngine := webrtc.MediaEngine{} - mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) - - rtcpfb := []webrtc.RTCPFeedback{ - webrtc.RTCPFeedback{ - Type: webrtc.TypeRTCPFBTransportCC, - }, - } - // mediaEngine.RegisterCodec(webrtc.NewRTPH264CodecExt(webrtc.DefaultPayloadTypeH264, 90000, rtcpfb)) - // mediaEngine.RegisterCodec(webrtc.NewRTPVP9Codec(webrtc.DefaultPayloadTypeVP9, 90000)) - mediaEngine.RegisterCodec(webrtc.NewRTPVP8CodecExt(webrtc.DefaultPayloadTypeVP8, 90000, rtcpfb)) - - api := webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) - t.pc, err = api.NewPeerConnection(cfg) - if err != nil { - return webrtc.SessionDescription{}, err - } - - var track *webrtc.Track - for ssrc, pt := range ssrcPT { - if pt == webrtc.DefaultPayloadTypeVP8 || - pt == webrtc.DefaultPayloadTypeVP9 || - pt == webrtc.DefaultPayloadTypeH264 { - track, _ = t.pc.NewTrack(pt, ssrc, "video", "pion") - } else { - track, _ = t.pc.NewTrack(pt, ssrc, "audio", "pion") - } - if track != nil { - t.pc.AddTrack(track) - t.trackLock.Lock() - t.track[ssrc] = track - t.trackLock.Unlock() - } - } - - err = t.pc.SetRemoteDescription(offer) - if err != nil { - return webrtc.SessionDescription{}, err - } - - answer, err = t.pc.CreateAnswer(nil) - err = t.pc.SetLocalDescription(answer) - t.subReadRTCP(mid) - return answer, err -} - -func (t *WebRTCTransport) receiveRTP(remoteTrack *webrtc.Track) { - for { - if t.stop { - return - } - - rtp, err := remoteTrack.ReadRTP() - if err != nil { - if err == io.EOF { - return - } - log.Errorf("rtp err => %v", err) - } - t.rtpCh <- rtp - } -} - -// ReadRTP read rtp packet -func (t *WebRTCTransport) ReadRTP() (*rtp.Packet, error) { - rtp, ok := <-t.rtpCh - if !ok { - return nil, errChanClosed - } - return rtp, nil -} - -// WriteRTP send rtp packet -func (t *WebRTCTransport) WriteRTP(pkt *rtp.Packet) error { - if pkt == nil { - return errInvalidPacket - } - t.trackLock.RLock() - track := t.track[pkt.SSRC] - t.trackLock.RUnlock() - - if track == nil { - log.Errorf("WebRTCTransport.WriteRTP track==nil pkt.SSRC=%d", pkt.SSRC) - return errInvalidTrack - } - - log.Debugf("WebRTCTransport.WriteRTP pkt=%v", pkt) - // log.Infof("rtp.Extension=%t rtp.ExtensionProfile=%x rtp.ExtensionPayload=%x", pkt.Extension, pkt.ExtensionProfile, pkt.ExtensionPayload) - err := track.WriteRTP(pkt) - if err != nil { - log.Errorf(err.Error()) - t.writeErrCnt++ - return err - } - return nil -} - -// Close all -func (t *WebRTCTransport) Close() { - if t.stop { - return - } - log.Infof("WebRTCTransport.Close t.ID()=%v", t.ID()) - // close pc first, otherwise remoteTrack.ReadRTP will be blocked - t.pc.Close() - t.stop = true -} - -func (t *WebRTCTransport) subReadRTCP(mid string) { - senders := t.pc.GetSenders() - for i := 0; i < len(senders); i++ { - go func(i int) { - for { - select { - default: - if t.stop { - return - } - - p := getPipeline(mid) - if p == nil { - continue - } - - pkt, err := senders[i].ReadRTCP() - if err != nil { - if err == io.EOF { - return - } - log.Errorf("rtcp err => %v", err) - } - - for i := 0; i < len(pkt); i++ { - // log.Infof("pkt[i]=%v", pkt[i]) - switch pkt[i].(type) { - case *rtcp.TransportLayerNack: - // log.Infof("WebRTCTransport.subReadRTCP rtcp.TransportLayerNack pkt[i]=%v", pkt[i]) - nack := pkt[i].(*rtcp.TransportLayerNack) - // for _, nackPair := range nack.Nacks { - // sns := nackPair.PacketList() - // for _, sn := range sns { - // if !p.writeRTP(t.id, nack.MediaSSRC, sn) { - // // log.Errorf("p.writePacket failed t.id=%v sn=%v", t.id, sn) - // if pub := p.getPub(); pub != nil { - // n := &rtcp.TransportLayerNack{ - // //origin ssrc - // SenderSSRC: nack.SenderSSRC, - // MediaSSRC: nack.MediaSSRC, - // Nacks: []rtcp.NackPair{rtcp.NackPair{PacketID: sn}}, - // } - // pub.WriteRTCP(n) - // } - // } - // } - // } - p.getPub().WriteRTCP(nack) - default: - p.PushRTCP(pkt[i]) - } - } - } - } - }(i) - } -} - -// SSRCPT get SSRC and PayloadType -func (t *WebRTCTransport) SSRCPT() map[uint32]uint8 { - t.ssrcPTLock.RLock() - defer t.ssrcPTLock.RUnlock() - return t.ssrcPT -} - -func (t *WebRTCTransport) WriteRTCP(pkt rtcp.Packet) error { - if t.pc == nil { - return errInvalidPC - } - return t.pc.WriteRTCP([]rtcp.Packet{pkt}) -} - -func (t *WebRTCTransport) writeErrTotal() int { - return t.writeErrCnt -} - -func (t *WebRTCTransport) writeErrReset() { - t.writeErrCnt = 0 -} diff --git a/scripts/mac/allRestart.sh b/scripts/allRestart.sh similarity index 77% rename from scripts/mac/allRestart.sh rename to scripts/allRestart.sh index 964a04457..661a9e41f 100755 --- a/scripts/mac/allRestart.sh +++ b/scripts/allRestart.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR mkdir -p $APP_DIR/logs @@ -26,8 +26,8 @@ do done # run command -$APP_DIR/scripts/mac/allStop.sh -$APP_DIR/scripts/mac/allStart.sh +$APP_DIR/scripts/allStop.sh +$APP_DIR/scripts/allStart.sh diff --git a/scripts/mac/allStart.sh b/scripts/allStart.sh similarity index 67% rename from scripts/mac/allStart.sh rename to scripts/allStart.sh index e9999b83c..c48dd6dc5 100755 --- a/scripts/mac/allStart.sh +++ b/scripts/allStart.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR mkdir -p $APP_DIR/logs @@ -27,24 +27,22 @@ done # run command echo "------------etcd--------------" -$APP_DIR/scripts/mac/etcdStart.sh +$APP_DIR/scripts/etcdStart.sh -echo "------------redis--------------" -$APP_DIR/scripts/mac/redisStart.sh - -echo "-----------rabbitmq---------------" -$APP_DIR/scripts/mac/mqStart.sh +echo "------------nats-server--------------" +$APP_DIR/scripts/natsStart.sh -sleep 10 +echo "------------redis--------------" +$APP_DIR/scripts/redisStart.sh echo "------------islb--------------" -$APP_DIR/scripts/mac/islbStart.sh +$APP_DIR/scripts/islbStart.sh echo "------------ion--------------" -$APP_DIR/scripts/mac/ionStart.sh +$APP_DIR/scripts/ionStart.sh echo "------------web--------------" -$APP_DIR/scripts/mac/webStart.sh +$APP_DIR/scripts/webStart.sh echo "--------------------------" diff --git a/scripts/mac/allStop.sh b/scripts/allStop.sh similarity index 67% rename from scripts/mac/allStop.sh rename to scripts/allStop.sh index 4eab1c5c8..4d99b2d99 100755 --- a/scripts/mac/allStop.sh +++ b/scripts/allStop.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR mkdir -p $APP_DIR/logs @@ -25,23 +25,24 @@ do esac done -echo "------------web--------------" -$APP_DIR/scripts/mac/webStop.sh +# run command +echo "------------etcd--------------" +$APP_DIR/scripts/etcdStop.sh -echo "------------ion--------------" -$APP_DIR/scripts/mac/ionStop.sh +echo "------------nats-server--------------" +$APP_DIR/scripts/natsStop.sh -echo "------------islb--------------" -$APP_DIR/scripts/mac/islbStop.sh +echo "------------redis--------------" +$APP_DIR/scripts/redisStop.sh -echo "------------etcd--------------" -$APP_DIR/scripts/mac/etcdStop.sh +echo "------------islb--------------" +$APP_DIR/scripts/islbStop.sh -echo "------------redis--------------" -$APP_DIR/scripts/mac/redisStop.sh +echo "------------ion--------------" +$APP_DIR/scripts/ionStop.sh -echo "-----------rabbitmq---------------" -$APP_DIR/scripts/mac/mqStop.sh +echo "------------web--------------" +$APP_DIR/scripts/webStop.sh echo "--------------------------" diff --git a/scripts/build.sh b/scripts/build.sh new file mode 100755 index 000000000..c9fede60e --- /dev/null +++ b/scripts/build.sh @@ -0,0 +1,61 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../; pwd) +OS_TYPE="" +. $APP_DIR/scripts/common +cd $APP_DIR +EXE1=ion +EXE2=islb + +COMMAND1=$APP_DIR/bin/$EXE1 +COMMAND2=$APP_DIR/bin/$EXE2 + + +help() +{ + echo "" + echo "build script" + echo "Usage: ./build.sh [-h]" + echo "" +} + +while getopts "o:h" arg +do + case $arg in + h) + help; + exit 0 + ;; + o) + OS_TYPE=$OPTARG + ;; + ?) + echo "No argument needed. Ignore them all!" + ;; + esac +done + +if [[ "$OS_TYPE" == "Darwin" || "$OS_TYPE" == "mac" || "$OS_TYPE" == "darwin" ]];then + export CGO_ENABLED=1 + export GOOS=darwin +fi + +if [[ "$OS_TYPE" == "Ubuntu" || "$OS_TYPE" =~ "CentOS" || "$OS_TYPE" == "ubuntu" || "$OS_TYPE" =~ "centos" || "$OS_TYPE" =~ "linux" || "$OS_TYPE" =~ "Linux" ]];then + export GOOS=linux +fi + +echo "-------------build ion----------" +echo "go build -o $COMMAND1" +cd $APP_DIR/cmd/ion +go build -o $COMMAND1 + +echo "-------------build islb----------" +echo "go build -o $COMMAND2" +cd $APP_DIR/cmd/islb +go build -o $COMMAND2 + +cd $APP_DIR +echo "------------tar ion-----------" +tar cvzf ion.tar.gz bin/ion configs/ion.toml configs/cert.pem configs/key.pem scripts/ionStart.sh scripts/ionStop.sh + +echo "------------tar islb-----------" +tar cvzf islb.tar.gz bin/islb configs/islb.toml configs/cert.pem configs/key.pem scripts/ionStart.sh scripts/ionStop.sh diff --git a/scripts/centos/allRestart.sh b/scripts/centos/allRestart.sh deleted file mode 100755 index d08d7ecd1..000000000 --- a/scripts/centos/allRestart.sh +++ /dev/null @@ -1,33 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -mkdir -p $APP_DIR/logs - -help() -{ - echo "" - echo "start script" - echo "Usage: ./allRestart.sh [-h]" - echo "" -} - -while getopts "h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -# run command -$APP_DIR/scripts/centos/allStop.sh -$APP_DIR/scripts/centos/allStart.sh - - - diff --git a/scripts/centos/allStart.sh b/scripts/centos/allStart.sh deleted file mode 100755 index b235ced77..000000000 --- a/scripts/centos/allStart.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -mkdir -p $APP_DIR/logs - -help() -{ - echo "" - echo "start script" - echo "Usage: ./allRestart.sh [-h]" - echo "" -} - -while getopts "h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -# run command -echo "------------etcd--------------" -$APP_DIR/scripts/centos/etcdStart.sh - -echo "------------redis--------------" -$APP_DIR/scripts/centos/redisStart.sh - -echo "-----------rabbitmq---------------" -$APP_DIR/scripts/centos/mqStart.sh - -echo "------------islb--------------" -$APP_DIR/scripts/centos/islbStart.sh - -echo "------------ion--------------" -$APP_DIR/scripts/centos/ionStart.sh - - -echo "------------web--------------" -$APP_DIR/scripts/centos/webStart.sh -echo "--------------------------" - - - diff --git a/scripts/centos/allStop.sh b/scripts/centos/allStop.sh deleted file mode 100755 index 94fbaa9bc..000000000 --- a/scripts/centos/allStop.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -mkdir -p $APP_DIR/logs - -help() -{ - echo "" - echo "start script" - echo "Usage: ./allRestart.sh [-h]" - echo "" -} - -while getopts "h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -# run command -echo "------------etcd--------------" -$APP_DIR/scripts/centos/etcdStop.sh - -echo "------------redis--------------" -$APP_DIR/scripts/centos/redisStop.sh - -echo "-----------rabbitmq---------------" -$APP_DIR/scripts/centos/mqStop.sh - -echo "------------islb--------------" -$APP_DIR/scripts/centos/islbStop.sh - -echo "------------ion--------------" -$APP_DIR/scripts/centos/ionStop.sh - - -echo "------------web--------------" -$APP_DIR/scripts/centos/webStop.sh -echo "--------------------------" - - - diff --git a/scripts/centos/build.sh b/scripts/centos/build.sh deleted file mode 100755 index 6b893e130..000000000 --- a/scripts/centos/build.sh +++ /dev/null @@ -1,105 +0,0 @@ -#!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -EXE1=ion -EXE2=islb - -COMMAND1=$APP_DIR/bin/$EXE1 -COMMAND2=$APP_DIR/bin/$EXE2 - - -if [ -f /etc/os-release ]; then - # freedesktop.org and systemd - . /etc/os-release - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$NAME - unset NAME - OS_VER=$VERSION_ID -elif type lsb_release >/dev/null 2>&1; then - # linuxbase.org - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$(lsb_release -si) - OS_VER=$(lsb_release -sr) -elif [ -f /etc/lsb-release ]; then - # For some versions of Debian/Ubuntu without lsb_release command - . /etc/lsb-release - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$DISTRIB_ID - OS_VER=$DISTRIB_RELEASE -elif [ -f /etc/debian_version ]; then - # Older Debian/Ubuntu/etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=Debian - OS_VER=$(cat /etc/debian_version) -elif [ -f /etc/SuSe-release ]; then - # Older SuSE/etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - ... -elif [ -f /etc/redhat-release ]; then - # Older Red Hat, CentOS, etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - ... -else - # Fall back to uname, e.g. "Linux ", also works for BSD, etc. - OS_TYPE=$(uname -s) - OS_VER=$(uname -r) - CPU=`sysctl -n machdep.cpu.thread_count` - MEM=`sysctl -n hw.memsize` -fi - - -help() -{ - echo "" - echo "build script" - echo "Usage: ./build.sh [-h]" - echo "" -} - -while getopts "o:h" arg -do - case $arg in - h) - help; - exit 0 - ;; - o) - OS_TYPE=$OPTARG - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -if [[ "$OS_TYPE" == "Darwin" || "$OS_TYPE" == "mac" || "$OS_TYPE" == "darwin" ]];then - export CGO_ENABLED=1 - export GOOS=darwin -fi - -if [[ "$OS_TYPE" == "Ubuntu" || "$OS_TYPE" =~ "CentOS" || "$OS_TYPE" == "ubuntu" || "$OS_TYPE" =~ "centos" || "$OS_TYPE" =~ "linux" || "$OS_TYPE" =~ "Linux" ]];then - export GOOS=linux -fi - -echo "-------------build ion----------" -echo "go build -o $COMMAND1" -cd $APP_DIR/cmd/ion -go build -o $COMMAND1 - -echo "-------------build islb----------" -echo "go build -o $COMMAND2" -cd $APP_DIR/cmd/islb -go build -o $COMMAND2 - -cd $APP_DIR -echo "------------tar ion-----------" -tar cvzf ion.tar.gz bin/ion configs/ion.toml configs/cert.pem configs/key.pem scripts/centos/ionStart.sh scripts/centos/ionStop.sh - -echo "------------tar islb-----------" -tar cvzf islb.tar.gz bin/islb configs/islb.toml configs/cert.pem configs/key.pem scripts/centos/ionStart.sh scripts/centos/ionStop.sh diff --git a/scripts/centos/etcdStart.sh b/scripts/centos/etcdStart.sh deleted file mode 100755 index 5dc69a0c3..000000000 --- a/scripts/centos/etcdStart.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -sudo systemctl start etcd -echo "start etcd ok!" diff --git a/scripts/centos/etcdStop.sh b/scripts/centos/etcdStop.sh deleted file mode 100755 index ab3ba48b4..000000000 --- a/scripts/centos/etcdStop.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -sudo systemctl stop etcd -echo "stop etcd ok!" - diff --git a/scripts/centos/installDeps.sh b/scripts/centos/installDeps.sh deleted file mode 100755 index 76fe0d776..000000000 --- a/scripts/centos/installDeps.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) - -sudo yum install epel-release -sudo yum install -y etcd redis rabbitmq-server nodejs -cd $APP_DIR/sdk/js -npm i diff --git a/scripts/centos/ionStart.sh b/scripts/centos/ionStart.sh deleted file mode 100755 index 3e114e582..000000000 --- a/scripts/centos/ionStart.sh +++ /dev/null @@ -1,62 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -mkdir -p $APP_DIR/logs -EXE=ion -COMMAND=$APP_DIR/bin/$EXE -CONFIG=$APP_DIR/configs/ion.toml -PID_FILE=$APP_DIR/configs/ion.pid -LOG_FILE=$APP_DIR/logs/ion.log - -help() -{ - echo "" - echo "start script" - echo "Usage: ./ionStart.sh [-h]" - echo "" -} - -while getopts "h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - - -count=`ps -ef |grep " $COMMAND " |grep -v "grep" |wc -l` -if [ 0 != $count ];then - ps aux | grep " $COMMAND " | grep -v "grep" - echo "already start" - exit 1; -fi - -if [ ! -r $CONFIG ]; then - echo "$CONFIG not exsist" - exit 1; -fi - -## build first -cd $APP_DIR/cmd/ion -go build -o $COMMAND -cd $APP_DIR - -## run command -echo "nohup $COMMAND -c $CONFIG >>$LOG_FILE 2>&1 &" -nohup $COMMAND -c $CONFIG >>$LOG_FILE 2>&1 & -pid=$! -echo "$pid" > $PID_FILE -rpid=`ps aux | grep $pid |grep -v "grep" | awk '{print $2}'` -if [[ $pid != $rpid ]];then - echo "start failly." - rm $PID_FILE - exit 1 -fi - diff --git a/scripts/centos/islbStop.sh b/scripts/centos/islbStop.sh deleted file mode 100755 index b62f19a87..000000000 --- a/scripts/centos/islbStop.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR - -PID_FILE=$APP_DIR/configs/islb.pid #pid file, default: worker.pid - -help() -{ - echo "" - echo "stop script" - echo "Usage:./islbStop.sh [-h]" - echo "" -} - -while getopts "p:h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -echo "stop process..." -PID=`cat $PID_FILE` -if [ ! -n "$PID" ]; then - echo "pid not exist" - exit 1; -fi -SUB_PIDS=`pgrep -P $PID` -if [ -n "$SUB_PIDS" ]; then - GRANDSON_PIDS=`pgrep -P $SUB_PIDS` -fi - -echo "kill $PID $SUB_PIDS $GRANDSON_PIDS" -kill $PID $SUB_PIDS $GRANDSON_PIDS -rm -rf $PID_FILE -echo "finish stop process..." - diff --git a/scripts/centos/mqStart.sh b/scripts/centos/mqStart.sh deleted file mode 100755 index 264eba2e8..000000000 --- a/scripts/centos/mqStart.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -sudo systemctl start rabbitmq-server -echo "start rabbitmq-server ok!" diff --git a/scripts/centos/mqStop.sh b/scripts/centos/mqStop.sh deleted file mode 100755 index e54029f22..000000000 --- a/scripts/centos/mqStop.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -sudo systemctl stop rabbitmq-server -echo "stop rabbitmq-server ok!" - diff --git a/scripts/centos/redisStart.sh b/scripts/centos/redisStart.sh deleted file mode 100755 index 2faa24347..000000000 --- a/scripts/centos/redisStart.sh +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/bash - -sudo systemctl start redis -echo "start redis ok!" diff --git a/scripts/centos/redisStop.sh b/scripts/centos/redisStop.sh deleted file mode 100755 index 106905215..000000000 --- a/scripts/centos/redisStop.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -sudo systemctl stop redis -echo "stop redis ok!" - diff --git a/scripts/common b/scripts/common new file mode 100755 index 000000000..d6546d1d2 --- /dev/null +++ b/scripts/common @@ -0,0 +1,172 @@ +#!/bin/bash +#adwpc + +CPU='' +MEM='' +OS_TYPE='' +OS_VER='' +CUR_DIR=$(cd `dirname $0`; pwd) +FNAME=`basename $0` +LOG="$CUR_DIR/$FNAME.log" +ERR="$CUR_DIR/$FNAME.err" + +#check os +if [ -f /etc/os-release ]; then + # freedesktop.org and systemd + . /etc/os-release + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + OS_TYPE=$NAME + unset NAME + OS_VER=$VERSION_ID +elif type lsb_release >/dev/null 2>&1; then + # linuxbase.org + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + OS_TYPE=$(lsb_release -si) + OS_VER=$(lsb_release -sr) +elif [ -f /etc/lsb-release ]; then + # For some versions of Debian/Ubuntu without lsb_release command + . /etc/lsb-release + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + OS_TYPE=$DISTRIB_ID + OS_VER=$DISTRIB_RELEASE +elif [ -f /etc/debian_version ]; then + # Older Debian/Ubuntu/etc. + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + OS_TYPE=Debian + OS_VER=$(cat /etc/debian_version) +elif [ -f /etc/SuSe-release ]; then + # Older SuSE/etc. + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + ... +elif [ -f /etc/redhat-release ]; then + # Older Red Hat, CentOS, etc. + CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` + MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` + ... +else + # Fall back to uname, e.g. "Linux ", also works for BSD, etc. + OS_TYPE=$(uname -s) + OS_VER=$(uname -r) + CPU=`sysctl -n machdep.cpu.thread_count` + MEM=`sysctl -n hw.memsize` +fi + + +#check if cmd exist +function exist() { + # $1 --help > /dev/null + type "$1" > /dev/null 2>&1 + if [ "$?" -eq 0 ] ; then + return 0 + else + return 1 + fi +} + +#echol LOGLEVEL ... +function echol() +{ + local mode="\033[0m" + case "$1" in + INFO) mode="\033[34;1m";;#bule + USER) mode="\033[32;1m";;#green + WARN) mode="\033[33;1m";;#yellow + ERROR) mode="\033[31;1m";;#red + *) mode="\033[35;1m";;#pink + esac + echo -e "$mode$@\033[0m" + echo -e "$@" >> "$LOG" +} + + +#run cmd {params...} +function run() +{ + echol "$@" + eval $@ 1>>"$LOG" 2>>"$ERR" + local ret=$? + if [[ $ret -ne 0 ]];then + eval $@ 1>>"$LOG" 2>>"$ERR" + if [[ $ret -eq 2 ]];then + #e.g. make distclean fail return 2 + echol WARN "warning:$@, ret=$ret" + else + echol ERROR "failed:$@, ret=$ret" + fi + # exit -3 + fi +} + +#mv to tmp +function saferm() +{ + # local name=`echo "$1" | awk -F'/' '{print $NF}' | awk -F'.' '{print $1}'` + local name="${1%/}" + name="${name##*/}" + mv $1 "/tmp/$name`date +%Y%m%d%H%M%S`" > /dev/null 2>&1 +} + + +#download url +#dl url {rename} +function wgetdl() +{ + local file="${1##*/}" + local rename="$2" + echol "$FUNCNAME:$@" + if [ ! -f "$file" ];then + rm -fr "$file" + if [ "$rename" = "" ];then + run wget --no-verbose -c "$1" > /dev/null + else + run wget --no-verbose -c -O "$2" "$1" > /dev/null + fi + fi + echol "success:$@" +} + + +#download repo to yum.repos.d +function dlrepo() +{ + cd /etc/yum.repos.d + run sudo wget --no-verbose -c "$1" + cd - +} + +function rmrepo() { + cd /etc/yum.repos.d + run sudo rm "$1" + cd - +} + +#unzip file +#uz file +function uz() +{ + echol "$@" + local ftype=`file "$1"` # Note ' and ` is different + case "$ftype" in + "$1: Zip archive"*) + run unzip "$1" > /dev/null;; + "$1: gzip compressed"*) + run tar zxvf "$1" > /dev/null;; + "$1: bzip2 compressed"*) + run tar jxvf "$1" > /dev/null;; + "$1: xz compressed data"*) + run tar xf "$1" > /dev/null;; + "$1: 7-zip archive data"*) + run 7za x "$1" > /dev/null;; + *) + echol ERROR "failed:File $1 can not be unzip" + return;; + esac + echol "success:$@" +} + + diff --git a/scripts/etcdStart.sh b/scripts/etcdStart.sh new file mode 100755 index 000000000..3c6019ab1 --- /dev/null +++ b/scripts/etcdStart.sh @@ -0,0 +1,13 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../;pwd) +OS_TYPE="" +. $APP_DIR/scripts/common + + +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew services start etcd +else + sudo systemctl start etcd +fi + +echo "start etcd ok!" diff --git a/scripts/etcdStop.sh b/scripts/etcdStop.sh new file mode 100755 index 000000000..8be15c999 --- /dev/null +++ b/scripts/etcdStop.sh @@ -0,0 +1,13 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../;pwd) +OS_TYPE="" +. $APP_DIR/scripts/common + +if [[ "$OS" =~ "Darwin" ]];then + brew services stop etcd +else + sudo systemctl stop etcd +fi + +echo "stop etcd ok!" + diff --git a/scripts/install-run-deps.sh b/scripts/install-run-deps.sh new file mode 100755 index 000000000..2777b06fa --- /dev/null +++ b/scripts/install-run-deps.sh @@ -0,0 +1,119 @@ +#!/bin/bash + +export ROOT_DIR=$(cd `dirname $0`/../; pwd) +export SRV_DIR=${ROOT_DIR}/deps + +export ETCD_VER=v3.3.18 +export NATS_VER=v2.1.4 +export REDIS_VER=5.0.7 +export TIDB_VER=3.0.9 + +export ETCD_DIR=${SRV_DIR}/etcd-server +export NATS_DIR=${SRV_DIR}/nats-server +export REDIS_DIR=${SRV_DIR}/redis-server +export TIDB_DIR=${SRV_DIR}/tidb-server + +case $(uname | tr '[:upper:]' '[:lower:]') in + linux*) + export OS=linux + ;; + darwin*) + export OS=darwin + ;; + msys*) + export OS=windows + ;; + *) + export OS=notset + ;; +esac + +function install_etcd_server() { + GITHUB_URL=https://github.com/etcd-io/etcd/releases/download + DOWNLOAD_URL=${GITHUB_URL} + + if [ "$OS" == "linux" ]; then + rm -f /tmp/etcd-${ETCD_VER}-${OS}-amd64.tar.gz + curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-${OS}-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-${OS}-amd64.tar.gz + tar xf /tmp/etcd-${ETCD_VER}-${OS}-amd64.tar.gz -C /tmp && rm -f /tmp/etcd-${ETCD_VER}-${OS}-amd64.tar.gz + else + rm -f /tmp/etcd-${ETCD_VER}-${OS}-amd64.zip + curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-${OS}-amd64.zip -o /tmp/etcd-${ETCD_VER}-${OS}-amd64.zip + unzip /tmp/etcd-${ETCD_VER}-${OS}-amd64.zip -d /tmp && rm -f /tmp/etcd-${ETCD_VER}-${OS}-amd64.zip + fi + + mv /tmp/etcd-${ETCD_VER}-${OS}-amd64/* ${ETCD_DIR} && rm -rf /tmp/etcd-${ETCD_VER}-${OS}-amd64 + + ${ETCD_DIR}/etcd --version + ${ETCD_DIR}/etcdctl --version +} + +function install_nats_server() { + GITHUB_URL=https://github.com/nats-io/nats-server/releases/download + DOWNLOAD_URL=${GITHUB_URL} + + rm -f /tmp/nats-server-${NATS_VER}-${OS}-amd64.zip + curl -L ${DOWNLOAD_URL}/${NATS_VER}/nats-server-${NATS_VER}-${OS}-amd64.zip -o /tmp/nats-server-${NATS_VER}-${OS}-amd64.zip + unzip /tmp/nats-server-${NATS_VER}-${OS}-amd64.zip -d /tmp && rm -f /tmp/nats-server-${NATS_VER}-${OS}-amd64.zip + mv /tmp/nats-server-${NATS_VER}-${OS}-amd64/* ${NATS_DIR} && rm -rf /tmp/nats-server-${NATS_VER}-${OS}-amd64 + ${NATS_DIR}/nats-server --version +} + +function install_redis_server() { + DOWNLOAD_URL=http://download.redis.io/releases/redis-${REDIS_VER}.tar.gz + + rm -f /tmp/redis-${REDIS_VER}.tar.gz + curl -L ${DOWNLOAD_URL} -o /tmp/redis-${REDIS_VER}.tar.gz + tar zxvf /tmp/redis-${REDIS_VER}.tar.gz -C /tmp && rm -f /tmp/redis-${REDIS_VER}.tar.gz + cd /tmp/redis-${REDIS_VER} && make + cp -rf /tmp/redis-${REDIS_VER}/src/redis-{server,cli} ${REDIS_DIR} && rm -f /tmp/redis-${REDIS_VER} + ${REDIS_DIR}/redis-server --version + ${REDIS_DIR}/redis-cli --version +} + +function install_tidb_server() { + DOWNLOAD_URL=https://github.com/pingcap/tidb/archive/v${TIDB_VER}.tar.gz + + rm -f /tmp/tidb-${TIDB_VER}.tar.gz + curl -L ${DOWNLOAD_URL} -o /tmp/tidb-${TIDB_VER}.tar.gz + tar zxvf /tmp/tidb-${TIDB_VER}.tar.gz -C /tmp && rm -f /tmp/tidb-${TIDB_VER}.tar.gz + cd /tmp/tidb-${TIDB_VER} && make + cp -rf /tmp/tidb-${TIDB_VER}/bin/tidb-server ${TIDB_DIR} && rm -rf /tmp/tidb-${TIDB_VER} + ${TIDB_DIR}/tidb-server -V +} + +echo "Install run dependencies." + +if [ ! -d $ROOT_DIR/deps ]; then + mkdir -p $ROOT_DIR/deps/{nats-server,redis-server,etcd-server,tidb-server} +fi + +if [ ! -f ${ETCD_DIR}/etcd ]; then + echo "Install ETCD for ${OS}." + install_etcd_server +else + echo "ECTD for ${OS} installed." +fi + +if [ ! -f ${NATS_DIR}/nats-server ]; then + echo "Install NATS-Server for ${OS}." + install_nats_server +else + echo "NATS-Server for ${OS} installed." +fi + +if [ ! -f ${REDIS_DIR}/redis-server ]; then + echo "Install Redis-Server for ${OS}." + install_redis_server +else + echo "Redis-Server for ${OS} installed." +fi + +if [ ! -f ${TIDB_DIR}/tidb-server ]; then + echo "Install TiDB-Server for ${OS}." + install_tidb_server +else + echo "TiDB-Server for ${OS} installed." +fi + +echo "Done" diff --git a/scripts/installDeps.sh b/scripts/installDeps.sh new file mode 100755 index 000000000..9be2c237b --- /dev/null +++ b/scripts/installDeps.sh @@ -0,0 +1,29 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../;pwd) +OS_TYPE="" +. $APP_DIR/scripts/common + + +# centos7 +if [[ "$OS_TYPE" =~ "Ubuntu" ]];then + sudo apt-get install -y etcd redis-server rabbitmq-server nodejs-legacy npm + sudo npm install -g n + sudo n stable +fi + +if [[ "$OS_TYPE" =~ "CentOS" ]];then + npm config set registry http://registry.cnpmjs.org/ + sudo yum install epel-release + sudo yum install -y etcd redis rabbitmq-server nodejs + wgetdl https://github.com/nats-io/nats-server/releases/download/v2.1.4/nats-server-v2.1.4-amd64.rpm + rpm -ivh nats-server-v2.1.4-amd64.rpm +fi + +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew install etcd redis rabbitmq nodejs nats-server +fi + +exit + +cd $APP_DIR/sdk/js +npm i diff --git a/scripts/mac/ionStart.sh b/scripts/ionStart.sh similarity index 96% rename from scripts/mac/ionStart.sh rename to scripts/ionStart.sh index 3e114e582..a7175fea4 100755 --- a/scripts/mac/ionStart.sh +++ b/scripts/ionStart.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR mkdir -p $APP_DIR/logs EXE=ion diff --git a/scripts/centos/ionStop.sh b/scripts/ionStop.sh similarity index 94% rename from scripts/centos/ionStop.sh rename to scripts/ionStop.sh index 6bbf36242..3f3b9d533 100755 --- a/scripts/centos/ionStop.sh +++ b/scripts/ionStop.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR PID_FILE=$APP_DIR/configs/ion.pid #pid file, default: worker.pid diff --git a/scripts/centos/islbStart.sh b/scripts/islbStart.sh similarity index 96% rename from scripts/centos/islbStart.sh rename to scripts/islbStart.sh index afdef2c47..5d956b309 100755 --- a/scripts/centos/islbStart.sh +++ b/scripts/islbStart.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR mkdir -p $APP_DIR/logs EXE=islb diff --git a/scripts/mac/islbStop.sh b/scripts/islbStop.sh similarity index 94% rename from scripts/mac/islbStop.sh rename to scripts/islbStop.sh index b62f19a87..111be7f26 100755 --- a/scripts/mac/islbStop.sh +++ b/scripts/islbStop.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR PID_FILE=$APP_DIR/configs/islb.pid #pid file, default: worker.pid diff --git a/scripts/mac/build.sh b/scripts/mac/build.sh deleted file mode 100755 index f696a866d..000000000 --- a/scripts/mac/build.sh +++ /dev/null @@ -1,105 +0,0 @@ -#!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -EXE1=ion -EXE2=islb - -COMMAND1=$APP_DIR/bin/$EXE1 -COMMAND2=$APP_DIR/bin/$EXE2 - - -if [ -f /etc/os-release ]; then - # freedesktop.org and systemd - . /etc/os-release - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$NAME - unset NAME - OS_VER=$VERSION_ID -elif type lsb_release >/dev/null 2>&1; then - # linuxbase.org - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$(lsb_release -si) - OS_VER=$(lsb_release -sr) -elif [ -f /etc/lsb-release ]; then - # For some versions of Debian/Ubuntu without lsb_release command - . /etc/lsb-release - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=$DISTRIB_ID - OS_VER=$DISTRIB_RELEASE -elif [ -f /etc/debian_version ]; then - # Older Debian/Ubuntu/etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - OS_TYPE=Debian - OS_VER=$(cat /etc/debian_version) -elif [ -f /etc/SuSe-release ]; then - # Older SuSE/etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - ... -elif [ -f /etc/redhat-release ]; then - # Older Red Hat, CentOS, etc. - CPU=`cat /proc/cpuinfo | grep "processor" | wc -l` - MEM=`free -b|grep "Mem"|awk -F' ' '{print $2}'` - ... -else - # Fall back to uname, e.g. "Linux ", also works for BSD, etc. - OS_TYPE=$(uname -s) - OS_VER=$(uname -r) - CPU=`sysctl -n machdep.cpu.thread_count` - MEM=`sysctl -n hw.memsize` -fi - - -help() -{ - echo "" - echo "build script" - echo "Usage: ./build.sh [-h]" - echo "" -} - -while getopts "o:h" arg -do - case $arg in - h) - help; - exit 0 - ;; - o) - OS_TYPE=$OPTARG - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -if [[ "$OS_TYPE" == "Darwin" || "$OS_TYPE" == "mac" || "$OS_TYPE" == "darwin" ]];then - export CGO_ENABLED=1 - export GOOS=darwin -fi - -if [[ "$OS_TYPE" == "Ubuntu" || "$OS_TYPE" =~ "CentOS" || "$OS_TYPE" == "ubuntu" || "$OS_TYPE" =~ "centos" || "$OS_TYPE" =~ "linux" || "$OS_TYPE" =~ "Linux" ]];then - export GOOS=linux -fi - -echo "-------------build ion----------" -echo "go build -o $COMMAND1" -cd $APP_DIR/cmd/ion -go build -o $COMMAND1 - -echo "-------------build islb----------" -echo "go build -o $COMMAND2" -cd $APP_DIR/cmd/islb -go build -o $COMMAND2 - -cd $APP_DIR -echo "------------tar ion-----------" -tar cvzf ion.tar.gz bin/ion configs/ion.toml configs/cert.pem configs/key.pem scripts/mac/ionStart.sh scripts/mac/ionStop.sh - -echo "------------tar islb-----------" -tar cvzf islb.tar.gz bin/islb configs/islb.toml configs/cert.pem configs/key.pem scripts/mac/ionStart.sh scripts/mac/ionStop.sh diff --git a/scripts/mac/etcdStart.sh b/scripts/mac/etcdStart.sh deleted file mode 100755 index a43a9d19e..000000000 --- a/scripts/mac/etcdStart.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services start etcd diff --git a/scripts/mac/etcdStop.sh b/scripts/mac/etcdStop.sh deleted file mode 100755 index c0ef4a6fd..000000000 --- a/scripts/mac/etcdStop.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services stop etcd diff --git a/scripts/mac/installDeps.sh b/scripts/mac/installDeps.sh deleted file mode 100755 index 79beb4576..000000000 --- a/scripts/mac/installDeps.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) - -brew install etcd redis rabbitmq nodejs -cd $APP_DIR/sdk/js -npm i diff --git a/scripts/mac/ionStop.sh b/scripts/mac/ionStop.sh deleted file mode 100755 index 6bbf36242..000000000 --- a/scripts/mac/ionStop.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR - -PID_FILE=$APP_DIR/configs/ion.pid #pid file, default: worker.pid - -help() -{ - echo "" - echo "stop script" - echo "Usage:./ionStop.sh [-h]" - echo "" -} - -while getopts "p:h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - -echo "stop process..." -PID=`cat $PID_FILE` -if [ ! -n "$PID" ]; then - echo "pid not exist" - exit 1; -fi -SUB_PIDS=`pgrep -P $PID` -if [ -n "$SUB_PIDS" ]; then - GRANDSON_PIDS=`pgrep -P $SUB_PIDS` -fi - -echo "kill $PID $SUB_PIDS $GRANDSON_PIDS" -kill $PID $SUB_PIDS $GRANDSON_PIDS -rm -rf $PID_FILE -echo "finish stop process..." - diff --git a/scripts/mac/islbStart.sh b/scripts/mac/islbStart.sh deleted file mode 100755 index 0a0b7af7e..000000000 --- a/scripts/mac/islbStart.sh +++ /dev/null @@ -1,62 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR -mkdir -p $APP_DIR/logs -EXE=islb -COMMAND=$APP_DIR/bin/$EXE -CONFIG=$APP_DIR/configs/islb.toml -PID_FILE=$APP_DIR/configs/islb.pid -LOG_FILE=$APP_DIR/logs/islb.log - -help() -{ - echo "" - echo "start script" - echo "Usage: ./islbStart.sh [-h]" - echo "" -} - -while getopts "h" arg -do - case $arg in - h) - help; - exit 0 - ;; - ?) - echo "No argument needed. Ignore them all!" - ;; - esac -done - - -count=`ps -ef |grep " $COMMAND " |grep -v "grep" |wc -l` -if [ 0 != $count ];then - ps aux | grep " $COMMAND " | grep -v "grep" - echo "already start" - exit 1; -fi - -if [ ! -r $CONFIG ]; then - echo "$CONFIG not exsist" - exit 1; -fi - -## build first -cd $APP_DIR/cmd/islb -go build -o $COMMAND -cd $APP_DIR - -## run command -echo "nohup $COMMAND -c $CONFIG >> $LOG_FILE 2>&1 &" -nohup $COMMAND -c $CONFIG >> $LOG_FILE 2>&1 & -pid=$! -echo "$pid" > $PID_FILE -rpid=`ps aux | grep $pid |grep -v "grep" | awk '{print $2}'` -if [[ $pid != $rpid ]];then - echo "start failly." - rm $PID_FILE - exit 1 -fi - diff --git a/scripts/mac/mqStart.sh b/scripts/mac/mqStart.sh deleted file mode 100755 index f92ebbd39..000000000 --- a/scripts/mac/mqStart.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services start rabbitmq diff --git a/scripts/mac/mqStop.sh b/scripts/mac/mqStop.sh deleted file mode 100755 index 3cc8172f3..000000000 --- a/scripts/mac/mqStop.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services stop rabbitmq diff --git a/scripts/mac/redisStart.sh b/scripts/mac/redisStart.sh deleted file mode 100755 index 3ef32aec8..000000000 --- a/scripts/mac/redisStart.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services start redis diff --git a/scripts/mac/redisStop.sh b/scripts/mac/redisStop.sh deleted file mode 100755 index 8f3f4c01e..000000000 --- a/scripts/mac/redisStop.sh +++ /dev/null @@ -1,2 +0,0 @@ -#!/bin/bash -brew services stop redis diff --git a/scripts/mac/webStart.sh b/scripts/mac/webStart.sh deleted file mode 100755 index af7e394ac..000000000 --- a/scripts/mac/webStart.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) -cd $APP_DIR/sdk/js -npm i - -nohup npm start 2>&1& echo $! > $APP_DIR/configs/node.pid -echo "start web ok" - diff --git a/scripts/mac/webStop.sh b/scripts/mac/webStop.sh deleted file mode 100755 index eb083bfdc..000000000 --- a/scripts/mac/webStop.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -APP_DIR=$(cd `dirname $0`/../../; pwd) - -PID=`cat $APP_DIR/configs/node.pid` -if [ ! -n "$PID" ]; then - echo "pid not exist" - exit 1; -fi -SUB_PIDS=`pgrep -P $PID` -if [ -n "$SUB_PIDS" ]; then - GRANDSON_PIDS=`pgrep -P $SUB_PIDS` -fi - -echo "kill $PID $SUB_PIDS $GRANDSON_PIDS" -kill $PID $SUB_PIDS $GRANDSON_PIDS -echo "stop web ok" - diff --git a/scripts/natsStart.sh b/scripts/natsStart.sh new file mode 100755 index 000000000..3c53d8103 --- /dev/null +++ b/scripts/natsStart.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +APP_DIR=$(cd `dirname $0`/../; pwd) +cd $APP_DIR +OS_TYPE="" +. $APP_DIR/scripts/common + +mkdir -p $APP_DIR/logs +EXE=nats-server +COMMAND=$EXE +#CONFIG=$APP_DIR/configs/ +PID_FILE=$APP_DIR/configs/nats-server.pid +LOG_FILE=$APP_DIR/logs/nats-server.log + +help() +{ + echo "" + echo "start script" + echo "Usage: ./natsStart.sh [-h]" + echo "" +} + +while getopts "h" arg +do + case $arg in + h) + help; + exit 0 + ;; + ?) + echo "No argument needed. Ignore them all!" + ;; + esac +done + + +count=`ps -ef |grep " $COMMAND " |grep -v "grep" |wc -l` +if [ 0 != $count ];then + ps aux | grep " $COMMAND " | grep -v "grep" + echo "already start" + exit 1; +fi + +# if [ ! -r $CONFIG ]; then + # echo "$CONFIG not exsist" + # exit 1; +# fi +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew services start nats-server +else + ## run command + echo "nohup $COMMAND >>$LOG_FILE 2>&1 &" + nohup $COMMAND >>$LOG_FILE 2>&1 & + pid=$! + echo "$pid" > $PID_FILE + rpid=`ps aux | grep $pid |grep -v "grep" | awk '{print $2}'` + if [[ $pid != $rpid ]];then + echo "start failly." + rm $PID_FILE + exit 1 + fi +fi diff --git a/scripts/natsStop.sh b/scripts/natsStop.sh new file mode 100755 index 000000000..ff2941876 --- /dev/null +++ b/scripts/natsStop.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +APP_DIR=$(cd `dirname $0`/../; pwd) +cd $APP_DIR +OS_TYPE="" +. $APP_DIR/scripts/common + + +PID_FILE=$APP_DIR/configs/nats-server.pid #pid file, default: worker.pid + +help() +{ + echo "" + echo "stop script" + echo "Usage:./natsStop.sh [-h]" + echo "" +} + +while getopts "p:h" arg +do + case $arg in + h) + help; + exit 0 + ;; + ?) + echo "No argument needed. Ignore them all!" + ;; + esac +done + +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew services start nats-server +else + echo "stop process..." + PID=`cat $PID_FILE` + if [ ! -n "$PID" ]; then + echo "pid not exist" + exit 1; + fi + SUB_PIDS=`pgrep -P $PID` + if [ -n "$SUB_PIDS" ]; then + GRANDSON_PIDS=`pgrep -P $SUB_PIDS` + fi + + echo "kill $PID $SUB_PIDS $GRANDSON_PIDS" + kill $PID $SUB_PIDS $GRANDSON_PIDS + rm -rf $PID_FILE + echo "finish stop process..." +fi diff --git a/scripts/redisStart.sh b/scripts/redisStart.sh new file mode 100755 index 000000000..9eba048f9 --- /dev/null +++ b/scripts/redisStart.sh @@ -0,0 +1,13 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../;pwd) +OS_TYPE="" +. $APP_DIR/scripts/common + + +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew services start redis +else + sudo systemctl start redis +fi + +echo "start redis ok!" diff --git a/scripts/redisStop.sh b/scripts/redisStop.sh new file mode 100755 index 000000000..c1b530d93 --- /dev/null +++ b/scripts/redisStop.sh @@ -0,0 +1,14 @@ +#!/bin/bash +APP_DIR=$(cd `dirname $0`/../;pwd) +OS_TYPE="" +. $APP_DIR/scripts/common + + +if [[ "$OS_TYPE" =~ "Darwin" ]];then + brew services stop redis +else + sudo systemctl stop redis +fi + +echo "stop redis ok!" + diff --git a/scripts/ubuntu/installDeps.sh b/scripts/ubuntu/installDeps.sh deleted file mode 100644 index 77936d970..000000000 --- a/scripts/ubuntu/installDeps.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) - -sudo apt-get install -y etcd redis-server rabbitmq-server nodejs-legacy npm - -sudo npm install -g n -sudo n stable - -npm config set registry http://registry.cnpmjs.org/ - -cd $APP_DIR/sdk/js -npm i diff --git a/scripts/centos/webStart.sh b/scripts/webStart.sh similarity index 79% rename from scripts/centos/webStart.sh rename to scripts/webStart.sh index be90714f5..56b5294c2 100755 --- a/scripts/centos/webStart.sh +++ b/scripts/webStart.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) cd $APP_DIR/sdk/js npm i cd $APP_DIR/sdk/js/demo diff --git a/scripts/centos/webStop.sh b/scripts/webStop.sh similarity index 88% rename from scripts/centos/webStop.sh rename to scripts/webStop.sh index eb083bfdc..3e3f64475 100755 --- a/scripts/centos/webStop.sh +++ b/scripts/webStop.sh @@ -1,6 +1,6 @@ #!/bin/bash -APP_DIR=$(cd `dirname $0`/../../; pwd) +APP_DIR=$(cd `dirname $0`/../; pwd) PID=`cat $APP_DIR/configs/node.pid` if [ ! -n "$PID" ]; then