Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid fan out matrix task failed due to result ref #8487

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

l-qing
Copy link
Contributor

@l-qing l-qing commented Jan 14, 2025

fix #8324
Continuing development based on this PR: #8327

Changes

Submitter Checklist

As the author of this PR, please check off the items in this checklist:

  • Has Docs if any changes are user facing, including updates to minimum requirements e.g. Kubernetes version bumps
  • Has Tests included if any functionality added or changed
  • pre-commit Passed
  • Follows the commit message standard
  • Meets the Tekton contributor standards (including functionality, content, code)
  • Has a kind label. You can add one by adding a comment on this PR that contains /kind <type>. Valid types are bug, cleanup, design, documentation, feature, flake, misc, question, tep
  • Release notes block below has been updated with any user facing changes (API changes, bug fixes, changes requiring upgrade notices or deprecation warnings). See some examples of good release notes.
  • Release notes contains the string "action required" if the change requires additional action from users switching to the new release

Release Notes

fix: avoid fan out matrix task failed due to result ref

/kind bug

1. Reproducible steps

$ kubectl version

Client Version: v1.32.0
Kustomize Version: v5.5.0
Server Version: v1.28.8

$ tkn version

Client version: 0.39.0
Pipeline version: v0.66.0
cat <<'EOF' | kubectl replace -f -
apiVersion: tekton.dev/v1
kind: Task
metadata:
  name: array-emitter
spec:
  results:
  - name: array
    type: array
  steps:
    - name: echo
      image: mirror.gcr.io/alpine
      script: |
        echo -n "[\"linux\",\"max\",\"windows\"]" > $(results.array.path)

---
apiVersion: tekton.dev/v1
kind: Task
metadata:
  name: platform-browsers
spec:
  params:
    - name: platform
  results:
  - name: str
    type: string
  steps:
    - name: echo
      image: mirror.gcr.io/alpine
      script: |
        echo -n "$(params.platform)" | tee $(results.str.path)

---
apiVersion: tekton.dev/v1
kind: Task
metadata:
  name: printer
