Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(coordinator): assign static prover first and avoid reassigning failed task to same prover #1584

Merged
merged 33 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
791fcaf
feat(coordinator): assign static prover first and avoid reassigning f…
yiweichi Dec 30, 2024
71acdb3
fix: lint
yiweichi Dec 30, 2024
8ce5121
fix: GetUnassignedBatchCount
yiweichi Dec 30, 2024
3e0589e
Merge branch 'develop' into feat-coordinator-assign-logic
yiweichi Dec 30, 2024
df92616
chore: remove extra files
yiweichi Dec 30, 2024
a75075d
fix: err log
yiweichi Dec 30, 2024
1c5d88d
fix: orm GetTaskOfProver
yiweichi Dec 30, 2024
e4c0779
fix: comments
yiweichi Dec 30, 2024
0d1c303
chore: auto version bump [bot]
yiweichi Jan 6, 2025
25796df
fix: comments
yiweichi Jan 8, 2025
e72523d
Merge branch 'feat-coordinator-assign-logic' of https://github.com/sc…
yiweichi Jan 8, 2025
e51e95d
chore: auto version bump [bot]
yiweichi Jan 8, 2025
e0aa5be
fix: lint
yiweichi Jan 8, 2025
c2075c8
Merge branch 'develop' into feat-coordinator-assign-logic
yiweichi Jan 8, 2025
77d3a75
Update coordinator/internal/logic/provertask/bundle_prover_task.go
yiweichi Jan 9, 2025
5ee2388
Update coordinator/internal/logic/provertask/batch_prover_task.go
yiweichi Jan 9, 2025
816d5ce
Update coordinator/internal/logic/provertask/batch_prover_task.go
yiweichi Jan 9, 2025
8f0ab50
Update coordinator/internal/logic/provertask/bundle_prover_task.go
yiweichi Jan 9, 2025
d8ad10f
Update coordinator/internal/logic/provertask/chunk_prover_task.go
yiweichi Jan 9, 2025
64e88ce
Update coordinator/internal/orm/prover_task.go
yiweichi Jan 9, 2025
dd3c9e1
Update coordinator/internal/logic/provertask/chunk_prover_task.go
yiweichi Jan 9, 2025
b11f010
feat: handle could prover name when assign task
yiweichi Jan 12, 2025
5ad00e7
feat: handle could prover name when assign task
yiweichi Jan 12, 2025
37deef1
fix: test
yiweichi Jan 12, 2025
078ac88
add jwt IdentityHandler
yiweichi Jan 12, 2025
27ae174
fix: test
yiweichi Jan 12, 2025
37ac215
fix: test
yiweichi Jan 13, 2025
cc57700
fix: typos
yiweichi Jan 13, 2025
f725c2e
fix: comments
yiweichi Jan 13, 2025
aae19ab
fix: lint
yiweichi Jan 13, 2025
35f22b4
fix: login verify
yiweichi Jan 13, 2025
1f05c6e
Apply suggestions from code review
yiweichi Jan 14, 2025
1e63677
add comments for IsExternalProverNameMatch
yiweichi Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: handle could prover name when assign task
  • Loading branch information
yiweichi committed Jan 12, 2025
commit b11f010827f39b4a350e96dc6f2071126451f300
9 changes: 5 additions & 4 deletions coordinator/internal/controller/api/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,11 @@ func (a *AuthController) PayloadFunc(data interface{}) jwt.MapClaims {
}

return jwt.MapClaims{
types.HardForkName: v.HardForkName,
types.PublicKey: v.PublicKey,
types.ProverName: v.Message.ProverName,
types.ProverVersion: v.Message.ProverVersion,
types.HardForkName: v.HardForkName,
types.PublicKey: v.PublicKey,
types.ProverName: v.Message.ProverName,
types.ProverVersion: v.Message.ProverVersion,
types.ProverProviderTypeKey: v.Message.ProverProviderType,
}
}

Expand Down
6 changes: 6 additions & 0 deletions coordinator/internal/logic/auth/login.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ func (l *LoginLogic) Check(login *types.LoginParameter) error {
}
}
}

