-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
adding context timeouts for management queries based on reload interval #390
base: main
Are you sure you want to change the base?
Changes from all commits
9cb81ce
a8f3724
bb44de9
9c1df4f
d7f08d5
af57938
3097b91
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package gosqldriver | ||
|
||
import ( | ||
"errors" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
var ErrInvalidConn = errors.New("invalid connection") | ||
|
||
// atomicError provides thread-safe error handling | ||
type atomicError struct { | ||
value atomic.Value | ||
mu sync.Mutex | ||
} | ||
|
||
// Set sets the error value atomically. The value must not be nil. | ||
func (ae *atomicError) Set(err error) { | ||
if err == nil { | ||
panic("atomicError: nil error value") | ||
} | ||
ae.mu.Lock() | ||
defer ae.mu.Unlock() | ||
ae.value.Store(err) | ||
} | ||
|
||
// Value returns the current error value, or nil if none is set. | ||
func (ae *atomicError) Value() error { | ||
v := ae.value.Load() | ||
if v == nil { | ||
return nil | ||
} | ||
return v.(error) | ||
} | ||
|
||
type atomicBool struct { | ||
value uint32 | ||
mu sync.Mutex | ||
} | ||
|
||
// Store sets the value of the bool regardless of the previous value | ||
func (ab *atomicBool) Store(value bool) { | ||
ab.mu.Lock() | ||
defer ab.mu.Unlock() | ||
if value { | ||
atomic.StoreUint32(&ab.value, 1) | ||
} else { | ||
atomic.StoreUint32(&ab.value, 0) | ||
} | ||
} | ||
|
||
// Load returns whether the current boolean value is true | ||
func (ab *atomicBool) Load() bool { | ||
ab.mu.Lock() | ||
defer ab.mu.Unlock() | ||
return atomic.LoadUint32(&ab.value) > 0 | ||
} | ||
|
||
// Swap sets the value of the bool and returns the old value. | ||
func (ab *atomicBool) Swap(value bool) bool { | ||
ab.mu.Lock() | ||
defer ab.mu.Unlock() | ||
if value { | ||
return atomic.SwapUint32(&ab.value, 1) > 0 | ||
} | ||
return atomic.SwapUint32(&ab.value, 0) > 0 | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -100,7 +100,7 @@ func getSQL() string { | |
/* | ||
load the physical to logical maping | ||
*/ | ||
func loadMap(ctx context.Context, db *sql.DB) error { | ||
func loadMap(ctx *context.Context, db *sql.DB, queryTimeoutInterval int) error { | ||
if logger.GetLogger().V(logger.Verbose) { | ||
logger.GetLogger().Log(logger.Verbose, "Begin loading shard map") | ||
} | ||
|
@@ -109,17 +109,18 @@ func loadMap(ctx context.Context, db *sql.DB) error { | |
logger.GetLogger().Log(logger.Verbose, "Done loading shard map") | ||
}() | ||
} | ||
|
||
conn, err := db.Conn(ctx) | ||
queryContext, cancel := context.WithTimeout(*ctx, time.Duration(queryTimeoutInterval)*time.Microsecond) | ||
defer cancel() | ||
conn, err := db.Conn(queryContext) | ||
if err != nil { | ||
return fmt.Errorf("Error (conn) loading shard map: %s", err.Error()) | ||
} | ||
defer conn.Close() | ||
stmt, err := conn.PrepareContext(ctx, getSQL()) | ||
stmt, err := conn.PrepareContext(queryContext, getSQL()) | ||
if err != nil { | ||
return fmt.Errorf("Error (stmt) loading shard map: %s", err.Error()) | ||
} | ||
rows, err := stmt.QueryContext(ctx) | ||
rows, err := stmt.QueryContext(queryContext) | ||
if err != nil { | ||
return fmt.Errorf("Error (query) loading shard map: %s", err.Error()) | ||
} | ||
|
@@ -216,7 +217,7 @@ func getWLSQL() string { | |
/* | ||
load the whitelist mapping | ||
*/ | ||
func loadWhitelist(ctx context.Context, db *sql.DB) { | ||
func loadWhitelist(ctx *context.Context, db *sql.DB, timeoutInMs int) { | ||
if logger.GetLogger().V(logger.Verbose) { | ||
logger.GetLogger().Log(logger.Verbose, "Begin loading whitelist") | ||
} | ||
|
@@ -225,19 +226,20 @@ func loadWhitelist(ctx context.Context, db *sql.DB) { | |
logger.GetLogger().Log(logger.Verbose, "Done loading whitelist") | ||
}() | ||
} | ||
|
||
conn, err := db.Conn(ctx) | ||
queryContext, cancel := context.WithTimeout(*ctx, time.Duration(timeoutInMs)*time.Microsecond) | ||
defer cancel() | ||
conn, err := db.Conn(queryContext) | ||
if err != nil { | ||
logger.GetLogger().Log(logger.Alert, "Error (conn) loading whitelist:", err) | ||
return | ||
} | ||
defer conn.Close() | ||
stmt, err := conn.PrepareContext(ctx, getWLSQL()) | ||
stmt, err := conn.PrepareContext(queryContext, getWLSQL()) | ||
if err != nil { | ||
logger.GetLogger().Log(logger.Alert, "Error (stmt) loading whitelist:", err) | ||
return | ||
} | ||
rows, err := stmt.QueryContext(ctx) | ||
rows, err := stmt.QueryContext(queryContext) | ||
if err != nil { | ||
logger.GetLogger().Log(logger.Alert, "Error (query) loading whitelist:", err) | ||
return | ||
|
@@ -291,7 +293,10 @@ func InitShardingCfg() error { | |
ctx := context.Background() | ||
var db *sql.DB | ||
var err error | ||
|
||
reloadInterval := time.Second * time.Duration(GetConfig().ShardingCfgReloadInterval) | ||
if reloadInterval < 100*time.Millisecond { | ||
reloadInterval = 100 * time.Millisecond | ||
} | ||
i := 0 | ||
for ; i < 60; i++ { | ||
for shard := 0; shard < GetConfig().NumOfShards; shard++ { | ||
|
@@ -300,13 +305,13 @@ func InitShardingCfg() error { | |
} | ||
db, err = openDb(shard) | ||
if err == nil { | ||
err = loadMap(ctx, db) | ||
err = loadMap(&ctx, db, GetConfig().ManagementQueriesTimeoutInUs) | ||
if err == nil { | ||
break | ||
} | ||
} | ||
logger.GetLogger().Log(logger.Warning, "Error <", err, "> loading the shard map from shard", shard) | ||
evt := cal.NewCalEvent(cal.EventTypeError, "no_shard_map", cal.TransOK, "Error loading shard map") | ||
evt := cal.NewCalEvent(cal.EventTypeError, "no_shard_map", cal.TransOK, fmt.Sprintf("Error loading shard map %v", err)) | ||
evt.Completed() | ||
} | ||
if err == nil { | ||
|
@@ -319,32 +324,36 @@ func InitShardingCfg() error { | |
return errors.New("Failed to load shard map, no more retry") | ||
} | ||
if GetConfig().EnableWhitelistTest { | ||
loadWhitelist(ctx, db) | ||
loadWhitelist(&ctx, db, GetConfig().ManagementQueriesTimeoutInUs) | ||
} | ||
go func() { | ||
reloadTimer := time.NewTimer(reloadInterval) //Periodic reload timer | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be replaced with NewTicker as the timer fires just once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added reset and clean up options for timer |
||
defer reloadTimer.Stop() | ||
for { | ||
reloadInterval := time.Second * time.Duration(GetConfig().ShardingCfgReloadInterval) | ||
if reloadInterval < 100 * time.Millisecond { | ||
reloadInterval = 100 * time.Millisecond | ||
} | ||
time.Sleep(reloadInterval) | ||
for shard := 0; shard < GetConfig().NumOfShards; shard++ { | ||
if db != nil { | ||
db.Close() | ||
} | ||
db, err = openDb(shard) | ||
if err == nil { | ||
err = loadMap(ctx, db) | ||
select { | ||
case <-ctx.Done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us also add timeout-related tests for validation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added tests. |
||
logger.GetLogger().Log(logger.Alert, "Application main context has been closed, so exiting from shard-config data reload.") | ||
return | ||
case <-reloadTimer.C: | ||
for shard := 0; shard < GetConfig().NumOfShards; shard++ { | ||
if db != nil { | ||
db.Close() | ||
} | ||
db, err = openDb(shard) | ||
if err == nil { | ||
if shard == 0 && GetConfig().EnableWhitelistTest { | ||
loadWhitelist(ctx, db) | ||
err = loadMap(&ctx, db, GetConfig().ManagementQueriesTimeoutInUs) | ||
if err == nil { | ||
if shard == 0 && GetConfig().EnableWhitelistTest { | ||
loadWhitelist(&ctx, db, GetConfig().ManagementQueriesTimeoutInUs) | ||
} | ||
break | ||
} | ||
break | ||
} | ||
logger.GetLogger().Log(logger.Warning, "Error <", err, "> loading the shard map from shard", shard) | ||
evt := cal.NewCalEvent(cal.EventTypeError, "no_shard_map", cal.TransOK, err.Error()) | ||
evt.Completed() | ||
} | ||
logger.GetLogger().Log(logger.Warning, "Error <", err, "> loading the shard map from shard", shard) | ||
evt := cal.NewCalEvent(cal.EventTypeError, "no_shard_map", cal.TransOK, "Error loading shard map") | ||
evt.Completed() | ||
reloadTimer.Reset(reloadInterval) //Reset timer | ||
} | ||
} | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with this but why do we need to introduce the timeTicker? I feel the existing code is simpler.