Skip to content

Commit 93af0ee

Browse files
committed
support concurrency
1 parent 3e1b63f commit 93af0ee

File tree

15 files changed

+1337
-180
lines changed

15 files changed

+1337
-180
lines changed

Diff for: models/actions/run.go

+49-100
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"code.gitea.io/gitea/modules/util"
2121
webhook_module "code.gitea.io/gitea/modules/webhook"
2222

23-
"github.com/nektos/act/pkg/jobparser"
2423
"xorm.io/builder"
2524
)
2625

@@ -47,6 +46,8 @@ type ActionRun struct {
4746
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4847
Status Status `xorm:"index"`
4948
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
49+
ConcurrencyGroup string
50+
ConcurrencyCancel bool
5051
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5152
Started timeutil.TimeStamp
5253
Stopped timeutil.TimeStamp
@@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
168169
return run.ScheduleID > 0
169170
}
170171

171-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
172+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
172173
_, err := db.GetEngine(ctx).ID(repo.ID).
173174
SetExpr("num_action_runs",
174175
builder.Select("count(*)").From("action_run").
@@ -222,119 +223,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
222223
return err
223224
}
224225

225-
// Iterate over each job and attempt to cancel it.
226-
for _, job := range jobs {
227-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
228-
status := job.Status
229-
if status.IsDone() {
230-
continue
231-
}
232-
233-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
234-
if job.TaskID == 0 {
235-
job.Status = StatusCancelled
236-
job.Stopped = timeutil.TimeStampNow()
237-
238-
// Update the job's status and stopped time in the database.
239-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
240-
if err != nil {
241-
return err
242-
}
243-
244-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
245-
if n == 0 {
246-
return fmt.Errorf("job has changed, try again")
247-
}
248-
249-
// Continue with the next job.
250-
continue
251-
}
252-
253-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
254-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
255-
return err
256-
}
226+
if err := CancelJobs(ctx, jobs); err != nil {
227+
return err
257228
}
258229
}
259230

260231
// Return nil to indicate successful cancellation of all running and waiting jobs.
261232
return nil
262233
}
263234

264-
// InsertRun inserts a run
265-
// The title will be cut off at 255 characters if it's longer than 255 characters.
266-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
267-
ctx, committer, err := db.TxContext(ctx)
268-
if err != nil {
269-
return err
270-
}
271-
defer committer.Close()
272-
273-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
274-
if err != nil {
275-
return err
276-
}
277-
run.Index = index
278-
run.Title = util.EllipsisDisplayString(run.Title, 255)
235+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
236+
// Iterate over each job and attempt to cancel it.
237+
for _, job := range jobs {
238+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
239+
status := job.Status
240+
if status.IsDone() {
241+
continue
242+
}
279243

280-
if err := db.Insert(ctx, run); err != nil {
281-
return err
282-
}
244+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
245+
if job.TaskID == 0 {
246+
job.Status = StatusCancelled
247+
job.Stopped = timeutil.TimeStampNow()
283248

284-
if run.Repo == nil {
285-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
286-
if err != nil {
287-
return err
288-
}
289-
run.Repo = repo
290-
}
249+
// Update the job's status and stopped time in the database.
250+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
251+
if err != nil {
252+
return err
253+
}
291254

292-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
293-
return err
294-
}
255+
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
256+
if n == 0 {
257+
return fmt.Errorf("job has changed, try again")
258+
}
295259

296-
runJobs := make([]*ActionRunJob, 0, len(jobs))
297-
var hasWaiting bool
298-
for _, v := range jobs {
299-
id, job := v.Job()
300-
needs := job.Needs()
301-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
302-
return err
260+
// Continue with the next job.
261+
continue
303262
}
304-
payload, _ := v.Marshal()
305-
status := StatusWaiting
306-
if len(needs) > 0 || run.NeedApproval {
307-
status = StatusBlocked
308-
} else {
309-
hasWaiting = true
310-
}
311-
job.Name = util.EllipsisDisplayString(job.Name, 255)
312-
runJobs = append(runJobs, &ActionRunJob{
313-
RunID: run.ID,
314-
RepoID: run.RepoID,
315-
OwnerID: run.OwnerID,
316-
CommitSHA: run.CommitSHA,
317-
IsForkPullRequest: run.IsForkPullRequest,
318-
Name: job.Name,
319-
WorkflowPayload: payload,
320-
JobID: id,
321-
Needs: needs,
322-
RunsOn: job.RunsOn(),
323-
Status: status,
324-
})
325-
}
326-
if err := db.Insert(ctx, runJobs); err != nil {
327-
return err
328-
}
329263

330-
// if there is a job in the waiting status, increase tasks version.
331-
if hasWaiting {
332-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
264+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
265+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
333266
return err
334267
}
335268
}
336-
337-
return committer.Commit()
269+
return nil
338270
}
339271

340272
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
@@ -426,7 +358,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426358
}
427359
run.Repo = repo
428360
}
429-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
361+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
430362
return err
431363
}
432364
}
@@ -435,3 +367,20 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435367
}
436368

437369
type ActionRunIndex db.ResourceIndex
370+
371+
func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (bool, error) {
372+
if actionRun.ConcurrencyGroup == "" || actionRun.ConcurrencyCancel {
373+
return false, nil
374+
}
375+
376+
concurrentRunsNum, err := db.Count[ActionRun](ctx, &FindRunOptions{
377+
RepoID: actionRun.RepoID,
378+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
379+
Status: []Status{StatusWaiting, StatusRunning},
380+
})
381+
if err != nil {
382+
return false, fmt.Errorf("count running and waiting runs: %w", err)
383+
}
384+
385+
return concurrentRunsNum > 0, nil
386+
}

