Skip to content

Commit

Permalink
Merge pull request #13 from skbkontur/ConnectionLeakFix
Browse files Browse the repository at this point in the history
Close connection if error occured while open
  • Loading branch information
gangstatracer authored Oct 5, 2020
2 parents 62bcebc + 1845283 commit 12d768d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.IO;
using System.Linq.Expressions;

using Cassandra;

using Moq;

using NUnit.Framework;

using SkbKontur.Cassandra.Local;
Expand Down Expand Up @@ -84,11 +87,34 @@ public void TestNonAuthenticatedConnection()
Assert.AreEqual("Provided username non-existent and/or password are incorrect", authenticationException.Why);
}

private void SomeActionThatRequiresAuthentication(string username, string password)
[Test]
public void TestThriftConnectionClosedAfterNonSuccessfulAuthentication()
{
var logger = new Mock<ILog>(MockBehavior.Strict);
logger.Setup(l => l.ForContext(It.IsAny<string>()))
.Returns(logger.Object);

logger.Setup(l => l.IsEnabledFor(It.IsAny<LogLevel>()))
.Returns((LogLevel level) => level == LogLevel.Error);

Expression<Action<ILog>> logAuthFailSetup = l =>
l.Log(It.Is<LogEvent>(
e => e.Exception is AuthenticationException
&& e.MessageTemplate == "Error occured while opening thrift connection. Will try to close open transports. Failed action: {ActionName}."
&& e.Properties != null
&& e.Properties.ContainsKey("ActionName")
&& e.Properties["ActionName"] as string == "login"));

logger.Setup(logAuthFailSetup).Verifiable();
Assert.Throws<AllItemsIsDeadExceptions>(() => SomeActionThatRequiresAuthentication("cassandra", "wrong_password", logger.Object));
logger.Verify(logAuthFailSetup, Times.Exactly(2));
}

private void SomeActionThatRequiresAuthentication(string username, string password, ILog logger = null)
{
var settings = node.CreateSettings();
settings.Credentials = new Credentials(username, password);
using (var cluster = new CassandraCluster(settings, new SilentLog()))
using (var cluster = new CassandraCluster(settings, logger ?? new SilentLog()))
cluster.RetrieveClusterConnection().RetrieveKeyspaces();
}

Expand Down
59 changes: 47 additions & 12 deletions Cassandra.ThriftClient/Core/ThriftConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,28 +110,63 @@ private void OpenTransport()
if (!cassandraClient.InputProtocol.Transport.Equals(cassandraClient.OutputProtocol.Transport))
cassandraClient.OutputProtocol.Transport.Open();

if (credentials != null)
cassandraClient.login(new AuthenticationRequest(new Dictionary<string, string>
{
["username"] = credentials.Username,
["password"] = credentials.Password,
}));

if (!string.IsNullOrEmpty(keyspaceName))
cassandraClient.set_keyspace(keyspaceName);
WithCloseTransportOnError(Login, "login");
WithCloseTransportOnError(SetKeyspace, "set keyspace");
}
}

private void WithCloseTransportOnError(Action action, string actionName)
{
try
{
action();
}
catch (Exception e)
{
logger.Error(e, "Error occured while opening thrift connection. Will try to close open transports. Failed action: {ActionName}.", new {ActionName = actionName});
try
{
DoCloseTransport();
}
catch (Exception closeException)
{
logger.Error(closeException, "Error occured while closing connection's transports.");
}
throw;
}
}

private void Login()
{
if (credentials != null)
cassandraClient.login(new AuthenticationRequest(new Dictionary<string, string>
{
["username"] = credentials.Username,
["password"] = credentials.Password,
}));
}

private void SetKeyspace()
{
if (!string.IsNullOrEmpty(keyspaceName))
cassandraClient.set_keyspace(keyspaceName);
}

private void CloseTransport()
{
lock (locker)
{
cassandraClient.InputProtocol.Transport.Close();
if (!cassandraClient.InputProtocol.Transport.Equals(cassandraClient.OutputProtocol.Transport))
cassandraClient.OutputProtocol.Transport.Close();
DoCloseTransport();
}
}

private void DoCloseTransport()
{
cassandraClient.InputProtocol.Transport.Close();
if (!cassandraClient.InputProtocol.Transport.Equals(cassandraClient.OutputProtocol.Transport))
cassandraClient.OutputProtocol.Transport.Close();
}

private Timestamp lastSuccessPingTimestamp;
private bool isAlive;

Expand Down
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"sdk": {
"version": "3.1.300"
"version": "3.1.402"
}
}

0 comments on commit 12d768d

Please sign in to comment.