Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…nector-net into SNOW-715524-SSO-Token-Cache

# Conflicts:
#	Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
  • Loading branch information
sfc-gh-ext-simba-lf committed Aug 27, 2024
2 parents d53955a + adac81a commit d6ff05a
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 36 deletions.
48 changes: 48 additions & 0 deletions Snowflake.Data.Tests/IntegrationTests/SFConnectionIT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,54 @@ public void TestSSOConnectionWithTokenCachingAsync()
Assert.AreEqual(ConnectionState.Closed, conn.State);
}
}

[Test]
public void TestCloseSessionWhenGarbageCollectorFinalizesConnection()
{
// arrange
var session = GetSessionFromForgottenConnection();
Assert.NotNull(session);
Assert.NotNull(session.sessionId);
Assert.NotNull(session.sessionToken);

// act
GC.Collect();
Awaiter.WaitUntilConditionOrTimeout(() => session.sessionToken == null, TimeSpan.FromSeconds(15));

// assert
Assert.IsNull(session.sessionToken);
}

private SFSession GetSessionFromForgottenConnection()
{
var connection = new SnowflakeDbConnection(ConnectionString + ";poolingEnabled=false;application=TestGarbageCollectorCloseSession");
connection.Open();
return connection.SfSession;
}

[Test]
public void TestHangingCloseIsNotBlocking()
{
// arrange
var restRequester = new MockCloseHangingRestRequester();
var session = new SFSession("account=test;user=test;password=test", null, restRequester);
session.Open();
var watchClose = new Stopwatch();
var watchClosedFinished = new Stopwatch();

// act
watchClose.Start();
watchClosedFinished.Start();
session.CloseNonBlocking();
watchClose.Stop();
Awaiter.WaitUntilConditionOrTimeout(() => restRequester.CloseRequests.Count > 0, TimeSpan.FromSeconds(15));
watchClosedFinished.Stop();

// assert
Assert.AreEqual(1, restRequester.CloseRequests.Count);
Assert.Less(watchClose.Elapsed.Duration(), TimeSpan.FromSeconds(5)); // close executed immediately
Assert.GreaterOrEqual(watchClosedFinished.Elapsed.Duration(), TimeSpan.FromSeconds(10)); // while background task took more time
}
}
}

Expand Down
82 changes: 82 additions & 0 deletions Snowflake.Data.Tests/Mock/MockCloseHangingRestRequester.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Snowflake.Data.Core;

namespace Snowflake.Data.Tests.Mock
{
internal class MockCloseHangingRestRequester: IMockRestRequester
{
internal List<SFRestRequest> CloseRequests { get; } = new();

public T Get<T>(IRestRequest request)
{
return Task.Run(async () => await (GetAsync<T>(request, CancellationToken.None)).ConfigureAwait(false)).Result;
}

public Task<T> GetAsync<T>(IRestRequest request, CancellationToken cancellationToken)
{
return Task.FromResult<T>((T)(object)null);
}

public Task<HttpResponseMessage> GetAsync(IRestRequest request, CancellationToken cancellationToken)
{
return Task.FromResult<HttpResponseMessage>(null);
}

public HttpResponseMessage Get(IRestRequest request)
{
return null;
}

public T Post<T>(IRestRequest postRequest)
{
return Task.Run(async () => await (PostAsync<T>(postRequest, CancellationToken.None)).ConfigureAwait(false)).Result;
}

public Task<T> PostAsync<T>(IRestRequest postRequest, CancellationToken cancellationToken)
{
SFRestRequest sfRequest = (SFRestRequest)postRequest;
if (sfRequest.jsonBody is LoginRequest)
{
LoginResponse authnResponse = new LoginResponse
{
data = new LoginResponseData()
{
sessionId = "123456789",
token = "session_token",
masterToken = "master_token",
authResponseSessionInfo = new SessionInfo(),
nameValueParameter = new List<NameValueParameter>()
},
success = true
};

// login request return success
return Task.FromResult<T>((T)(object)authnResponse);
}

if (sfRequest.Url.Query.StartsWith("?delete=true"))
{
var closeResponse = new CloseResponse()
{
code = 390111,
message = "Session no longer exists. New login required to access the service",
success = false
};
Thread.Sleep(TimeSpan.FromSeconds(10));
CloseRequests.Add(sfRequest);
return Task.FromResult<T>((T)(object)closeResponse);
}

throw new NotImplementedException();
}

public void setHttpClient(HttpClient httpClient)
{
// Nothing to do
}
}
}
11 changes: 10 additions & 1 deletion Snowflake.Data.Tests/UnitTests/SFSessionTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@ class SFSessionTest
[Test]
public void TestSessionGoneWhenClose()
{
Mock.MockCloseSessionGone restRequester = new Mock.MockCloseSessionGone();
var restRequester = new MockCloseSessionGone();
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
Assert.DoesNotThrow(() => sfSession.close());
}

