From 2827298227e0ee694c2a95a07b999598d5cbe2ca Mon Sep 17 00:00:00 2001 From: coddmeistr Date: Thu, 26 Dec 2024 23:59:36 +0300 Subject: [PATCH 1/4] vpn is method to sync --- pkg/instances/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/instances/server.go b/pkg/instances/server.go index 6582d35c9..680f4dc1d 100644 --- a/pkg/instances/server.go +++ b/pkg/instances/server.go @@ -151,6 +151,7 @@ var methodsToSync = []string{ "manual_renew", "free_renew", "cancel_renew", + "vpn", } func (s *InstancesServer) Invoke(ctx context.Context, _req *connect.Request[pb.InvokeRequest]) (*connect.Response[pb.InvokeResponse], error) { From 5eda5021005a5ce6608a84e396ccb1dc95f570e0 Mon Sep 17 00:00:00 2001 From: coddmeistr Date: Fri, 27 Dec 2024 13:31:25 +0300 Subject: [PATCH 2/4] skip daily whmcs sync --- pkg/billing/cron_whmcs_invoices_syncer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/billing/cron_whmcs_invoices_syncer.go b/pkg/billing/cron_whmcs_invoices_syncer.go index b301f6bcd..2d8fcfdc9 100644 --- a/pkg/billing/cron_whmcs_invoices_syncer.go +++ b/pkg/billing/cron_whmcs_invoices_syncer.go @@ -52,6 +52,9 @@ func (s *BillingServiceServer) WhmcsInvoicesSyncerCronJob(ctx context.Context, l delCount++ } + log.Info("Finished WHMCS Invoices syncer cron job", zap.Int("deleted", delCount)) + return + whmcsIdToInvoice := make(map[int]struct{}) for _, inv := range ncInvoices { if inv.Meta == nil { From 83c48e2a0f2a43d8b56c56125e007a8d811d782c Mon Sep 17 00:00:00 2001 From: coddmeistr Date: Fri, 27 Dec 2024 14:28:06 +0300 Subject: [PATCH 3/4] adjust states stream --- pkg/services/services.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/pkg/services/services.go b/pkg/services/services.go index 463dc9878..a2164d3da 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -953,17 +953,25 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St s.ps.AddSub(messages, topics...) defer unsub(s.ps, messages) - for msg := range messages { - state := msg.(*spb.ObjectState) - log.Debug("state", zap.Any("state", state)) - err := srv.Send(state) - if err != nil { - log.Warn("Unable to send message", zap.Error(err)) - break + for { + select { + case <-ctx.Done(): + log.Debug("Context is cancelled. Connection was probably closed by the client") + return nil + case msg, ok := <-messages: + if !ok { + log.Error("Messages pubsub channel is closed") + return fmt.Errorf("internal connection closed") + } + state := msg.(*spb.ObjectState) + log.Debug("state", zap.Any("state", state)) + err := srv.Send(state) + if err != nil { + log.Warn("Unable to send message", zap.Error(err)) + return nil + } } } - - return nil } func unsub[T chan any](ps *pubsub.PubSub, ch chan any) { From 8986d39fe80132e20804a20f6e52c696861abeb2 Mon Sep 17 00:00:00 2001 From: coddmeistr Date: Fri, 27 Dec 2024 14:37:47 +0300 Subject: [PATCH 4/4] use channel reconnections --- pkg/services/services.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/services/services.go b/pkg/services/services.go index a2164d3da..c0d6f1513 100644 --- a/pkg/services/services.go +++ b/pkg/services/services.go @@ -930,8 +930,6 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St log.Debug("Request received", zap.Any("req", req)) requestor := ctx.Value(nocloud.NoCloudAccount).(string) - messages := make(chan interface{}, 10) - if service, err := s.ctrl.Get(ctx, requestor, req.GetUuid()); err != nil || service.GetAccess().GetLevel() < access.Level_READ { log.Warn("Failed access check", zap.String("uuid", req.GetUuid())) return errors.New("failed access check") @@ -947,9 +945,11 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St for i, id := range uuids { topics[i] = "instance/" + id } - s.log.Debug("topics", zap.Any("topics", topics)) + reconnections := 0 +retry: + messages := make(chan interface{}, 10) s.ps.AddSub(messages, topics...) defer unsub(s.ps, messages) @@ -960,8 +960,13 @@ func (s *ServicesServer) Stream(ctx context.Context, _req *connect.Request[pb.St return nil case msg, ok := <-messages: if !ok { - log.Error("Messages pubsub channel is closed") - return fmt.Errorf("internal connection closed") + if reconnections > 5 { + log.Error("Too many reconnections. Closing stream") + return fmt.Errorf("internal channel closed") + } + log.Warn("Messages pubsub channel is closed. Trying to resubscribe...") + reconnections++ + goto retry } state := msg.(*spb.ObjectState) log.Debug("state", zap.Any("state", state))