diff --git a/lib/src/gateway/gateway.dart b/lib/src/gateway/gateway.dart index b26ec4d77..b61fb52fe 100644 --- a/lib/src/gateway/gateway.dart +++ b/lib/src/gateway/gateway.dart @@ -95,8 +95,11 @@ class Gateway extends GatewayManager with EventParser { /// Create a new [Gateway]. Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() { + final logger = Logger('${client.options.loggerName}.Gateway'); + const identifyDelay = Duration(seconds: 5); final maxConcurrency = gatewayBot.sessionStartLimit.maxConcurrency; + var remainingIdentifyRequests = gatewayBot.sessionStartLimit.remaining; // A mapping of rateLimitId (shard.id % maxConcurrency) to Futures that complete when the identify lock for that rate_limit_key is no longer used. final identifyLocks = >{}; @@ -114,11 +117,21 @@ class Gateway extends GatewayManager with EventParser { if (event is RequestingIdentify) { final currentLock = identifyLocks[rateLimitKey] ?? Future.value(); - identifyLocks[rateLimitKey] = currentLock.then((_) { - if (_closing) return null; + identifyLocks[rateLimitKey] = currentLock.then((_) async { + if (_closing) return; + + if (remainingIdentifyRequests < client.options.minimumSessionStarts * 5) { + logger.warning('$remainingIdentifyRequests session starts remaining'); + } + + if (remainingIdentifyRequests < client.options.minimumSessionStarts) { + await client.close(); + throw OutOfRemainingSessionsError(gatewayBot); + } + remainingIdentifyRequests--; shard.add(Identify()); - return Future.delayed(identifyDelay); + return await Future.delayed(identifyDelay); }); } }, @@ -152,14 +165,6 @@ class Gateway extends GatewayManager with EventParser { ' Remaining Session Starts: ${gatewayBot.sessionStartLimit.remaining}, Reset After: ${gatewayBot.sessionStartLimit.resetAfter}', ); - if (gatewayBot.sessionStartLimit.remaining < client.options.minimumSessionStarts * 5) { - logger.warning('${gatewayBot.sessionStartLimit.remaining} session starts remaining'); - } - - if (gatewayBot.sessionStartLimit.remaining < client.options.minimumSessionStarts) { - throw OutOfRemainingSessionsError(gatewayBot); - } - assert( shardIds.every((element) => element < totalShards), 'Shard ID exceeds total shard count', diff --git a/lib/src/gateway/shard.dart b/lib/src/gateway/shard.dart index fa2fcfbb9..9cb888b34 100644 --- a/lib/src/gateway/shard.dart +++ b/lib/src/gateway/shard.dart @@ -174,12 +174,10 @@ class Shard extends Stream implements StreamSink { Future doClose() async { add(Dispose()); - // Wait for disconnection confirmation - await firstWhere((message) => message is Disconnecting); - // Give the isolate time to shut down cleanly, but kill it if it takes too long. try { - await drain().timeout(const Duration(seconds: 5)); + // Wait for disconnection confirmation + await firstWhere((message) => message is Disconnecting).then(drain).timeout(const Duration(seconds: 5)); } on TimeoutException { logger.warning('Isolate took too long to shut down, killing it'); isolate.kill(priority: Isolate.immediate); diff --git a/lib/src/gateway/shard_runner.dart b/lib/src/gateway/shard_runner.dart index 0f8a96bd8..59399507e 100644 --- a/lib/src/gateway/shard_runner.dart +++ b/lib/src/gateway/shard_runner.dart @@ -79,6 +79,13 @@ class ShardRunner { } else if (message is Dispose) { disposing = true; connection!.close(); + + // We might get a dispose request while we are waiting to identify. + // Add an error to the identify stream so we break out of the wait. + identifyController.addError( + Exception('Out of remaining session starts'), + StackTrace.current, + ); } else if (message is StartShard) { if (startCompleter.isCompleted) { controller.add(ErrorReceived(