Skip to content

Commit

Permalink
Remove protobuf2 from qlbridge
Browse files Browse the repository at this point in the history
  • Loading branch information
ajroetker committed Oct 8, 2024
1 parent d030357 commit c7d8a38
Show file tree
Hide file tree
Showing 29 changed files with 231 additions and 4,874 deletions.
3 changes: 0 additions & 3 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
ignore:
# - "**/*.pb.go" # ignore protobuf files
- "expr/node.pb.go"
- "plan/plan.pb.go"
- "rel/sql.pb.go"
- "schema/schema.pb.go"
- "examples/**/*"
- "datasource/mockcsvtestdata/*"
Expand Down
71 changes: 34 additions & 37 deletions datasource/sqlite/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type (
colidx map[string]int
err error
sqlInsert string
sqlUpdate string
}
)

Expand Down Expand Up @@ -109,7 +108,7 @@ func (m *qryconn) CreateMutator(pc interface{}) (schema.ConnMutator, error) {
m.stmt = ctx.Stmt
return m, nil
}
return nil, fmt.Errorf("Expected *plan.Context but got %T", pc)
return nil, fmt.Errorf("expected *plan.Context but got %T", pc)
}

func (m *qryconn) Next() schema.Message {
Expand All @@ -122,43 +121,41 @@ func (m *qryconn) Next() schema.Message {
case <-m.exit:
return nil
default:
for {
if !m.rows.Next() {
return nil
}
//vals := make([]driver.Value, len(m.cols))
//u.Infof("expecting %d cols", len(m.cols))
readCols := make([]interface{}, len(m.cols))
writeCols := make([]driver.Value, len(m.cols))
for i := range writeCols {
readCols[i] = &writeCols[i]
}
//cols, _ := m.rows.Columns()
//u.Debugf("sqlite result cols provides %v but expecting %d", cols, len(m.cols))
if !m.rows.Next() {
return nil
}
//vals := make([]driver.Value, len(m.cols))
//u.Infof("expecting %d cols", len(m.cols))
readCols := make([]interface{}, len(m.cols))
writeCols := make([]driver.Value, len(m.cols))
for i := range writeCols {
readCols[i] = &writeCols[i]
}
//cols, _ := m.rows.Columns()
//u.Debugf("sqlite result cols provides %v but expecting %d", cols, len(m.cols))

m.err = m.rows.Scan(readCols...)
if m.err != nil {
u.Warnf("err=%v", m.err)
return nil
}
//u.Debugf("read vals: %#v", writeCols)

// This seems pretty gross, isn't there a better way to do this?
for i, col := range writeCols {
//u.Debugf("%d %s %T %v", i, m.cols[i], col, col)
switch val := col.(type) {
case []uint8:
writeCols[i] = driver.Value(string(val))
}
m.err = m.rows.Scan(readCols...)
if m.err != nil {
u.Warnf("err=%v", m.err)
return nil
}
//u.Debugf("read vals: %#v", writeCols)

// This seems pretty gross, isn't there a better way to do this?
for i, col := range writeCols {
//u.Debugf("%d %s %T %v", i, m.cols[i], col, col)
switch val := col.(type) {
case []uint8:
writeCols[i] = driver.Value(string(val))
}
msg := datasource.NewSqlDriverMessageMap(m.ct, writeCols, m.colidx)
}
msg := datasource.NewSqlDriverMessageMap(m.ct, writeCols, m.colidx)

m.ct++
m.ct++

//u.Infof("return item btreeP:%p itemP:%p cursorP:%p %v %v", m, item, m.cursor, msg.Id(), msg.Values())
//u.Debugf("return? %T %v", item, item.(*DriverItem).SqlDriverMessageMap)
return msg
}
//u.Infof("return item btreeP:%p itemP:%p cursorP:%p %v %v", m, item, m.cursor, msg.Id(), msg.Values())
//u.Debugf("return? %T %v", item, item.(*DriverItem).SqlDriverMessageMap)
return msg
}
}

Expand All @@ -170,7 +167,7 @@ func (m *qryconn) Put(ctx context.Context, key schema.Key, row interface{}) (sch
case []driver.Value:
if len(rowVals) != len(m.Columns()) {
u.Warnf("wrong column ct")
return nil, fmt.Errorf("Wrong number of columns, got %v expected %v", len(rowVals), len(m.Columns()))
return nil, fmt.Errorf("wrong number of columns, got %v expected %v", len(rowVals), len(m.Columns()))
}

id := MakeId(rowVals[m.indexCol])
Expand Down Expand Up @@ -205,7 +202,7 @@ func (m *qryconn) Put(ctx context.Context, key schema.Key, row interface{}) (sch
return NewKey(id), nil
default:
u.Warnf("not implemented %T", row)
return nil, fmt.Errorf("Expected []driver.Value but got %T", row)
return nil, fmt.Errorf("expected []driver.Value but got %T", row)
}
}

Expand Down
2 changes: 0 additions & 2 deletions datasource/sqlite/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@ var (
// - Support full predicate push down to SqlLite.
// - Support Thread-Safe wrapper around sqlite file.
type Source struct {
exit <-chan bool
schema *schema.Schema
file string // Local file path to sqlite db
db *sql.DB
mu sync.Mutex
source *Source
qryconns map[string]*qryconn
tables map[string]*schema.Table
tblmu sync.Mutex
Expand Down
82 changes: 20 additions & 62 deletions datasource/sqlite/sqlrewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import (
"github.com/lytics/qlbridge/expr"
"github.com/lytics/qlbridge/lex"
"github.com/lytics/qlbridge/rel"
"github.com/lytics/qlbridge/value"
"github.com/lytics/qlbridge/vm"
)

type rewrite struct {
sel *rel.SqlSelect
result *rel.SqlSelect
needsPolyFill bool // do we request that features be polyfilled?
sel *rel.SqlSelect
result *rel.SqlSelect
}

func newRewriter(stmt *rel.SqlSelect) *rewrite {
Expand Down Expand Up @@ -65,28 +62,9 @@ func (m *rewrite) rewrite() (string, error) {
return m.result.String(), nil
}

// eval() returns ( value, isOk, isIdentity )
func (m *rewrite) eval(arg expr.Node) (value.Value, bool, bool) {
switch arg := arg.(type) {
case *expr.NumberNode, *expr.StringNode:
val, ok := vm.Eval(nil, arg)
return val, ok, false
case *expr.IdentityNode:
if arg.IsBooleanIdentity() {
return value.NewBoolValue(arg.Bool()), true, false
}
return value.NewStringValue(arg.Text), true, true
case *expr.ArrayNode:
val, ok := vm.Eval(nil, arg)
return val, ok, false
}
return nil, false, false
}

// Aggregations from the <select_list>
//
// SELECT <select_list> FROM ... WHERE
//
// SELECT <select_list> FROM ... WHERE
func (m *rewrite) walkSelectList() error {

m.result.Columns = m.sel.Columns
Expand All @@ -109,7 +87,7 @@ func (m *rewrite) walkSelectList() error {
newNode, err := m.selectFunc(curNode)
if err == nil {
col.Expr = newNode
} else if err != nil {
} else {
u.Error(err)
return err
}
Expand All @@ -136,10 +114,9 @@ func (m *rewrite) walkSelectList() error {
// Group By Clause: Mongo is a little weird where they move the
// group by expressions INTO the aggregation clause:
//
// operation(field) FROM x GROUP BY x,y,z
//
// db.article.aggregate([{"$group":{_id: "$author", count: {"$sum":1}}}]);
// operation(field) FROM x GROUP BY x,y,z
//
// db.article.aggregate([{"$group":{_id: "$author", count: {"$sum":1}}}]);
func (m *rewrite) walkGroupBy() error {

for _, col := range m.sel.GroupBy {
Expand Down Expand Up @@ -193,7 +170,6 @@ func (m *rewrite) selectFunc(cur expr.Node) (expr.Node, error) {
// nested bson document for mongo queries if possible.
//
// - if can't express logic we need to allow qlbridge to poly-fill
//
func (m *rewrite) walkNode(cur expr.Node) (expr.Node, error) {
//u.Debugf("WalkNode: %#v", cur)
switch curNode := cur.(type) {
Expand All @@ -206,7 +182,7 @@ func (m *rewrite) walkNode(cur expr.Node) (expr.Node, error) {
case *expr.UnaryNode:
//return m.walkUnary(curNode)
u.Warnf("not implemented: %#v", curNode)
return nil, fmt.Errorf("Not implemented urnary function: %v", curNode.String())
return nil, fmt.Errorf("not implemented urnary function: %v", curNode.String())
case *expr.FuncNode:
return m.walkFilterFunc(curNode)
case *expr.IdentityNode:
Expand All @@ -223,8 +199,7 @@ func (m *rewrite) walkNode(cur expr.Node) (expr.Node, error) {

// Tri Nodes expressions:
//
// <expression> [NOT] BETWEEN <expression> AND <expression>
//
// <expression> [NOT] BETWEEN <expression> AND <expression>
func (m *rewrite) walkFilterTri(node *expr.TriNode) (expr.Node, error) {

/*
Expand All @@ -248,22 +223,20 @@ func (m *rewrite) walkFilterTri(node *expr.TriNode) (expr.Node, error) {

// Array Nodes expressions:
//
// year IN (1990,1992) =>
//
// year IN (1990,1992) =>
func (m *rewrite) walkArrayNode(node *expr.ArrayNode) (expr.Node, error) {

return node, nil
}

// Binary Node: operations for >, >=, <, <=, =, !=, AND, OR, Like, IN
//
// x = y => db.users.find({field: {"$eq": value}})
// x != y => db.inventory.find( { qty: { $ne: 20 } } )
//
// x like "list%" => db.users.find( { user_id: /^list/ } )
// x like "%list%" => db.users.find( { user_id: /bc/ } )
// x IN [a,b,c] => db.users.find( { user_id: {"$in":[a,b,c] } } )
// x = y => db.users.find({field: {"$eq": value}})
// x != y => db.inventory.find( { qty: { $ne: 20 } } )
//
// x like "list%" => db.users.find( { user_id: /^list/ } )
// x like "%list%" => db.users.find( { user_id: /bc/ } )
// x IN [a,b,c] => db.users.find( { user_id: {"$in":[a,b,c] } } )
func (m *rewrite) walkFilterBinary(node *expr.BinaryNode) (expr.Node, error) {

// If we have to recurse deeper for AND, OR operators
Expand Down Expand Up @@ -329,9 +302,8 @@ func (m *rewrite) walkFilterBinary(node *expr.BinaryNode) (expr.Node, error) {
// Take an expression func, ensure we don't do runtime-checking (as the function)
// doesn't really exist, then map that function to a mongo operation
//
// exists(fieldname)
// regex(fieldname,value)
//
// exists(fieldname)
// regex(fieldname,value)
func (m *rewrite) walkFilterFunc(node *expr.FuncNode) (expr.Node, error) {
switch funcName := strings.ToLower(node.Name); funcName {
case "exists", "missing":
Expand All @@ -346,14 +318,15 @@ func (m *rewrite) walkFilterFunc(node *expr.FuncNode) (expr.Node, error) {
// Take an expression func, ensure we don't do runtime-checking (as the function)
// doesn't really exist, then map that function to an Mongo Aggregation/MapReduce function
//
// min, max, avg, sum, cardinality, terms
// min, max, avg, sum, cardinality, terms
//
// Single Value Aggregates:
// min, max, avg, sum, cardinality, count
//
// min, max, avg, sum, cardinality, count
//
// MultiValue aggregates:
// terms, ??
//
// terms, ??
func (m *rewrite) walkProjectionFunc(node *expr.FuncNode) (expr.Node, error) {
switch funcName := strings.ToLower(node.Name); funcName {
case "count":
Expand All @@ -363,18 +336,3 @@ func (m *rewrite) walkProjectionFunc(node *expr.FuncNode) (expr.Node, error) {
}
return node, nil
}

func eval(cur expr.Node) (value.Value, bool) {
switch curNode := cur.(type) {
case *expr.IdentityNode:
if curNode.IsBooleanIdentity() {
return value.NewBoolValue(curNode.Bool()), true
}
return value.NewStringValue(curNode.Text), true
case *expr.StringNode:
return value.NewStringValue(curNode.Text), true
default:
//u.Errorf("unrecognized T:%T %v", cur, cur)
}
return value.NilValueVal, false
}
Loading

0 comments on commit c7d8a38

Please sign in to comment.