Skip to content

Commit a4ef169

Browse files
committed
Resolve all PipelineResources first before continuing
As part of #1184 I need to call `GetSetup` on all PipelineResources early on in PipelineRun execution. Since PipelineRuns declare all their resource up front, I wanted to be able to resolve all of them at once, then call `GetSetup` on all of them. Also, as Pipelines got more complex (we added Conditions) it turned out we were retrieving the resources in a few different places. Also in #1324 @pritidesai is making it so that these Resources can be provided by spec. By resolving all of this up front at once, we can simplify the logic later on. And you can see in this commit that we are able to reduce the responsibilities of ResolvePipelineRun a bit too!
1 parent 3873c3c commit a4ef169

File tree

7 files changed

+219
-296
lines changed

7 files changed

+219
-296
lines changed

pkg/reconciler/pipelinerun/pipelinerun.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
242242
})
243243
return nil
244244
}
245-
providedResources, err := resources.GetResourcesFromBindings(p, pr)
246-
if err != nil {
245+
if err := resources.ValidateResourceBindings(p, pr); err != nil {
247246
// This Run has failed, so we need to mark it as failed and stop reconciling it
248247
pr.Status.SetCondition(&apis.Condition{
249248
Type: apis.ConditionSucceeded,
@@ -254,6 +253,18 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
254253
})
255254
return nil
256255
}
256+
providedResources, err := resources.GetResourcesFromBindings(pr, c.resourceLister.PipelineResources(pr.Namespace).Get)
257+
if err != nil {
258+
// This Run has failed, so we need to mark it as failed and stop reconciling it
259+
pr.Status.SetCondition(&apis.Condition{
260+
Type: apis.ConditionSucceeded,
261+
Status: corev1.ConditionFalse,
262+
Reason: ReasonCouldntGetResource,
263+
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
264+
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
265+
})
266+
return nil
267+
}
257268

258269
// Ensure that the parameters from the PipelineRun are overriding Pipeline parameters with the same type.
259270
// Weird substitution issues can occur if this is not validated (ApplyParameters() does not verify type).
@@ -284,7 +295,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
284295
func(name string) (v1alpha1.TaskInterface, error) {
285296
return c.clusterTaskLister.Get(name)
286297
},
287-
c.resourceLister.PipelineResources(pr.Namespace).Get,
288298
func(name string) (*v1alpha1.Condition, error) {
289299
return c.conditionLister.Conditions(pr.Namespace).Get(name)
290300
},
@@ -301,14 +311,6 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1alpha1.PipelineRun) er
301311
Message: fmt.Sprintf("Pipeline %s can't be Run; it contains Tasks that don't exist: %s",
302312
fmt.Sprintf("%s/%s", p.Namespace, p.Name), err),
303313
})
304-
case *resources.ResourceNotFoundError:
305-
pr.Status.SetCondition(&apis.Condition{
306-
Type: apis.ConditionSucceeded,
307-
Status: corev1.ConditionFalse,
308-
Reason: ReasonCouldntGetResource,
309-
Message: fmt.Sprintf("PipelineRun %s can't be Run; it tries to bind Resources that don't exist: %s",
310-
fmt.Sprintf("%s/%s", p.Namespace, pr.Name), err),
311-
})
312314
case *resources.ConditionNotFoundError:
313315
pr.Status.SetCondition(&apis.Condition{
314316
Type: apis.ConditionSucceeded,

pkg/reconciler/pipelinerun/pipelinerun_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,8 @@ func TestReconcile(t *testing.T) {
225225
)
226226

227227
// ignore IgnoreUnexported ignore both after and before steps fields
228-
if d := cmp.Diff(actual, expectedTaskRun, cmpopts.SortSlices(func(x, y v1alpha1.TaskResourceBinding) bool { return x.Name < y.Name })); d != "" {
229-
t.Errorf("expected to see TaskRun %v created. Diff %s", expectedTaskRun, d)
228+
if d := cmp.Diff(expectedTaskRun, actual, cmpopts.SortSlices(func(x, y v1alpha1.TaskResourceBinding) bool { return x.Name < y.Name })); d != "" {
229+
t.Errorf("expected to see TaskRun %v created. Diff (-want, +got): %s", expectedTaskRun, d)
230230
}
231231
// test taskrun is able to recreate correct pipeline-pvc-name
232232
if expectedTaskRun.GetPipelineRunPVCName() != "test-pipeline-run-success-pvc" {

pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go

Lines changed: 60 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (t ResolvedPipelineRunTask) IsSuccessful() bool {
9696
return false
9797
}
9898

99-
// IsFailed returns true only if the taskrun itself has failed
99+
// IsFailure returns true only if the taskrun itself has failed
100100
func (t ResolvedPipelineRunTask) IsFailure() bool {
101101
if t.TaskRun == nil {
102102
return false
@@ -169,12 +169,24 @@ func (state PipelineRunState) SuccessfulPipelineTaskNames() []string {
169169
// GetTaskRun is a function that will retrieve the TaskRun name.
170170
type GetTaskRun func(name string) (*v1alpha1.TaskRun, error)
171171

172-
// GetResourcesFromBindings will validate that all PipelineResources declared in Pipeline p are bound in PipelineRun pr
173-
// and if so, will return a map from the declared name of the PipelineResource (which is how the PipelineResource will
174-
// be referred to in the PipelineRun) to the ResourceRef.
175-
func GetResourcesFromBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) (map[string]v1alpha1.PipelineResourceRef, error) {
176-
resources := map[string]v1alpha1.PipelineResourceRef{}
172+
// GetResourcesFromBindings will retreive all Resources bound in PipelineRun pr and return a map
173+
// from the declared name of the PipelineResource (which is how the PipelineResource will
174+
// be referred to in the PipelineRun) to the PipelineResource, obtained via getResource.
175+
func GetResourcesFromBindings(pr *v1alpha1.PipelineRun, getResource resources.GetResource) (map[string]*v1alpha1.PipelineResource, error) {
176+
resources := map[string]*v1alpha1.PipelineResource{}
177177

178+
for _, resource := range pr.Spec.Resources {
179+
r, err := getResource(resource.ResourceRef.Name)
180+
if err != nil {
181+
return resources, xerrors.Errorf("Error following resource reference for %s: %w", resource.Name, err)
182+
}
183+
resources[resource.Name] = r
184+
}
185+
return resources, nil
186+
}
187+
188+
// ValidateResourceBindings validate that the PipelineResources declared in Pipeline p are bound in PipelineRun.
189+
func ValidateResourceBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) error {
178190
required := make([]string, 0, len(p.Spec.Resources))
179191
for _, resource := range p.Spec.Resources {
180192
required = append(required, resource.Name)
@@ -183,42 +195,10 @@ func GetResourcesFromBindings(p *v1alpha1.Pipeline, pr *v1alpha1.PipelineRun) (m
183195
for _, resource := range pr.Spec.Resources {
184196
provided = append(provided, resource.Name)
185197
}
186-
err := list.IsSame(required, provided)
187-
if err != nil {
188-
return resources, xerrors.Errorf("PipelineRun bound resources didn't match Pipeline: %w", err)
198+
if err := list.IsSame(required, provided); err != nil {
199+
return xerrors.Errorf("PipelineRun bound resources didn't match Pipeline: %w", err)
189200
}
190-
191-
for _, resource := range pr.Spec.Resources {
192-
resources[resource.Name] = resource.ResourceRef
193-
}
194-
return resources, nil
195-
}
196-
197-
func getPipelineRunTaskResources(pt v1alpha1.PipelineTask, providedResources map[string]v1alpha1.PipelineResourceRef) ([]v1alpha1.TaskResourceBinding, []v1alpha1.TaskResourceBinding, error) {
198-
inputs, outputs := []v1alpha1.TaskResourceBinding{}, []v1alpha1.TaskResourceBinding{}
199-
if pt.Resources != nil {
200-
for _, taskInput := range pt.Resources.Inputs {
201-
resource, ok := providedResources[taskInput.Resource]
202-
if !ok {
203-
return inputs, outputs, xerrors.Errorf("pipelineTask tried to use input resource %s not present in declared resources", taskInput.Resource)
204-
}
205-
inputs = append(inputs, v1alpha1.TaskResourceBinding{
206-
Name: taskInput.Name,
207-
ResourceRef: resource,
208-
})
209-
}
210-
for _, taskOutput := range pt.Resources.Outputs {
211-
resource, ok := providedResources[taskOutput.Resource]
212-
if !ok {
213-
return outputs, outputs, xerrors.Errorf("pipelineTask tried to use output resource %s not present in declared resources", taskOutput.Resource)
214-
}
215-
outputs = append(outputs, v1alpha1.TaskResourceBinding{
216-
Name: taskOutput.Name,
217-
ResourceRef: resource,
218-
})
219-
}
220-
}
221-
return inputs, outputs, nil
201+
return nil
222202
}
223203

224204
// TaskNotFoundError indicates that the resolution failed because a referenced Task couldn't be retrieved
@@ -231,15 +211,6 @@ func (e *TaskNotFoundError) Error() string {
231211
return fmt.Sprintf("Couldn't retrieve Task %q: %s", e.Name, e.Msg)
232212
}
233213

234-
// ResourceNotFoundError indicates that the resolution failed because a referenced PipelineResource couldn't be retrieved
235-
type ResourceNotFoundError struct {
236-
Msg string
237-
}
238-
239-
func (e *ResourceNotFoundError) Error() string {
240-
return fmt.Sprintf("Couldn't retrieve PipelineResource: %s", e.Msg)
241-
}
242-
243214
type ConditionNotFoundError struct {
244215
Name string
245216
Msg string
@@ -252,17 +223,15 @@ func (e *ConditionNotFoundError) Error() string {
252223
// ResolvePipelineRun retrieves all Tasks instances which are reference by tasks, getting
253224
// instances from getTask. If it is unable to retrieve an instance of a referenced Task, it
254225
// will return an error, otherwise it returns a list of all of the Tasks retrieved.
255-
// It will retrieve the Resources needed for the TaskRun as well using getResource and the mapping
256-
// of providedResources.
226+
// It will retrieve the Resources needed for the TaskRun using the mapping of providedResources.
257227
func ResolvePipelineRun(
258228
pipelineRun v1alpha1.PipelineRun,
259229
getTask resources.GetTask,
260230
getTaskRun resources.GetTaskRun,
261231
getClusterTask resources.GetClusterTask,
262-
getResource resources.GetResource,
263232
getCondition GetCondition,
264233
tasks []v1alpha1.PipelineTask,
265-
providedResources map[string]v1alpha1.PipelineResourceRef,
234+
providedResources map[string]*v1alpha1.PipelineResource,
266235
) (PipelineRunState, error) {
267236

268237
state := []*ResolvedPipelineRunTask{}
@@ -289,16 +258,10 @@ func ResolvePipelineRun(
289258
}
290259
}
291260

292-
// Get all the resources that this task will be using, if any
293-
inputs, outputs, err := getPipelineRunTaskResources(pt, providedResources)
294-
if err != nil {
295-
return nil, xerrors.Errorf("unexpected error which should have been caught by Pipeline webhook: %w", err)
296-
}
297-
298261
spec := t.TaskSpec()
299-
rtr, err := resources.ResolveTaskResources(&spec, t.TaskMetadata().Name, pt.TaskRef.Kind, inputs, outputs, getResource)
262+
rtr, err := ResolvePipelineTaskResources(pt, &spec, t.TaskMetadata().Name, pt.TaskRef.Kind, providedResources)
300263
if err != nil {
301-
return nil, &ResourceNotFoundError{Msg: err.Error()}
264+
return nil, xerrors.Errorf("couldn't match referenced resources with declared resources: %w", err)
302265
}
303266
rprt.ResolvedTaskResources = rtr
304267

@@ -314,7 +277,7 @@ func ResolvePipelineRun(
314277

315278
// Get all conditions that this pipelineTask will be using, if any
316279
if len(pt.Conditions) > 0 {
317-
rcc, err := resolveConditionChecks(&pt, pipelineRun.Status.TaskRuns, rprt.TaskRunName, getTaskRun, getCondition, getResource, providedResources)
280+
rcc, err := resolveConditionChecks(&pt, pipelineRun.Status.TaskRuns, rprt.TaskRunName, getTaskRun, getCondition, providedResources)
318281
if err != nil {
319282
return nil, err
320283
}
@@ -360,7 +323,6 @@ func GetPipelineConditionStatus(pr *v1alpha1.PipelineRun, state PipelineRunState
360323
// 2. Any one TaskRun has failed - >Failed. This should change with #1020 and #1023
361324
// 3. All tasks are done or are skipped (i.e. condition check failed).-> Success
362325
// 4. A Task or Condition is running right now or there are things left to run -> Running
363-
364326
if pr.IsTimedOut() {
365327
return &apis.Condition{
366328
Type: apis.ConditionSucceeded,
@@ -489,10 +451,7 @@ func ValidateFrom(state PipelineRunState) error {
489451
return nil
490452
}
491453

492-
func resolveConditionChecks(pt *v1alpha1.PipelineTask,
493-
taskRunStatus map[string]*v1alpha1.PipelineRunTaskRunStatus,
494-
taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition,
495-
getResource resources.GetResource, providedResources map[string]v1alpha1.PipelineResourceRef) ([]*ResolvedConditionCheck, error) {
454+
func resolveConditionChecks(pt *v1alpha1.PipelineTask, taskRunStatus map[string]*v1alpha1.PipelineRunTaskRunStatus, taskRunName string, getTaskRun resources.GetTaskRun, getCondition GetCondition, providedResources map[string]*v1alpha1.PipelineResource) ([]*ResolvedConditionCheck, error) {
496455
rccs := []*ResolvedConditionCheck{}
497456
for _, ptc := range pt.Conditions {
498457
cName := ptc.ConditionRef
@@ -510,47 +469,53 @@ func resolveConditionChecks(pt *v1alpha1.PipelineTask,
510469
return nil, xerrors.Errorf("error retrieving ConditionCheck %s for taskRun name %s : %w", conditionCheckName, taskRunName, err)
511470
}
512471
}
472+
conditionResources := map[string]*v1alpha1.PipelineResource{}
473+
for _, declared := range ptc.Resources {
474+
r, ok := providedResources[declared.Resource]
475+
if !ok {
476+
return nil, xerrors.Errorf("resources %s missing for condition %s in pipeline task %s", declared.Resource, cName, pt.Name)
477+
}
478+
conditionResources[declared.Name] = r
479+
}
513480

514481
rcc := ResolvedConditionCheck{
515482
Condition: c,
516483
ConditionCheckName: conditionCheckName,
517484
ConditionCheck: v1alpha1.NewConditionCheck(cctr),
518485
PipelineTaskCondition: &ptc,
519-
}
520-
521-
if len(ptc.Resources) > 0 {
522-
r, err := resolveConditionResources(ptc.Resources, getResource, providedResources)
523-
if err != nil {
524-
return nil, xerrors.Errorf("cloud not resolve resources for condition %s in pipeline task %s: %w", cName, pt.Name, err)
525-
}
526-
rcc.ResolvedResources = r
486+
ResolvedResources: conditionResources,
527487
}
528488

529489
rccs = append(rccs, &rcc)
530490
}
531491
return rccs, nil
532492
}
533493

534-
func resolveConditionResources(prc []v1alpha1.PipelineConditionResource,
535-
getResource resources.GetResource,
536-
providedResources map[string]v1alpha1.PipelineResourceRef,
537-
) (map[string]*v1alpha1.PipelineResource, error) {
538-
rr := make(map[string]*v1alpha1.PipelineResource)
539-
for _, r := range prc {
540-
// First get a ref to actual resource name from its bound name
541-
resourceRef, ok := providedResources[r.Resource]
542-
if !ok {
543-
return nil, xerrors.Errorf("resource %s not present in declared resources", r.Resource)
494+
// ResolvePipelineTaskResources matches PipelineResources referenced by pt inputs and outputs with the
495+
// providedResources and returns an instance of ResolvedTaskResources.
496+
func ResolvePipelineTaskResources(pt v1alpha1.PipelineTask, ts *v1alpha1.TaskSpec, taskName string, kind v1alpha1.TaskKind, providedResources map[string]*v1alpha1.PipelineResource) (*resources.ResolvedTaskResources, error) {
497+
rtr := resources.ResolvedTaskResources{
498+
TaskName: taskName,
499+
TaskSpec: ts,
500+
Kind: kind,
501+
Inputs: map[string]*v1alpha1.PipelineResource{},
502+
Outputs: map[string]*v1alpha1.PipelineResource{},
503+
}
504+
if pt.Resources != nil {
505+
for _, taskInput := range pt.Resources.Inputs {
506+
resource, ok := providedResources[taskInput.Resource]
507+
if !ok {
508+
return nil, xerrors.Errorf("pipelineTask tried to use input resource %s not present in declared resources", taskInput.Resource)
509+
}
510+
rtr.Inputs[taskInput.Name] = resource
544511
}
545-
546-
// Next, fetch the actual resource definition
547-
gotResource, err := getResource(resourceRef.Name)
548-
if err != nil {
549-
return nil, xerrors.Errorf("could not retrieve resource %s: %w", r.Name, err)
512+
for _, taskOutput := range pt.Resources.Outputs {
513+
resource, ok := providedResources[taskOutput.Resource]
514+
if !ok {
515+
return nil, xerrors.Errorf("pipelineTask tried to use output resource %s not present in declared resources", taskOutput.Resource)
516+
}
517+
rtr.Outputs[taskOutput.Name] = resource
550518
}
551-
552-
// Finally add it to the resolved resources map
553-
rr[r.Name] = gotResource
554519
}
555-
return rr, nil
520+
return &rtr, nil
556521
}

0 commit comments

Comments
 (0)