Skip to content

Commit

Permalink
feat: added schema check
Browse files Browse the repository at this point in the history
  • Loading branch information
solsticemj25 committed Dec 4, 2024
1 parent e588377 commit 1f1380b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 28 deletions.
17 changes: 16 additions & 1 deletion plugins/extractors/maxcompute/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Client struct {
client *odps.Odps
project *odps.Project
tunnel *tunnel.Tunnel

// isSchemaEnabled bool
}

func New(conf config.Config) (*Client, error) {
Expand All @@ -28,20 +30,33 @@ func New(conf config.Config) (*Client, error) {
return nil, err
}

// properties, err := project.GetAllProperties()

Check failure on line 33 in plugins/extractors/maxcompute/client/client.go

View workflow job for this annotation

GitHub Actions / golangci

commentedOutCode: may want to remove commented-out code (gocritic)
// if err != nil {
// return nil, err
// }
// isSchemaEnabled := properties.Get("odps.namespace.schema") == "true"

return &Client{
client: client,
project: project,
tunnel: tunnelInstance,
// isSchemaEnabled: isSchemaEnabled,
}, nil
}

func (c *Client) ListSchema(context.Context) (schemas []*odps.Schema, err error) {
func (c *Client) ListSchema(_ context.Context) (schemas []*odps.Schema, err error) {
err = c.project.Schemas().List(func(schema *odps.Schema, err2 error) {
if err2 != nil {
err = err2
return
}
// if c.isSchemaEnabled {
// if schema.Name() == "default" {
// schemas = append(schemas, schema)
// }
// } else {
schemas = append(schemas, schema)
// }
})

return schemas, err
Expand Down
21 changes: 12 additions & 9 deletions plugins/extractors/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
_ "embed" // used to print the embedded assets
"fmt"
"math/rand"
"time"

"github.com/aliyun/aliyun-odps-go-sdk/odps"
Expand All @@ -27,7 +28,6 @@ import (

const (
maxcomputeService = "maxcompute"
typeTable = "table"
)

type Extractor struct {
Expand Down Expand Up @@ -82,10 +82,11 @@ type Client interface {
GetTablePreview(ctx context.Context, partitionValue string, table *odps.Table, maxRows int) ([]string, *structpb.ListValue, error)
}

func New(logger log.Logger, clientFunc NewClientFunc) *Extractor {
func New(logger log.Logger, clientFunc NewClientFunc, randFn randFn) *Extractor {
e := &Extractor{
logger: logger,
newClient: clientFunc,
randFn: randFn,
}
e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config)
e.ScopeNotRequired = true
Expand Down Expand Up @@ -172,7 +173,6 @@ func (e *Extractor) processTable(ctx context.Context, schema *odps.Schema, table
return err
}

fmt.Println("********************" + tableType + "********************")
asset, err := e.buildAsset(ctx, schema, table, tableType, tableSchema)
if err != nil {
e.logger.Error("failed to build asset", "table", table.Name(), "error", err)
Expand Down Expand Up @@ -205,7 +205,7 @@ func (e *Extractor) buildAsset(ctx context.Context, schema *odps.Schema,
asset := &v1beta2.Asset{
Urn: tableURN,
Name: tableSchema.TableName,
Type: typeTable,
Type: "table",
Description: tableSchema.Comment,
CreateTime: timestamppb.New(time.Time(tableSchema.CreateTime)),
UpdateTime: timestamppb.New(time.Time(tableSchema.LastModifiedTime)),
Expand Down Expand Up @@ -387,10 +387,6 @@ func (e *Extractor) mixValuesIfNeeded(rows []interface{}, rndSeed int64) ([]inte
numRows := len(table)
numColumns := len(table[0])

if e.randFn == nil {
return nil, fmt.Errorf("randFn is not initialized")
}

rndGen := e.randFn(rndSeed)
for col := 0; col < numColumns; col++ {
for row := 0; row < numRows; row++ {
Expand Down Expand Up @@ -428,12 +424,19 @@ func contains(slice []string, item string) bool {

func init() {
if err := registry.Extractors.Register(maxcomputeService, func() plugins.Extractor {
return New(plugins.GetLog(), CreateClient)
return New(plugins.GetLog(), CreateClient, seededRandom)
}); err != nil {
panic(err)
}
}

func seededRandom(seed int64) func(max int64) int64 {
rnd := rand.New(rand.NewSource(seed)) //nolint:gosec
return func(max int64) int64 {
return rnd.Int63n(max)
}
}

func CreateClient(_ context.Context, _ log.Logger, conf config.Config) (Client, error) {
return client.New(conf)
}
16 changes: 8 additions & 8 deletions plugins/extractors/maxcompute/maxcompute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestInit(t *testing.T) {

mockClient := mocks.NewMaxComputeClient(t)
t.Run("should return error if config is invalid", func(t *testing.T) {
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, plugins.Config{
Expand All @@ -48,7 +48,7 @@ func TestInit(t *testing.T) {
})

t.Run("should return no error", func(t *testing.T) {
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := extr.Init(ctx, plugins.Config{
Expand Down Expand Up @@ -153,12 +153,12 @@ func TestExtract(t *testing.T) {
"new_table": &newTableSchema,
}

runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient)) ([]*v1beta2.Asset, error) {
runTest := func(t *testing.T, cfg plugins.Config, mockSetup func(mockClient *mocks.MaxComputeClient), randomizer func(seed int64) func(int64) int64) ([]*v1beta2.Asset, error) {
mockClient := mocks.NewMaxComputeClient(t)
if mockSetup != nil {
mockSetup(mockClient)
}
extr := maxcompute.New(utils.Logger, createClient(mockClient))
extr := maxcompute.New(utils.Logger, createClient(mockClient), randomizer)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := extr.Init(ctx, cfg)
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestExtract(t *testing.T) {
},
nil,
)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestExtract(t *testing.T) {
},
nil,
)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand All @@ -291,7 +291,7 @@ func TestExtract(t *testing.T) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(schema1, nil)
mockClient.EXPECT().ListTable(mock.Anything, "my_schema").Return(table1[1:], nil)
mockClient.EXPECT().GetTableSchema(mock.Anything, table1[1]).Return("MANAGED_TABLE", schemaMapping[table1[1].Name()], nil)
})
}, nil)

assert.Nil(t, err)
assert.NotEmpty(t, actual)
Expand All @@ -311,7 +311,7 @@ func TestExtract(t *testing.T) {
},
}, func(mockClient *mocks.MaxComputeClient) {
mockClient.EXPECT().ListSchema(mock.Anything).Return(nil, fmt.Errorf("ListSchema fails"))
})
}, nil)
assert.ErrorContains(t, err, "ListSchema fails")
assert.Nil(t, actual)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -37,7 +37,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table",
"name": "dummy_table",
"service": "maxcompute",
"type": "VIRTUAL_VIEW",
"type": "table",
"url": "",
"description": "dummy table description",
"data": {
Expand Down Expand Up @@ -58,7 +58,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/dummy_table",
"schema": "my_schema",
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table"
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table",
"type": "VIRTUAL_VIEW"
},
"create_time": "2024-11-14T06:41:35Z",
"update_time": "2024-11-14T06:41:35Z"
Expand All @@ -84,7 +85,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -130,7 +131,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down
10 changes: 6 additions & 4 deletions plugins/extractors/maxcompute/testdata/expected-assets.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.dummy_table",
"name": "dummy_table",
"service": "maxcompute",
"type": "VIRTUAL_VIEW",
"type": "table",
"url": "",
"description": "dummy table description",
"data": {
Expand Down Expand Up @@ -58,7 +58,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/dummy_table",
"schema": "my_schema",
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table"
"sql": "SELECT id, name, user_info\nFROM test-project-id.default.my_dummy_table",
"type": "VIRTUAL_VIEW"
},
"create_time": "2024-11-14T06:41:35Z",
"update_time": "2024-11-14T06:41:35Z"
Expand All @@ -74,7 +75,7 @@
"urn": "urn:maxcompute:test-project-id:table:test-project-id.my_schema.new_table",
"name": "new_table",
"service": "maxcompute",
"type": "MANAGED_TABLE",
"type": "table",
"url": "",
"description": "",
"data": {
Expand Down Expand Up @@ -120,7 +121,8 @@
"project_name": "test-project-id",
"resource_url": "/projects/test-project-id/tables/new_table",
"schema": "my_schema",
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table"
"sql": "SELECT user_id, email FROM test-project-id.my_schema.new_table",
"type": "MANAGED_TABLE"
},
"create_time": "2024-11-18T08:00:00Z",
"update_time": "2024-11-18T08:00:00Z"
Expand Down

0 comments on commit 1f1380b

Please sign in to comment.