Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for class MultiplexedSession #2191

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class Database extends common.GrpcServiceObject {
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
databaseRole?: string | null;
labels?: {[k: string]: string} | null;
databaseDialect?: EnumKey<
typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect
> | null;
Expand Down Expand Up @@ -460,6 +461,7 @@ class Database extends common.GrpcServiceObject {
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
}
this.formattedName_ = formattedName_;
this.instance = instance;
Expand Down Expand Up @@ -978,9 +980,7 @@ class Database extends common.GrpcServiceObject {

reqOpts.session = {};

if (options.labels) {
reqOpts.session.labels = options.labels;
}
reqOpts.session.labels = options.labels || this.labels || null;

if (options.multiplexed) {
reqOpts.session.multiplexed = options.multiplexed;
Expand All @@ -994,24 +994,29 @@ class Database extends common.GrpcServiceObject {
addLeaderAwareRoutingHeader(headers);
}

this.request<google.spanner.v1.ISession>(
{
client: 'SpannerClient',
method: 'createSession',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, resp) => {
if (err) {
callback(err, null, resp!);
return;
startTrace('Database.createSession', this._traceConfig, span => {
this.request<google.spanner.v1.ISession>(
{
client: 'SpannerClient',
method: 'createSession',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, resp) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err, null, resp!);
return;
}
const session = this.session(resp!.name!);
session.metadata = resp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add span.end() before callback

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

span.end();
callback(null, session, resp!);
}
const session = this.session(resp!.name!);
session.metadata = resp;
callback(null, session, resp!);
}
);
);
});
}
/**
* @typedef {array} CreateTableResponse
Expand Down
263 changes: 263 additions & 0 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*!
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {EventEmitter} from 'events';
import {Database} from './database';
import {Session} from './session';
import {
isDatabaseNotFoundError,
isInstanceNotFoundError,
isCreateSessionPermissionError,
isDefaultCredentialsNotSetError,
isProjectIdNotSetInEnvironmentError,
} from './helper';
import {GetSessionCallback} from './session-pool';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
setSpanError,
startTrace,
} from './instrument';

export const MUX_SESSION_AVAILABLE = 'mux-session-available';

/**
* Interface for implementing multiplexed session logic, it should extend the
* {@link https://nodejs.org/api/events.html|EventEmitter} class
*
* @interface MultiplexedSessionInterface
* @extends external:{@link https://nodejs.org/api/events.html|EventEmitter}
*
* @constructs MultiplexedSessionInterface
* @param {Database} database The database to create a multiplexed session for.
*/
export interface MultiplexedSessionInterface {
/**
* When called creates a multiplexed session.
*
* @name MultiplexedSessionInterface#createSession
*/
createSession(): void;

/**
* When called returns a multiplexed session.
*
* @name MultiplexedSessionInterface#getSession
* @param {GetSessionCallback} callback The callback function.
*/
getSession(callback: GetSessionCallback): void;
}