Diff for: models/actions/run_job.go

+115-4
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,17 @@ type ActionRunJob struct {
3333
RunsOn []string `xorm:"JSON TEXT"`
3434
TaskID int64 // the latest task of the job
3535
Status Status `xorm:"index"`
36-
Started timeutil.TimeStamp
37-
Stopped timeutil.TimeStamp
38-
Created timeutil.TimeStamp `xorm:"created"`
39-
Updated timeutil.TimeStamp `xorm:"updated index"`
36+
37+
RawConcurrencyGroup string // raw concurrency.group
38+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39+
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
40+
ConcurrencyGroup string // evaluated concurrency.group
41+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
42+
43+
Started timeutil.TimeStamp
44+
Stopped timeutil.TimeStamp
45+
Created timeutil.TimeStamp `xorm:"created"`
46+
Updated timeutil.TimeStamp `xorm:"updated index"`
4047
}
4148

4249
func init() {
@@ -184,3 +191,107 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
184191
return StatusUnknown // it shouldn't happen
185192
}
186193
}
194+
195+
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
196+
if job.RawConcurrencyGroup == "" {
197+
return false, nil
198+
}
199+
if !job.IsConcurrencyEvaluated {
200+
return false, ErrUnevaluatedConcurrency{
201+
Group: job.RawConcurrencyGroup,
202+
CancelInProgress: job.RawConcurrencyCancel,
203+
}
204+
}
205+
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
206+
return false, nil
207+
}
208+
209+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
210+
RepoID: job.RepoID,
211+
ConcurrencyGroup: job.ConcurrencyGroup,
212+
Statuses: []Status{StatusRunning, StatusWaiting},
213+
})
214+
if err != nil {
215+
return false, fmt.Errorf("count running and waiting jobs: %w", err)
216+
}
217+
if concurrentJobsNum > 0 {
218+
return true, nil
219+
}
220+
221+
if err := job.LoadRun(ctx); err != nil {
222+
return false, fmt.Errorf("load run: %w", err)
223+
}
224+
225+
return ShouldBlockRunByConcurrency(ctx, job.Run)
226+
}
227+
228+
func CancelPreviousJobsByConcurrency(ctx context.Context, job *ActionRunJob) error {
229+
if job.RawConcurrencyGroup != "" {
230+
if !job.IsConcurrencyEvaluated {
231+
return ErrUnevaluatedConcurrency{
232+
Group: job.RawConcurrencyGroup,
233+
CancelInProgress: job.RawConcurrencyCancel,
234+
}
235+
}
236+
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
237+
// cancel previous jobs in the same concurrency group
238+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
239+
RepoID: job.RepoID,
240+
ConcurrencyGroup: job.ConcurrencyGroup,
241+
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
242+
})
243+
if err != nil {
244+
return fmt.Errorf("find previous jobs: %w", err)
245+
}
246+
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
247+
if err := CancelJobs(ctx, previousJobs); err != nil {
248+
return fmt.Errorf("cancel previous jobs: %w", err)
249+
}
250+
}
251+
}
252+
253+
if err := job.LoadRun(ctx); err != nil {
254+
return fmt.Errorf("load run: %w", err)
255+
}
256+
if job.Run.ConcurrencyGroup != "" && job.Run.ConcurrencyCancel {
257+
// cancel previous runs in the same concurrency group
258+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
259+
RepoID: job.RepoID,
260+
ConcurrencyGroup: job.Run.ConcurrencyGroup,
261+
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
262+
})
263+
if err != nil {
264+
return fmt.Errorf("find runs: %w", err)
265+
}
266+
for _, run := range runs {
267+
if run.ID == job.Run.ID {
268+
continue
269+
}
270+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
271+
RunID: run.ID,
272+
})
273+
if err != nil {
274+
return fmt.Errorf("find run %d jobs: %w", run.ID, err)
275+
}
276+
if err := CancelJobs(ctx, jobs); err != nil {
277+
return fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
278+
}
279+
}
280+
}
281+
282+
return nil
283+
}
284+
285+
type ErrUnevaluatedConcurrency struct {
286+
Group string
287+
CancelInProgress string
288+
}
289+
290+
func IsErrUnevaluatedConcurrency(err error) bool {
291+
_, ok := err.(ErrUnevaluatedConcurrency)
292+
return ok
293+
}
294+
295+
func (err ErrUnevaluatedConcurrency) Error() string {
296+
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] has not been evaluated", err.Group, err.CancelInProgress)
297+
}

Diff for: models/actions/run_job_list.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
4646
return jobs.LoadRuns(ctx, withRepo)
4747
}
4848

49+
func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
50+
runList := make(RunList, 0, len(runIDs))
51+
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
52+
return runList, err
53+
}
54+
4955
type FindRunJobOptions struct {
5056
db.ListOptions
51-
RunID int64
52-
RepoID int64
53-
OwnerID int64
54-
CommitSHA string
55-
Statuses []Status
56-
UpdatedBefore timeutil.TimeStamp
57+
RunID int64
58+
RepoID int64
59+
OwnerID int64
60+
CommitSHA string
61+
Statuses []Status
62+
UpdatedBefore timeutil.TimeStamp
63+
ConcurrencyGroup string
5764
}
5865

5966
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
7683
if opts.UpdatedBefore > 0 {
7784
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
7885
}
86+
if opts.ConcurrencyGroup != "" {
87+
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
88+
}
7989
return cond
8090
}

0 commit comments

Comments
 (0)