From 9580f0a50517fca13c919628403f35e1e615b454 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=C3=A1n=20C=20McCord?= Date: Sun, 29 Mar 2020 12:48:13 -0400 Subject: [PATCH] Limit number of reloads per time Limit the number of module reloads to once per 5s. --- main.go | 158 ++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 118 insertions(+), 40 deletions(-) diff --git a/main.go b/main.go index 4751591..69db489 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,7 @@ package main import ( "archive/zip" + "context" "fmt" "io" "io/ioutil" @@ -11,6 +12,7 @@ import ( "path" "path/filepath" "strings" + "sync" "time" "github.com/CyCoreSystems/kubetemplate" @@ -24,6 +26,7 @@ const renderFlagFilename = ".asterisk-config" var maxShortDeaths = 10 var minRuntime = time.Minute +var defaultMinReloadInterval = 5 * time.Second // Service maintains an Asterisk configuration set type Service struct { @@ -138,8 +141,13 @@ func main() { // Run executes the Service func (s *Service) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + renderChan := make(chan error, 1) + r := newReloader(ctx, ariUsername, s.Secret, s.Modules) + s.engine = kubetemplate.NewEngine(renderChan, s.Discoverer) defer s.engine.Close() @@ -158,6 +166,8 @@ func (s *Service) Run() error { return errors.Wrap(err, "failed to write render flag file") } + r.Reload() + s.engine.FirstRenderComplete(true) // Wait for Asterisk to come up before proceeding, so as to not interrupt @@ -182,9 +192,7 @@ func (s *Service) Run() error { return errors.Wrap(err, "failed to render configuration") } - if err := reload(ariUsername, s.Secret, s.Modules); err != nil { - return errors.Wrap(err, "failed to reload asterisk modules") - } + r.Reload() } } @@ -311,43 +319,6 @@ func waitAsterisk(username, secret string) error { } } -func reload(username, secret, modules string) (err error) { - urlFormat := "http://127.0.0.1:8088/ari/asterisk/modules/%s" - - for _, m := range strings.Split(modules, ",") { - - var r *http.Request - r, err = http.NewRequest("PUT", fmt.Sprintf(urlFormat, m), nil) - if err != nil { - return errors.Wrapf(err, "failed to construct module reload request for module %s", m) - } - r.Header.Set("Content-Type", "application/json") - r.SetBasicAuth(username, secret) - - ret, err := http.DefaultClient.Do(r) - if err != nil { - return errors.Wrapf(err, "failed to contact ARI to reload module %s", m) - } - ret.Body.Close() // nolint - - switch ret.StatusCode { - case http.StatusNoContent: - continue - case http.StatusNotFound: - return errors.Errorf("module %s not already loaded", m) - case http.StatusUnauthorized: - return errors.Errorf("module %s failed to reload due bad authentication", m) - case 409: - return errors.Errorf("module %s could not be reloaded", m) - default: - return errors.Errorf("module %s reload failed: %s", m, ret.Status) - } - } - - log.Println("reloads complete") - return nil -} - func extractSource(source, customRoot string) (err error) { if strings.HasPrefix(source, "http") { source, err = downloadSource(source) @@ -434,3 +405,110 @@ func downloadSource(uri string) (string, error) { return tf.Name(), err } + +type reloader struct { + lastReload time.Time + minReloadInterval time.Duration + + username string + secret string + + modules []string + + needReload bool + + mu sync.Mutex +} + +func newReloader(ctx context.Context, username, secret, modules string) *reloader { + r := &reloader{ + minReloadInterval: defaultMinReloadInterval, + username: username, + secret: secret, + } + + for _, m := range strings.Split(modules, ",") { + r.modules = append(r.modules, strings.TrimSpace(m)) + } + + go r.run(ctx) + + return r +} + +func (r *reloader) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(r.minReloadInterval): + } + + if err := r.maybeRunReload(); err != nil { + log.Println("failed to reload modules", err) + } + } +} + +func (r *reloader) maybeRunReload() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.needReload { + if err := r.reload(); err != nil { + return err + } + + r.needReload = false + } + + return nil +} + +func (r *reloader) Reload() { + r.mu.Lock() + r.needReload = true + r.mu.Unlock() +} + +func (r *reloader) reload() error { + log.Println("reloading Asterisk modules") + for _, m := range r.modules { + if err := r.reloadModule(m); err != nil { + return err + } + } + log.Println("Asterisk modules reloaded") + + return nil +} + +func (r *reloader) reloadModule(name string) error { + url := fmt.Sprintf("http://127.0.0.1:8088/ari/asterisk/modules/%s", name) + + req, err := http.NewRequest("PUT", url, nil) + if err != nil { + return errors.Wrapf(err, "failed to construct module reload request for module %s", name) + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(r.username, r.secret) + + ret, err := http.DefaultClient.Do(req) + if err != nil { + return errors.Wrapf(err, "failed to contact ARI to reload module %s", name) + } + ret.Body.Close() // nolint + + switch ret.StatusCode { + case http.StatusNoContent: + return nil + case http.StatusNotFound: + return errors.Errorf("module %s not already loaded", name) + case http.StatusUnauthorized: + return errors.Errorf("module %s failed to reload due bad authentication", name) + case 409: + return errors.Errorf("module %s could not be reloaded", name) + default: + return errors.Errorf("module %s reload failed: %s", name, ret.Status) + } +}