Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

提交pg库表元数据功能代码 #2086

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ require (
github.com/swaggo/swag v1.6.7
github.com/ungerik/go-dry v0.0.0-20210209114055-a3e162a9e62e
github.com/urfave/cli/v2 v2.8.1
github.com/vektah/gqlparser/v2 v2.5.1
github.com/lib/pq v1.10.2
golang.org/x/net v0.15.0
google.golang.org/grpc v1.50.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0
Expand Down
55 changes: 38 additions & 17 deletions sqle/server/auditplan/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ type Meta struct {
}

const (
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeHuaweiRdsMySQLSlowLog = "huawei_rds_mysql_slow_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypeDefault = "default"
TypeMySQLSlowLog = "mysql_slow_log"
TypeMySQLMybatis = "mysql_mybatis"
TypeMySQLSchemaMeta = "mysql_schema_meta"
TypeMySQLProcesslist = "mysql_processlist"
TypeAliRdsMySQLSlowLog = "ali_rds_mysql_slow_log"
TypeAliRdsMySQLAuditLog = "ali_rds_mysql_audit_log"
TypeOracleTopSQL = "oracle_top_sql"
TypeTiDBAuditLog = "tidb_audit_log"
TypeAllAppExtract = "all_app_extract"
TypeBaiduRdsMySQLSlowLog = "baidu_rds_mysql_slow_log"
TypeSQLFile = "sql_file"
TypePostgreSQLSchemaMeta = "Postgresql_schema_meta"
)

const (
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypeAll = ""
InstanceTypeMySQL = "MySQL"
InstanceTypeOracle = "Oracle"
InstanceTypeTiDB = "TiDB"
InstanceTypePostgreSQL = "PostgreSQL"
)

const (
Expand Down Expand Up @@ -371,6 +372,26 @@ var Metas = []Meta{
InstanceType: InstanceTypeAll,
CreateTask: NewDefaultTask,
},
{
Type: TypePostgreSQLSchemaMeta,
Desc: "库表元数据",
InstanceType: InstanceTypePostgreSQL,
CreateTask: NewPostgreSQLSchemaMetaTask,
Params: []*params.Param{
{
Key: paramKeyCollectIntervalMinute,
Desc: "采集周期(分钟)",
Value: "60",
Type: params.ParamTypeInt,
},
{
Key: "collect_view",
Desc: "是否采集视图信息",
Value: "0",
Type: params.ParamTypeBool,
},
},
},
}

var MetaMap = map[string]Meta{}
Expand Down
267 changes: 267 additions & 0 deletions sqle/server/auditplan/task.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/actiontech/sqle/sqle/driver"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -1550,3 +1551,269 @@ func NewBaiduRdsMySQLSlowLogTask(entry *logrus.Entry, ap *model.AuditPlan) Task

return b
}

// PostgreSQL库表元数据
type PostgreSQLSchemaMetaTask struct {
*sqlCollector
}

func NewPostgreSQLSchemaMetaTask(entry *logrus.Entry, ap *model.AuditPlan) Task {
sqlCollector := newSQLCollector(entry, ap)
task := &PostgreSQLSchemaMetaTask{
sqlCollector,
}
sqlCollector.do = task.collectorDo
return task
}

