Skip to content

Commit

Permalink
tracker: dumb implementation, also still needs recorder serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
kipukun committed May 14, 2024
1 parent b87fb8e commit 48957de
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 54 deletions.
1 change: 1 addition & 0 deletions proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func NewServer(ctx context.Context, cfg config.Config, manager radio.ManagerServ
Handler: r,
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 5,
BaseContext: func(l net.Listener) context.Context { return ctx },
}

return srv, nil
Expand Down
25 changes: 0 additions & 25 deletions tracker/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package tracker

import (
"context"
"net"
"net/http"
"time"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/website"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
)
Expand Down Expand Up @@ -73,29 +71,6 @@ func ListenerRemove(ctx context.Context, recorder *Recorder) http.HandlerFunc {
}
}

func NewServer(ctx context.Context, addr string, recorder *Recorder) *http.Server {
r := website.NewRouter()

r.Use(
hlog.NewHandler(*zerolog.Ctx(ctx)),
hlog.RemoteAddrHandler("ip"),
hlog.UserAgentHandler("user_agent"),
hlog.RequestIDHandler("req_id", "Request-Id"),
hlog.URLHandler("url"),
hlog.MethodHandler("method"),
hlog.ProtoHandler("protocol"),
hlog.AccessHandler(zerologLoggerFunc),
)
r.Post("/listener_joined", ListenerAdd(ctx, recorder))
r.Post("/listener_left", ListenerRemove(ctx, recorder))

return &http.Server{
Addr: addr,
Handler: r,
BaseContext: func(l net.Listener) context.Context { return ctx },
}
}