/**
* Class used to manage connections to Spanner using multiplexed session.
*
* **You don't need to use this class directly, connections will be handled for
* you.**
*
* @class
* @extends {EventEmitter}
*/
export class MultiplexedSession
extends EventEmitter
implements MultiplexedSessionInterface
{
database: Database;
// frequency to create new mux session
refreshRate: number;
_multiplexedSession: Session | null;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
constructor(database: Database) {
super();
this.database = database;
// default frequency is 7 days
this.refreshRate = 7;
this._multiplexedSession = null;
this._observabilityOptions = database._observabilityOptions;
}

/**
* Creates a new multiplexed session and manages its maintenance.
*
* This method initiates the session creation process by calling the `_createSession` method, which returns a Promise.
*/
createSession(): void {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thought , can we make this method to return Promise<void> . This will allow us to use await with this method and if we dont want to await we can simply call this without await.

this._createSession()
.then(() => {
this._maintain();
})
.catch(err => {
if (
isDatabaseNotFoundError(err) ||
isInstanceNotFoundError(err) ||
isCreateSessionPermissionError(err) ||
isDefaultCredentialsNotSetError(err) ||
isProjectIdNotSetInEnvironmentError(err)
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
) {
return;
}
this.emit('error', err);
});
}

/**
* Creates a new multiplexed session.
*
* This method sends a request to the database to create a new session with multiplexing enabled.
* The response from the database would be an array, the first value of the array will be containing the multiplexed session.
*
* @returns {Promise<void>} A Promise that resolves when the session has been successfully created and assigned, an event
* `mux-session-available` will be emitted to signal that the session is ready.
*
* In case of error, an error will get emitted along with the erorr event.
*
* @private
*/
async _createSession(): Promise<void> {
const traceConfig = {
opts: this._observabilityOptions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this._observabilityOptions is never assigned. Where is this value coming from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, correct the value was not assigned. I have updated the code, now the value is getting assign in the constructor

dbName: this.database.formattedName_,
};
return startTrace(
'MultiplexedSession.createSession',
traceConfig,
async span => {
span.addEvent('Requesting a multiplexed session');
try {
const [createSessionResponse] = await this.database.createSession({
multiplexed: true,
});
this._multiplexedSession = createSessionResponse;
span.addEvent(
`Created multiplexed session ${this._multiplexedSession.formattedName_}`
);
this.emit(MUX_SESSION_AVAILABLE);
} catch (e) {
setSpanError(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
}
);
}

/**
* Maintains the multiplexed session by periodically refreshing it.
*
* This method sets up a periodic refresh interval for maintaining the session. The interval duration
* is determined by the @param refreshRate option, which is provided in days.
* The default value is 7 days.
* @throws {Error} In case the multiplexed session creation will get fail, and an error occurs within `_createSession`, it is caught and ignored.
*
* @returns {void} This method does not return any value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add the behavior of this manintaner when there is an error during session creation as doc comments?
As per my understanding here we are ifgnoring the error right, assuming that the existing multiplexed session will be still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc, thanks!

*
*/
_maintain(): void {
const refreshRate = this.refreshRate! * 24 * 60 * 60000;
this._refreshHandle = setInterval(async () => {
try {
await this._createSession();
} catch (err) {
return;
}
}, refreshRate);
this._refreshHandle.unref();
}

/**
* Retrieves a session asynchronously and invokes a callback with the session details.
*
* @param {GetSessionCallback} callback - The callback to be invoked once the session is acquired or an error occurs.
*
* @returns {void} This method does not return any value, as it operates asynchronously and relies on the callback.
*
*/
getSession(callback: GetSessionCallback): void {
this._acquire().then(
session => callback(null, session, session?.txn),
callback
);
}

/**
* Acquires a session asynchronously, and prepares the transaction for the session.
*
* Once a session is successfully acquired, it returns the session object (which may be `null` if unsuccessful).
*
* @returns {Promise<Session | null>}
* A Promise that resolves with the acquired session (or `null` if no session is available after retries).
*
*/
async _acquire(): Promise<Session | null> {
const session = await this._getSession();
// Prepare a transaction for a session
session!.txn = session!.transaction(
(session!.parent as Database).queryOptions_
);
return session;
}

/**
* Attempts to get a session, waiting for it to become available if necessary.
*
* Waits for the `mux-session-available` event to be emitted if the multiplexed session is not yet available.
* The method listens for these events, and once `mux-session-available` is emitted, it resolves and returns
* the session.
*
* @returns {Promise<Session | null>} A promise that resolves with the current multiplexed session if available,
* or `null` if the session is not available.
*
* @private
*
*/
async _getSession(): Promise<Session | null> {
const span = getActiveOrNoopSpan();
// Check if the multiplexed session is already available
if (this._multiplexedSession !== null) {
span.addEvent('Cache hit: has usable multiplexed session');
return this._multiplexedSession;
}

// Define event and promises to wait for the session to become available
span.addEvent('Waiting for a multiplexed session to become available');
let removeListener: Function;
let removeErrorListener: Function;
const promises = [
new Promise((_, reject) => {
this.once('error', reject);
removeErrorListener = this.removeListener.bind(this, 'error', reject);
}),
new Promise(resolve => {
this.once(MUX_SESSION_AVAILABLE, resolve);
removeListener = this.removeListener.bind(
this,
MUX_SESSION_AVAILABLE,
resolve
);
}),
];

try {
await Promise.race(promises);
} finally {
removeListener!();
removeErrorListener!();
}
// Return the multiplexed session after it becomes available
return this._multiplexedSession;
}
}
2 changes: 2 additions & 0 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,7 @@ describe('Database', () => {
database: database.formattedName_,
session: {
creatorRole: database.databaseRole,
labels: null,
},
});
assert.strictEqual(config.gaxOpts, gaxOptions);
Expand All @@ -2119,6 +2120,7 @@ describe('Database', () => {
database: database.formattedName_,
session: {
creatorRole: database.databaseRole,
labels: null,
},
});

Expand Down
Loading
Loading