mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-29 07:34:26 +00:00
## Problem
When a workflow run is **cancelled**, the runner diverged from GitHub Actions:
- Main-stage `if: ${{ always() }}` / `if: ${{ cancelled() }}` steps **did not run** at all (unlike a *failed* run, where they do).
- `if: ${{ cancelled() }}` was **structurally impossible** to satisfy — it could never be true.
GitHub Actions runs `always()`/`cancelled()` steps (and post cleanup) even when a job is cancelled. This runner only honored that for action *post* steps (since #1016), leaving main-stage cleanup steps silently skipped.
## Root causes (both in `act/`)
1. **`getJobContext()`** derived the job status purely from step conclusions, so it could only ever return `"success"` or `"failure"`. Since `cancelled()` checks `Job.Status == "cancelled"`, it was impossible — and `success()` stayed *true* on a cancelled run, so the wrong `if` branch was taken everywhere.
2. **The main step pipeline** is chained with `Executor.Then()`, which short-circuits the moment `ctx.Err() != nil`. Once the server cancelled, every not-yet-started main step (including `always()` ones) was abandoned.
## Fix
- Add a per-`RunContext` `jobCancelled` flag + `markCancelled()`. `getJobContext()` now reports `"cancelled"` (taking precedence over success/failure), so `cancelled()`/`always()` are true and `success()`/`failure()` are false — matching GitHub's "only always()/cancelled() run on cancel" semantics.
- Replace the plain main-steps pipeline with `newMainStepsExecutor`. On interruption (`context.Canceled` from a server cancel, or `context.DeadlineExceeded` from the job timeout) it marks the job cancelled and runs the **remaining** steps under a fresh context (`context.WithoutCancel` + bounded timeout) so `always()`/`cancelled()` steps run for cleanup, while default-`success()` steps skip themselves. The original interrupt error is still propagated upward.
- Backstop `markCancelled()` in the post-step `Finally` so cancellations landing outside the main loop still surface the cancelled status to post steps.
Pre-steps keep normal short-circuit behavior, and reporting (`RESULT_CANCELLED`) is untouched — that remains handled by #1016.
## Reporting semantics (unchanged by this PR)
| Run state | failing post/`always()` step reported as |
| --------- | --------------------------------------------------------------------------------------------- |
| Normal | **FAILURE** |
| Timeout | **FAILURE** (deadline path preserves the job-error container) |
| Cancelled | **CANCELLED** — cancellation wins; the failing step is logged but doesn't flip the conclusion |
The new `always()` path runs under `context.WithoutCancel`, so the job-error container is preserved — a failing `always()` step records its failure at step level and does not panic in `SetJobError`.
Fixes #657
Reviewed-on: https://gitea.com/gitea/runner/pulls/1043
Reviewed-by: Zettat123 <39446+zettat123@noreply.gitea.com>
595 lines
21 KiB
Go
595 lines
21 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// Copyright 2022 The nektos/act Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package runner
|
|
|
|
import (
|
|
"archive/tar"
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"path"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
"unicode"
|
|
|
|
"gitea.com/gitea/runner/act/common"
|
|
"gitea.com/gitea/runner/act/container"
|
|
"gitea.com/gitea/runner/act/exprparser"
|
|
"gitea.com/gitea/runner/act/model"
|
|
)
|
|
|
|
const maxJobSummaryBytes = 1024 * 1024
|
|
|
|
// jobSummaryTruncationMarker is appended to a summary that exceeded the size limit
|
|
// so the rendered output makes the truncation visible instead of silently cutting off.
|
|
const jobSummaryTruncationMarker = "\n\n---\n\n*Job summary truncated: it exceeded the maximum allowed size.*\n"
|
|
|
|
var (
|
|
jobSummaryUploadRetryDelay = time.Second
|
|
// jobSummaryUploadRequestTimeout bounds a single step upload request. It is kept
|
|
// below jobSummaryUploadPhaseTimeout so one slow or unreachable request times out
|
|
// and lets the remaining steps still upload within the phase budget, instead of a
|
|
// single stuck request consuming the whole phase.
|
|
jobSummaryUploadRequestTimeout = 5 * time.Second
|
|
// jobSummaryUploadPhaseTimeout bounds the total time spent uploading all step
|
|
// summaries. The uploads run inside the job cleanup budget that is also used to
|
|
// stop and remove the container, so a slow or unreachable endpoint must not be
|
|
// allowed to consume it; this keeps the remaining budget available for teardown.
|
|
jobSummaryUploadPhaseTimeout = 15 * time.Second
|
|
)
|
|
|
|
type jobInfo interface {
|
|
matrix() map[string]any
|
|
steps() []*model.Step
|
|
startContainer() common.Executor
|
|
stopContainer() common.Executor
|
|
closeContainer() common.Executor
|
|
interpolateOutputs() common.Executor
|
|
result(result string)
|
|
}
|
|
|
|
// reportStepError emits the GitHub Actions ##[error] annotation and records
|
|
// the error against the job so the job is reported as failed.
|
|
func reportStepError(ctx context.Context, rc *RunContext, err error) {
|
|
common.Logger(ctx).Errorf("##[error]%v", err)
|
|
common.SetJobError(ctx, err)
|
|
rc.markFailed()
|
|
}
|
|
|
|
func newJobExecutor(info jobInfo, sf stepFactory, rc *RunContext) common.Executor {
|
|
steps := make([]common.Executor, 0)
|
|
preSteps := make([]common.Executor, 0)
|
|
var postExecutor common.Executor
|
|
|
|
steps = append(steps, func(ctx context.Context) error {
|
|
logger := common.Logger(ctx)
|
|
if len(info.matrix()) > 0 {
|
|
logger.Infof("Matrix: %v", info.matrix())
|
|
}
|
|
return nil
|
|
})
|
|
|
|
infoSteps := info.steps()
|
|
|
|
if len(infoSteps) == 0 {
|
|
return common.NewDebugExecutor("No steps found")
|
|
}
|
|
|
|
preSteps = append(preSteps, func(ctx context.Context) error {
|
|
// Have to be skipped for some Tests
|
|
if rc.Run == nil {
|
|
return nil
|
|
}
|
|
rc.ExprEval = rc.NewExpressionEvaluator(ctx)
|
|
// evaluate environment variables since they can contain
|
|
// GitHub's special environment variables.
|
|
for k, v := range rc.GetEnv() {
|
|
rc.Env[k] = rc.ExprEval.Interpolate(ctx, v)
|
|
}
|
|
return nil
|
|
})
|
|
|
|
for i, stepModel := range infoSteps {
|
|
if stepModel == nil {
|
|
return func(ctx context.Context) error {
|
|
return fmt.Errorf("invalid Step %v: missing run or uses key", i)
|
|
}
|
|
}
|
|
if stepModel.ID == "" {
|
|
stepModel.ID = strconv.Itoa(i)
|
|
}
|
|
stepModel.Number = i
|
|
|
|
step, err := sf.newStep(stepModel, rc)
|
|
if err != nil {
|
|
return common.NewErrorExecutor(err)
|
|
}
|
|
|
|
stepIdx := stepModel.Number
|
|
preExec := step.pre()
|
|
preSteps = append(preSteps, useStepLogger(rc, stepModel, stepStagePre, func(ctx context.Context) error {
|
|
rc.CurrentStepIndex = stepIdx
|
|
preErr := preExec(ctx)
|
|
if preErr != nil {
|
|
reportStepError(ctx, rc, preErr)
|
|
} else if ctx.Err() != nil {
|
|
reportStepError(ctx, rc, ctx.Err())
|
|
}
|
|
return preErr
|
|
}))
|
|
|
|
stepExec := step.main()
|
|
steps = append(steps, useStepLogger(rc, stepModel, stepStageMain, func(ctx context.Context) error {
|
|
rc.CurrentStepIndex = stepIdx
|
|
err := stepExec(ctx)
|
|
if err != nil {
|
|
reportStepError(ctx, rc, err)
|
|
} else if ctx.Err() != nil {
|
|
reportStepError(ctx, rc, ctx.Err())
|
|
}
|
|
return nil
|
|
}))
|
|
|
|
postFn := step.post()
|
|
postExec := useStepLogger(rc, stepModel, stepStagePost, func(ctx context.Context) error {
|
|
rc.CurrentStepIndex = stepIdx
|
|
err := postFn(ctx)
|
|
if err != nil {
|
|
reportStepError(ctx, rc, err)
|
|
} else if ctx.Err() != nil {
|
|
reportStepError(ctx, rc, ctx.Err())
|
|
}
|
|
return err
|
|
})
|
|
if postExecutor != nil {
|
|
// run the post executor in reverse order
|
|
postExecutor = postExec.Finally(postExecutor)
|
|
} else {
|
|
postExecutor = postExec
|
|
}
|
|
}
|
|
|
|
postExecutor = postExecutor.Finally(func(ctx context.Context) error {
|
|
jobError := common.JobError(ctx)
|
|
var err error
|
|
// jobError == nil keeps a failed job's container alive for post-mortem debugging when
|
|
// AutoRemove is off (the act-CLI --rm behavior; the shipped runner always sets
|
|
// AutoRemove). A cancelled run is not a failure to inspect, and the cancel-path post
|
|
// context now carries its own error container so a failing post step makes jobError
|
|
// non-nil — OR in rc.jobCancelled so cancellation still always tears the container down.
|
|
if rc.Config.AutoRemove || jobError == nil || rc.jobCancelled {
|
|
// always allow 1 min for stopping and removing the runner, even if we were cancelled
|
|
ctx, cancel := context.WithTimeout(common.WithLogger(context.Background(), common.Logger(ctx)), time.Minute)
|
|
defer cancel()
|
|
|
|
logger := common.Logger(ctx)
|
|
tryUploadJobSummary(ctx, rc)
|
|
// For Gitea
|
|
// We don't need to call `stopServiceContainers` here since it will be called by following `info.stopContainer`
|
|
// logger.Infof("Cleaning up services for job %s", rc.JobName)
|
|
// if err := rc.stopServiceContainers()(ctx); err != nil {
|
|
// logger.Errorf("Error while cleaning services: %v", err)
|
|
// }
|
|
|
|
logger.Infof("Cleaning up container for job %s", rc.JobName)
|
|
if err = info.stopContainer()(ctx); err != nil {
|
|
logger.Errorf("Error while stop job container: %v", err)
|
|
}
|
|
|
|
// For Gitea
|
|
// We don't need to call `NewDockerNetworkRemoveExecutor` here since it is called by above `info.stopContainer`
|
|
// if !rc.IsHostEnv(ctx) && rc.Config.ContainerNetworkMode == "" {
|
|
// // clean network in docker mode only
|
|
// // if the value of `ContainerNetworkMode` is empty string,
|
|
// // it means that the network to which containers are connecting is created by `runner`,
|
|
// // so, we should remove the network at last.
|
|
// networkName, _ := rc.networkName()
|
|
// logger.Infof("Cleaning up network for job %s, and network name is: %s", rc.JobName, networkName)
|
|
// if err := container.NewDockerNetworkRemoveExecutor(networkName)(ctx); err != nil {
|
|
// logger.Errorf("Error while cleaning network: %v", err)
|
|
// }
|
|
// }
|
|
}
|
|
setJobResult(ctx, info, rc, jobError == nil)
|
|
setJobOutputs(ctx, rc)
|
|
|
|
return err
|
|
})
|
|
|
|
stepsExecutor := newStepsExecutor(rc, preSteps, steps)
|
|
|
|
return common.NewPipelineExecutor(info.startContainer(), stepsExecutor.
|
|
Finally(func(ctx context.Context) error {
|
|
// Record an interrupt (backstop for interrupts that land outside the main
|
|
// step loop) so the post steps observe the cancelled/failed job status.
|
|
rc.markInterrupted(ctx.Err())
|
|
postCtx, cancel := postStepsContext(ctx)
|
|
defer cancel()
|
|
return postExecutor(postCtx)
|
|
}).
|
|
Finally(info.interpolateOutputs()).
|
|
Finally(info.closeContainer()))
|
|
}
|
|
|
|
// postStepsContext derives the context used to run the job's post/cleanup steps from the
|
|
// finished main-pipeline context. Cleanup has to run even when the run was interrupted, so the
|
|
// returned context always carries a fresh bounded deadline and is never itself cancelled.
|
|
//
|
|
// - context.Canceled (server cancel): detach from the cancelled context via a fresh root so
|
|
// the post steps can run.
|
|
// - context.DeadlineExceeded (job timeout): detach the deadline with WithoutCancel, which
|
|
// keeps the original values — including the job-error container — so the timeout failure and
|
|
// any post-step error are preserved and the job is still reported as failed.
|
|
// - otherwise: run on the live context unchanged.
|
|
func postStepsContext(ctx context.Context) (context.Context, context.CancelFunc) {
|
|
switch ctx.Err() {
|
|
case context.Canceled:
|
|
// The cancelled context is abandoned for a fresh root, which drops the job-error
|
|
// container installed at the job root. Re-attach a fresh one so a failing post step
|
|
// records its error via SetJobError instead of panicking on a nil container.
|
|
return context.WithTimeout(common.WithJobErrorContainer(common.WithLogger(context.Background(), common.Logger(ctx))), 5*time.Minute)
|
|
case context.DeadlineExceeded:
|
|
return context.WithTimeout(context.WithoutCancel(ctx), 5*time.Minute)
|
|
default:
|
|
return ctx, func() {}
|
|
}
|
|
}
|
|
|
|
// newStepsExecutor sequences the job's pre steps and main steps.
|
|
//
|
|
// The pre steps run as a normal pipeline that short-circuits on the first failure or
|
|
// cancellation. The main-steps executor then runs unconditionally — even if a pre step failed
|
|
// or the job was interrupted — so always()/cancelled()/failure() main steps still run, mirroring
|
|
// GitHub Actions. This is safe because each main step re-evaluates its own `if` (a pre-step
|
|
// failure flips the expression job status to failure, so success()-default steps skip) and
|
|
// newMainStepsExecutor detaches from an interrupted context before running the remaining steps.
|
|
//
|
|
// A pre-step failure or interrupt is still propagated so the job is reported with the correct
|
|
// conclusion; the pre error takes precedence since it happened first.
|
|
func newStepsExecutor(rc *RunContext, preSteps, steps []common.Executor) common.Executor {
|
|
preExecutor := common.NewPipelineExecutor(preSteps...)
|
|
mainExecutor := newMainStepsExecutor(rc, steps)
|
|
return func(ctx context.Context) error {
|
|
preErr := preExecutor(ctx)
|
|
mainErr := mainExecutor(ctx)
|
|
if preErr != nil {
|
|
return preErr
|
|
}
|
|
return mainErr
|
|
}
|
|
}
|
|
|
|
// newMainStepsExecutor runs the job's main-stage step executors in order. Unlike a plain
|
|
// pipeline, an interruption (context.Canceled from a server cancel, or context.DeadlineExceeded
|
|
// from the job timeout) does not abandon the remaining steps: it marks the job cancelled when
|
|
// appropriate and keeps iterating under a fresh, bounded context so steps whose `if` still
|
|
// evaluates true — always() and cancelled() — run for cleanup, mirroring GitHub Actions. Steps
|
|
// that default to success() skip themselves because success() is false once the job is no longer
|
|
// successful. The main-step wrappers report their own errors and return nil, so the loop drives
|
|
// step ordering off the context, not return values.
|
|
func newMainStepsExecutor(rc *RunContext, steps []common.Executor) common.Executor {
|
|
return func(ctx context.Context) error {
|
|
for i, step := range steps {
|
|
if ctx.Err() != nil {
|
|
return runMainStepsAfterInterrupt(ctx, rc, steps[i:])
|
|
}
|
|
_ = step(ctx)
|
|
}
|
|
// An interrupt can land during the final step, after the loop's last context
|
|
// check; record it so the post steps still observe the cancelled/failed status.
|
|
rc.markInterrupted(ctx.Err())
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// runMainStepsAfterInterrupt runs the remaining main steps after the job context was cancelled or
|
|
// timed out. It detaches from the interrupted context (keeping its values: logger and job error)
|
|
// and applies a fresh deadline so always()/cancelled() steps run to completion. The original
|
|
// interrupt error is returned so callers up the chain still see the job as cancelled/timed out.
|
|
func runMainStepsAfterInterrupt(ctx context.Context, rc *RunContext, steps []common.Executor) error {
|
|
interruptErr := ctx.Err()
|
|
rc.markInterrupted(interruptErr)
|
|
freshCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), 5*time.Minute)
|
|
defer cancel()
|
|
for _, step := range steps {
|
|
_ = step(freshCtx)
|
|
}
|
|
return interruptErr
|
|
}
|
|
|
|
func setJobResult(ctx context.Context, info jobInfo, rc *RunContext, success bool) {
|
|
logger := common.Logger(ctx)
|
|
|
|
// Matrix combinations share one *model.Job and run in parallel; serialize the
|
|
// read-modify-write of the job result so a failing combination is not lost-updated by a
|
|
// concurrent succeeding one.
|
|
job := rc.Run.Job()
|
|
var continueOnError bool
|
|
if !success {
|
|
// Use a fresh context so an expired job timeout cannot block expression evaluation.
|
|
evalCtx := common.WithLogger(context.Background(), common.Logger(ctx))
|
|
continueOnError = evaluateJobContinueOnError(evalCtx, rc, job)
|
|
}
|
|
jobResult := func() string {
|
|
defer lockJob(job)()
|
|
result := "success"
|
|
// we have only one result for a whole matrix build, so we need
|
|
// to keep an existing result state if we run a matrix
|
|
if len(info.matrix()) > 0 && job.Result != "" {
|
|
result = job.Result
|
|
}
|
|
if !success {
|
|
result = "failure"
|
|
job.SetContinueOnError(continueOnError)
|
|
}
|
|
info.result(result)
|
|
return result
|
|
}()
|
|
|
|
if rc.caller != nil {
|
|
// set reusable workflow job result
|
|
rc.caller.setReusedWorkflowJobResult(rc.JobName, jobResult) // For Gitea
|
|
return
|
|
}
|
|
|
|
jobResultMessage := "succeeded"
|
|
if jobResult != "success" {
|
|
jobResultMessage = "failed"
|
|
}
|
|
|
|
logger.WithField("jobResult", jobResult).Infof("Job %s", jobResultMessage)
|
|
}
|
|
|
|
func setJobOutputs(ctx context.Context, rc *RunContext) {
|
|
if rc.caller != nil {
|
|
// map outputs for reusable workflows
|
|
callerOutputs := make(map[string]string)
|
|
|
|
ee := rc.NewExpressionEvaluator(ctx)
|
|
|
|
for k, v := range rc.Run.Workflow.WorkflowCallConfig().Outputs {
|
|
callerOutputs[k] = ee.Interpolate(ctx, ee.Interpolate(ctx, v.Value))
|
|
}
|
|
|
|
// Matrix combinations of a reusable-workflow caller share the caller's *model.Job;
|
|
// serialize the write so parallel combos don't race on its Outputs field.
|
|
callerJob := rc.caller.runContext.Run.Job()
|
|
defer lockJob(callerJob)()
|
|
callerJob.Outputs = callerOutputs
|
|
}
|
|
}
|
|
|
|
// applyJobTimeout applies the job-level timeout-minutes to ctx, mirroring the
|
|
// step-level evaluateStepTimeout in step.go.
|
|
func applyJobTimeout(ctx context.Context, rc *RunContext, job *model.Job) (context.Context, context.CancelFunc) {
|
|
timeout := rc.ExprEval.Interpolate(ctx, job.TimeoutMinutes)
|
|
if timeout != "" {
|
|
if timeoutMinutes, err := strconv.ParseInt(timeout, 10, 64); err == nil {
|
|
return context.WithTimeout(ctx, time.Duration(timeoutMinutes)*time.Minute)
|
|
}
|
|
}
|
|
return ctx, func() {}
|
|
}
|
|
|
|
// evaluateJobContinueOnError evaluates the job-level continue-on-error expression.
|
|
func evaluateJobContinueOnError(ctx context.Context, rc *RunContext, job *model.Job) bool {
|
|
expr := strings.TrimSpace(job.RawContinueOnError)
|
|
if expr == "" {
|
|
return false
|
|
}
|
|
continueOnError, err := EvalBool(ctx, rc.NewExpressionEvaluator(ctx), expr, exprparser.DefaultStatusCheckNone)
|
|
if err != nil {
|
|
common.Logger(ctx).Warnf("continue-on-error expression %q evaluation failed: %v", expr, err)
|
|
return false
|
|
}
|
|
return continueOnError
|
|
}
|
|
|
|
func tryUploadJobSummary(ctx context.Context, rc *RunContext) {
|
|
if rc == nil || rc.JobContainer == nil || rc.Config == nil {
|
|
return
|
|
}
|
|
// Bound the whole upload phase so a slow or unreachable endpoint cannot consume
|
|
// the job cleanup budget reserved for stopping and removing the container.
|
|
ctx, cancel := context.WithTimeout(ctx, jobSummaryUploadPhaseTimeout)
|
|
defer cancel()
|
|
env := rc.GetEnv()
|
|
caps := strings.TrimSpace(env["GITEA_ACTIONS_CAPABILITIES"])
|
|
if !hasJobSummaryCapability(caps) {
|
|
// Server did not advertise support. Do not attempt upload.
|
|
return
|
|
}
|
|
runtimeURL := strings.TrimSpace(env["ACTIONS_RUNTIME_URL"])
|
|
runtimeToken := strings.TrimSpace(env["ACTIONS_RUNTIME_TOKEN"])
|
|
runID := strings.TrimSpace(env["GITEA_RUN_ID"])
|
|
if runtimeURL == "" || runtimeToken == "" || runID == "" {
|
|
return
|
|
}
|
|
if rc.Run == nil || rc.Run.Job() == nil {
|
|
return
|
|
}
|
|
// The numeric ActionRunJob ID is not exposed in the proto Task message or task context,
|
|
// but the server signs it into the ACTIONS_RUNTIME_TOKEN JWT claims. We decode the
|
|
// unverified claims to retrieve it; the server re-verifies the token on the request.
|
|
jobID := extractJobIDFromRuntimeToken(runtimeToken)
|
|
if jobID <= 0 {
|
|
return
|
|
}
|
|
|
|
base := strings.TrimRight(runtimeURL, "/") + "/_apis/pipelines/workflows/" + runID +
|
|
"/jobs/" + strconv.FormatInt(jobID, 10) + "/steps/"
|
|
actPath := rc.JobContainer.GetActPath()
|
|
// Reuse a single client across all step uploads so connections can be pooled.
|
|
client := &http.Client{Timeout: jobSummaryUploadRequestTimeout}
|
|
for i := range rc.Run.Job().Steps {
|
|
summaryPath := path.Join(actPath, "workflow", "step-summary-"+strconv.Itoa(i)+".md")
|
|
body, ok := readSingleFileFromContainerArchive(ctx, rc.JobContainer, summaryPath, maxJobSummaryBytes)
|
|
if !ok || len(body) == 0 {
|
|
continue
|
|
}
|
|
uploadJobSummary(ctx, client, base+strconv.Itoa(i)+"/summary", runtimeToken, body)
|
|
}
|
|
}
|
|
|
|
// extractJobIDFromRuntimeToken returns the JobID claim from an ACTIONS_RUNTIME_TOKEN JWT
|
|
// without verifying its signature. Returns 0 if the token is unparseable or has no JobID.
|
|
func extractJobIDFromRuntimeToken(token string) int64 {
|
|
parts := strings.Split(token, ".")
|
|
if len(parts) != 3 {
|
|
return 0
|
|
}
|
|
payload, err := base64.RawURLEncoding.DecodeString(parts[1])
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
var claims struct {
|
|
JobID int64 `json:"JobID"`
|
|
}
|
|
if err := json.Unmarshal(payload, &claims); err != nil {
|
|
return 0
|
|
}
|
|
return claims.JobID
|
|
}
|
|
|
|
func hasJobSummaryCapability(caps string) bool {
|
|
return slices.Contains(strings.FieldsFunc(caps, func(r rune) bool {
|
|
return r == ',' || unicode.IsSpace(r)
|
|
}), "job-summary")
|
|
}
|
|
|
|
func uploadJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) {
|
|
logger := common.Logger(ctx)
|
|
|
|
var lastStatus int
|
|
var lastErr error
|
|
for attempt := 0; attempt < 2; attempt++ {
|
|
status, err := putJobSummary(ctx, client, url, runtimeToken, body)
|
|
if err == nil && status/100 == 2 {
|
|
return
|
|
}
|
|
lastStatus = status
|
|
lastErr = err
|
|
if attempt == 1 || !isTransientJobSummaryUploadFailure(status, err) {
|
|
break
|
|
}
|
|
timer := time.NewTimer(jobSummaryUploadRetryDelay)
|
|
select {
|
|
case <-ctx.Done():
|
|
timer.Stop()
|
|
lastErr = ctx.Err()
|
|
attempt = 1
|
|
case <-timer.C:
|
|
}
|
|
}
|
|
|
|
// Best-effort only; do not fail job, but log because capability was advertised.
|
|
if lastErr != nil {
|
|
logger.WithError(lastErr).Warn("job summary upload failed")
|
|
return
|
|
}
|
|
logger.Warnf("job summary upload failed: status=%d", lastStatus)
|
|
}
|
|
|
|
func putJobSummary(ctx context.Context, client *http.Client, url, runtimeToken string, body []byte) (int, error) {
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPut, url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
req.Header.Set("Authorization", "Bearer "+runtimeToken)
|
|
req.Header.Set("Content-Type", "text/markdown; charset=utf-8")
|
|
|
|
resp, err := client.Do(req)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer resp.Body.Close()
|
|
_, _ = io.Copy(io.Discard, resp.Body)
|
|
return resp.StatusCode, nil
|
|
}
|
|
|
|
func isTransientJobSummaryUploadFailure(status int, err error) bool {
|
|
return err != nil || status == http.StatusRequestTimeout || status == http.StatusTooManyRequests || status/100 == 5
|
|
}
|
|
|
|
func readSingleFileFromContainerArchive(ctx context.Context, env container.ExecutionsEnvironment, p string, maxBytes int64) ([]byte, bool) {
|
|
rc, err := env.GetContainerArchive(ctx, p)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
defer rc.Close()
|
|
|
|
tr := tar.NewReader(rc)
|
|
for {
|
|
header, err := tr.Next()
|
|
if err == io.EOF {
|
|
return nil, false
|
|
}
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
if header.Typeflag != tar.TypeReg {
|
|
continue
|
|
}
|
|
if !archiveEntryMatchesPath(header.Name, p) {
|
|
continue
|
|
}
|
|
// Summaries larger than the limit are truncated rather than dropped, so the
|
|
// user still gets the leading content (mirroring how GitHub caps oversized
|
|
// step summaries instead of discarding them). Read one extra byte so an
|
|
// over-limit file is detected from the actual stream rather than trusting
|
|
// header.Size, then cap the returned content at maxBytes.
|
|
b, err := io.ReadAll(io.LimitReader(tr, maxBytes+1))
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
if int64(len(b)) > maxBytes {
|
|
// Reserve room for the marker so the marked-up result still fits in maxBytes.
|
|
marker := []byte(jobSummaryTruncationMarker)
|
|
keep := max(maxBytes-int64(len(marker)), 0)
|
|
b = append(b[:keep], marker...)
|
|
common.Logger(ctx).Warnf("job summary truncated: path=%s max=%d", p, maxBytes)
|
|
}
|
|
return b, true
|
|
}
|
|
}
|
|
|
|
func archiveEntryMatchesPath(entryName, requestedPath string) bool {
|
|
entryName = path.Clean(strings.TrimPrefix(entryName, "/"))
|
|
requestedPath = path.Clean(strings.TrimPrefix(requestedPath, "/"))
|
|
return entryName == requestedPath || entryName == path.Base(requestedPath)
|
|
}
|
|
|
|
func useStepLogger(rc *RunContext, stepModel *model.Step, stage stepStage, executor common.Executor) common.Executor {
|
|
return func(ctx context.Context) error {
|
|
ctx = withStepLogger(ctx, stepModel.Number, stepModel.ID, rc.ExprEval.Interpolate(ctx, stepModel.String()), stage.String())
|
|
|
|
rawLogger := common.Logger(ctx).WithField("raw_output", true)
|
|
logWriter := common.NewLineWriter(rc.commandHandler(ctx), func(s string) bool {
|
|
if rc.Config.LogOutput {
|
|
rawLogger.Infof("%s", s)
|
|
} else {
|
|
rawLogger.Debugf("%s", s)
|
|
}
|
|
return true
|
|
})
|
|
|
|
oldout, olderr := rc.JobContainer.ReplaceLogWriter(logWriter, logWriter)
|
|
defer rc.JobContainer.ReplaceLogWriter(oldout, olderr)
|
|
|
|
// Flush any buffered, not-yet-newline-terminated trailing line once the
|
|
// step has finished, so the final line of the step's output is not lost
|
|
// when it is not newline-terminated.
|
|
defer common.FlushWriter(logWriter)
|
|
|
|
return executor(ctx)
|
|
}
|
|
}
|