if login.Message.ProverProviderType != types.ProverProviderTypeInternal && login.Message.ProverProviderType != types.ProverProviderTypeExternal {
log.Error("invalid prover_provider_type", "value", login.Message.ProverProviderType, "prover name", login.Message.ProverName, "prover_version", login.Message.ProverVersion)
return errors.New("invalid prover provider type.")
}

return nil
}

Expand Down
17 changes: 10 additions & 7 deletions coordinator/internal/logic/provertask/batch_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -23,6 +22,7 @@ import (
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
cutils "scroll-tech/coordinator/internal/utils"
)

// BatchProverTask is prover task implement for batch proof
Expand Down Expand Up @@ -64,7 +64,7 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
unassignedBatchCount, getCountError := bp.batchOrm.GetUnassignedBatchCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
Expand Down Expand Up @@ -102,14 +102,17 @@ func (bp *BatchProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBatch, tmpBatchTask.Hash, 2)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
log.Error("failed to get prover tasks", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBatchTask.Hash, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
for i := 0; i < len(proverTasks); i++ {
if proverTasks[i].ProverName == taskCtx.ProverName ||
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
log.Debug("get empty batch, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}
}

rowsAffected, updateAttemptsErr := bp.batchOrm.UpdateBatchAttempts(ctx.Copy(), tmpBatchTask.Index, tmpBatchTask.ActiveAttempts, tmpBatchTask.TotalAttempts)
Expand Down
17 changes: 10 additions & 7 deletions coordinator/internal/logic/provertask/bundle_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -22,6 +21,7 @@ import (
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
cutils "scroll-tech/coordinator/internal/utils"
)

// BundleProverTask is prover task implement for bundle proof
Expand Down Expand Up @@ -64,7 +64,7 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat

maxActiveAttempts := bp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := bp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
unassignedBundleCount, getCountError := bp.bundleOrm.GetUnassignedBundleCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts)
if getCountError != nil {
log.Error("failed to get unassigned batch proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
Expand Down Expand Up @@ -102,14 +102,17 @@ func (bp *BundleProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinat
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := bp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeBatch, tmpBundleTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
proverTasks, getTaskError := bp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeBundle, tmpBundleTask.Hash, 2)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeBatch.String(), "taskID", tmpBundleTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
log.Error("failed to get prover tasks", "proof_type", message.ProofTypeBundle.String(), "taskID", tmpBundleTask.Hash, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
for i := 0; i < len(proverTasks); i++ {
if proverTasks[i].ProverName == taskCtx.ProverName ||
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
log.Debug("get empty bundle, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}
}

rowsAffected, updateAttemptsErr := bp.bundleOrm.UpdateBundleAttempts(ctx.Copy(), tmpBundleTask.Hash, tmpBundleTask.ActiveAttempts, tmpBundleTask.TotalAttempts)
Expand Down
18 changes: 11 additions & 7 deletions coordinator/internal/logic/provertask/chunk_prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/gin-gonic/gin"
Expand All @@ -22,6 +21,7 @@ import (
"scroll-tech/coordinator/internal/config"
"scroll-tech/coordinator/internal/orm"
coordinatorType "scroll-tech/coordinator/internal/types"
cutils "scroll-tech/coordinator/internal/utils"
)

// ChunkProverTask the chunk prover task
Expand Down Expand Up @@ -62,7 +62,7 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato

maxActiveAttempts := cp.cfg.ProverManager.ProversPerSession
maxTotalAttempts := cp.cfg.ProverManager.SessionAttempts
if strings.HasPrefix(taskCtx.ProverName, ExternalProverNamePrefix) {
if taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) {
unassignedChunkCount, getCountError := cp.chunkOrm.GetUnassignedChunkCount(ctx.Copy(), maxActiveAttempts, maxTotalAttempts, getTaskParameter.ProverHeight)
if getCountError != nil {
log.Error("failed to get unassigned chunk proving tasks count", "height", getTaskParameter.ProverHeight, "err", err)
Expand Down Expand Up @@ -100,15 +100,19 @@ func (cp *ChunkProverTask) Assign(ctx *gin.Context, getTaskParameter *coordinato
}

// Don't dispatch the same failing job to the same prover
proverTask, getTaskError := cp.proverTaskOrm.GetTaskOfProver(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, taskCtx.PublicKey, taskCtx.ProverVersion)
proverTasks, getTaskError := cp.proverTaskOrm.GetFailedProverTasksByHash(ctx.Copy(), message.ProofTypeChunk, tmpChunkTask.Hash, 2)
if getTaskError != nil {
log.Error("failed to get prover task of prover", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "key", taskCtx.PublicKey, "Prover_version", taskCtx.ProverVersion, "error", getTaskError)
log.Error("failed to get prover tasks", "proof_type", message.ProofTypeChunk.String(), "taskID", tmpChunkTask.Hash, "error", getTaskError)
return nil, ErrCoordinatorInternalFailure
}
if proverTask != nil && types.ProverProveStatus(proverTask.ProvingStatus) == types.ProverProofInvalid {
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
for i := 0; i < len(proverTasks); i++ {
if proverTasks[i].ProverName == taskCtx.ProverName ||
taskCtx.ProverProviderType == uint8(coordinatorType.ProverProviderTypeExternal) && cutils.IsExternalProverNameMatch(proverTasks[i].ProverName, taskCtx.ProverName) {
log.Debug("get empty chunk, the prover already failed this task", "height", getTaskParameter.ProverHeight)
return nil, nil
}
}

rowsAffected, updateAttemptsErr := cp.chunkOrm.UpdateChunkAttempts(ctx.Copy(), tmpChunkTask.Index, tmpChunkTask.ActiveAttempts, tmpChunkTask.TotalAttempts)
if updateAttemptsErr != nil {
log.Error("failed to update chunk attempts", "height", getTaskParameter.ProverHeight, "err", updateAttemptsErr)
Expand Down
15 changes: 11 additions & 4 deletions coordinator/internal/logic/provertask/prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type BaseProverTask struct {
}

type proverTaskContext struct {
PublicKey string
ProverName string
ProverVersion string
HardForkNames map[string]struct{}
PublicKey string
ProverName string
ProverVersion string
ProverProviderType uint8
HardForkNames map[string]struct{}
}

// checkParameter check the prover task parameter illegal
Expand All @@ -81,6 +82,12 @@ func (b *BaseProverTask) checkParameter(ctx *gin.Context) (*proverTaskContext, e
}
ptc.ProverVersion = proverVersion.(string)

ProverProviderType, ProverProviderTypeExist := ctx.Get(coordinatorType.ProverProviderTypeKey)
if !ProverProviderTypeExist {
return nil, errors.New("get prover provider type from context failed")
}
ptc.ProverProviderType = ProverProviderType.(uint8)

hardForkNamesStr, hardForkNameExist := ctx.Get(coordinatorType.HardForkName)
if !hardForkNameExist {
return nil, errors.New("get hard fork name from context failed")
Expand Down
39 changes: 21 additions & 18 deletions coordinator/internal/orm/prover_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,27 @@ func (o *ProverTask) GetProverTasksByHashes(ctx context.Context, taskType messag
return proverTasks, nil
}

// GetFailedProverTasksByHash retrieves the failed ProverTask records associated with the specified hash.
// The returned prover task objects are sorted in descending order by their ids.
func (o *ProverTask) GetFailedProverTasksByHash(ctx context.Context, taskType message.ProofType, hash string, limit int) ([]*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id ?", hash)
db = db.Where("proving_status = ?", int(types.ProverProofInvalid))
db = db.Order("id desc")

if limit != 0 {
db = db.Limit(limit)
}

var proverTasks []*ProverTask
if err := db.Find(&proverTasks).Error; err != nil {
return nil, fmt.Errorf("ProverTask.GetFailedProverTasksByHash error: %w, hash: %v", err, hash)
}
return proverTasks, nil
}

// GetProverTaskByUUIDAndPublicKey get prover task taskID by uuid and public key
func (o *ProverTask) GetProverTaskByUUIDAndPublicKey(ctx context.Context, uuid, publicKey string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
Expand Down Expand Up @@ -148,24 +169,6 @@ func (o *ProverTask) GetAssignedTaskOfOtherProvers(ctx context.Context, taskType
return proverTasks, nil
}

// GetTaskOfOtherProvers get the chunk/batch task of prover
func (o *ProverTask) GetTaskOfProver(ctx context.Context, taskType message.ProofType, taskID, proverPublicKey, proverVersion string) (*ProverTask, error) {
db := o.db.WithContext(ctx)
db = db.Model(&ProverTask{})
db = db.Where("task_type", int(taskType))
db = db.Where("task_id", taskID)
db = db.Where("prover_public_key", proverPublicKey)
db = db.Where("prover_version", proverVersion)
db = db.Limit(1)

var proverTask ProverTask
err := db.Find(&proverTask).Error
if err != nil {
return nil, fmt.Errorf("ProverTask.GetTaskOfProver error: %w, taskID: %v, publicKey:%s", err, taskID, proverPublicKey)
}
return &proverTask, nil
}

// GetProvingStatusByTaskID retrieves the proving status of a prover task
func (o *ProverTask) GetProvingStatusByTaskID(ctx context.Context, taskType message.ProofType, taskID string) (types.ProverProveStatus, error) {
db := o.db.WithContext(ctx)
Expand Down
13 changes: 8 additions & 5 deletions coordinator/internal/types/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const (
ProverName = "prover_name"
// ProverVersion the prover version for context
ProverVersion = "prover_version"
// ProverProviderTypeKey the prover provider type for context
ProverProviderTypeKey = "prover_provider_type"
// HardForkName the hard fork name for context
HardForkName = "hard_fork_name"
)
Expand All @@ -30,11 +32,12 @@ type LoginSchema struct {

// Message the login message struct
type Message struct {
Challenge string `form:"challenge" json:"challenge" binding:"required"`
ProverVersion string `form:"prover_version" json:"prover_version" binding:"required"`
ProverName string `form:"prover_name" json:"prover_name" binding:"required"`
ProverTypes []ProverType `form:"prover_types" json:"prover_types"`
VKs []string `form:"vks" json:"vks"`
Challenge string `form:"challenge" json:"challenge" binding:"required"`
ProverVersion string `form:"prover_version" json:"prover_version" binding:"required"`
ProverName string `form:"prover_name" json:"prover_name" binding:"required"`
ProverProviderType ProverProviderType `form:"prover_provider_type" json:"prover_provider_type" binding:"required"`
ProverTypes []ProverType `form:"prover_types" json:"prover_types"`
VKs []string `form:"vks" json:"vks"`
}

// LoginParameterWithHardForkName constructs new payload for login
Expand Down
23 changes: 23 additions & 0 deletions coordinator/internal/types/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,26 @@ func MakeProverType(proofType message.ProofType) ProverType {
return ProverTypeUndefined
}
}

// ProverProviderType represents the type of prover provider.
type ProverProviderType uint8

func (r ProverProviderType) String() string {
switch r {
case ProverProviderTypeInternal:
return "prover provider type internal"
case ProverProviderTypeExternal:
return "prover provider type external"
default:
return fmt.Sprintf("prover provider type type: %d", r)
}
}

const (
// ProverProviderTypeUndefined is an unknown prover provider type
ProverProviderTypeUndefined ProverProviderType = iota
// ProverProviderTypeInternal is an internal prover provider type
ProverProviderTypeInternal
// ProverProviderTypeExternal is an external prover provider type
ProverProviderTypeExternal
)
15 changes: 15 additions & 0 deletions coordinator/internal/utils/prover_name.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package utils

import "strings"

func IsExternalProverNameMatch(localName, remoteName string) bool {
local := strings.Split(localName, "_")
remote := strings.Split(remoteName, "_")

if len(local) < 3 || len(remote) < 3 {
return false
}

// note the name of cloud prover is in fact in the format of "cloud_prover_{provider-name}_index"
return local[0] == remote[0] && local[1] == remote[1] && local[2] == remote[2]
}