Skip to content

Commit

Permalink
Check for remaining identify requests at every identify
Browse files Browse the repository at this point in the history
  • Loading branch information
abitofevrything committed Dec 31, 2023
1 parent 9768f53 commit 3a8d876
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 15 deletions.
27 changes: 16 additions & 11 deletions lib/src/gateway/gateway.dart
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,11 @@ class Gateway extends GatewayManager with EventParser {

/// Create a new [Gateway].
Gateway(this.client, this.gatewayBot, this.shards, this.totalShards, this.shardIds) : super.create() {
final logger = Logger('${client.options.loggerName}.Gateway');

const identifyDelay = Duration(seconds: 5);
final maxConcurrency = gatewayBot.sessionStartLimit.maxConcurrency;
var remainingIdentifyRequests = gatewayBot.sessionStartLimit.remaining;

// A mapping of rateLimitId (shard.id % maxConcurrency) to Futures that complete when the identify lock for that rate_limit_key is no longer used.
final identifyLocks = <int, Future<void>>{};
Expand All @@ -114,11 +117,21 @@ class Gateway extends GatewayManager with EventParser {

if (event is RequestingIdentify) {
final currentLock = identifyLocks[rateLimitKey] ?? Future.value();
identifyLocks[rateLimitKey] = currentLock.then((_) {
if (_closing) return null;
identifyLocks[rateLimitKey] = currentLock.then((_) async {
if (_closing) return;

if (remainingIdentifyRequests < client.options.minimumSessionStarts * 5) {
logger.warning('$remainingIdentifyRequests session starts remaining');
}

if (remainingIdentifyRequests < client.options.minimumSessionStarts) {
await client.close();
throw OutOfRemainingSessionsError(gatewayBot);
}

remainingIdentifyRequests--;
shard.add(Identify());
return Future.delayed(identifyDelay);
return await Future.delayed(identifyDelay);
});
}
},
Expand Down Expand Up @@ -152,14 +165,6 @@ class Gateway extends GatewayManager with EventParser {
' Remaining Session Starts: ${gatewayBot.sessionStartLimit.remaining}, Reset After: ${gatewayBot.sessionStartLimit.resetAfter}',
);

if (gatewayBot.sessionStartLimit.remaining < client.options.minimumSessionStarts * 5) {
logger.warning('${gatewayBot.sessionStartLimit.remaining} session starts remaining');
}

if (gatewayBot.sessionStartLimit.remaining < client.options.minimumSessionStarts) {
throw OutOfRemainingSessionsError(gatewayBot);
}

assert(
shardIds.every((element) => element < totalShards),
'Shard ID exceeds total shard count',
Expand Down
6 changes: 2 additions & 4 deletions lib/src/gateway/shard.dart
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,10 @@ class Shard extends Stream<ShardMessage> implements StreamSink<GatewayMessage> {
Future<void> doClose() async {
add(Dispose());

// Wait for disconnection confirmation
await firstWhere((message) => message is Disconnecting);

// Give the isolate time to shut down cleanly, but kill it if it takes too long.
try {
await drain().timeout(const Duration(seconds: 5));
// Wait for disconnection confirmation
await firstWhere((message) => message is Disconnecting).then(drain).timeout(const Duration(seconds: 5));
} on TimeoutException {
logger.warning('Isolate took too long to shut down, killing it');
isolate.kill(priority: Isolate.immediate);
Expand Down
7 changes: 7 additions & 0 deletions lib/src/gateway/shard_runner.dart
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ class ShardRunner {
} else if (message is Dispose) {
disposing = true;
connection!.close();

// We might get a dispose request while we are waiting to identify.
// Add an error to the identify stream so we break out of the wait.
identifyController.addError(
Exception('Out of remaining session starts'),
StackTrace.current,
);
} else if (message is StartShard) {
if (startCompleter.isCompleted) {
controller.add(ErrorReceived(
Expand Down

0 comments on commit 3a8d876

Please sign in to comment.