Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: resolve compatibilities for Metabase (#340) #352

Merged
merged 10 commits into from
Jan 9, 2025
58 changes: 58 additions & 0 deletions catalog/internal_macro.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package catalog

import "strings"

type MacroDefinition struct {
Params []string
DDL string
}

type InternalMacro struct {
Schema string
Name string
IsTableMacro bool
// A macro can be overloaded with multiple definitions, each with a different set of parameters.
// https://duckdb.org/docs/sql/statements/create_macro.html#overloading
Definitions []MacroDefinition
}

func (v *InternalMacro) QualifiedName() string {
if strings.ToLower(v.Schema) == "pg_catalog" {
return "__sys__." + v.Name
}
return v.Schema + "." + v.Name
}

var InternalMacros = []InternalMacro{
{
Schema: "information_schema",
Name: "_pg_expandarray",
IsTableMacro: true,
Definitions: []MacroDefinition{
{
Params: []string{"a"},
DDL: `SELECT STRUCT_PACK(
x := unnest(a),
n := generate_series(1, array_length(a))
) AS item`,
},
},
},
{
Schema: "pg_catalog",
Name: "pg_get_indexdef",
IsTableMacro: false,
Definitions: []MacroDefinition{
{
Params: []string{"index_oid"},
// Do nothing currently
DDL: `''`,
},
{
Params: []string{"index_oid", "column_no", "pretty_bool"},
// Do nothing currently
DDL: `''`,
},
},
},
}
48 changes: 48 additions & 0 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ var InternalTables = struct {
PGProc InternalTable
PGClass InternalTable
PGNamespace InternalTable
PGMatViews InternalTable
}{
PersistentVariable: InternalTable{
Schema: "__sys__",
Expand Down Expand Up @@ -608,6 +609,52 @@ var InternalTables = struct {
"nspacl TEXT",
InitialData: InitialDataTables.PGNamespace,
},
// View "pg_catalog.pg_matviews"
// postgres=# \d+ pg_catalog.pg_matviews
// View "pg_catalog.pg_matviews"
// Column | Type | Collation | Nullable | Default | Storage | Description
//--------------+---------+-----------+----------+---------+----------+-------------
// schemaname | name | | | | plain |
// matviewname | name | | | | plain |
// matviewowner | name | | | | plain |
// tablespace | name | | | | plain |
// hasindexes | boolean | | | | plain |
// ispopulated | boolean | | | | plain |
// definition | text | | | | extended |
//View definition:
// SELECT n.nspname AS schemaname,
// c.relname AS matviewname,
// pg_get_userbyid(c.relowner) AS matviewowner,
// t.spcname AS tablespace,
// c.relhasindex AS hasindexes,
// c.relispopulated AS ispopulated,
// pg_get_viewdef(c.oid) AS definition
// FROM pg_class c
// LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
// LEFT JOIN pg_tablespace t ON t.oid = c.reltablespace
// WHERE c.relkind = 'm'::"char";
PGMatViews: InternalTable{
Schema: "__sys__",
Name: "pg_matviews",
KeyColumns: []string{
"schemaname",
"matviewname",
},
ValueColumns: []string{
"matviewowner",
"tablespace",
"hasindexes",
"ispopulated",
"definition",
},
DDL: "schemaname VARCHAR NOT NULL, " +
"matviewname VARCHAR NOT NULL, " +
"matviewowner VARCHAR, " +
"tablespace VARCHAR, " +
"hasindexes BOOLEAN, " +
"ispopulated BOOLEAN, " +
"definition TEXT",
},
}

var internalTables = []InternalTable{
Expand All @@ -621,6 +668,7 @@ var internalTables = []InternalTable{
InternalTables.PGProc,
InternalTables.PGClass,
InternalTables.PGNamespace,
InternalTables.PGMatViews,
}

func GetInternalTables() []InternalTable {
Expand Down
88 changes: 88 additions & 0 deletions catalog/internal_views.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package catalog

type InternalView struct {
Schema string
Name string
DDL string
}

func (v *InternalView) QualifiedName() string {
return v.Schema + "." + v.Name
}

var InternalViews = []InternalView{
{
Schema: "__sys__",
Name: "pg_stat_user_tables",
DDL: `SELECT
t.table_schema || '.' || t.table_name AS relid, -- Create a unique ID for the table
t.table_schema AS schemaname, -- Schema name
t.table_name AS relname, -- Table name
0 AS seq_scan, -- Default to 0 (DuckDB doesn't track this)
NULL AS last_seq_scan, -- Placeholder (DuckDB doesn't track this)
0 AS seq_tup_read, -- Default to 0
0 AS idx_scan, -- Default to 0
NULL AS last_idx_scan, -- Placeholder
0 AS idx_tup_fetch, -- Default to 0
0 AS n_tup_ins, -- Default to 0 (inserted tuples not tracked)
0 AS n_tup_upd, -- Default to 0 (updated tuples not tracked)
0 AS n_tup_del, -- Default to 0 (deleted tuples not tracked)
0 AS n_tup_hot_upd, -- Default to 0 (HOT updates not tracked)
0 AS n_tup_newpage_upd, -- Default to 0 (new page updates not tracked)
0 AS n_live_tup, -- Default to 0 (live tuples not tracked)
0 AS n_dead_tup, -- Default to 0 (dead tuples not tracked)
0 AS n_mod_since_analyze, -- Default to 0
0 AS n_ins_since_vacuum, -- Default to 0
NULL AS last_vacuum, -- Placeholder
NULL AS last_autovacuum, -- Placeholder
NULL AS last_analyze, -- Placeholder
NULL AS last_autoanalyze, -- Placeholder
0 AS vacuum_count, -- Default to 0
0 AS autovacuum_count, -- Default to 0
0 AS analyze_count, -- Default to 0
0 AS autoanalyze_count -- Default to 0
FROM
information_schema.tables t
WHERE
t.table_type = 'BASE TABLE'; -- Include only base tables (not views)`,
},
{
Schema: "__sys__",
Name: "pg_index",
DDL: `SELECT
ROW_NUMBER() OVER () AS indexrelid, -- Simulated unique ID for the index
t.table_oid AS indrelid, -- OID of the table
COUNT(k.column_name) AS indnatts, -- Number of columns included in the index
COUNT(k.column_name) AS indnkeyatts, -- Number of key columns in the index (same as indnatts here)
CASE
WHEN c.constraint_type = 'UNIQUE' THEN TRUE
ELSE FALSE
END AS indisunique, -- Indicates if the index is unique
CASE
WHEN c.constraint_type = 'PRIMARY KEY' THEN TRUE
ELSE FALSE
END AS indisprimary, -- Indicates if the index is a primary key
ARRAY_AGG(k.ordinal_position ORDER BY k.ordinal_position) AS indkey, -- Array of column positions
ARRAY[]::BIGINT[] AS indcollation, -- DuckDB does not support collation, set to default
ARRAY[]::BIGINT[] AS indclass, -- DuckDB does not support index class, set to default
ARRAY[]::INTEGER[] AS indoption, -- DuckDB does not support index options, set to default
NULL AS indexprs, -- DuckDB does not support expression indexes, set to NULL
NULL AS indpred -- DuckDB does not support partial indexes, set to NULL
FROM
information_schema.key_column_usage k
JOIN
information_schema.table_constraints c
ON k.constraint_name = c.constraint_name
AND k.table_name = c.table_name
JOIN
duckdb_tables() t
ON k.table_name = t.table_name
AND k.table_schema = t.schema_name
WHERE
c.constraint_type IN ('PRIMARY KEY', 'UNIQUE') -- Only select primary key and unique constraints
GROUP BY
t.table_oid, c.constraint_type, c.constraint_name
ORDER BY
t.table_oid;`,
},
}
52 changes: 44 additions & 8 deletions catalog/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
dataDir: dataDir,
}

shouldInit := true
if defaultDB == "" || defaultDB == "memory" {
prov.defaultCatalogName = "memory"
prov.dbFile = ""
Expand All @@ -66,8 +65,6 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
prov.defaultCatalogName = defaultDB
prov.dbFile = defaultDB + ".db"
prov.dsn = filepath.Join(prov.dataDir, prov.dbFile)
_, err = os.Stat(prov.dsn)
shouldInit = os.IsNotExist(err)
}

prov.connector, err = duckdb.NewConnector(prov.dsn, nil)
Expand All @@ -94,11 +91,9 @@ func NewDBProvider(defaultTimeZone, dataDir, defaultDB string) (prov *DatabasePr
}
}

if shouldInit {
err = prov.initCatalog()
if err != nil {
return nil, err
}
err = prov.initCatalog()
if err != nil {
return nil, err
}

err = prov.attachCatalogs()
Expand Down Expand Up @@ -182,6 +177,47 @@ func (prov *DatabaseProvider) initCatalog() error {
}
}

for _, v := range InternalViews {
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+v.Schema,
); err != nil {
return fmt.Errorf("failed to create internal schema %q: %w", v.Schema, err)
}
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE VIEW IF NOT EXISTS "+v.QualifiedName()+" AS "+v.DDL,
); err != nil {
return fmt.Errorf("failed to create internal view %q: %w", v.Name, err)
}
}

