Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup: two optimizations #755

Merged
merged 3 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
if peerDBType != int32(protos.DBType_BIGQUERY) && !strings.ContainsRune(tableIdentifier, '.') {
return "public." + tableIdentifier
}
return strings.Join(tableIdentifierParts, ".")
return tableIdentifier
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ and updating the other columns (not the unchanged toast columns)
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ", ")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
Expand All @@ -197,7 +197,7 @@ func DefineAvroSchema(dstTableName string,
// If a field is nullable, its Avro type should be ["null", actualType]
if !bqField.Required {
avroType = []interface{}{"null", avroType}
nullableFields[bqField.Name] = true
nullableFields[bqField.Name] = struct{}{}
}

avroFields = append(avroFields, AvroField{
Expand Down
9 changes: 5 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,15 +572,16 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
}

func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastColsLists))

for _, cols := range unchangedToastColsLists {
unchangedColsArray := make([]string, 0)
for _, unchangedToastCol := range strings.Split(cols, ",") {
unquotedUnchangedColsArray := strings.Split(cols, ",")
unchangedColsArray := make([]string, 0, len(unquotedUnchangedColsArray))
for _, unchangedToastCol := range unquotedUnchangedColsArray {
unchangedColsArray = append(unchangedColsArray, fmt.Sprintf(`"%s"`, unchangedToastCol))
}
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
}
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,15 +926,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(

// parseTableName parses a table name into schema and table name.
func parseTableName(tableName string) (*tableNameComponents, error) {
parts := strings.Split(tableName, ".")
if len(parts) != 2 {
schemaIdentifier, tableIdentifier, hasDot := strings.Cut(tableName, ".")
if !hasDot || strings.ContainsRune(tableIdentifier, '.') {
return nil, fmt.Errorf("invalid table name: %s", tableName)
}

return &tableNameComponents{
schemaIdentifier: parts[0],
tableIdentifier: parts[1],
}, nil
return &tableNameComponents{schemaIdentifier, tableIdentifier}, nil
}

func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) {
Expand Down Expand Up @@ -1032,12 +1029,12 @@ and updating the other columns.
func (c *SnowflakeConnector) generateUpdateStatements(
syncedAtCol string, softDeleteCol string, softDelete bool,
allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols)+2)
for _, colName := range otherCols {
quotedUpperColName := fmt.Sprintf(`"%s"`, strings.ToUpper(colName))
tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", quotedUpperColName, quotedUpperColName))
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package utils

func ArrayMinus(first []string, second []string) []string {
lookup := make(map[string]bool)
lookup := make(map[string]struct{})
// Add elements from arrayB to the lookup map
for _, element := range second {
lookup[element] = true
lookup[element] = struct{}{}
}
// Iterate over arrayA and check if the element is present in the lookup map
var result []string
for _, element := range first {
if !lookup[element] {
_, exists := lookup[element]
if !exists {
result = append(result, element)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
avroConverter := model.NewQRecordAvroConverter(
qRecord,
p.targetDWH,
&p.avroSchema.NullableFields,
p.avroSchema.NullableFields,
colNames,
)

Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,11 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) {
stagingPath := strings.TrimPrefix(s3Path, "s3://")

// Split into bucket and prefix
splitPath := strings.SplitN(stagingPath, "/", 2)

bucket := splitPath[0]
prefix := ""
if len(splitPath) > 1 {
// Remove leading and trailing slashes from prefix
prefix = strings.Trim(splitPath[1], "/")
}
bucket, prefix, _ := strings.Cut(stagingPath, "/")

return &S3BucketAndPrefix{
Bucket: bucket,
Prefix: prefix,
Prefix: strings.Trim(prefix, "/"),
}, nil
}

Expand Down
14 changes: 7 additions & 7 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
type QRecordAvroConverter struct {
QRecord *QRecord
TargetDWH qvalue.QDWHType
NullableFields *map[string]bool
NullableFields map[string]struct{}
ColNames []string
}

func NewQRecordAvroConverter(
q *QRecord,
targetDWH qvalue.QDWHType,
nullableFields *map[string]bool,
nullableFields map[string]struct{},
colNames []string,
) *QRecordAvroConverter {
return &QRecordAvroConverter{
Expand All @@ -33,12 +33,12 @@ func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) {

for idx := range qac.QRecord.Entries {
key := qac.ColNames[idx]
nullable, ok := (*qac.NullableFields)[key]
_, nullable := qac.NullableFields[key]

avroConverter := qvalue.NewQValueAvroConverter(
&qac.QRecord.Entries[idx],
qac.TargetDWH,
nullable && ok,
nullable,
)
avroVal, err := avroConverter.ToAvroValue()
if err != nil {
Expand All @@ -64,15 +64,15 @@ type QRecordAvroSchema struct {

type QRecordAvroSchemaDefinition struct {
Schema string
NullableFields map[string]bool
NullableFields map[string]struct{}
}

func GetAvroSchemaDefinition(
dstTableName string,
qRecordSchema *QRecordSchema,
) (*QRecordAvroSchemaDefinition, error) {
avroFields := []QRecordAvroField{}
nullableFields := map[string]bool{}
nullableFields := make(map[string]struct{})

for _, qField := range qRecordSchema.Fields {
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable)
Expand All @@ -84,7 +84,7 @@ func GetAvroSchemaDefinition(

if qField.Nullable {
consolidatedType = []interface{}{"null", consolidatedType}
nullableFields[qField.Name] = true
nullableFields[qField.Name] = struct{}{}
}

avroFields = append(avroFields, QRecordAvroField{
Expand Down
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {
}

type ToJSONOptions struct {
UnnestColumns map[string]bool
UnnestColumns map[string]struct{}
}

func NewToJSONOptions(unnestCols []string) *ToJSONOptions {
unnestColumns := make(map[string]bool)
unnestColumns := make(map[string]struct{})
for _, col := range unnestCols {
unnestColumns[col] = true
unnestColumns[col] = struct{}{}
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
Expand Down
Loading