Skip to content

Commit

Permalink
Ignore unprocessable events + only log handler fails
Browse files Browse the repository at this point in the history
The client can't assume a jetstream server will always produce valid messages, or that it won't forward evil messages that break parsing [like a JSON object with 10,000 levels nested `[[[[[[]]]]]]`](bluesky-social#24).

This change makes the reader continue after encountering any messages that can't be decompressed or unmarshalled into JSON -- they will be logged as errors but the client will continue. The handler will not be called on these events.

The sequential scheduler is also modified to no longer quit with error if its handler func returns an error -- it just logs. This matches the behaviour of the parallel scheduler. Maybe different behaviour is fine, I guess it's to taste.
  • Loading branch information
uniphil committed Nov 18, 2024
1 parent 5e9ab19 commit a12f505
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
8 changes: 4 additions & 4 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,17 @@ func (c *Client) readLoop(ctx context.Context) error {
if c.decoder != nil && c.config.Compress {
m, err := c.decoder.DecodeAll(msg, nil)
if err != nil {
c.logger.Error("failed to decompress message", "error", err)
return fmt.Errorf("failed to decompress message: %w", err)
c.logger.Error("failed to decompress message (ignoring)", "error", err)
continue
}
msg = m
}

// Unpack the message and pass it to the handler
var event models.Event
if err := json.Unmarshal(msg, &event); err != nil {
c.logger.Error("failed to unmarshal event", "error", err)
return fmt.Errorf("failed to unmarshal event: %w", err)
c.logger.Error("failed to unmarshal event (ignoring)", "error", err, msg)
continue
}

if err := c.Scheduler.AddWork(ctx, event.Did, &event); err != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/client/schedulers/sequential/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (p *Scheduler) Shutdown() {
func (s *Scheduler) AddWork(ctx context.Context, repo string, val *models.Event) error {
s.itemsAdded.Inc()
s.itemsActive.Inc()
err := s.handleEvent(ctx, val)
if err := s.handleEvent(ctx, val); err != nil {
s.logger.Error("event handler failed", "error", err)
}
s.itemsProcessed.Inc()
return err
return nil
}

0 comments on commit a12f505

Please sign in to comment.