for _, m := range InternalMacros {
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE SCHEMA IF NOT EXISTS "+m.Schema,
); err != nil {
return fmt.Errorf("failed to create internal schema %q: %w", m.Schema, err)
}
definitions := make([]string, 0, len(m.Definitions))
for _, d := range m.Definitions {
macroParams := strings.Join(d.Params, ", ")
var asType string
if m.IsTableMacro {
asType = "TABLE\n"
} else {
asType = "\n"
}
definitions = append(definitions, fmt.Sprintf("\n(%s) AS %s%s", macroParams, asType, d.DDL))
}
if _, err := prov.storage.ExecContext(
context.Background(),
"CREATE OR REPLACE MACRO "+m.QualifiedName()+strings.Join(definitions, ",")+";",
); err != nil {
return fmt.Errorf("failed to create internal macro %q: %w", m.Name, err)
}
}

if _, err := prov.pool.ExecContext(context.Background(), "PRAGMA enable_checkpoint_on_shutdown"); err != nil {
logrus.WithError(err).Fatalln("Failed to enable checkpoint on shutdown")
}
Expand Down
15 changes: 15 additions & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,26 @@ type ConvertedStatement struct {
Tag string
PgParsable bool
HasSentRowDesc bool
IsExtendedQuery bool
SubscriptionConfig *SubscriptionConfig
BackupConfig *BackupConfig
RestoreConfig *RestoreConfig
}

func (cs ConvertedStatement) WithQueryString(queryString string) ConvertedStatement {
return ConvertedStatement{
VWagen1989 marked this conversation as resolved.
Show resolved Hide resolved
String: queryString,
AST: cs.AST,
Tag: cs.Tag,
PgParsable: cs.PgParsable,
HasSentRowDesc: cs.HasSentRowDesc,
IsExtendedQuery: cs.IsExtendedQuery,
SubscriptionConfig: cs.SubscriptionConfig,
BackupConfig: cs.BackupConfig,
RestoreConfig: cs.RestoreConfig,
}
}

// copyFromStdinState tracks the metadata for an import of data into a table using a COPY FROM STDIN statement. When
// this statement is processed, the server accepts COPY DATA messages from the client with chunks of data to load
// into a table.
Expand Down
Loading
Loading