func (at *PostgreSQLSchemaMetaTask) collectorDo() {
if at.ap.InstanceName == "" {
at.logger.Warnf("instance is not configured")
return
}
if at.ap.InstanceDatabase == "" {
at.logger.Warnf("instance schema is not configured")
return
}
instance, _, err := dms.GetInstanceInProjectByName(context.Background(), string(at.ap.ProjectId), at.ap.InstanceName)
if err != nil {
at.logger.Errorf("get pg instance in project by instanceName failed: %s\n", err)
return
}

pluginMgr := driver.GetPluginManager()
if !pluginMgr.IsOptionalModuleEnabled(instance.DbType, driverV2.OptionalModuleQuery) {
at.logger.Errorf("collect pg schema meta failed: %v",
driver.NewErrPluginAPINotImplement(driverV2.OptionalModuleQuery))
return
}
plugin, err := pluginMgr.OpenPlugin(at.logger, instance.DbType, &driverV2.Config{DSN: &driverV2.DSN{
Host: instance.Host,
Port: instance.Port,
User: instance.User,
Password: instance.Password,
AdditionalParams: instance.AdditionalParams,
DatabaseName: at.ap.InstanceDatabase,
}})
if err != nil {
at.logger.Errorf("connect to instance fail, error: %v", err)
return
}

schemas, err := at.GetAllUserSchemas(plugin)
if err != nil {
at.logger.Errorf("get database=%s schemas error: %s", at.ap.InstanceDatabase, err)
return
}
if len(schemas) == 0 {
at.logger.Errorf("get database=%s schemas empty error: %s", at.ap.InstanceDatabase, err)
return
}

finalTableSqls := make([]string, 0)
finalViewSqls := make([]string, 0)
for _, schema := range schemas {
tables, err := at.ShowSchemaTablesForPg(plugin, schema)
ColdWaterLW marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
at.logger.Errorf("get schema table fail, error: %s", err)
continue
}
for _, table := range tables {
tableSqls, err := at.ShowCreateTablesForPg(plugin, at.ap.InstanceDatabase, schema, table)
ColdWaterLW marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
at.logger.Errorf("show create table fail, error: %s", err)
continue
}
if len(tableSqls) > 0 {
finalTableSqls = append(finalTableSqls, tableSqls...)
}
}
if len(finalTableSqls) > 0 {
err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalTableSqls, schema))
if err != nil {
at.logger.Errorf("save table schema meta to storage fail, error: %s", err)
}
}
}

for _, schema := range schemas {
var views []string
if at.ap.Params.GetParam("collect_view").Bool() {
views, err = at.ShowSchemaViewsForPg(plugin, schema)
ColdWaterLW marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
at.logger.Errorf("get schema view fail, error: %s", err)
continue
}
}
for _, view := range views {
viewSqls, err := at.ShowCreateViewsForPg(plugin, at.ap.InstanceDatabase, schema, view)
ColdWaterLW marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
at.logger.Errorf("show create view fail, error: %s", err)
continue
}
if len(viewSqls) > 0 {
finalViewSqls = append(finalViewSqls, viewSqls...)
}
}
if len(finalViewSqls) > 0 {
err = at.persist.OverrideAuditPlanSQLs(at.ap.ID, convertRawSQLToModelSQLs(finalViewSqls, schema))
if err != nil {
at.logger.Errorf("save view schema meta to storage fail, error: %s", err)
}
}
}
}

func (at *PostgreSQLSchemaMetaTask) GetAllUserSchemas(plugin driver.Plugin) ([]string, error) {
querySql := "SELECT nspname FROM pg_namespace WHERE nspname NOT LIKE 'pg_%' AND nspname != 'information_schema'"
ColdWaterLW marked this conversation as resolved.
Show resolved Hide resolved
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowSchemaTablesForPg(plugin driver.Plugin, schema string) ([]string, error) {
querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+
" where table_schema='%s' and TABLE_TYPE in ('BASE TABLE','SYSTEM VIEW')", schema)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowSchemaViewsForPg(plugin driver.Plugin, schema string) ([]string, error) {
querySql := fmt.Sprintf("select TABLE_NAME from information_schema.tables "+
" where table_schema='%s' and TABLE_TYPE='VIEW'", schema)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) ShowCreateTablesForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) {
tables := make([]string, 0)
tableDDl := fmt.Sprintf("CREATE TABLE %s.%s(", schema, tableName)
columnsCondition := fmt.Sprintf("table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'",
database, schema, tableName)
// 获取列定义,多个英文逗号分割
columns := fmt.Sprintf("SELECT string_agg(column_name || ' ' || "+
"CASE "+
" WHEN lower(data_type) IN ('char', 'varchar', 'character', 'character varying') "+
" THEN data_type || '(' || COALESCE(character_maximum_length, 0) || ')' "+
" WHEN lower(data_type) IN ('numeric', 'decimal') "+
" THEN data_type || '(' || COALESCE(numeric_precision, 0) || ',' || COALESCE(numeric_scale, 0) || ')' "+
" WHEN lower(data_type) IN ('integer', 'smallint', 'bigint', 'text') THEN data_type "+
" ELSE data_type "+
" END "+
" || "+
" CASE "+
" WHEN column_default != '' THEN ' DEFAULT ' || column_default ELSE '' END "+
" || "+
" CASE "+
" WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE '' END, ',\n ' ORDER BY ordinal_position) AS columns_sql"+
" FROM information_schema.columns "+
" WHERE %s GROUP BY table_name", columnsCondition)
sqls, err := at.GetResultSqls(plugin, columns)
if err != nil {
at.logger.Errorf("search column definition error:%s\n", err)
return nil, err
}
if len(sqls) == 0 {
return tables, nil
}
tableDDl += strings.Join(sqls, "")
constraintsCondition := fmt.Sprintf("n.nspname = '%s' AND C.relname = '%s'", schema, tableName)
// 获取所有约束
constraints := fmt.Sprintf("SELECT 'CONSTRAINT ' || r.conname || ' ' || "+
" pg_catalog.pg_get_constraintdef ( r.OID, TRUE ) AS constraint_definition "+
" FROM pg_catalog.pg_constraint r "+
" JOIN pg_catalog.pg_class C ON C.OID = r.conrelid "+
" JOIN pg_catalog.pg_namespace n ON n.OID = C.relnamespace "+
" WHERE %s", constraintsCondition)
sqls, err = at.GetResultSqls(plugin, constraints)
if err != nil {
at.logger.Errorf("search constraint definition error:%s\n", err)
return nil, err
}
for _, sqlContext := range sqls {
tableDDl += ",\n" + sqlContext
}
tableDDl += ")"
indexesCondition := fmt.Sprintf("schemaname = '%s' and tablename = '%s' ", schema, tableName)
// 获取索引
indexes := fmt.Sprintf("SELECT indexdef AS index_definition FROM pg_indexes "+
" WHERE %s", indexesCondition)
sqls, err = at.GetResultSqls(plugin, indexes)
if err != nil {
at.logger.Errorf("search index definition error:%s\n", err)
return nil, err
}
for _, sqlContent := range sqls {
if strings.Contains(sqlContent, "CREATE UNIQUE INDEX") {
continue
}
tableDDl += ";\n" + sqlContent
}
tables = append(tables, tableDDl)
return tables, nil
}

