Skip to content

Commit

Permalink
chore: Update stack stream handling in GrpcServerConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
oriventi committed Jun 5, 2024
1 parent 975e4c3 commit 4c76bea
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 49 deletions.
2 changes: 1 addition & 1 deletion duo_backend/api/game_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ func (gm *GameManager) SetStackStream(gameId int, userId uuid.UUID, stream pb.DU
return err
}

//TODO
log.Printf("[stack stream] Received message from stack %v: %v", userId, msg)

}
}
}
Expand Down
1 change: 0 additions & 1 deletion duo_client/lib/provider/api_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class ApiProvider extends ChangeNotifier implements AbstractServerConnection {
notifyListeners();
break;
}
case StackStateInitEvent:
case UserStatusInitEvent:
{
notifyListeners();
Expand Down
70 changes: 27 additions & 43 deletions duo_client/lib/utils/connection/grpc_server_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class GrpcServerConnection extends AbstractServerConnection {
StreamController<PlayerAction> playerActionStreamController =
StreamController<PlayerAction>.broadcast();

StreamController<StackRequest>? stackRequestStreamController;
StreamController<StackRequest>? stackRequestStreamController =
StreamController<StackRequest>.broadcast();
GrpcServerConnection({required String host}) : super() {
channel = ClientChannel(
host,
Expand Down Expand Up @@ -275,25 +276,25 @@ class GrpcServerConnection extends AbstractServerConnection {
playerStream = stream;

playerStream?.listen(
(value) {
// playerState = value;
// _notifyListeners();
debugPrint('Received Player State Event: $value');

eventController.add(PlayerStateEvent(value));
},
cancelOnError: false,
onError: (e) {
eventController.add(PlayerStateDoneEvent());
debugPrint('Error: $e');
},
onDone: () {
// playerState = null;
eventController.add(PlayerStateDoneEvent());
gameStream = null;
debugPrint('Player Stream Done');
},
);
(value) {
// playerState = value;
// _notifyListeners();
debugPrint('Received Player State Event: $value');

eventController.add(PlayerStateEvent(value));
},
cancelOnError: false,
onError: (e) {
eventController.add(PlayerStateDoneEvent());
debugPrint('Error: $e');
},
onDone: () {
// playerState = null;
eventController.add(PlayerStateDoneEvent());
gameStream = null;
debugPrint('Player Stream Done');
},
);
} catch (e) {
return -1;
}
Expand All @@ -309,13 +310,6 @@ class GrpcServerConnection extends AbstractServerConnection {
@override
Future<int> getStackStream(String token, int gameId) async {
try {
final streamEstablishedCompleter = Completer<void>();
if (!hasStackStream) {
stackRequestStreamController = StreamController<StackRequest>.broadcast(
onCancel: () =>
debugPrint('Stream controller for stack request cancelled'),
);
}
final responseStream =
client.getStackStream(stackRequestStreamController!.stream,
options: CallOptions(
Expand All @@ -324,22 +318,14 @@ class GrpcServerConnection extends AbstractServerConnection {
stackStream = responseStream;
stackStream?.listen(
(value) {
if (!streamEstablishedCompleter.isCompleted) {
if (!streamEstablishedCompleter.isCompleted) {
streamEstablishedCompleter.complete();
}
eventController.add(StackStateEvent(value));
debugPrint('card on top: ${value.placeStack.cardIdOnTop}');
debugPrint('stack stream acknowledge received');
}
eventController.add(StackStateEvent(value));
debugPrint('card on top: ${value.placeStack.cardIdOnTop}');
debugPrint('stack stream acknowledge received');
},
onError: (error) {
if (!streamEstablishedCompleter.isCompleted) {
streamEstablishedCompleter.completeError(error);
stackStream?.cancel();
stackStream = null;
eventController.add(StackStateDoneEvent());
}
stackStream?.cancel();
stackStream = null;
eventController.add(StackStateDoneEvent());
debugPrint('Error establishing stack stream: $error');
},
onDone: () {
Expand All @@ -350,8 +336,6 @@ class GrpcServerConnection extends AbstractServerConnection {
cancelOnError: false,
);

await streamEstablishedCompleter.future;
eventController.add(StackStateInitEvent());
return 0;
} catch (e) {
debugPrint('Error sending stack request: $e');
Expand Down
4 changes: 0 additions & 4 deletions duo_client/lib/utils/connection/server_events.dart
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ class StackStateEvent extends ServerEvent {
StackStateEvent(this.state);
}

class StackStateInitEvent extends ServerEvent {
StackStateInitEvent();
}

class StackStateDoneEvent extends ServerEvent {
StackStateDoneEvent();
}
Expand Down

0 comments on commit 4c76bea

Please sign in to comment.