Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do mongo handshake and store ServerDescription on connection #2201

Merged
merged 12 commits into from
Dec 11, 2019
36 changes: 28 additions & 8 deletions mongodb/vibe/db/mongo/connection.d
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ final class MongoConnection {
int m_msgid = 1;
StreamOutputRange!(InterfaceProxy!Stream) m_outRange;
ServerDescription m_description;
/// Flag to prevent recursive connections when server closes connection while connecting
bool m_allowReconnect;
bool m_isAuthenticating;
}

enum ushort defaultPort = MongoClientSettings.defaultPort;
Expand Down Expand Up @@ -183,7 +186,9 @@ final class MongoConnection {
throw new MongoDriverException(format("Failed to connect to MongoDB server at %s:%s.", m_settings.hosts[0].name, m_settings.hosts[0].port), __FILE__, __LINE__, e);
}

if (m_settings.nextGen) {
m_allowReconnect = false;
scope (exit)
m_allowReconnect = true;
import os = std.system;
import compiler = std.compiler;
string platform = compiler.name ~ " "
Expand Down Expand Up @@ -213,12 +218,14 @@ final class MongoConnection {
enforce!MongoAuthException(doc["ok"].get!double == 1.0, "Authentication failed.");
m_description = deserializeBson!ServerDescription(doc);
});
}

m_bytesRead = 0;
if(m_settings.digest != string.init)
{
if (m_settings.authMechanism == MongoAuthMechanism.mongoDBCR)
m_isAuthenticating = true;
scope (exit)
m_isAuthenticating = false;
if (m_settings.authMechanism == MongoAuthMechanism.mongoDBCR || !m_description.satisfiesVersion(WireVersion.v30))
authenticate(); // use old mechanism if explicitly stated
else {
/**
Expand Down Expand Up @@ -438,6 +445,8 @@ final class MongoConnection {
auto bson = () @trusted { return recvBson(buf); } ();
}

// logDebugV("Received mongo response on %s:%s: %s", reqid, i, bson);

static if (is(T == Bson)) on_doc(i, bson);
else {
T doc = deserializeBson!T(bson);
Expand All @@ -450,14 +459,20 @@ final class MongoConnection {

private int send(ARGS...)(OpCode code, int response_to, ARGS args)
{
if( !connected() ) connect();
if( !connected() ) {
if (m_allowReconnect) connect();
else if (m_isAuthenticating) throw new MongoAuthException("Connection got closed while authenticating");
else throw new MongoDriverException("Connection got closed while connecting");
}
int id = nextMessageId();
sendValue(16 + sendLength(args));
sendValue(id);
sendValue(response_to);
sendValue(cast(int)code);
// sendValue!int to make sure we don't accidentally send other types after arithmetic operations/changing types
sendValue!int(16 + sendLength(args));
sendValue!int(id);
sendValue!int(response_to);
sendValue!int(cast(int)code);
foreach (a; args) sendValue(a);
m_outRange.flush();
// logDebugV("Sent mongo opcode %s (id %s) in response to %s with args %s", code, id, response_to, tuple(args));
return id;
}

Expand Down Expand Up @@ -695,6 +710,11 @@ struct ServerDescription
string primary;
string lastUpdateTime = "infinity ago";
Nullable!int logicalSessionTimeoutMinutes;

bool satisfiesVersion(WireVersion wireVersion) @safe const @nogc pure nothrow
{
return maxWireVersion >= wireVersion;
}
}

enum WireVersion : int
Expand Down