func (at *PostgreSQLSchemaMetaTask) ShowCreateViewsForPg(plugin driver.Plugin, database, schema, tableName string) ([]string, error) {
querySql := fmt.Sprintf(
"SELECT 'CREATE OR REPLACE VIEW ' || table_schema || '.' || table_name || ' AS ' || view_definition"+
" AS create_view_statement "+
" FROM information_schema.views "+
" WHERE table_catalog = '%s' AND table_schema = '%s' AND table_name = '%s'",
database, schema, tableName)
return at.GetResultSqls(plugin, querySql)
}

func (at *PostgreSQLSchemaMetaTask) GetResultSqls(plugin driver.Plugin, sql string) ([]string, error) {
var ret []string
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2)
defer cancel()

result, err := plugin.Query(ctx, sql, &driverV2.QueryConf{TimeOutSecond: 120})
if err != nil {
at.logger.Errorf("plugin.Query() failed:%s\n", err)
return nil, err
}
rows := result.Rows
for _, row := range rows {
values := row.Values
if len(values) == 0 {
continue
}
sqlContent := values[0].Value
if len(sqlContent) == 0 {
continue
}
ret = append(ret, sqlContent)
}
return ret, nil
}

func (at *PostgreSQLSchemaMetaTask) Audit() (*AuditResultResp, error) {
task, err := getTaskWithInstanceByAuditPlan(at.ap, at.persist)
if err != nil {
return nil, err
}
return at.baseTask.audit(task)
}

func (at *PostgreSQLSchemaMetaTask) GetSQLs(args map[string]interface{}) ([]Head, []map[string] /* head name */ string, uint64, error) {
auditPlanSQLs, count, err := at.persist.GetAuditPlanSQLsByReq(args)
if err != nil {
return nil, nil, count, err
}
head, rows := buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs)
return head, rows, count, nil
}

func buildPostgreSQLSchemaMetaSQLsResult(auditPlanSQLs []*model.AuditPlanSQLListDetail) ([]Head, []map[string] /* head name */ string) {
head := []Head{
{
Name: "sql",
Desc: "SQL语句",
Type: "sql",
},
}
rows := make([]map[string]string, 0, len(auditPlanSQLs))
for _, sql := range auditPlanSQLs {
rows = append(rows, map[string]string{
"sql": sql.SQLContent,
})
}
return head, rows
}