Skip to content

Commit

Permalink
Do not send header twice via malfeasance stream
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Feb 26, 2025
1 parent fdd243d commit 0166f4e
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
25 changes: 14 additions & 11 deletions api/grpcserver/v2alpha1/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ func (s *MalfeasanceStreamService) Stream(
request *spacemeshv2alpha1.MalfeasanceStreamRequest,
stream spacemeshv2alpha1.MalfeasanceStreamService_StreamServer,
) error {
var sub *events.BufferedSubscription[events.EventMalfeasance]
if request.Watch {
matcher := malfeasanceMatcher{request}
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

Check warning on line 155 in api/grpcserver/v2alpha1/malfeasance.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/malfeasance.go#L154-L155

Added lines #L154 - L155 were not covered by tests
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return err
}

Check warning on line 159 in api/grpcserver/v2alpha1/malfeasance.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2alpha1/malfeasance.go#L158-L159

Added lines #L158 - L159 were not covered by tests
}

legacyProofs, err := fetchLegacyFromDB(
stream.Context(),
s.db,
Expand Down Expand Up @@ -187,20 +201,9 @@ func (s *MalfeasanceStreamService) Stream(
return nil
}

matcher := malfeasanceMatcher{request}
sub, err := events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer sub.Close()
eventsOut := sub.Out()
eventsFull := sub.Full()

if err := stream.SendHeader(metadata.MD{}); err != nil {
ctxzap.Debug(stream.Context(), "failed to send stream header",
zap.Error(err),
)
}
for {
select {
// process pending events first
Expand Down
25 changes: 14 additions & 11 deletions api/grpcserver/v2beta1/malfeasance.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,20 @@ func (s *MalfeasanceStreamService) Stream(
request *spacemeshv2beta1.MalfeasanceStreamRequest,
stream spacemeshv2beta1.MalfeasanceStreamService_StreamServer,
) error {
var sub *events.BufferedSubscription[events.EventMalfeasance]
if request.Watch {
matcher := malfeasanceMatcher{request}
var err error
sub, err = events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

Check warning on line 155 in api/grpcserver/v2beta1/malfeasance.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2beta1/malfeasance.go#L154-L155

Added lines #L154 - L155 were not covered by tests
defer sub.Close()
if err := stream.SendHeader(metadata.MD{}); err != nil {
return err
}

Check warning on line 159 in api/grpcserver/v2beta1/malfeasance.go

View check run for this annotation

Codecov / codecov/patch

api/grpcserver/v2beta1/malfeasance.go#L158-L159

Added lines #L158 - L159 were not covered by tests
}

legacyProofs, err := fetchLegacyFromDB(
stream.Context(),
s.db,
Expand Down Expand Up @@ -187,20 +201,9 @@ func (s *MalfeasanceStreamService) Stream(
return nil
}

matcher := malfeasanceMatcher{request}
sub, err := events.SubscribeMatched(matcher.match)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer sub.Close()
eventsOut := sub.Out()
eventsFull := sub.Full()

if err := stream.SendHeader(metadata.MD{}); err != nil {
ctxzap.Debug(stream.Context(), "failed to send stream header",
zap.Error(err),
)
}
for {
select {
// process pending events first
Expand Down

0 comments on commit 0166f4e

Please sign in to comment.