From 7e67f658c4f477b561f7e417476f704972cab83f Mon Sep 17 00:00:00 2001 From: Krzysztof Nozderko Date: Tue, 27 Aug 2024 15:08:55 +0200 Subject: [PATCH 1/2] SNOW-1640968 Make connection finaliser a not blocking operation (#1009) --- .../IntegrationTests/SFConnectionIT.cs | 48 +++++++++++ .../Mock/MockCloseHangingRestRequester.cs | 82 +++++++++++++++++++ .../UnitTests/SFSessionTest.cs | 11 ++- .../Client/SnowflakeDbConnection.cs | 2 +- Snowflake.Data/Core/Session/SFSession.cs | 79 ++++++++++-------- 5 files changed, 187 insertions(+), 35 deletions(-) create mode 100644 Snowflake.Data.Tests/Mock/MockCloseHangingRestRequester.cs diff --git a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs index 5f08e4051..9eaebe377 100644 --- a/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs +++ b/Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs @@ -2310,6 +2310,54 @@ public void TestOpenAsyncThrowExceptionWhenOperationIsCancelled() Assert.AreEqual(ConnectionState.Closed, connection.State); } } + + [Test] + public void TestCloseSessionWhenGarbageCollectorFinalizesConnection() + { + // arrange + var session = GetSessionFromForgottenConnection(); + Assert.NotNull(session); + Assert.NotNull(session.sessionId); + Assert.NotNull(session.sessionToken); + + // act + GC.Collect(); + Awaiter.WaitUntilConditionOrTimeout(() => session.sessionToken == null, TimeSpan.FromSeconds(15)); + + // assert + Assert.IsNull(session.sessionToken); + } + + private SFSession GetSessionFromForgottenConnection() + { + var connection = new SnowflakeDbConnection(ConnectionString + ";poolingEnabled=false;application=TestGarbageCollectorCloseSession"); + connection.Open(); + return connection.SfSession; + } + + [Test] + public void TestHangingCloseIsNotBlocking() + { + // arrange + var restRequester = new MockCloseHangingRestRequester(); + var session = new SFSession("account=test;user=test;password=test", null, restRequester); + session.Open(); + var watchClose = new Stopwatch(); + var watchClosedFinished = new Stopwatch(); + + // act + watchClose.Start(); + watchClosedFinished.Start(); + session.CloseNonBlocking(); + watchClose.Stop(); + Awaiter.WaitUntilConditionOrTimeout(() => restRequester.CloseRequests.Count > 0, TimeSpan.FromSeconds(15)); + watchClosedFinished.Stop(); + + // assert + Assert.AreEqual(1, restRequester.CloseRequests.Count); + Assert.Less(watchClose.Elapsed.Duration(), TimeSpan.FromSeconds(5)); // close executed immediately + Assert.GreaterOrEqual(watchClosedFinished.Elapsed.Duration(), TimeSpan.FromSeconds(10)); // while background task took more time + } } } diff --git a/Snowflake.Data.Tests/Mock/MockCloseHangingRestRequester.cs b/Snowflake.Data.Tests/Mock/MockCloseHangingRestRequester.cs new file mode 100644 index 000000000..35e8fbcd1 --- /dev/null +++ b/Snowflake.Data.Tests/Mock/MockCloseHangingRestRequester.cs @@ -0,0 +1,82 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using Snowflake.Data.Core; + +namespace Snowflake.Data.Tests.Mock +{ + internal class MockCloseHangingRestRequester: IMockRestRequester + { + internal List CloseRequests { get; } = new(); + + public T Get(IRestRequest request) + { + return Task.Run(async () => await (GetAsync(request, CancellationToken.None)).ConfigureAwait(false)).Result; + } + + public Task GetAsync(IRestRequest request, CancellationToken cancellationToken) + { + return Task.FromResult((T)(object)null); + } + + public Task GetAsync(IRestRequest request, CancellationToken cancellationToken) + { + return Task.FromResult(null); + } + + public HttpResponseMessage Get(IRestRequest request) + { + return null; + } + + public T Post(IRestRequest postRequest) + { + return Task.Run(async () => await (PostAsync(postRequest, CancellationToken.None)).ConfigureAwait(false)).Result; + } + + public Task PostAsync(IRestRequest postRequest, CancellationToken cancellationToken) + { + SFRestRequest sfRequest = (SFRestRequest)postRequest; + if (sfRequest.jsonBody is LoginRequest) + { + LoginResponse authnResponse = new LoginResponse + { + data = new LoginResponseData() + { + sessionId = "123456789", + token = "session_token", + masterToken = "master_token", + authResponseSessionInfo = new SessionInfo(), + nameValueParameter = new List() + }, + success = true + }; + + // login request return success + return Task.FromResult((T)(object)authnResponse); + } + + if (sfRequest.Url.Query.StartsWith("?delete=true")) + { + var closeResponse = new CloseResponse() + { + code = 390111, + message = "Session no longer exists. New login required to access the service", + success = false + }; + Thread.Sleep(TimeSpan.FromSeconds(10)); + CloseRequests.Add(sfRequest); + return Task.FromResult((T)(object)closeResponse); + } + + throw new NotImplementedException(); + } + + public void setHttpClient(HttpClient httpClient) + { + // Nothing to do + } + } +} diff --git a/Snowflake.Data.Tests/UnitTests/SFSessionTest.cs b/Snowflake.Data.Tests/UnitTests/SFSessionTest.cs index a1f795026..262122b2d 100644 --- a/Snowflake.Data.Tests/UnitTests/SFSessionTest.cs +++ b/Snowflake.Data.Tests/UnitTests/SFSessionTest.cs @@ -16,12 +16,21 @@ class SFSessionTest [Test] public void TestSessionGoneWhenClose() { - Mock.MockCloseSessionGone restRequester = new Mock.MockCloseSessionGone(); + var restRequester = new MockCloseSessionGone(); SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester); sfSession.Open(); Assert.DoesNotThrow(() => sfSession.close()); } + [Test] + public void TestSessionGoneWhenCloseNonBlocking() + { + var restRequester = new MockCloseSessionGone(); + SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester); + sfSession.Open(); + Assert.DoesNotThrow(() => sfSession.CloseNonBlocking()); + } + [Test] public void TestUpdateSessionProperties() { diff --git a/Snowflake.Data/Client/SnowflakeDbConnection.cs b/Snowflake.Data/Client/SnowflakeDbConnection.cs index edc4b1e4e..9acb24f06 100755 --- a/Snowflake.Data/Client/SnowflakeDbConnection.cs +++ b/Snowflake.Data/Client/SnowflakeDbConnection.cs @@ -397,7 +397,7 @@ protected override void Dispose(bool disposing) } else { - SfSession?.close(); + SfSession?.CloseNonBlocking(); SfSession = null; _connectionState = ConnectionState.Closed; } diff --git a/Snowflake.Data/Core/Session/SFSession.cs b/Snowflake.Data/Core/Session/SFSession.cs index ceeffb001..b6a0ebf79 100755 --- a/Snowflake.Data/Core/Session/SFSession.cs +++ b/Snowflake.Data/Core/Session/SFSession.cs @@ -279,62 +279,75 @@ internal void close() if (!IsEstablished()) return; logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); stopHeartBeatForThisSession(); + var closeSessionRequest = PrepareCloseSessionRequest(); + PostCloseSession(closeSessionRequest, restRequester); + sessionToken = null; + } - // Send a close session request - var queryParams = new Dictionary(); - queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true"; - queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString(); - queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString(); + internal void CloseNonBlocking() + { + // Nothing to do if the session is not open + if (!IsEstablished()) return; + logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); + stopHeartBeatForThisSession(); + var closeSessionRequest = PrepareCloseSessionRequest(); + Task.Run(() => PostCloseSession(closeSessionRequest, restRequester)); + sessionToken = null; + } - SFRestRequest closeSessionRequest = new SFRestRequest - { - Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams), - authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken), - sid = sessionId - }; + internal async Task CloseAsync(CancellationToken cancellationToken) + { + // Nothing to do if the session is not open + if (!IsEstablished()) return; + logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); + stopHeartBeatForThisSession(); + + var closeSessionRequest = PrepareCloseSessionRequest(); - logger.Debug($"Send closeSessionRequest"); - var response = restRequester.Post(closeSessionRequest); + logger.Debug($"Closing session async"); + var response = await restRequester.PostAsync(closeSessionRequest, cancellationToken).ConfigureAwait(false); if (!response.success) { - logger.Debug($"Failed to delete session: {sessionId}, error ignored. Code: {response.code} Message: {response.message}"); + logger.Error($"Failed to close session {sessionId}, error ignored. Code: {response.code} Message: {response.message}"); } logger.Debug($"Session closed: {sessionId}"); - // Just in case the session won't be closed twice sessionToken = null; } - internal async Task CloseAsync(CancellationToken cancellationToken) + private static void PostCloseSession(SFRestRequest closeSessionRequest, IRestRequester restRequester) { - // Nothing to do if the session is not open - if (!IsEstablished()) return; - logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}"); - stopHeartBeatForThisSession(); + try + { + logger.Debug($"Closing session"); + var response = restRequester.Post(closeSessionRequest); + if (!response.success) + { + logger.Error($"Failed to close session: {closeSessionRequest.sid}, error ignored. Code: {response.code} Message: {response.message}"); + } - // Send a close session request + logger.Debug($"Session closed: {closeSessionRequest.sid}"); + } + catch (Exception) + { + logger.Error($"Failed to close session: {closeSessionRequest.sid}, because of exception."); + throw; + } + } + + private SFRestRequest PrepareCloseSessionRequest() + { var queryParams = new Dictionary(); queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true"; queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString(); queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString(); - SFRestRequest closeSessionRequest = new SFRestRequest() + return new SFRestRequest { Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams), authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken), sid = sessionId }; - - logger.Debug($"Send async closeSessionRequest"); - var response = await restRequester.PostAsync(closeSessionRequest, cancellationToken).ConfigureAwait(false); - if (!response.success) - { - logger.Debug($"Failed to delete session {sessionId}, error ignored. Code: {response.code} Message: {response.message}"); - } - - logger.Debug($"Session closed: {sessionId}"); - // Just in case the session won't be closed twice - sessionToken = null; } internal bool IsEstablished() => sessionToken != null; From adac81a24ca3ddc1fa25bf71e5ab163f73b534a2 Mon Sep 17 00:00:00 2001 From: David Szmolka <69192509+sfc-gh-dszmolka@users.noreply.github.com> Date: Tue, 27 Aug 2024 17:11:32 +0200 Subject: [PATCH 2/2] SNOW-1612981 loglevel for errors Debug -> Error (#1008) --- Snowflake.Data/Core/RestRequester.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Snowflake.Data/Core/RestRequester.cs b/Snowflake.Data/Core/RestRequester.cs index e9064495d..a38a11f70 100644 --- a/Snowflake.Data/Core/RestRequester.cs +++ b/Snowflake.Data/Core/RestRequester.cs @@ -118,7 +118,7 @@ protected virtual async Task SendAsync(HttpRequestMessage m .ConfigureAwait(false); if (!response.IsSuccessStatusCode) { - logger.Debug($"Failed Response: {sid} {message.Method} {message.RequestUri} StatusCode: {(int)response.StatusCode}, ReasonPhrase: '{response.ReasonPhrase}'"); + logger.Error($"Failed Response: {sid} {message.Method} {message.RequestUri} StatusCode: {(int)response.StatusCode}, ReasonPhrase: '{response.ReasonPhrase}'"); } else {