Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ydb implementation #8

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ An in-memory session module is bundled with Telegraf. The following modules are
- [SQLite](#sqlite)
- [PostgreSQL](#postgresql)
- [MySQL / MariaDB](#mysql--mariadb)
- [YDB](#ydb)

## Redis

Expand Down Expand Up @@ -147,6 +148,58 @@ bot.use(session({ store }));

To reuse an existing MySQL2 pool, use `MySQL({ pool })` instead.

## Ydb

Install the official [Ydb](https://ydb.tech) driver alongside this module.

```shell
npm i @telegraf/session ydb-sdk
```

Usage is pretty straightforward:

```TS
import { YDB } from "@telegraf/session/Ydb";

const store = YDB({

// Anonymous authentication will be performed without specifying authentication options

/* Static сredentials
authOptions: {
authType: AuthTypes.Static,
user: "username",
password: "password"
},
*/

/* Сredentials from environment vars
authOptions: {
authType: AuthTypes.Environment,
},
*/

/* Сredentials from JSON file
authOptions: {
authType: AuthTypes.Json,
jsonFileName: "full path filename"
},
*/

endpointUrl: "grpc://localhost:2136",
databaseName: "local",
tokenExpirationTimeout: 20000,
connectionTimeout: 10000,
})

const bot = new Telegraf(token, opts);
bot.use(session({ store }));

// the rest of your bot
```

To reuse an existing YDB client, use `YDB({ client })` instead.

## Background

Since [telegraf#1372](https://github.com/telegraf/telegraf/issues/1372), it has been known that all asynchronous session middleware have been prone to race-conditions. This was addressed in [telegraf#1713](https://github.com/telegraf/telegraf/pull/1713), but third-party session middleware continue to be affected. Since Telegraf 1.12.0, it's recommended that third-party plugins only provide the store parameter for session, instead of implementing session themselves. This way, they can take advantage of the safety provided by Telegraf's builtin session. Of course, if your plugin has an exceptional usecase, it may need to implement its own middleware.
Expand Down
186 changes: 186 additions & 0 deletions Ydb.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import {
StaticCredentialsAuthService,
Column,
Driver,
getCredentialsFromEnv,
TableDescription,
Types,
TypedData,
TypedValues, IAuthService, getSACredentialsFromJson, IamAuthService, AnonymousAuthService, Session as YdbSession
} from 'ydb-sdk';

import { SessionStore } from "./types";
import { defaults } from "./defaults";

//#region AuthOptions
export enum AuthTypes {
Static,
Environment,
Json
}

interface IStaticCredentials {

authType: AuthTypes.Static,
user:string,
password:string
}
interface IJSONCredentials {
authType: AuthTypes.Json,
jsonFileName: string
}
interface IEnvCredentials {
authType: AuthTypes.Environment
}

type AuthOptionsType = IStaticCredentials | IEnvCredentials | IJSONCredentials

function isStaticCredentialsAuthOptions(authOptions: AuthOptionsType): authOptions is IStaticCredentials {
return authOptions.authType === AuthTypes.Static
}
function isEnvCredentialsAuthOptions(authOptions: AuthOptionsType): authOptions is IEnvCredentials {
return authOptions.authType === AuthTypes.Environment
}
function isJsonCredentialsAuthOptions(authOptions: AuthOptionsType): authOptions is IJSONCredentials {
return authOptions.authType === AuthTypes.Json
}
//#endregion

interface NewClientOpts {

authOptions?: AuthOptionsType

tokenExpirationTimeout: number
endpointUrl: string
databaseName: string

tableName?: string
onInitError?: (err: unknown) => void
connectionTimeout: number
}
interface ExistingClientOpts {

client: Driver
tableName?: string
onInitError?: (err: unknown) => void
connectionTimeout: number
}



/** @unstable */
export function YDB<Session>(opts: NewClientOpts): SessionStore<Session>;
export function YDB<Session>(opts: ExistingClientOpts): SessionStore<Session>;
export function YDB<Session>(opts: NewClientOpts | ExistingClientOpts): SessionStore<Session> {

// this assertion is a hack to make the Database type work
const tablename = (opts.tableName ?? defaults.table)

let client: Driver;
let connection: Promise<boolean> | undefined;
let authService: IAuthService = new AnonymousAuthService()

if ("client" in opts) client = opts.client;
else {
if(opts.authOptions) {
if (isStaticCredentialsAuthOptions(opts.authOptions)) {
authService = new StaticCredentialsAuthService(
opts.authOptions.user,
opts.authOptions.password,
opts.endpointUrl,
{
tokenExpirationTimeout: opts.tokenExpirationTimeout,
})
}
if (isEnvCredentialsAuthOptions(opts.authOptions)) {
authService = getCredentialsFromEnv()
}
if (isJsonCredentialsAuthOptions(opts.authOptions)) {
authService = new IamAuthService(getSACredentialsFromJson(opts.authOptions.jsonFileName))
}
}

client = new Driver({endpoint: opts.endpointUrl, database: opts.databaseName, authService})
connection = client.ready(opts.connectionTimeout)
connection.catch(opts.onInitError)
}

const create = client.tableClient.withSession(async session => {
await session.createTable(tablename,
new TableDescription()
.withColumn(new Column('key', Types.optional(Types.TEXT)))
.withColumn(new Column('session', Types.optional(Types.TEXT)))
.withPrimaryKey('key'))
}, opts.connectionTimeout)

return {
async get(key: string) {

const query = `
DECLARE $keyValue AS TEXT;
$table_name = "${tablename}";
select key, session from $table_name
where key= $keyValue`

const params = {
'$keyValue': TypedValues.text(key),
};

let value: string | undefined | null

await connection
await create

await client.tableClient.withSession(async (ydbSession: YdbSession) => {
const result = await ydbSession.executeQuery(query, params)
if(result.resultSets.length > 0) {
const resultSet = result.resultSets[0];
const data = TypedData.createNativeObjects(resultSet);
if (data.length > 0 && data[0].hasOwnProperty("session")) value = data[0]["session"]
}
})

return value ? JSON.parse(value) : undefined;
},
async set(key: string, session: Session) {

const query = `
DECLARE $keyValue AS TEXT;
DECLARE $sessionValue AS TEXT;
$table_name = "${tablename}";
UPSERT INTO $table_name (key, session)
VALUES ( $keyValue, $sessionValue);`

const params = {
'$keyValue': TypedValues.text(key),
'$sessionValue':TypedValues.text(JSON.stringify(session))
};

await connection
await create

await client.tableClient.withSession(async (ydbSession: YdbSession) => {
await ydbSession.executeQuery(query, params)
})

},
async delete(key: string) {

const query = `
DECLARE $keyValue AS TEXT;
$table_name = "${tablename}";
delete from $table_name where key = $keyValue`

const params = {
'$keyValue': TypedValues.text(key)
};

await connection
await create

return await client.tableClient.withSession(async (ydbSession: YdbSession) => {
await ydbSession.executeQuery(query, params)
})
},
};
}
Loading