spec:
  params:
    - name: platform
      default: "default-platform"
    - name: platforms
      default: []
  steps:
    - name: echo
      image: mirror.gcr.io/alpine
      args:
        - "$(params.platforms)"
      script: |
        if [ -z "$(params.platform)" ]; then
          echo "platform: $(params.platform)"
        fi
        if [ $# -gt 0 ]; then
          echo "platforms: $@"
        fi

---
apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
  name: matrixed-pr
spec:
  taskRunTemplate:
    serviceAccountName: "default"
  pipelineSpec:
    tasks:

    - name: array-emitter
      taskRef:
        name: array-emitter

    - name: platforms
      params:
        - name: test
          value: test
      matrix:
        params:
          - name: platform
            value: $(tasks.array-emitter.results.array[*])
      taskRef:
        name: platform-browsers

    - name: printer-matrix
      taskRef:
        name: printer
      matrix:
        params:
          - name: platform
            value: $(tasks.platforms.results.str[*])

    - name: printer-all-platforms
      taskRef:
        name: printer
      params:
        - name: platforms
          value: $(tasks.platforms.results.str[*])
EOF

2. Error message

invalid result reference in pipeline task "printer-matrix": unable to validate result referencing pipeline task "platforms": task spec not found

apiVersion: tekton.dev/v1
kind: PipelineRun
metadata:
  creationTimestamp: "2025-01-14T07:40:10Z"
  generation: 1
  name: matrixed-pr
  namespace: default
  resourceVersion: "26861"
  uid: feba93ac-759d-4817-8512-700eb8eef059
spec:
  pipelineSpec:
    tasks:
    - name: array-emitter
      taskRef:
        kind: Task
        name: array-emitter
    - matrix:
        params:
        - name: platform
          value: $(tasks.array-emitter.results.array[*])
      name: platforms
      params:
      - name: test
        value: test
      taskRef:
        kind: Task
        name: platform-browsers
    - matrix:
        params:
        - name: platform
          value: $(tasks.platforms.results.str[*])
      name: printer-matrix
      taskRef:
        kind: Task
        name: printer
    - name: printer-all-platforms
      params:
      - name: platforms
        value: $(tasks.platforms.results.str[*])
      taskRef:
        kind: Task
        name: printer
  taskRunTemplate:
    serviceAccountName: default
  timeouts:
    pipeline: 1h0m0s
status:
  completionTime: "2025-01-14T07:40:12Z"
  conditions:
  - lastTransitionTime: "2025-01-14T07:40:12Z"
    message: 'invalid result reference in pipeline task "printer-matrix": unable to
      validate result referencing pipeline task "platforms": task spec not found'
    reason: InvalidTaskResultReference
    status: "False"
    type: Succeeded
  pipelineSpec:
    tasks:
    - name: array-emitter
      taskRef:
        kind: Task
        name: array-emitter
    - matrix:
        params:
        - name: platform
          value: $(tasks.array-emitter.results.array[*])
      name: platforms
      params:
      - name: test
        value: test
      taskRef:
        kind: Task
        name: platform-browsers
    - matrix:
        params:
        - name: platform
          value: $(tasks.platforms.results.str[*])
      name: printer-matrix
      taskRef:
        kind: Task
        name: printer
    - name: printer-all-platforms
      params:
      - name: platforms
        value: $(tasks.platforms.results.str[*])
      taskRef:
        kind: Task
        name: printer
  provenance:
    featureFlags:
      AwaitSidecarReadiness: true
      Coschedule: workspaces
      DisableAffinityAssistant: false
      DisableCredsInit: false
      DisableInlineSpec: ""
      EnableAPIFields: beta
      EnableArtifacts: false
      EnableCELInWhenExpression: false
      EnableConciseResolverSyntax: false
      EnableKeepPodOnCancel: false
      EnableKubernetesSidecar: false
      EnableParamEnum: false
      EnableProvenanceInStatus: true
      EnableStepActions: false
      EnforceNonfalsifiability: none
      MaxResultSize: 4096
      RequireGitSSHSecretKnownHosts: false
      ResultExtractionMethod: termination-message
      RunningInEnvWithInjectedSidecars: true
      SendCloudEventsForRuns: false
      SetSecurityContext: false
      VerificationNoMatchPolicy: ignore
  startTime: "2025-01-14T07:40:12Z"

3. Analysis

a. validateResultRef: unable to validate result referencing pipeline task

if ptMap[ref.PipelineTask].ResolvedTask == nil || ptMap[ref.PipelineTask].ResolvedTask.TaskSpec == nil {

if ptMap[ref.PipelineTask].ResolvedTask == nil || ptMap[ref.PipelineTask].ResolvedTask.TaskSpec == nil {
return fmt.Errorf("unable to validate result referencing pipeline task %q: task spec not found", ref.PipelineTask)
}

b. ValidatePipelineTaskResults: invalid result reference in pipeline task

func ValidatePipelineTaskResults(state PipelineRunState) error {
ptMap := state.ToMap()
for _, rpt := range state {
for _, ref := range v1.PipelineTaskResultRefs(rpt.PipelineTask) {
if err := validateResultRef(ref, ptMap); err != nil {
return pipelineErrors.WrapUserError(fmt.Errorf("invalid result reference in pipeline task %q: %w", rpt.PipelineTask.Name, err))

c. PipelineRun-Reconcile: call ValidatePipelineTaskResults

if pipelineRunFacts.State.IsBeforeFirstTaskRun() {
if err := resources.ValidatePipelineTaskResults(pipelineRunFacts.State); err != nil {
logger.Errorf("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(v1.PipelineRunReasonInvalidTaskResultReference.String(), err.Error())
return controller.NewPermanentError(err)
}

d. pipelineRunFacts.State: come from resolvePipelineState

// Second iteration
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}
// Build PipelineRunFacts with a list of resolved pipeline tasks,
// dag tasks graph and final tasks graph
pipelineRunFacts := &resources.PipelineRunFacts{
State: pipelineRunState,

e. resolvePipelineState: resolvedTask - ResolvePipelineTask

resolvedTask, err := resources.ResolvePipelineTask(ctx,
*pr,
fn,
func(name string) (*v1.TaskRun, error) {
return c.taskRunLister.TaskRuns(pr.Namespace).Get(name)
},
getCustomRunFunc,
task,
pst,
)

f. ResolvePipelineTask: call CountCombinations

numCombinations := 1
// We want to resolve all of the result references and ignore any errors at this point since there could be
// instances where result references are missing here, but will be later skipped and resolved in
// skipBecauseResultReferencesAreMissing. The final validation is handled in CheckMissingResultReferences.
resolvedResultRefs, _, _ := ResolveResultRefs(pst, PipelineRunState{&rpt})
if err := validateArrayResultsIndex(resolvedResultRefs); err != nil {
return nil, err
}
ApplyTaskResults(PipelineRunState{&rpt}, resolvedResultRefs)
if rpt.PipelineTask.IsMatrixed() {
numCombinations = rpt.PipelineTask.Matrix.CountCombinations()
}
if rpt.IsCustomTask() {
rpt.CustomRunNames = getNamesOfCustomRuns(pipelineRun.Status.ChildReferences, pipelineTask.Name, pipelineRun.Name, numCombinations)
for _, runName := range rpt.CustomRunNames {
run, err := getRun(runName)
if err != nil && !kerrors.IsNotFound(err) {
return nil, fmt.Errorf("error retrieving CustomRun %s: %w", runName, err)
}
if run != nil {
rpt.CustomRuns = append(rpt.CustomRuns, run)
}
}
} else {
rpt.TaskRunNames = GetNamesOfTaskRuns(pipelineRun.Status.ChildReferences, pipelineTask.Name, pipelineRun.Name, numCombinations)
for _, taskRunName := range rpt.TaskRunNames {
if err := rpt.setTaskRunsAndResolvedTask(ctx, taskRunName, getTask, getTaskRun, pipelineTask); err != nil {

h1. CountCombinations: Calculate the number of TaskRuns

// CountCombinations returns the count of Combinations of Parameters generated from the Matrix in PipelineTask.
func (m *Matrix) CountCombinations() int {
// Iterate over Matrix Parameters and compute count of all generated Combinations
count := m.countGeneratedCombinationsFromParams()
// Add any additional Combinations generated from Matrix Include Parameters
count += m.countNewCombinationsFromInclude()
return count
}
// countGeneratedCombinationsFromParams returns the count of Combinations of Parameters generated from the Matrix
// Parameters
func (m *Matrix) countGeneratedCombinationsFromParams() int {
if !m.HasParams() {
return 0
}
count := 1
for _, param := range m.Params {
count *= len(param.Value.ArrayVal)
}
return count
}

h2. Error: The calculated count is 0.

func (m *Matrix) countGeneratedCombinationsFromParams() int {
if !m.HasParams() {
return 0
}
count := 1
for _, param := range m.Params {
count *= len(param.Value.ArrayVal)

Because the param.Value.StringVale is $(tasks.platforms.results.str[*]) and param.Value.ArrayVal is empty.

j1. ResolvePipelineTask: call GetNamesOfTaskRuns

rpt.TaskRunNames = GetNamesOfTaskRuns(pipelineRun.Status.ChildReferences, pipelineTask.Name, pipelineRun.Name, numCombinations)
for _, taskRunName := range rpt.TaskRunNames {
if err := rpt.setTaskRunsAndResolvedTask(ctx, taskRunName, getTask, getTaskRun, pipelineTask); err != nil {

j2. GetNamesOfTaskRuns: call getNewRunNames the numberOfRuns is 0, the result taskRunNames is empty

func getNewRunNames(ptName, prName string, numberOfRuns int) []string {
var taskRunNames []string
// If it is a singular TaskRun/CustomRun, we only append the ptName
if numberOfRuns == 1 {
taskRunName := kmeta.ChildName(prName, "-"+ptName)
return append(taskRunNames, taskRunName)
}
// For a matrix we append i to then end of the fanned out TaskRuns "matrixed-pr-taskrun-0"
for i := range numberOfRuns {
taskRunName := kmeta.ChildName(prName, fmt.Sprintf("-%s-%d", ptName, i))
// check if the taskRun name ends with a matrix instance count
if !strings.HasSuffix(taskRunName, fmt.Sprintf("-%d", i)) {
taskRunName = kmeta.ChildName(prName, "-"+ptName)
// kmeta.ChildName limits the size of a name to max of 63 characters based on k8s guidelines
// truncate the name such that "-<matrix-id>" can be appended to the taskRun name
longest := 63 - len(fmt.Sprintf("-%d", numberOfRuns))
taskRunName = taskRunName[0:longest]
taskRunName = fmt.Sprintf("%s-%d", taskRunName, i)
}
taskRunNames = append(taskRunNames, taskRunName)
}
return taskRunNames
}

k. ResolvePipelineTask: setTaskRunsAndResolvedTask has not been called.

rpt.TaskRunNames = GetNamesOfTaskRuns(pipelineRun.Status.ChildReferences, pipelineTask.Name, pipelineRun.Name, numCombinations)
for _, taskRunName := range rpt.TaskRunNames {
if err := rpt.setTaskRunsAndResolvedTask(ctx, taskRunName, getTask, getTaskRun, pipelineTask); err != nil {
return nil, err
}

l. setTaskRunsAndResolvedTask: ResolvedTask has not been set.

func (t *ResolvedPipelineTask) setTaskRunsAndResolvedTask(
ctx context.Context,
taskRunName string,
getTask resources.GetTask,
getTaskRun resources.GetTaskRun,
pipelineTask v1.PipelineTask,
) error {
taskRun, err := getTaskRun(taskRunName)
if err != nil {
if !kerrors.IsNotFound(err) {
return fmt.Errorf("error retrieving TaskRun %s: %w", taskRunName, err)
}
}
if taskRun != nil {
t.TaskRuns = append(t.TaskRuns, taskRun)
}
rt, err := resolveTask(ctx, taskRun, getTask, pipelineTask)
if err != nil {
return err
}
t.ResolvedTask = rt

m. So it led to the error seen at the top.

@tekton-robot tekton-robot added kind/bug Categorizes issue or PR as related to a bug. release-note Denotes a PR that will be considered when it comes time to generate release notes. labels Jan 14, 2025
@tekton-robot
Copy link
Collaborator

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
To complete the pull request process, please assign yongxuanzhang after the PR has been reviewed.
You can assign the PR to them by writing /assign @yongxuanzhang in a comment when ready.

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@tekton-robot tekton-robot added the needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. label Jan 14, 2025
@tekton-robot
Copy link
Collaborator

Hi @l-qing. Thanks for your PR.

I'm waiting for a tektoncd member to verify that this patch is reasonable to test. If it is, they should reply with /ok-to-test on its own line. Until that is done, I will not automatically test new commits in this PR, but the usual testing commands by org members will still work. Regular contributors should join the org to skip this step.

Once the patch is verified, the new status will be reflected by the ok-to-test label.

I understand the commands that are listed here.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@tekton-robot tekton-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jan 14, 2025
@@ -782,7 +782,7 @@ func TestPipelineTask_CountCombinations(t *testing.T) {
}{{
name: "combinations count is zero",
matrix: &v1.Matrix{
Params: v1.Params{{}}},
Params: v1.Params{}},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is different between having no param and having a param with an empty value.

Params: v1.Params{{
Name: "GOARCH", Value: v1.ParamValue{ArrayVal: []string{"linux/amd64", "linux/ppc64le", "linux/s390x"}},
}, {
Name: "version", Value: v1.ParamValue{StringVal: "$(tasks.platforms.results.str[*])"}},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem to be solved this time involves referencing an array parameter within a string value.

@@ -8942,8 +8942,14 @@ spec:
script: |
echo "$(params.platform)"
- name: b-task
taskRef:
name: mytask
taskSpec:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid returning early due to the PipelineRunReasonCouldntGetTask error.

@@ -3801,10 +3801,10 @@ func TestResolvePipelineRunTask_WithMatrix(t *testing.T) {
name: "task with matrix - whole array results",
pt: pts[2],
want: &ResolvedPipelineTask{
TaskRunNames: nil,
TaskRunNames: []string{"pipelinerun-pipelinetask-with-whole-array-results"},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current behavior will return a definition of TaskSpec, facilitating subsequent verification.

matrix:
params:
- name: platform
value: $(tasks.matrix-include.results.str[*])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a usage that is supported this time.

@l-qing l-qing force-pushed the fix/matrix-result-ref branch from d4c465d to def2c31 Compare January 15, 2025 03:02
@l-qing
Copy link
Contributor Author

l-qing commented Jan 15, 2025

It seems that this usage might bypass the limitation of DefaultMaxMatrixCombinationsCount.

func (m *Matrix) validateCombinationsCount(ctx context.Context) (errs *apis.FieldError) {
matrixCombinationsCount := m.CountCombinations()
maxMatrixCombinationsCount := config.FromContextOrDefaults(ctx).Defaults.DefaultMaxMatrixCombinationsCount
if matrixCombinationsCount > maxMatrixCombinationsCount {
errs = errs.Also(apis.ErrOutOfBoundsValue(matrixCombinationsCount, 0, maxMatrixCombinationsCount, "matrix"))
}
return errs
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Categorizes issue or PR as related to a bug. needs-ok-to-test Indicates a PR that requires an org member to verify it is safe to test. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
Status: Todo
Development

Successfully merging this pull request may close these issues.

InvalidTaskResultsReference when dynamically generating subsequent tasks in a pipeline using matrix
3 participants