Skip to content

Commit

Permalink
Chore: some update to reflect the adjustment of Wren Engine v0.6.0 (#472
Browse files Browse the repository at this point in the history
)

* no need to deploy wren engine anymore

* fix typo

* communicate with engine/ibis server with manifest.

* remove unused method in wren engine adaptor

* add comment at wren engine adaptor

* use the limit option from ibis server instead of rewriting the sql ourself

* prevent no successful deployment found

* pass manifest when getting native sql
  • Loading branch information
onlyjackfrost committed Jul 11, 2024
1 parent 09b2933 commit 2a6d1e4
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 146 deletions.
18 changes: 15 additions & 3 deletions wren-ui/src/apollo/server/adaptors/ibisAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import { Manifest } from '@server/mdl/type';
import * as Errors from '@server/utils/error';
import { getConfig } from '@server/config';
import { toDockerHost } from '@server/utils';
import { CompactTable, RecommendConstraint } from '@server/services';
import {
CompactTable,
DEFAULT_PREVIEW_LIMIT,
RecommendConstraint,
} from '@server/services';
import { snakeCase } from 'lodash';
import { WREN_AI_CONNECTION_INFO } from '../repositories';
import { toIbisConnectionInfo } from '../dataSource';
Expand Down Expand Up @@ -88,18 +92,21 @@ export interface ValidationResponse {
message: string | null;
}

