Skip to content

Commit

Permalink
HTTP Server: Statuses can be collected in the background (#203)
Browse files Browse the repository at this point in the history
* Fixed collection of statuses.

* AutoConnect also supports periodic update.

* Refresh.

* Convurrent.

* More logging.
  • Loading branch information
ptabor authored Dec 8, 2024
1 parent 2ac6d1b commit 489e960
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 25 deletions.
2 changes: 1 addition & 1 deletion application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (a *Application) Update() error {
if err == nil {
break
}
a.log("error getting receiever status: %v", err)
a.log("error getting receiver status: %v", err)
a.log("unable to get status from device; attempt %d/5, retrying...", i+1)
time.Sleep(time.Second * 2)
}
Expand Down
1 change: 1 addition & 0 deletions cast/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,5 @@ type DeviceInfo struct {
Ssid string `json:"ssid"`
Timezone string `json:"timezone"`
UptimeSec float64 `json:"uptime"`
SsdpUdn string `json:"ssdp_udn"`
}
141 changes: 117 additions & 24 deletions http/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"encoding/json"
"fmt"
"golang.org/x/sync/errgroup"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"

"golang.org/x/sync/errgroup"

log "github.com/sirupsen/logrus"
"github.com/vishen/go-chromecast/application"
"github.com/vishen/go-chromecast/dns"
Expand All @@ -22,28 +22,73 @@ type Handler struct {
apps map[string]application.App
mux *http.ServeMux

verbose bool
autoconnect bool
verbose bool

autoconnectPeriod time.Duration
autoconnectTicker *time.Ticker

// autoupdatePeriodSec defines how frequently app.Update method is called in the background.
autoupdatePeriod time.Duration
autoupdateTicker *time.Ticker
}

func NewHandler(verbose bool) *Handler {
handler := &Handler{
verbose: verbose,
apps: map[string]application.App{},
mux: http.NewServeMux(),
mu: sync.Mutex{},
autoconnect: false,
verbose: verbose,
apps: map[string]application.App{},
mux: http.NewServeMux(),
mu: sync.Mutex{},

autoconnectPeriod: time.Duration(-1),
autoconnectTicker: nil,

autoupdatePeriod: time.Duration(-1),
autoupdateTicker: nil,
}
handler.registerHandlers()
return handler
}

// Autoconnect configures the handler to perform auto-discovery of all the cast devices & groups.
// AutoConnect configures the handler to perform periodic auto-discovery of all the cast devices & groups.
// It's intended to be called just after `NewHandler()`, before the handler is registered in the server.
func (h *Handler) AutoConnect(period time.Duration) error {
// Setting the autoconnect property - to allow (in future) periodic refresh of the connections.
h.autoconnectPeriod = period
if err := h.connectAllInternal("", "3"); err != nil {
return err
}
if h.autoconnectPeriod > 0 {
h.autoconnectTicker = time.NewTicker(period)
go func() {
for {
<-h.autoconnectTicker.C
if err := h.connectAllInternal("", "3"); err != nil {
log.Printf("AutoConnect issued connectAllInternal failed: %v", err)
}
}
}()
}
return nil
}

// AutoUpdate configures the handler to perform auto-update of all the cast devices & groups.
// It's intended to be called just after `NewHandler()`, before the handler is registered in the server.
func (h *Handler) Autoconnect() error {
// Thanks to AutoUpdate, /status and /statuses returns relatively recent status 'instantly'.
func (h *Handler) AutoUpdate(period time.Duration) error {
// Setting the autoconnect property - to allow (in future) periodic refresh of the connections.
h.autoconnect = true
return h.connectAllInternal("", "3")
h.autoupdatePeriod = period
if h.autoupdatePeriod > 0 {
h.autoupdateTicker = time.NewTicker(period)
go func() {
for {
<-h.autoupdateTicker.C
if err := h.UpdateAll(); err != nil {
log.Printf("AutoUpdate issued UpdateAll failed: %v", err)
}
}
}()
}
return nil
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -291,20 +336,42 @@ func (h *Handler) connectAll(w http.ResponseWriter, r *http.Request) {
}

func (h *Handler) connectAllInternal(iface string, waitSec string) error {
devices := h.discoverDnsEntries(context.Background(), iface, waitSec)
uuidMap := map[string]application.App{}
ctx := context.Background()
devices := h.discoverDnsEntries(ctx, iface, waitSec)
apps := make(chan *application.App, len(devices)+1)
g, ctx := errgroup.WithContext(ctx)
for _, device := range devices {
app, err := h.connectInternal(device.Addr, device.Port, device.DeviceName)
g.Go(func() error {
log.Printf("Connecting to %s:%d (%s)", device.Addr, device.Port, device.DeviceName)
app, err := h.connectInternal(device.Addr, device.Port, device.DeviceName)
if err != nil {
log.Printf("Connection to %s:%d (%s) failed: %v", device.Addr, device.Port, device.DeviceName, err)
return err
}
log.Printf("Connected to %s:%d (%s)", device.Addr, device.Port, device.DeviceName)
apps <- &app
return nil
})
}
err := g.Wait()
log.Printf("Post wait status: %v", err)
close(apps)

// Even if we cannot connect to some of the devices, we still update the map for remaining devices.
uuidMap := map[string]application.App{}
for app := range apps {
info, err := (*app).Info()
if err != nil {
return err
log.Printf("Skipping device %v", app)
} else {
uuidMap[strings.ReplaceAll(info.SsdpUdn, "-", "")] = *app
}
uuidMap[device.UUID] = app
}

h.mu.Lock()
h.apps = uuidMap
h.mu.Unlock()
return nil
return err
}

func (h *Handler) disconnect(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -353,7 +420,14 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) {
return
}
h.log("status for device")

syncUpdate := r.URL.Query().Get("syncUpdate") == "true"
if syncUpdate {
if err := app.Update(); err != nil {
h.log("error updating status: %v", err)
httpError(w, fmt.Errorf("error updating status: %w", err))
return
}
}
castApplication, castMedia, castVolume := app.Status()
info, err := app.Info()
if err != nil {
Expand All @@ -377,8 +451,21 @@ func (h *Handler) status(w http.ResponseWriter, r *http.Request) {
}
}

func (h *Handler) UpdateAll() error {
uuids := h.ConnectedDeviceUUIDs()
g := new(errgroup.Group)
for _, deviceUUID := range uuids {
app, ok := h.app(deviceUUID)
if ok {
g.Go(func() error { return app.Update() })
}
}
return g.Wait()
}

func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) {
h.log("statuses for devices")
syncUpdate := r.URL.Query().Get("syncUpdate") == "true"
uuids := h.ConnectedDeviceUUIDs()
mapUUID2Ch := map[string]chan statusResponse{}
g := new(errgroup.Group)
Expand All @@ -388,6 +475,11 @@ func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) {
ch := make(chan statusResponse, 1)
mapUUID2Ch[deviceUUID] = ch
g.Go(func() error {
if syncUpdate {
if err := app.Update(); err != nil {
return err
}
}
castApplication, castMedia, castVolume := app.Status()
info, err := app.Info()
if err != nil {
Expand All @@ -404,8 +496,9 @@ func (h *Handler) statuses(w http.ResponseWriter, r *http.Request) {
}
}
if err := g.Wait(); err != nil {
h.log("%v", err)
httpError(w, err)
h.log("collecting statuses failed: %v", err)
httpError(w, fmt.Errorf("collecting statuses failed: %w", err))
return
}

statusResponses := map[string]statusResponse{}
Expand Down Expand Up @@ -560,7 +653,7 @@ func (h *Handler) rewind(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
seconds := q.Get("seconds")
if seconds == "" {
httpValidationError(w, "missing 'seconds' in query paramater")
httpValidationError(w, "missing 'seconds' in query parameter")
return
}

Expand Down Expand Up @@ -589,7 +682,7 @@ func (h *Handler) seek(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()
seconds := q.Get("seconds")
if seconds == "" {
httpValidationError(w, "missing 'seconds' in query paramater")
httpValidationError(w, "missing 'seconds' in query parameter")
return
}

Expand Down

0 comments on commit 489e960

Please sign in to comment.