func zerologLoggerFunc(r *http.Request, status, size int, duration time.Duration) {
hlog.FromRequest(r).Info().
Int("status_code", status).
Expand Down
6 changes: 4 additions & 2 deletions tracker/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ func TestListenerAddAndRemove(t *testing.T) {
defer cancel()
cfg := config.TestConfig()

recorder := NewRecorder(ctx, cfg)
dummy := NewServer(ctx, "", recorder)
dummy, err := NewServer(ctx, cfg)
if err != nil {
t.Fatal(err)
}

srv := httptest.NewServer(dummy.Handler)

Check failure on line 27 in tracker/http_test.go

View workflow job for this annotation

GitHub Actions / test

dummy.Handler undefined (type *Server has no field or method Handler)
defer srv.Close()
Expand Down
45 changes: 18 additions & 27 deletions tracker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package tracker

import (
"context"
"net"
"time"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/rpc"
"github.com/R-a-dio/valkyrie/util/graceful"
"github.com/rs/zerolog"
"google.golang.org/grpc"
)

const (
Expand All @@ -24,43 +22,36 @@ const (
RemoveStalePeriod = time.Minute * 5
)

func NewGRPCServer(ctx context.Context, lts radio.ListenerTrackerService) *grpc.Server {
gs := rpc.NewGrpcServer(ctx)
rpc.RegisterListenerTrackerServer(gs, rpc.NewListenerTracker(lts))
return gs
}

func Execute(ctx context.Context, cfg config.Config) error {
// setup recorder
var recorder = NewRecorder(ctx, cfg)

// setup periodic task to update the manager of our listener count
go PeriodicallyUpdateListeners(ctx, cfg.Manager, recorder, UpdateListenersTickrate)
// setup periodic task to keep recorder state in sync with icecast
go PeriodicallySyncListeners(ctx, cfg, recorder, SyncListenersTickrate)
const op errors.Op = "tracker/Execute"

// setup the HTTP server that icecast will be poking
srv := NewServer(ctx, cfg.Conf().Tracker.ListenAddr, recorder)

// setup the GRPC server that the rest will be poking
grpcSrv := NewGRPCServer(ctx, recorder)
// and a listener for the GRPC server
ln, err := net.Listen("tcp", cfg.Conf().Tracker.RPCAddr)
srv, err := NewServer(ctx, cfg)
if err != nil {
return err
}

errCh := make(chan error, 2)
go func() {
errCh <- grpcSrv.Serve(ln)
}()
if graceful.IsChild() {
var done func()
ctx, done = graceful.WithSync(ctx)

err = srv.handleResume(ctx, cfg)
if err != nil {
return err
}
done()
}

errCh := make(chan error, 1)
go func() {
errCh <- srv.ListenAndServe()
errCh <- srv.Start(ctx)
}()

select {
case <-ctx.Done():
return srv.Close()
case <-graceful.Signal:
return srv.handleRestart(ctx, cfg)
case err := <-errCh:
return err
}
Expand Down
237 changes: 237 additions & 0 deletions tracker/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package tracker

import (
"context"
"fmt"
"net"
"net/http"
"os"
"sync"
"time"

radio "github.com/R-a-dio/valkyrie"
"github.com/R-a-dio/valkyrie/config"
"github.com/R-a-dio/valkyrie/errors"
"github.com/R-a-dio/valkyrie/proxy/compat"
"github.com/R-a-dio/valkyrie/rpc"
"github.com/R-a-dio/valkyrie/util/graceful"
"github.com/R-a-dio/valkyrie/website"
"github.com/rs/zerolog"
"github.com/rs/zerolog/hlog"
"google.golang.org/grpc"
)

type Server struct {
cfg config.Config
mu sync.Mutex
rpcSrv *grpc.Server
rpcLn net.Listener
httpSrv *http.Server
httpLn net.Listener
}

func NewGRPCServer(ctx context.Context, lts radio.ListenerTrackerService) *grpc.Server {
gs := rpc.NewGrpcServer(ctx)
rpc.RegisterListenerTrackerServer(gs, rpc.NewListenerTracker(lts))
return gs
}

func NewServer(ctx context.Context, cfg config.Config) (*Server, error) {
const op errors.Op = "tracker.NewServer"

// setup recorder
var recorder = NewRecorder(ctx, cfg)

// setup periodic task to update the manager of our listener count
go PeriodicallyUpdateListeners(ctx, cfg.Manager, recorder, UpdateListenersTickrate)
// setup periodic task to keep recorder state in sync with icecast
go PeriodicallySyncListeners(ctx, cfg, recorder, SyncListenersTickrate)

srv := new(Server)
r := website.NewRouter()

r.Use(
hlog.NewHandler(*zerolog.Ctx(ctx)),
hlog.RemoteAddrHandler("ip"),
hlog.UserAgentHandler("user_agent"),
hlog.RequestIDHandler("req_id", "Request-Id"),
hlog.URLHandler("url"),
hlog.MethodHandler("method"),
hlog.ProtoHandler("protocol"),
hlog.AccessHandler(zerologLoggerFunc),
)
r.Post("/listener_joined", ListenerAdd(ctx, recorder))
r.Post("/listener_left", ListenerRemove(ctx, recorder))

var err error

// setup the GRPC server that the rest will be poking
srv.rpcSrv = NewGRPCServer(ctx, recorder)
// and a listener for the GRPC server
srv.rpcLn, err = net.Listen("tcp", cfg.Conf().Tracker.RPCAddr)
if err != nil {
return nil, errors.E(op, err)
}

srv.httpSrv = &http.Server{
Addr: cfg.Conf().Tracker.ListenAddr,
Handler: r,
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 5,
BaseContext: func(l net.Listener) context.Context { return ctx },
}
srv.httpLn, err = net.Listen("tcp", cfg.Conf().Tracker.ListenAddr)
if err != nil {
return nil, errors.E(op, err)
}

return srv, nil
}

func (s *Server) Close() error {
s.rpcSrv.Stop()
return s.httpSrv.Close()
}

func (s *Server) Start(ctx context.Context) error {
logger := zerolog.Ctx(ctx)
errCh := make(chan error, 2)
go func() {
s.mu.Lock()
ln := s.rpcLn
s.mu.Unlock()
logger.Info().Str("address", ln.Addr().String()).Msg("tracker started listening [grpc]")
errCh <- s.rpcSrv.Serve(ln)
}()
go func() {
s.mu.Lock()
ln := s.httpLn
s.mu.Unlock()
logger.Info().Str("address", ln.Addr().String()).Msg("tracker started listening [http]")
errCh <- s.httpSrv.Serve(ln)
}()

select {
case <-ctx.Done():
return s.Close()
case err := <-errCh:
return err
}

}

func (srv *Server) handleResume(ctx context.Context, cfg config.Config) error {
parent, err := graceful.FD2Unix(3)
if err != nil {
return err
}
defer parent.Close()

return srv.readSelf(ctx, cfg, parent)
}

func (srv *Server) handleRestart(ctx context.Context, cfg config.Config) error {
dst, err := graceful.StartChild()
if err != nil {
return err
}
defer dst.Close()

return srv.writeSelf(dst)
}

type httpWire string
type rpcWire string

// type wireServer struct {
// HTTPServer, RPCServer string
// }

func (srv *Server) writeSelf(dst *net.UnixConn) error {
var hw httpWire
var rw rpcWire
rw = rpcWire(srv.cfg.Conf().Tracker.RPCAddr)
hw = httpWire(srv.cfg.Conf().Tracker.ListenAddr)

rpcFd, err := getFile(srv.rpcLn)
if err != nil {
return fmt.Errorf("fd failure in server: %w", err)
}
defer rpcFd.Close()

httpFd, err := getFile(srv.httpLn)
if err != nil {
return fmt.Errorf("fd failure in server: %w", err)
}
defer httpFd.Close()

err = graceful.WriteJSONFile(dst, rw, rpcFd)
if err != nil {
return err
}
err = graceful.WriteJSONFile(dst, hw, httpFd)
if err != nil {
return err
}

// TODO(kipu): serialize recorder
// return srv.recorder.writeSelf(dst)
return nil
}

func (srv *Server) readSelf(ctx context.Context, cfg config.Config, src *net.UnixConn) error {
var hw httpWire
var rw rpcWire

zerolog.Ctx(ctx).Info().Msg("tracker: resuming server")
rpcFile, err := graceful.ReadJSONFile(src, &rw)
if err != nil {
return err
}
defer rpcFile.Close()
zerolog.Ctx(ctx).Info().Str("rw", string(rw)).Msg("resume")

httpFile, err := graceful.ReadJSONFile(src, &rw)
if err != nil {
return err
}
defer rpcFile.Close()

zerolog.Ctx(ctx).Info().Str("hw", string(hw)).Msg("resume")

srv.mu.Lock()
defer srv.mu.Unlock()

srv.httpLn, err = net.FileListener(httpFile)
if err != nil {
return err
}
srv.rpcLn, err = net.FileListener(rpcFile)
if err != nil {
return err
}

// TODO(kipu): serialize recorder
// return srv.recorder.readSelf(ctx, cfg, src)
return nil
}

func getFile(un any) (*os.File, error) {
if un == nil {
return nil, errors.New("nil passed to getFile")
}

if f, ok := un.(interface{ File() (*os.File, error) }); ok {
return f.File()
}

switch v := un.(type) {
case *compat.Listener:
return getFile(v.Listener)
case *compat.Conn:
return getFile(v.Conn)
default:
fmt.Printf("unknown type in getFile: %#v", un)
panic("unknown type")
}
}

0 comments on commit 48957de

Please sign in to comment.