export interface IbisQueryOptions {
export interface IbisBaseOptions {
dataSource: DataSourceName;
connectionInfo: WREN_AI_CONNECTION_INFO;
mdl: Manifest;
}
export interface IbisQueryOptions extends IbisBaseOptions {
limit?: number;
}

export interface IIbisAdaptor {
query: (
query: string,
options: IbisQueryOptions,
) => Promise<IbisQueryResponse>;
dryRun: (query: string, options: IbisQueryOptions) => Promise<boolean>;
dryRun: (query: string, options: IbisBaseOptions) => Promise<boolean>;
getTables: (
dataSource: DataSourceName,
connectionInfo: WREN_AI_CONNECTION_INFO,
Expand Down Expand Up @@ -148,6 +155,11 @@ export class IbisAdaptor implements IIbisAdaptor {
const res = await axios.post(
`${this.ibisServerEndpoint}/v2/ibis/${dataSourceUrlMap[dataSource]}/query`,
body,
{
params: {
limit: options.limit || DEFAULT_PREVIEW_LIMIT,
},
},
);
const response = res.data;
return response;
Expand Down
81 changes: 12 additions & 69 deletions wren-ui/src/apollo/server/adaptors/wrenEngineAdaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,16 @@ import axios, { AxiosResponse } from 'axios';
import { Manifest } from '../mdl/type';
import { getLogger } from '@server/utils';
import * as Errors from '@server/utils/error';
import { CompactTable } from '../services';
import { CompactTable, DEFAULT_PREVIEW_LIMIT } from '../services';

const logger = getLogger('WrenEngineAdaptor');
logger.level = 'debug';

const DEFAULT_PREVIEW_LIMIT = 500;

export enum WrenEngineDeployStatusEnum {
SUCCESS = 'SUCCESS',
FAILED = 'FAILED',
}

export interface WrenEngineDeployStatusResponse {
systemStatus: string;
version: string;
}

export interface DeployResponse {
status: WrenEngineDeployStatusEnum;
error?: string;
}

interface DeployPayload {
manifest: Manifest;
version: string;
}

export interface deployData {
manifest: Manifest;
hash: string;
}

export interface ColumnMetadata {
name: string;
type: string;
Expand Down Expand Up @@ -89,18 +67,23 @@ export interface DryRunResponse {
}

export interface IWrenEngineAdaptor {
deploy(deployData: deployData): Promise<DeployResponse>;
// duckdb data source related
prepareDuckDB(options: DuckDBPrepareOptions): Promise<void>;
listTables(): Promise<CompactTable[]>;
putSessionProps(props: Record<string, any>): Promise<void>;
queryDuckdb(sql: string): Promise<EngineQueryResponse>;
putSessionProps(props: Record<string, any>): Promise<void>;

// metadata related, used to fetch metadata of duckdb
listTables(): Promise<CompactTable[]>;

// config wren engine
patchConfig(config: Record<string, any>): Promise<void>;

// query
previewData(
sql: string,
mdl: Manifest,
limit?: number,
mdl?: Manifest,
): Promise<EngineQueryResponse>;
describeStatement(sql: string): Promise<DescribeStatementResponse>;
getNativeSQL(sql: string, options?: DryPlanOption): Promise<string>;
validateColumnIsValid(
manifest: Manifest,
Expand Down Expand Up @@ -182,33 +165,6 @@ export class WrenEngineAdaptor implements IWrenEngineAdaptor {
return this.formatToCompactTable(response);
}

public async deploy(deployData: deployData): Promise<DeployResponse> {
const { manifest, hash } = deployData;
const deployPayload = { manifest, version: hash } as DeployPayload;

try {
// skip if the model has been deployed
const resp = await this.getDeployStatus();
if (resp.version === hash) {
return { status: WrenEngineDeployStatusEnum.SUCCESS };
}

// start deploy to wren engine
await axios.post(
`${this.wrenEngineBaseEndpoint}/v1/mdl/deploy`,
deployPayload,
);
logger.debug(`WrenEngine: Deploy wren engine success, hash: ${hash}`);
return { status: WrenEngineDeployStatusEnum.SUCCESS };
} catch (err: any) {
logger.debug(`Got error when deploying to wren engine: ${err.message}`);
return {
status: WrenEngineDeployStatusEnum.FAILED,
error: `WrenEngine Error, deployment hash:${hash}: ${err.message}`,
};
}
}

public async putSessionProps(props: Record<string, any>) {
const setSessionStatements = Object.entries(props)
.map(([key, value]) => {
Expand Down Expand Up @@ -270,8 +226,8 @@ export class WrenEngineAdaptor implements IWrenEngineAdaptor {

public async previewData(
sql: string,
manifest: Manifest,
limit: number = DEFAULT_PREVIEW_LIMIT,
manifest?: Manifest,
): Promise<EngineQueryResponse> {
try {
const url = new URL(this.previewUrlPath, this.wrenEngineBaseEndpoint);
Expand All @@ -297,19 +253,6 @@ export class WrenEngineAdaptor implements IWrenEngineAdaptor {
}
}

public async describeStatement(
sql: string,
): Promise<DescribeStatementResponse> {
try {
// preview data with limit 1 to get column metadata
const res = await this.previewData(sql, 1);
return { columns: res.columns };
} catch (err: any) {
logger.debug(`Got error when describing statement: ${err.message}`);
throw err;
}
}

public async getNativeSQL(
sql: string,
options: DryPlanOption,
Expand Down
4 changes: 2 additions & 2 deletions wren-ui/src/apollo/server/data/sample.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ export const sampleDatasets: Record<string, SampleDataset> = {
},
{
name: 'BlkHome',
properties: { description: 'number of bloacks by the home team.' },
properties: { description: 'number of blocks by the home team.' },
},
{
name: 'TovHome',
Expand Down Expand Up @@ -748,7 +748,7 @@ export const sampleDatasets: Record<string, SampleDataset> = {
},
{
name: 'BlkAway',
properties: { description: 'number of bloacks by the away team.' },
properties: { description: 'number of blocks by the away team.' },
},
{
name: 'TovAway',
Expand Down
31 changes: 15 additions & 16 deletions wren-ui/src/apollo/server/resolvers/modelResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ export enum SyncStatusEnum {
UNSYNCRONIZED = 'UNSYNCRONIZED',
}

const PREVIEW_MAX_OUTPUT_ROW = 100;

export class ModelResolver {
constructor() {
// model & model column
Expand Down Expand Up @@ -147,7 +145,7 @@ export class ModelResolver {
const { manifest } = await ctx.mdlService.makeCurrentModelMDL();
const currentHash = ctx.deployService.createMDLHash(manifest, id);
const lastDeploy = await ctx.deployService.getLastDeployment(id);
const lastDeployHash = lastDeploy.hash;
const lastDeployHash = lastDeploy?.hash;
const inProgressDeployment =
await ctx.deployService.getInProgressDeployment(id);
if (inProgressDeployment) {
Expand Down Expand Up @@ -547,8 +545,7 @@ export class ModelResolver {

// create view
const project = await ctx.projectService.getCurrentProject();
const deployment = await ctx.deployService.getLastDeployment(project.id);
const mdl = deployment.manifest;
const { manifest } = await ctx.deployService.getLastDeployment(project.id);

// get sql statement of a response
const response = await ctx.askingService.getResponse(responseId);
Expand All @@ -563,9 +560,9 @@ export class ModelResolver {
// describe columns
const { columns } = await ctx.queryService.describeStatement(statement, {
project,
limit: PREVIEW_MAX_OUTPUT_ROW,
limit: 1,
modelingOnly: false,
mdl,
manifest,
});

if (isEmpty(columns)) {
Expand Down Expand Up @@ -634,9 +631,8 @@ export class ModelResolver {

const data = (await ctx.queryService.preview(sql, {
project,
limit: PREVIEW_MAX_OUTPUT_ROW,
modelingOnly: false,
mdl: manifest,
manifest,
})) as PreviewDataResponse;

return data;
Expand All @@ -653,8 +649,8 @@ export class ModelResolver {

const data = (await ctx.queryService.preview(view.statement, {
project,
limit: limit | PREVIEW_MAX_OUTPUT_ROW,
mdl: manifest,
limit,
manifest,
modelingOnly: false,
})) as PreviewDataResponse;
return data;
Expand All @@ -669,13 +665,12 @@ export class ModelResolver {
const project = projectId
? await ctx.projectService.getProjectById(projectId)
: await ctx.projectService.getCurrentProject();
const deployment = await ctx.deployService.getLastDeployment(project.id);
const mdl = deployment.manifest;
const { manifest } = await ctx.deployService.getLastDeployment(project.id);
const previewRes = await ctx.queryService.preview(sql, {
project,
limit: limit || PREVIEW_MAX_OUTPUT_ROW,
limit: limit,
modelingOnly: false,
mdl,
manifest,
dryRun,
});
return dryRun ? { dryRun: 'success' } : previewRes;
Expand All @@ -693,6 +688,7 @@ export class ModelResolver {
if (sampleDataset) {
throw new Error(`Doesn't support Native SQL`);
}
const { manifest } = await ctx.mdlService.makeCurrentModelMDL();

// get sql statement of a response
const response = await ctx.askingService.getResponse(responseId);
Expand All @@ -704,7 +700,10 @@ export class ModelResolver {
const steps = response.detail.steps;
const sql = format(constructCteSql(steps));

return await ctx.wrenEngineAdaptor.getNativeSQL(sql);
return await ctx.wrenEngineAdaptor.getNativeSQL(sql, {
manifest,
modelingOnly: false,
});
}

public async updateViewMetadata(
Expand Down
4 changes: 2 additions & 2 deletions wren-ui/src/apollo/server/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ export const typeDefs = gql`
input PreviewViewDataInput {
id: Int!
# It will return default 100 rows if not specified limit
# refer: PREVIEW_MAX_OUTPUT_ROW
# It will return default 500 rows if not specified limit
# refer: DEFAULT_PREVIEW_LIMIT
limit: Int
}
Expand Down
2 changes: 1 addition & 1 deletion wren-ui/src/apollo/server/services/askingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ export class AskingService implements IAskingService {
const sql = format(constructCteSql(steps, stepIndex));
const data = (await this.queryService.preview(sql, {
project,
mdl,
manifest: mdl,
limit,
})) as PreviewDataResponse;

Expand Down
37 changes: 15 additions & 22 deletions wren-ui/src/apollo/server/services/deployService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ import {
IWrenAIAdaptor,
WrenAIDeployStatusEnum,
} from '../adaptors/wrenAIAdaptor';
import {
IWrenEngineAdaptor,
WrenEngineDeployStatusEnum,
} from '../adaptors/wrenEngineAdaptor';
import {
Deploy,
DeployStatusEnum,
Expand Down Expand Up @@ -43,23 +39,19 @@ export interface IDeployService {

export class DeployService implements IDeployService {
private wrenAIAdaptor: IWrenAIAdaptor;
private wrenEngineAdaptor: IWrenEngineAdaptor;
private deployLogRepository: IDeployLogRepository;
private telemetry: Telemetry;

constructor({
wrenAIAdaptor,
wrenEngineAdaptor,
deployLogRepository,
telemetry,
}: {
wrenAIAdaptor: IWrenAIAdaptor;
wrenEngineAdaptor: IWrenEngineAdaptor;
deployLogRepository: IDeployLogRepository;
telemetry: Telemetry;
}) {
this.wrenAIAdaptor = wrenAIAdaptor;
this.wrenEngineAdaptor = wrenEngineAdaptor;
this.deployLogRepository = deployLogRepository;
this.telemetry = telemetry;
}
Expand Down Expand Up @@ -102,24 +94,25 @@ export class DeployService implements IDeployService {
} as Deploy;
const deploy = await this.deployLogRepository.createOne(deployData);

// deploy to wren-engine & AI-service
const [engineRes, aiRes] = await Promise.all([
this.wrenEngineAdaptor.deploy({ manifest, hash }),
this.wrenAIAdaptor.deploy({ manifest, hash }),
]);
// deploy to AI-service
const { status: aiStatus, error: aiError } =
await this.wrenAIAdaptor.deploy({
manifest,
hash,
});

// store deploy log
// update deploy status
const status =
engineRes.status === WrenEngineDeployStatusEnum.SUCCESS &&
aiRes.status === WrenAIDeployStatusEnum.SUCCESS
aiStatus === WrenAIDeployStatusEnum.SUCCESS
? DeployStatusEnum.SUCCESS
: DeployStatusEnum.FAILED;
const error = engineRes.error || aiRes.error;
await this.deployLogRepository.updateOne(deploy.id, { status, error });
if (status === DeployStatusEnum.SUCCESS) {
this.telemetry.send_event('deploy_model', { mdl: manifest });
}
return { status, error };
await this.deployLogRepository.updateOne(deploy.id, {
status,
error: aiError,
});
this.telemetry.send_event('deploy_model', { mdl: manifest });

return { status, error: aiError };
}

public createMDLHash(manifest: Manifest, projectId: number) {
Expand Down
Loading

0 comments on commit 2a6d1e4

Please sign in to comment.