From e175dec499ff9ca62ebaeb0e0bf294b0ccfbec45 Mon Sep 17 00:00:00 2001 From: davidramiro Date: Wed, 30 Oct 2024 16:21:20 +0100 Subject: [PATCH] feat: concurrently poll twitch and restreamer --- internal/adapter/restreamer/restreamer.go | 28 +++++---- internal/adapter/twitch/twitch.go | 73 ++++++++++++++++------- internal/core/port/port.go | 7 ++- internal/core/service/stream.go | 34 +++++++---- main.go | 12 ++-- 5 files changed, 104 insertions(+), 50 deletions(-) diff --git a/internal/adapter/restreamer/restreamer.go b/internal/adapter/restreamer/restreamer.go index 99c468a..4c3f7bb 100644 --- a/internal/adapter/restreamer/restreamer.go +++ b/internal/adapter/restreamer/restreamer.go @@ -73,39 +73,45 @@ func fetchInfo(ctx context.Context, stream domain.StreamQuery, client *http.Clie return restreamerInfo, nil } -func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error) { - log.Info().Int("count", len(streams)).Msg("getting info for restreamer streams") +func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, + streams []*domain.StreamQuery, + wg *sync.WaitGroup, + infos chan<- []domain.StreamInfo, + errorCh chan<- error) { + defer wg.Done() - wg := sync.WaitGroup{} + log.Info().Int("count", len(streams)).Msg("getting info for restreamer streams") - wg.Add(len(streams)) + wg2 := new(sync.WaitGroup) + wg2.Add(len(streams)) - infos := make([]domain.StreamInfo, 0) + streamInfos := make([]domain.StreamInfo, 0) infoCh := make(chan domain.StreamInfo, len(streams)) errCh := make(chan error, len(streams)) client := &http.Client{} for _, stream := range streams { - go fetch(ctx, stream, client, infoCh, errCh, &wg) + go fetch(ctx, stream, client, infoCh, errCh, wg2) } - wg.Wait() + wg2.Wait() close(infoCh) close(errCh) for err := range errCh { if err != nil { - log.Error().Err(err).Msg("error getting stream info") - return nil, err + log.Error().Err(err).Msg("error getting restreamer stream info") + errorCh <- err + return } } for info := range infoCh { - infos = append(infos, info) + streamInfos = append(streamInfos, info) } - return infos, nil + infos <- streamInfos } func fetch(ctx context.Context, diff --git a/internal/adapter/twitch/twitch.go b/internal/adapter/twitch/twitch.go index 7889c5d..3299177 100644 --- a/internal/adapter/twitch/twitch.go +++ b/internal/adapter/twitch/twitch.go @@ -1,16 +1,19 @@ package twitch import ( + "bytes" "context" "encoding/json" "fmt" "github.com/rs/zerolog/log" "github.com/spf13/viper" + "io" "net/http" "net/url" "streamobserver/internal/core/domain" "streamobserver/internal/core/port" "strings" + "sync" "time" ) @@ -35,7 +38,7 @@ type twitchResponse struct { GameName string `json:"game_name"` Title string `json:"title"` ThumbnailURL string `json:"thumbnail_url"` - } `json:"data"` + } `json:"data,omitempty"` } type authToken struct { @@ -49,25 +52,33 @@ func formatTwitchPhotoUrl(url string) string { return strings.Replace(url, widthPlaceholder, "1920", 1) } -func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error) { +func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, + streams []*domain.StreamQuery, + wg *sync.WaitGroup, + infos chan<- []domain.StreamInfo, + errCh chan<- error) { + defer wg.Done() + log.Info().Int("count", len(streams)).Msg("getting info for twitch streams") err := s.authenticate(ctx) if err != nil { log.Err(err).Msg("error authenticating with twitch") - return nil, err + errCh <- err + return } bearer := "Bearer " + s.token.AccessToken base, err := url.Parse(twitchStreamsURL) if err != nil { - return nil, err + errCh <- err + return } // Query params params := url.Values{} - params.Add("type", "live") + params.Add("type", "all") params.Add("first", "100") for _, s := range streams { params.Add("user_login", s.UserID) @@ -77,7 +88,8 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma req, err := http.NewRequestWithContext(ctx, http.MethodGet, base.String(), nil) if err != nil { log.Err(err).Msg("error building request for twitch") - return nil, err + errCh <- err + return } req.Header.Set("Authorization", bearer) @@ -89,33 +101,50 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma resp, err := client.Do(req) if err != nil { log.Error().Err(err).Msg("error making request to twitch") - return nil, err + errCh <- err + return } if resp.StatusCode != http.StatusOK { log.Error().Int("StatusCode", resp.StatusCode).Interface("Response", resp). Msg("No HTTP OK from Twitch Helix.") - return nil, err + errCh <- err + return } defer resp.Body.Close() - var response twitchResponse + responseBytes, err := io.ReadAll(resp.Body) + if err != nil { + log.Err(err).Msg("error getting bytes from twitch json response") + errCh <- err + return + } + + buffer := new(bytes.Buffer) + err = json.Compact(buffer, responseBytes) + if err != nil { + log.Err(err).Msg("error compacting twitch json response") + errCh <- err + return + } - err = json.NewDecoder(resp.Body).Decode(&response) + var response twitchResponse + err = json.NewDecoder(buffer).Decode(&response) if err != nil { log.Err(err).Msg("error decoding response from twitch") - return nil, err + errCh <- err + return } - infos := make([]domain.StreamInfo, len(streams)) + streamInfos := make([]domain.StreamInfo, len(streams)) for i, s := range streams { online := false log.Debug().Str("id", s.UserID).Msg("checking if stream in response") for _, data := range response.Data { - if data.Username == s.UserID { + if strings.EqualFold(data.Username, s.UserID) { log.Debug().Msg("found, setting info") - infos[i] = domain.StreamInfo{ + streamInfos[i] = domain.StreamInfo{ Query: s, Username: data.Username, Title: fmt.Sprintf("%s: %s", data.GameName, data.Title), @@ -125,18 +154,18 @@ func (s *StreamInfoProvider) GetStreamInfos(ctx context.Context, streams []*doma } online = true } - if !online { - log.Debug().Msg("not found, setting offline info") - infos[i] = domain.StreamInfo{ - Query: s, - Username: s.UserID, - IsOnline: false, - } + } + if !online { + log.Debug().Msg("not found, setting offline info") + streamInfos[i] = domain.StreamInfo{ + Query: s, + Username: s.UserID, + IsOnline: false, } } } - return infos, nil + infos <- streamInfos } func (s *StreamInfoProvider) authenticate(ctx context.Context) error { diff --git a/internal/core/port/port.go b/internal/core/port/port.go index 214e3d7..09a4323 100644 --- a/internal/core/port/port.go +++ b/internal/core/port/port.go @@ -3,11 +3,16 @@ package port import ( "context" "streamobserver/internal/core/domain" + "sync" ) type StreamInfoProvider interface { // GetStreamInfos takes an array of streams for a single stream service and returns metadata for those that are online. - GetStreamInfos(ctx context.Context, streams []*domain.StreamQuery) ([]domain.StreamInfo, error) + GetStreamInfos(ctx context.Context, + streams []*domain.StreamQuery, + wg *sync.WaitGroup, + stream chan<- []domain.StreamInfo, + err chan<- error) // Kind returns the streaming service fetched by this provider Kind() domain.StreamKind } diff --git a/internal/core/service/stream.go b/internal/core/service/stream.go index cfab15d..79e83dc 100644 --- a/internal/core/service/stream.go +++ b/internal/core/service/stream.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/viper" "streamobserver/internal/core/domain" "streamobserver/internal/core/port" + "sync" ) type StreamService struct { @@ -33,7 +34,6 @@ func (ss *StreamService) GetStreamInfos(ctx context.Context, streams []*domain.S ctx, cancel := context.WithTimeout(ctx, viper.GetDuration("general.request_timeout")) defer cancel() - infos := make([]domain.StreamInfo, 0) twitchStreams := make([]*domain.StreamQuery, 0) restreamerStreams := make([]*domain.StreamQuery, 0) @@ -50,22 +50,36 @@ func (ss *StreamService) GetStreamInfos(ctx context.Context, streams []*domain.S Int("restreamerStreamCount", len(restreamerStreams)). Msg("getting stream infos") + wg := new(sync.WaitGroup) + + infoCh := make(chan []domain.StreamInfo, 2) + errCh := make(chan error, 2) + if len(twitchStreams) > 0 { - twitchInfos, err := ss.twitchGetter.GetStreamInfos(ctx, twitchStreams) - if err != nil { - log.Err(err).Msg("unable to fetch twitch streams") - return nil, err - } - infos = append(infos, twitchInfos...) + wg.Add(1) + go ss.twitchGetter.GetStreamInfos(ctx, twitchStreams, wg, infoCh, errCh) } if len(restreamerStreams) > 0 { - restreamerInfos, err := ss.restreamerGetter.GetStreamInfos(ctx, restreamerStreams) + wg.Add(1) + go ss.restreamerGetter.GetStreamInfos(ctx, restreamerStreams, wg, infoCh, errCh) + } + + wg.Wait() + close(errCh) + close(infoCh) + + for err := range errCh { if err != nil { - log.Err(err).Msg("unable to fetch restream streams") + log.Error().Err(err).Msg("error getting stream info") return nil, err } - infos = append(infos, restreamerInfos...) + } + + infos := make([]domain.StreamInfo, 0) + + for info := range infoCh { + infos = append(infos, info...) } return infos, nil diff --git a/main.go b/main.go index 7454330..0fe63eb 100644 --- a/main.go +++ b/main.go @@ -16,12 +16,6 @@ import ( func main() { log.Info().Str("author", "davidramiro").Msg("starting streamobserver") - if viper.GetBool("general.debug") { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } else { - zerolog.SetGlobalLevel(zerolog.InfoLevel) - } - log.Info().Msg("initializing telegram bot") viper.AddConfigPath(".") err := viper.ReadInConfig() @@ -29,6 +23,12 @@ func main() { log.Panic().Err(err).Msg("failed to read config") } + if viper.GetBool("general.debug") { + zerolog.SetGlobalLevel(zerolog.DebugLevel) + } else { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } + token := viper.GetString("telegram.apikey") b, err := bot.New(token)