Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Nov 28, 2024
1 parent 814ce81 commit d8746c7
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 130 deletions.
44 changes: 24 additions & 20 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ class Database extends common.GrpcServiceObject {
resourceHeader_: {[k: string]: string};
request: DatabaseRequest;
databaseRole?: string | null;
labels?: {[k: string]: string} | null;
databaseDialect?: EnumKey<
typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect
> | null;
Expand Down Expand Up @@ -460,6 +461,7 @@ class Database extends common.GrpcServiceObject {
}
if (typeof poolOptions === 'object') {
this.databaseRole = poolOptions.databaseRole || null;
this.labels = poolOptions.labels || null;
}
this.formattedName_ = formattedName_;
this.instance = instance;
Expand Down Expand Up @@ -978,9 +980,7 @@ class Database extends common.GrpcServiceObject {

reqOpts.session = {};

if (options.labels) {
reqOpts.session.labels = options.labels;
}
reqOpts.session.labels = options.labels || this.labels || null;

if (options.multiplexed) {
reqOpts.session.multiplexed = options.multiplexed;
Expand All @@ -994,24 +994,28 @@ class Database extends common.GrpcServiceObject {
addLeaderAwareRoutingHeader(headers);
}

this.request<google.spanner.v1.ISession>(
{
client: 'SpannerClient',
method: 'createSession',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, resp) => {
if (err) {
callback(err, null, resp!);
return;
startTrace('Database.createSession', this._traceConfig, span => {
this.request<google.spanner.v1.ISession>(
{
client: 'SpannerClient',
method: 'createSession',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, resp) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err, null, resp!);
return;
}
const session = this.session(resp!.name!);
session.metadata = resp;
callback(null, session, resp!);
}
const session = this.session(resp!.name!);
session.metadata = resp;
callback(null, session, resp!);
}
);
);
});
}
/**
* @typedef {array} CreateTableResponse
Expand Down
78 changes: 51 additions & 27 deletions src/multiplexed-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ import {
isProjectIdNotSetInEnvironmentError,
} from './helper';
import {GetSessionCallback} from './session-pool';
import {
ObservabilityOptions,
getActiveOrNoopSpan,
setSpanErrorAndException,
startTrace,
} from './instrument';

const MUX_SESSION_AVAILABLE = 'mux-session-available';

/**
* Interface for implementing multiplexed session logic, it should extend the
Expand Down Expand Up @@ -72,6 +80,7 @@ export class MultiplexedSession
_multiplexedSession: Session | null;
_pingHandle!: NodeJS.Timer;
_refreshHandle!: NodeJS.Timer;
_observabilityOptions?: ObservabilityOptions;
constructor(database: Database) {
super();
this.database = database;
Expand Down Expand Up @@ -118,15 +127,32 @@ export class MultiplexedSession
* @private
*/
async _createSession(): Promise<void> {
try {
const [createSessionResponse] = await this.database.createSession({
multiplexed: true,
});
this._multiplexedSession = createSessionResponse;
this.emit('mux-session-available');
} catch (e) {
this.emit('error', e);
}
const traceConfig = {
opts: this._observabilityOptions,
dbName: this.database.formattedName_,
};
return startTrace(
'MultiplexedSession.createSession',
traceConfig,
async span => {
span.addEvent('Requesting a multiplexed session');
try {
const [createSessionResponse] = await this.database.createSession({
multiplexed: true,
});
this._multiplexedSession = createSessionResponse;
span.addEvent(
`Created multiplexed session ${this._multiplexedSession.formattedName_}`
);
this.emit('mux-session-available');
} catch (e) {
setSpanErrorAndException(span, e as Error);
this.emit('error', e);
} finally {
span.end();
}
}
);
}

/**
Expand All @@ -142,7 +168,13 @@ export class MultiplexedSession
*/
_maintain(): void {
const refreshRate = this.refreshRate! * 24 * 60 * 60000;
this._refreshHandle = setInterval(() => this._refresh(), refreshRate);
this._refreshHandle = setInterval(async () => {
try {
await this._createSession();
} catch (err) {
return;
}
}, refreshRate);
this._refreshHandle.unref();
}

Expand All @@ -156,20 +188,6 @@ export class MultiplexedSession
*
* @throws {Error} If there is an issue with retrieving the session metadata or calculating the expiration time.
*/
async _refresh(): Promise<void> {
const metadata = await this._multiplexedSession?.getMetadata();
const createTime =
parseInt(metadata![0].createTime.seconds) * 1000 +
metadata![0].createTime.nanos / 1000000;

// Calculate expiration time (7 days after session creation)
const expireTime = createTime + this.refreshRate * 24 * 60 * 60 * 1000;

// If the current time exceeds the expiration time, create a new session
if (Date.now() > expireTime) {
this.createSession();
}
}

/**
* Retrieves a session asynchronously and invokes a callback with the session details.
Expand Down Expand Up @@ -217,17 +235,23 @@ export class MultiplexedSession
* @private
*/
async _getSession(): Promise<Session | null> {
const span = getActiveOrNoopSpan();
// Check if the multiplexed session is already available
if (this._multiplexedSession !== null) {
span.addEvent('Cache hit: has usable multiplexed session');
return this._multiplexedSession;
}

// Define event and promises to wait for the session to become available
const availableEvent = 'mux-session-available';
span.addEvent('Waiting for a multiplexed session to become available');
let removeListener: Function;
const promise = new Promise(resolve => {
this.once(availableEvent, resolve);
removeListener = this.removeListener.bind(this, availableEvent, resolve);
this.once(MUX_SESSION_AVAILABLE, resolve);
removeListener = this.removeListener.bind(
this,
MUX_SESSION_AVAILABLE,
resolve
);
});

try {
Expand Down
1 change: 0 additions & 1 deletion src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ export interface GetSessionMetadataResponse {
createTime?: google.protobuf.ITimestamp | null;
approximateLastUseTime?: google.protobuf.ITimestamp | null;
databaseRole?: string | null;
multiplexed?: boolean;
}

export type GetSessionMetadataCallback =
Expand Down
2 changes: 2 additions & 0 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2096,6 +2096,7 @@ describe('Database', () => {
database: database.formattedName_,
session: {
creatorRole: database.databaseRole,
labels: null,
},
});
assert.strictEqual(config.gaxOpts, gaxOptions);
Expand All @@ -2119,6 +2120,7 @@ describe('Database', () => {
database: database.formattedName_,
session: {
creatorRole: database.databaseRole,
labels: null,
},
});

Expand Down
Loading

0 comments on commit d8746c7

Please sign in to comment.