diff --git a/client.go b/client.go index 0aff8734..1c2b9683 100644 --- a/client.go +++ b/client.go @@ -174,8 +174,15 @@ func (cli *Client) SyncWithContext(ctx context.Context) error { // We will keep syncing until the syncing state changes. Either because // Sync is called or StopSync is called. syncingID := cli.incrementSyncingID() - nextBatch := cli.Store.LoadNextBatch(ctx, cli.UserID) - filterID := cli.Store.LoadFilterID(ctx, cli.UserID) + nextBatch, err := cli.Store.LoadNextBatch(ctx, cli.UserID) + if err != nil { + return err + } + filterID, err := cli.Store.LoadFilterID(ctx, cli.UserID) + if err != nil { + return err + } + if filterID == "" { filterJSON := cli.Syncer.GetFilterJSON(cli.UserID) resFilter, err := cli.CreateFilter(ctx, filterJSON) @@ -183,7 +190,9 @@ func (cli *Client) SyncWithContext(ctx context.Context) error { return err } filterID = resFilter.FilterID - cli.Store.SaveFilterID(ctx, cli.UserID, filterID) + if err := cli.Store.SaveFilterID(ctx, cli.UserID, filterID); err != nil { + return err + } } lastSuccessfulSync := time.Now().Add(-cli.StreamSyncMinAge - 1*time.Hour) for { diff --git a/crypto/sql_store.go b/crypto/sql_store.go index c73a859a..64d62bc4 100644 --- a/crypto/sql_store.go +++ b/crypto/sql_store.go @@ -88,22 +88,27 @@ func (store *SQLCryptoStore) GetNextBatch(ctx context.Context) (string, error) { var _ mautrix.SyncStore = (*SQLCryptoStore)(nil) -func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) {} -func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) string { return "" } +func (store *SQLCryptoStore) SaveFilterID(ctx context.Context, _ id.UserID, _ string) error { + return nil +} +func (store *SQLCryptoStore) LoadFilterID(ctx context.Context, _ id.UserID) (string, error) { + return "", nil +} -func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) { +func (store *SQLCryptoStore) SaveNextBatch(ctx context.Context, _ id.UserID, nextBatchToken string) error { err := store.PutNextBatch(ctx, nextBatchToken) if err != nil { - // TODO handle error + return fmt.Errorf("unable to store batch: %w", err) } + return nil } -func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) string { +func (store *SQLCryptoStore) LoadNextBatch(ctx context.Context, _ id.UserID) (string, error) { nb, err := store.GetNextBatch(ctx) if err != nil { - // TODO handle error + return "", fmt.Errorf("unable to load batch: %w", err) } - return nb + return nb, nil } func (store *SQLCryptoStore) FindDeviceID() (deviceID id.DeviceID) { diff --git a/syncstore.go b/syncstore.go index 8b5b3a55..6c7fc9ee 100644 --- a/syncstore.go +++ b/syncstore.go @@ -3,6 +3,7 @@ package mautrix import ( "context" "errors" + "fmt" "maunium.net/go/mautrix/id" ) @@ -16,10 +17,10 @@ var _ SyncStore = (*AccountDataStore)(nil) // provided "MemorySyncStore" which just keeps data around in-memory which is lost on // restarts. type SyncStore interface { - SaveFilterID(ctx context.Context, userID id.UserID, filterID string) - LoadFilterID(ctx context.Context, userID id.UserID) string - SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) - LoadNextBatch(ctx context.Context, userID id.UserID) string + SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error + LoadFilterID(ctx context.Context, userID id.UserID) (string, error) + SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error + LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) } // Deprecated: renamed to SyncStore @@ -36,23 +37,25 @@ type MemorySyncStore struct { } // SaveFilterID to memory. -func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) { +func (s *MemorySyncStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error { s.Filters[userID] = filterID + return nil } // LoadFilterID from memory. -func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) string { - return s.Filters[userID] +func (s *MemorySyncStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) { + return s.Filters[userID], nil } // SaveNextBatch to memory. -func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) { +func (s *MemorySyncStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error { s.NextBatch[userID] = nextBatchToken + return nil } // LoadNextBatch from memory. -func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) string { - return s.NextBatch[userID] +func (s *MemorySyncStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) { + return s.NextBatch[userID], nil } // NewMemorySyncStore constructs a new MemorySyncStore. @@ -76,25 +79,26 @@ type accountData struct { NextBatch string `json:"next_batch"` } -func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) { +func (s *AccountDataStore) SaveFilterID(ctx context.Context, userID id.UserID, filterID string) error { if userID.String() != s.client.UserID.String() { panic("AccountDataStore must only be used with a single account") } s.FilterID = filterID + return nil } -func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) string { +func (s *AccountDataStore) LoadFilterID(ctx context.Context, userID id.UserID) (string, error) { if userID.String() != s.client.UserID.String() { panic("AccountDataStore must only be used with a single account") } - return s.FilterID + return s.FilterID, nil } -func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) { +func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, nextBatchToken string) error { if userID.String() != s.client.UserID.String() { panic("AccountDataStore must only be used with a single account") } else if nextBatchToken == s.nextBatch { - return + return nil } data := accountData{ @@ -103,7 +107,7 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, err := s.client.SetAccountData(ctx, s.EventType, data) if err != nil { - s.client.Log.Warn().Err(err).Msg("Failed to save next batch token to account data") + return fmt.Errorf("failed to save next batch token to account data: %w", err) } else { s.client.Log.Debug(). Str("old_token", s.nextBatch). @@ -111,9 +115,10 @@ func (s *AccountDataStore) SaveNextBatch(ctx context.Context, userID id.UserID, Msg("Saved next batch token") s.nextBatch = nextBatchToken } + return nil } -func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) string { +func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) (string, error) { if userID.String() != s.client.UserID.String() { panic("AccountDataStore must only be used with a single account") } @@ -124,15 +129,15 @@ func (s *AccountDataStore) LoadNextBatch(ctx context.Context, userID id.UserID) if err != nil { if errors.Is(err, MNotFound) { s.client.Log.Debug().Msg("No next batch token found in account data") + return "", nil } else { - s.client.Log.Warn().Err(err).Msg("Failed to load next batch token from account data") + return "", fmt.Errorf("failed to load next batch token from account data: %w", err) } - return "" } s.nextBatch = data.NextBatch s.client.Log.Debug().Str("next_batch", data.NextBatch).Msg("Loaded next batch token from account data") - return s.nextBatch + return s.nextBatch, nil } // NewAccountDataStore returns a new AccountDataStore, which stores