Skip to content

Commit

Permalink
Cleanup: two optimizations
Browse files Browse the repository at this point in the history
1. Prefer strings.Cut to strings.Split
2. Prefer map[T]struct{} to map[T]bool when empty & false are equivalent
  • Loading branch information
serprex committed Dec 5, 2023
1 parent 53235aa commit 22517e9
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 43 deletions.
7 changes: 3 additions & 4 deletions flow/cmd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ func (h *FlowRequestHandler) getPeerID(ctx context.Context, peerName string) (in
}

func schemaForTableIdentifier(tableIdentifier string, peerDBType int32) string {
tableIdentifierParts := strings.Split(tableIdentifier, ".")
if len(tableIdentifierParts) == 1 && peerDBType != int32(protos.DBType_BIGQUERY) {
tableIdentifierParts = append([]string{"public"}, tableIdentifierParts...)
if peerDBType != int32(protos.DBType_BIGQUERY) && !strings.ContainsRune(tableIdentifier, '.') {
return "public." + tableIdentifier
}
return strings.Join(tableIdentifierParts, ".")
return tableIdentifier
}

func (h *FlowRequestHandler) createCdcJobEntry(ctx context.Context,
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ and updating the other columns (not the unchanged toast columns)
7. Return the list of generated update statements.
*/
func (m *mergeStmtGenerator) generateUpdateStatements(allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ", ")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("`%s` = _peerdb_deduped.%s", colName, colName))
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ type AvroSchema struct {
func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := []AvroField{}
nullableFields := map[string]bool{}
nullableFields := make(map[string]struct{})

for _, bqField := range dstTableMetadata.Schema {
avroType, err := GetAvroType(bqField)
Expand All @@ -197,7 +197,7 @@ func DefineAvroSchema(dstTableName string,
// If a field is nullable, its Avro type should be ["null", actualType]
if !bqField.Required {
avroType = []interface{}{"null", avroType}
nullableFields[bqField.Name] = true
nullableFields[bqField.Name] = struct{}{}
}

avroFields = append(avroFields, AvroField{
Expand Down
9 changes: 5 additions & 4 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,15 +572,16 @@ func (c *PostgresConnector) generateMergeStatement(destinationTableIdentifier st
}

func (c *PostgresConnector) generateUpdateStatement(allCols []string, unchangedToastColsLists []string) string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastColsLists))

for _, cols := range unchangedToastColsLists {
unchangedColsArray := make([]string, 0)
for _, unchangedToastCol := range strings.Split(cols, ",") {
unquotedUnchangedColsArray := strings.Split(cols, ",")
unchangedColsArray := make([]string, 0, len(unquotedUnchangedColsArray))
for _, unchangedToastCol := range unquotedUnchangedColsArray {
unchangedColsArray = append(unchangedColsArray, fmt.Sprintf(`"%s"`, unchangedToastCol))
}
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols))
for _, colName := range otherCols {
tmpArray = append(tmpArray, fmt.Sprintf("%s=src.%s", colName, colName))
}
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,15 +926,12 @@ func (c *SnowflakeConnector) generateAndExecuteMergeStatement(

// parseTableName parses a table name into schema and table name.
func parseTableName(tableName string) (*tableNameComponents, error) {
parts := strings.Split(tableName, ".")
if len(parts) != 2 {
schemaIdentifier, tableIdentifier, hasDot := strings.Cut(tableName, ".")
if !hasDot || strings.ContainsRune(tableIdentifier, '.') {
return nil, fmt.Errorf("invalid table name: %s", tableName)
}

return &tableNameComponents{
schemaIdentifier: parts[0],
tableIdentifier: parts[1],
}, nil
return &tableNameComponents{schemaIdentifier, tableIdentifier}, nil
}

func (c *SnowflakeConnector) jobMetadataExists(jobName string) (bool, error) {
Expand Down Expand Up @@ -1032,12 +1029,12 @@ and updating the other columns.
func (c *SnowflakeConnector) generateUpdateStatements(
syncedAtCol string, softDeleteCol string, softDelete bool,
allCols []string, unchangedToastCols []string) []string {
updateStmts := make([]string, 0)
updateStmts := make([]string, 0, len(unchangedToastCols))

for _, cols := range unchangedToastCols {
unchangedColsArray := strings.Split(cols, ",")
otherCols := utils.ArrayMinus(allCols, unchangedColsArray)
tmpArray := make([]string, 0)
tmpArray := make([]string, 0, len(otherCols)+2)
for _, colName := range otherCols {
quotedUpperColName := fmt.Sprintf(`"%s"`, strings.ToUpper(colName))
tmpArray = append(tmpArray, fmt.Sprintf("%s = SOURCE.%s", quotedUpperColName, quotedUpperColName))
Expand Down
7 changes: 4 additions & 3 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package utils

func ArrayMinus(first []string, second []string) []string {
lookup := make(map[string]bool)
lookup := make(map[string]struct{})
// Add elements from arrayB to the lookup map
for _, element := range second {
lookup[element] = true
lookup[element] = struct{}{}
}
// Iterate over arrayA and check if the element is present in the lookup map
var result []string
for _, element := range first {
if !lookup[element] {
_, exists := lookup[element]
if !exists {
result = append(result, element)
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (p *PeerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
avroConverter := model.NewQRecordAvroConverter(
qRecord,
p.targetDWH,
&p.avroSchema.NullableFields,
p.avroSchema.NullableFields,
colNames,
)

Expand Down
11 changes: 2 additions & 9 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,11 @@ func NewS3BucketAndPrefix(s3Path string) (*S3BucketAndPrefix, error) {
stagingPath := strings.TrimPrefix(s3Path, "s3://")

// Split into bucket and prefix
splitPath := strings.SplitN(stagingPath, "/", 2)

bucket := splitPath[0]
prefix := ""
if len(splitPath) > 1 {
// Remove leading and trailing slashes from prefix
prefix = strings.Trim(splitPath[1], "/")
}
bucket, prefix, _ := strings.Cut(stagingPath, "/")

return &S3BucketAndPrefix{
Bucket: bucket,
Prefix: prefix,
Prefix: strings.Trim(prefix, "/"),
}, nil
}

Expand Down
14 changes: 7 additions & 7 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
type QRecordAvroConverter struct {
QRecord *QRecord
TargetDWH qvalue.QDWHType
NullableFields *map[string]bool
NullableFields map[string]struct{}
ColNames []string
}

func NewQRecordAvroConverter(
q *QRecord,
targetDWH qvalue.QDWHType,
nullableFields *map[string]bool,
nullableFields map[string]struct{},
colNames []string,
) *QRecordAvroConverter {
return &QRecordAvroConverter{
Expand All @@ -33,12 +33,12 @@ func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) {

for idx := range qac.QRecord.Entries {
key := qac.ColNames[idx]
nullable, ok := (*qac.NullableFields)[key]
_, nullable := qac.NullableFields[key]

avroConverter := qvalue.NewQValueAvroConverter(
&qac.QRecord.Entries[idx],
qac.TargetDWH,
nullable && ok,
nullable,
)
avroVal, err := avroConverter.ToAvroValue()
if err != nil {
Expand All @@ -64,15 +64,15 @@ type QRecordAvroSchema struct {

type QRecordAvroSchemaDefinition struct {
Schema string
NullableFields map[string]bool
NullableFields map[string]struct{}
}

func GetAvroSchemaDefinition(
dstTableName string,
qRecordSchema *QRecordSchema,
) (*QRecordAvroSchemaDefinition, error) {
avroFields := []QRecordAvroField{}
nullableFields := map[string]bool{}
nullableFields := make(map[string]struct{})

for _, qField := range qRecordSchema.Fields {
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable)
Expand All @@ -84,7 +84,7 @@ func GetAvroSchemaDefinition(

if qField.Nullable {
consolidatedType = []interface{}{"null", consolidatedType}
nullableFields[qField.Name] = true
nullableFields[qField.Name] = struct{}{}
}

avroFields = append(avroFields, QRecordAvroField{
Expand Down
6 changes: 3 additions & 3 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,13 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {
}

type ToJSONOptions struct {
UnnestColumns map[string]bool
UnnestColumns map[string]struct{}
}

func NewToJSONOptions(unnestCols []string) *ToJSONOptions {
unnestColumns := make(map[string]bool)
unnestColumns := make(map[string]struct{})
for _, col := range unnestCols {
unnestColumns[col] = true
unnestColumns[col] = struct{}{}
}
return &ToJSONOptions{
UnnestColumns: unnestColumns,
Expand Down

0 comments on commit 22517e9

Please sign in to comment.