[Test]
public void TestSessionGoneWhenCloseNonBlocking()
{
var restRequester = new MockCloseSessionGone();
SFSession sfSession = new SFSession("account=test;user=test;password=test", null, restRequester);
sfSession.Open();
Assert.DoesNotThrow(() => sfSession.CloseNonBlocking());
}

[Test]
public void TestUpdateSessionProperties()
{
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Client/SnowflakeDbConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ protected override void Dispose(bool disposing)
}
else
{
SfSession?.close();
SfSession?.CloseNonBlocking();
SfSession = null;
_connectionState = ConnectionState.Closed;
}
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/RestRequester.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ protected virtual async Task<HttpResponseMessage> SendAsync(HttpRequestMessage m
.ConfigureAwait(false);
if (!response.IsSuccessStatusCode)
{
logger.Debug($"Failed Response: {sid} {message.Method} {message.RequestUri} StatusCode: {(int)response.StatusCode}, ReasonPhrase: '{response.ReasonPhrase}'");
logger.Error($"Failed Response: {sid} {message.Method} {message.RequestUri} StatusCode: {(int)response.StatusCode}, ReasonPhrase: '{response.ReasonPhrase}'");
}
else
{
Expand Down
79 changes: 46 additions & 33 deletions Snowflake.Data/Core/Session/SFSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -316,62 +316,75 @@ internal void close()
if (!IsEstablished()) return;
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
stopHeartBeatForThisSession();
var closeSessionRequest = PrepareCloseSessionRequest();
PostCloseSession(closeSessionRequest, restRequester);
sessionToken = null;
}

// Send a close session request
var queryParams = new Dictionary<string, string>();
queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true";
queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString();
queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString();
internal void CloseNonBlocking()
{
// Nothing to do if the session is not open
if (!IsEstablished()) return;
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
stopHeartBeatForThisSession();
var closeSessionRequest = PrepareCloseSessionRequest();
Task.Run(() => PostCloseSession(closeSessionRequest, restRequester));
sessionToken = null;
}

SFRestRequest closeSessionRequest = new SFRestRequest
{
Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams),
authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken),
sid = sessionId
};
internal async Task CloseAsync(CancellationToken cancellationToken)
{
// Nothing to do if the session is not open
if (!IsEstablished()) return;
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
stopHeartBeatForThisSession();

var closeSessionRequest = PrepareCloseSessionRequest();

logger.Debug($"Send closeSessionRequest");
var response = restRequester.Post<CloseResponse>(closeSessionRequest);
logger.Debug($"Closing session async");
var response = await restRequester.PostAsync<CloseResponse>(closeSessionRequest, cancellationToken).ConfigureAwait(false);
if (!response.success)
{
logger.Debug($"Failed to delete session: {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
logger.Error($"Failed to close session {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
}

logger.Debug($"Session closed: {sessionId}");
// Just in case the session won't be closed twice
sessionToken = null;
}

internal async Task CloseAsync(CancellationToken cancellationToken)
private static void PostCloseSession(SFRestRequest closeSessionRequest, IRestRequester restRequester)
{
// Nothing to do if the session is not open
if (!IsEstablished()) return;
logger.Debug($"Closing session with id: {sessionId}, user: {_user}, database: {database}, schema: {schema}, role: {role}, warehouse: {warehouse}, connection start timestamp: {_startTime}");
stopHeartBeatForThisSession();
try
{
logger.Debug($"Closing session");
var response = restRequester.Post<CloseResponse>(closeSessionRequest);
if (!response.success)
{
logger.Error($"Failed to close session: {closeSessionRequest.sid}, error ignored. Code: {response.code} Message: {response.message}");
}

// Send a close session request
logger.Debug($"Session closed: {closeSessionRequest.sid}");
}
catch (Exception)
{
logger.Error($"Failed to close session: {closeSessionRequest.sid}, because of exception.");
throw;
}
}

private SFRestRequest PrepareCloseSessionRequest()
{
var queryParams = new Dictionary<string, string>();
queryParams[RestParams.SF_QUERY_SESSION_DELETE] = "true";
queryParams[RestParams.SF_QUERY_REQUEST_ID] = Guid.NewGuid().ToString();
queryParams[RestParams.SF_QUERY_REQUEST_GUID] = Guid.NewGuid().ToString();

SFRestRequest closeSessionRequest = new SFRestRequest()
return new SFRestRequest
{
Url = BuildUri(RestPath.SF_SESSION_PATH, queryParams),
authorizationToken = string.Format(SF_AUTHORIZATION_SNOWFLAKE_FMT, sessionToken),
sid = sessionId
};

logger.Debug($"Send async closeSessionRequest");
var response = await restRequester.PostAsync<CloseResponse>(closeSessionRequest, cancellationToken).ConfigureAwait(false);
if (!response.success)
{
logger.Debug($"Failed to delete session {sessionId}, error ignored. Code: {response.code} Message: {response.message}");
}

logger.Debug($"Session closed: {sessionId}");
// Just in case the session won't be closed twice
sessionToken = null;
}

internal bool IsEstablished() => sessionToken != null;
Expand Down

0 comments on commit d6ff05a

Please sign in to comment.