Skip to content

Commit

Permalink
Update test cases and implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
dilanSachi committed Jan 18, 2024
1 parent a297fcc commit 60bd19b
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static void createStream(Http2Connection conn, int streamId) throws Http
try {
conn.local().createStream(streamId, false);
} catch (Http2Exception exception) {
throw new Http2Exception(exception.error(), "Error occured while creating stream", exception);
throw new Http2Exception(exception.error(), "Error occurred while creating stream", exception);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Stream created streamId: {}", streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
}
Channel channel = http2ClientChannel.getChannel();
if (channel == null) { // if channel is not active, forget it and fetch next one
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
return fetchTargetChannel();
}
// increment and get active stream count
Expand All @@ -89,7 +89,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
return http2ClientChannel;
} else if (activeStreamCount == maxActiveStreams) { // no more streams except this one can be opened
http2ClientChannel.markAsExhausted();
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
// When the stream count reaches maxActiveStreams, a new channel will be added only if the
// channel queue is empty. This process is synchronized as transport thread can return channels
// after being reset. If such channel is returned before the new CountDownLatch, the subsequent
Expand All @@ -104,7 +104,7 @@ synchronized Http2ClientChannel fetchTargetChannel() {
}
return http2ClientChannel;
} else {
http2ClientChannels.remove(http2ClientChannel);
removeChannel(http2ClientChannel);
return fetchTargetChannel(); // fetch the next one from the queue
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,7 @@ public void onStreamClosed(Http2Stream stream) {

@Override
public void onGoAwayReceived(int lastStreamId, long errorCode, ByteBuf debugData) {
synchronized (http2ConnectionManager) {
markAsStale();
}
markAsStale();
http2ClientChannel.inFlightMessages.forEach((streamId, outboundMsgHolder) -> {
if (streamId > lastStreamId) {
http2ClientChannel.removeInFlightMessage(streamId);
Expand All @@ -323,8 +321,10 @@ void removeFromConnectionPool() {
}

void markAsStale() {
isStale.set(true);
http2ConnectionManager.markClientChannelAsStale(httpRoute, this);
synchronized (http2ConnectionManager) {
isStale.set(true);
http2ConnectionManager.markClientChannelAsStale(httpRoute, this);
}
}

boolean hasInFlightMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ public void testExhaustedStreamId() throws Http2Exception {
HttpCarbonMessage secondMessage = MessageGenerator.generateRequest(HttpMethod.POST, testValue);
Throwable firstError = new MessageSender(httpClientConnector).sendMessageAndExpectError(secondMessage);
assertNotNull(firstError, "Expected error not received");
assertEquals(firstError.getMessage(), "No more streams can be created on this connection",
assertEquals(firstError.getMessage(), "Error occurred while creating stream",
"Expected error response not received");
assertEquals(firstError.getCause().getMessage(), "No more streams can be created on this connection",
"Expected error response not received");

//Send another request using the same client and it should not fail
HttpCarbonMessage thirdMessage = MessageGenerator.generateRequest(HttpMethod.POST, testValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ private void sendRSTStream(OutputStream outputStream) throws IOException, Interr
readSemaphore.release();
writeSemaphore.acquire();
outputStream.write(HEADER_FRAME_STREAM_07);
Thread.sleep(SLEEP_TIME);
outputStream.write(RST_STREAM_FRAME_STREAM_07);
Thread.sleep(SLEEP_TIME);
readSemaphore.release();
Thread.sleep(END_SLEEP_TIME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setup() throws InterruptedException {
@Test
private void testRSTStreamFrameForSingleStream() {
CountDownLatch latch = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge);
try {
latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void setup() throws InterruptedException {
@Test
private void testRSTStreamFrameWhenReadingBodyForSingleStream() {
CountDownLatch latch = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge);
try {
latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void setup() throws InterruptedException {
@Test
private void testSuccessfulConnection() {
CountDownLatch latch = new CountDownLatch(1);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(null, h2ClientWithPriorKnowledge);
DefaultHttpConnectorListener msgListener = TestUtil.sendRequestAsync(latch, h2ClientWithPriorKnowledge);
try {
latch.await(TestUtil.HTTP2_RESPONSE_TIME_OUT, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand Down

0 comments on commit 60bd19b

Please sign in to comment.