From 489e960977fe0425f8a5a43bbfef02cc1c2a9b92 Mon Sep 17 00:00:00 2001 From: Piotr Tabor Date: Sun, 8 Dec 2024 11:43:56 -0800 Subject: [PATCH] HTTP Server: Statuses can be collected in the background (#203) * Fixed collection of statuses. * AutoConnect also supports periodic update. * Refresh. * Convurrent. * More logging. --- application/application.go | 2 +- cast/payload.go | 1 + http/handlers.go | 141 ++++++++++++++++++++++++++++++------- 3 files changed, 119 insertions(+), 25 deletions(-) diff --git a/application/application.go b/application/application.go index 8013c13..75a043b 100644 --- a/application/application.go +++ b/application/application.go @@ -393,7 +393,7 @@ func (a *Application) Update() error { if err == nil { break } - a.log("error getting receiever status: %v", err) + a.log("error getting receiver status: %v", err) a.log("unable to get status from device; attempt %d/5, retrying...", i+1) time.Sleep(time.Second * 2) } diff --git a/cast/payload.go b/cast/payload.go index 76e9591..65a3290 100644 --- a/cast/payload.go +++ b/cast/payload.go @@ -165,4 +165,5 @@ type DeviceInfo struct { Ssid string `json:"ssid"` Timezone string `json:"timezone"` UptimeSec float64 `json:"uptime"` + SsdpUdn string `json:"ssdp_udn"` } diff --git a/http/handlers.go b/http/handlers.go index 7d09e5c..5e7b566 100644 --- a/http/handlers.go +++ b/http/handlers.go @@ -4,14 +4,14 @@ import ( "context" "encoding/json" "fmt" + "golang.org/x/sync/errgroup" "net" "net/http" "strconv" + "strings" "sync" "time" - "golang.org/x/sync/errgroup" - log "github.com/sirupsen/logrus" "github.com/vishen/go-chromecast/application" "github.com/vishen/go-chromecast/dns" @@ -22,28 +22,73 @@ type Handler struct { apps map[string]application.App mux *http.ServeMux - verbose bool - autoconnect bool + verbose bool + + autoconnectPeriod time.Duration + autoconnectTicker *time.Ticker + + // autoupdatePeriodSec defines how frequently app.Update method is called in the background. + autoupdatePeriod time.Duration + autoupdateTicker *time.Ticker } func NewHandler(verbose bool) *Handler { handler := &Handler{ - verbose: verbose, - apps: map[string]application.App{}, - mux: http.NewServeMux(), - mu: sync.Mutex{}, - autoconnect: false, + verbose: verbose, + apps: map[string]application.App{}, + mux: http.NewServeMux(), + mu: sync.Mutex{}, + + autoconnectPeriod: time.Duration(-1), + autoconnectTicker: nil, + + autoupdatePeriod: time.Duration(-1), + autoupdateTicker: nil, } handler.registerHandlers() return handler } -// Autoconnect configures the handler to perform auto-discovery of all the cast devices & groups. +// AutoConnect configures the handler to perform periodic auto-discovery of all the cast devices & groups. +// It's intended to be called just after `NewHandler()`, before the handler is registered in the server. +func (h *Handler) AutoConnect(period time.Duration) error { + // Setting the autoconnect property - to allow (in future) periodic refresh of the connections. + h.autoconnectPeriod = period + if err := h.connectAllInternal("", "3"); err != nil { + return err + } + if h.autoconnectPeriod > 0 { + h.autoconnectTicker = time.NewTicker(period) + go func() { + for { + <-h.autoconnectTicker.C + if err := h.connectAllInternal("", "3"); err != nil { + log.Printf("AutoConnect issued connectAllInternal failed: %v", err) + } + } + }() + } + return nil +} + +// AutoUpdate configures the handler to perform auto-update of all the cast devices & groups. // It's intended to be called just after `NewHandler()`, before the handler is registered in the server. -func (h *Handler) Autoconnect() error { +// Thanks to AutoUpdate, /status and /statuses returns relatively recent status 'instantly'. +func (h *Handler) AutoUpdate(period time.Duration) error { // Setting the autoconnect property - to allow (in future) periodic refresh of the connections. - h.autoconnect = true - return h.connectAllInternal("", "3") + h.autoupdatePeriod = period + if h.autoupdatePeriod > 0 { + h.autoupdateTicker = time.NewTicker(period) + go func() { + for { + <-h.autoupdateTicker.C + if err := h.UpdateAll(); err != nil { + log.Printf("AutoUpdate issued UpdateAll failed: %v", err) + } + } + }() + } + return nil } func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -291,20 +336,42 @@ func (h *Handler) connectAll(w http.ResponseWriter, r *http.Request) { } func (h *Handler) connectAllInternal(iface string, waitSec string) error { - devices := h.discoverDnsEntries(context.Background(), iface, waitSec) - uuidMap := map[string]application.App{} + ctx := context.Background() + devices := h.discoverDnsEntries(ctx, iface, waitSec) + apps := make(chan *application.App, len(devices)+1) + g, ctx := errgroup.WithContext(ctx) for _, device := range devices { - app, err := h.connectInternal(device.Addr, device.Port, device.DeviceName) + g.Go(func() error { + log.Printf("Connecting to %s:%d (%s)", device.Addr, device.Port, device.DeviceName) + app, err := h.connectInternal(device.Addr, device.Port, device.DeviceName) + if err != nil { + log.Printf("Connection to %s:%d (%s) failed: %v", device.Addr, device.Port, device.DeviceName, err) + return err + } + log.Printf("Connected to %s:%d (%s)", device.Addr, device.Port, device.DeviceName) + apps <- &app + return nil + }) + } + err := g.Wait() + log.Printf("Post wait status: %v", err) + close(apps) + + // Even if we cannot connect to some of the devices, we still update the map for remaining devices. + uuidMap := map[string]application.App{} + for app := range apps { + info, err := (*app).Info() if err != nil { - return err + log.Printf("Skipping device %v", app) + } else { + uuidMap[strings.ReplaceAll(info.SsdpUdn, "-", "")] = *app } - uuidMap[device.UUID] = app } h.mu.Lock() h.apps = uuidMap h.mu.Unlock() - return nil + return err } func (h *Handler) disconnect(w http.ResponseWriter, r *http.Request) { @@ -353,7 +420,14 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) { return } h.log("status for device") - + syncUpdate := r.URL.Query().Get("syncUpdate") == "true" + if syncUpdate { + if err := app.Update(); err != nil { + h.log("error updating status: %v", err) + httpError(w, fmt.Errorf("error updating status: %w", err)) + return + } + } castApplication, castMedia, castVolume := app.Status() info, err := app.Info() if err != nil { @@ -377,8 +451,21 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) { } } +func (h *Handler) UpdateAll() error { + uuids := h.ConnectedDeviceUUIDs() + g := new(errgroup.Group) + for _, deviceUUID := range uuids { + app, ok := h.app(deviceUUID) + if ok { + g.Go(func() error { return app.Update() }) + } + } + return g.Wait() +} + func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) { h.log("statuses for devices") + syncUpdate := r.URL.Query().Get("syncUpdate") == "true" uuids := h.ConnectedDeviceUUIDs() mapUUID2Ch := map[string]chan statusResponse{} g := new(errgroup.Group) @@ -388,6 +475,11 @@ func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) { ch := make(chan statusResponse, 1) mapUUID2Ch[deviceUUID] = ch g.Go(func() error { + if syncUpdate { + if err := app.Update(); err != nil { + return err + } + } castApplication, castMedia, castVolume := app.Status() info, err := app.Info() if err != nil { @@ -404,8 +496,9 @@ func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) { } } if err := g.Wait(); err != nil { - h.log("%v", err) - httpError(w, err) + h.log("collecting statuses failed: %v", err) + httpError(w, fmt.Errorf("collecting statuses failed: %w", err)) + return } statusResponses := map[string]statusResponse{} @@ -560,7 +653,7 @@ func (h *Handler) rewind(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() seconds := q.Get("seconds") if seconds == "" { - httpValidationError(w, "missing 'seconds' in query paramater") + httpValidationError(w, "missing 'seconds' in query parameter") return } @@ -589,7 +682,7 @@ func (h *Handler) seek(w http.ResponseWriter, r *http.Request) { q := r.URL.Query() seconds := q.Get("seconds") if seconds == "" { - httpValidationError(w, "missing 'seconds' in query paramater") + httpValidationError(w, "missing 'seconds' in query parameter") return }