diff --git a/act/container/host_environment.go b/act/container/host_environment.go index ee456a24..03f453a1 100644 --- a/act/container/host_environment.go +++ b/act/container/host_environment.go @@ -16,9 +16,7 @@ import ( "os/exec" "path/filepath" "runtime" - "strconv" "strings" - "sync" "sync/atomic" "time" @@ -44,9 +42,6 @@ type HostEnvironment struct { CleanUp func() StdOut io.Writer AllocatePTY bool // allocate a pseudo-TTY for each step's process - - mu sync.Mutex - runningPIDs map[int]struct{} } func (e *HostEnvironment) Create(_, _ []string) common.Executor { @@ -353,24 +348,9 @@ func (e *HostEnvironment) exec(ctx context.Context, command []string, cmdline st go copyPtyOutput(writer, ppty, finishLog) go writeKeepAlive(ppty) } - // Split Start/Wait so the PID can be registered before the process can exit; - // cmd.Run() would block until exit, by which time the PID may have been reused. if err := cmd.Start(); err != nil { return err } - if cmd.Process != nil { - e.mu.Lock() - if e.runningPIDs == nil { - e.runningPIDs = map[int]struct{}{} - } - e.runningPIDs[cmd.Process.Pid] = struct{}{} - e.mu.Unlock() - defer func(pid int) { - e.mu.Lock() - delete(e.runningPIDs, pid) - e.mu.Unlock() - }(cmd.Process.Pid) - } err = cmd.Wait() if err != nil { var exitErr *exec.ExitError @@ -440,30 +420,80 @@ func removePathWithRetry(ctx context.Context, path string) error { return lastErr } +// buildWindowsWorkspaceKillScript builds a PowerShell command that `taskkill +// /T /F`s every process tree whose ExecutablePath or CommandLine references one +// of the given absolute workspace dirs, releasing file handles for cleanup. +// +// Win32_Process is used because it exposes both ExecutablePath and CommandLine +// (Get-Process doesn't, wmic is deprecated). Both match the dir+separator +// prefix, so a sibling dir sharing a name prefix (job1 vs job10) is spared. +// Ordinal String methods, not -like, so path metacharacters ([ ] ? *) stay +// literal. +// +// Pure function so the quote-escaping can be unit-tested without PowerShell. +func buildWindowsWorkspaceKillScript(dirs []string) string { + quoted := make([]string, len(dirs)) + for i, d := range dirs { + // Single-quoted PowerShell literal; escape ' by doubling it. + quoted[i] = "'" + strings.ReplaceAll(d, "'", "''") + "'" + } + + return `$paths = @(` + strings.Join(quoted, ",") + `) +$selfPid = $PID +Get-CimInstance Win32_Process -ErrorAction SilentlyContinue | Where-Object { + if ($_.ProcessId -eq $selfPid) { return $false } + foreach ($p in $paths) { + $prefix = $p + '\' + if ($_.ExecutablePath -and $_.ExecutablePath.StartsWith($prefix, [System.StringComparison]::OrdinalIgnoreCase)) { return $true } + if ($_.CommandLine -and $_.CommandLine.IndexOf($prefix, [System.StringComparison]::OrdinalIgnoreCase) -ge 0) { return $true } + } + return $false +} | ForEach-Object { + & taskkill.exe /PID $_.ProcessId /T /F 2>$null | Out-Null +} +` +} + func (e *HostEnvironment) terminateRunningProcesses(ctx context.Context) { if runtime.GOOS != "windows" { return } - e.mu.Lock() - pids := make([]int, 0, len(e.runningPIDs)) - for pid := range e.runningPIDs { - pids = append(pids, pid) - } - e.mu.Unlock() - if len(pids) == 0 { + // Detached: exec.CommandContext won't start on a cancelled ctx, and a + // server cancel has already cancelled the parent ctx. + killCtx, killCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer killCancel() + + logger := common.Logger(ctx) + + // Workspace dirs we own. Any process running from or referencing one is a + // leftover job process. ToolCache is shared across jobs; Workdir only when + // we own it (else it's a caller-provided checkout, e.g. act local mode). + owned := []string{e.Path, e.TmpDir} + if e.CleanWorkdir { + owned = append(owned, e.Workdir) + } + dirs := make([]string, 0, len(owned)) + for _, d := range owned { + if d == "" { + continue + } + abs, err := filepath.Abs(d) + if err != nil { + continue + } + dirs = append(dirs, abs) + } + if len(dirs) == 0 { return } - logger := common.Logger(ctx) - for _, pid := range pids { - // Best-effort: forcibly terminate process tree to release file handles - // so that workspace cleanup can succeed on Windows. - cmd := exec.CommandContext(ctx, "taskkill", "/PID", strconv.Itoa(pid), "/T", "/F") - out, err := cmd.CombinedOutput() - if err != nil { - logger.Debugf("taskkill failed for pid=%d: %v output=%s", pid, err, strings.TrimSpace(string(out))) - } + script := buildWindowsWorkspaceKillScript(dirs) + + cmd := exec.CommandContext(killCtx, "powershell.exe", "-NoProfile", "-NonInteractive", "-Command", script) + out, err := cmd.CombinedOutput() + if err != nil { + logger.Debugf("workspace process-tree kill via PowerShell failed: %v output=%s", err, strings.TrimSpace(string(out))) } } @@ -477,14 +507,20 @@ func (e *HostEnvironment) Remove() common.Executor { if e.CleanUp != nil { e.CleanUp() } + + // Detach: a cancelled ctx would skip removePathWithRetry's retries, + // which absorb Windows file-handle release lag after the kill above. + rmCtx, rmCancel := context.WithTimeout(context.Background(), 30*time.Second) + defer rmCancel() + logger := common.Logger(ctx) var errs []error - if err := removePathWithRetry(ctx, e.Path); err != nil { + if err := removePathWithRetry(rmCtx, e.Path); err != nil { logger.Warnf("failed to remove host misc state %s: %v", e.Path, err) errs = append(errs, err) } if e.CleanWorkdir { - if err := removePathWithRetry(ctx, e.Workdir); err != nil { + if err := removePathWithRetry(rmCtx, e.Workdir); err != nil { logger.Warnf("failed to remove host workspace %s: %v", e.Workdir, err) errs = append(errs, err) } diff --git a/act/container/host_environment_test.go b/act/container/host_environment_test.go index 945685c9..983cffd6 100644 --- a/act/container/host_environment_test.go +++ b/act/container/host_environment_test.go @@ -187,3 +187,64 @@ func TestHostEnvironmentRemoveCleansWorkdirWhenOwned(t *testing.T) { _, err := os.Stat(workdir) assert.ErrorIs(t, err, os.ErrNotExist) } + +func TestBuildWindowsWorkspaceKillScript(t *testing.T) { + t.Run("single dir", func(t *testing.T) { + s := buildWindowsWorkspaceKillScript([]string{`C:\workspace\job1`}) + assert.Contains(t, s, `$paths = @('C:\workspace\job1')`) + // Self-PID guard is essential — without it the script could taskkill + // the PowerShell process running it. + assert.Contains(t, s, "$selfPid = $PID") + assert.Contains(t, s, "$_.ProcessId -eq $selfPid") + // Must match both ExecutablePath (binaries from the workspace) and + // CommandLine (system binaries invoked with workspace paths in args), + // both bounded by dir+separator so a name-prefix sibling is spared. + assert.Contains(t, s, `$prefix = $p + '\'`) + assert.Contains(t, s, "$_.ExecutablePath.StartsWith($prefix") + assert.Contains(t, s, "$_.CommandLine.IndexOf($prefix") + // Each matched PID must be tree-killed, not just stopped. + assert.Contains(t, s, "taskkill.exe /PID $_.ProcessId /T /F") + }) + + t.Run("multiple dirs comma-separated", func(t *testing.T) { + s := buildWindowsWorkspaceKillScript([]string{ + `C:\work\path`, + `C:\work\workdir`, + `C:\Users\runner\AppData\Local\Temp\job-42`, + }) + assert.Contains(t, s, `'C:\work\path'`) + assert.Contains(t, s, `'C:\work\workdir'`) + assert.Contains(t, s, `'C:\Users\runner\AppData\Local\Temp\job-42'`) + // Commas between entries — no trailing comma, no leading comma. + assert.Contains(t, s, `'C:\work\path','C:\work\workdir',`) + }) + + t.Run("path with single quote is escaped", func(t *testing.T) { + // In PowerShell single-quoted strings the only special char is the + // quote itself, escaped by doubling. A workspace path that ever + // contained `'` would inject a command into the script otherwise. + s := buildWindowsWorkspaceKillScript([]string{`C:\work\it's\path`}) + assert.Contains(t, s, `'C:\work\it''s\path'`) + // And it must NOT appear unescaped — otherwise the quote would + // terminate the literal early. + assert.NotContains(t, s, `'C:\work\it's\path'`) + }) + + t.Run("path with wildcard metacharacters is matched literally", func(t *testing.T) { + // A path containing [ ] ? * must be embedded verbatim and matched with + // ordinal String methods, not -like, otherwise the metacharacters would + // be interpreted as wildcards and the leftover process could escape. + s := buildWindowsWorkspaceKillScript([]string{`C:\work\[job]?1`}) + assert.Contains(t, s, `'C:\work\[job]?1'`) + assert.NotContains(t, s, "-like") + assert.Contains(t, s, "StartsWith") + assert.Contains(t, s, "IndexOf") + }) + + t.Run("empty dir list still produces a valid script", func(t *testing.T) { + s := buildWindowsWorkspaceKillScript(nil) + // Empty array literal — script runs, matches nothing, is a no-op. + assert.Contains(t, s, "$paths = @()") + assert.Contains(t, s, "Get-CimInstance Win32_Process") + }) +} diff --git a/internal/pkg/config/config.example.yaml b/internal/pkg/config/config.example.yaml index 53d136a0..7fb081f9 100644 --- a/internal/pkg/config/config.example.yaml +++ b/internal/pkg/config/config.example.yaml @@ -60,6 +60,9 @@ runner: # The interval for reporting task state (step status, timing) to the Gitea instance. # State is also reported immediately on step transitions (start/stop). state_report_interval: 5s + # Per-attempt deadline for flushing the final logs and task state when a job + # finishes, on a detached context so a server cancel can't block the acknowledgement. + report_close_timeout: 10s # The github_mirror of a runner is used to specify the mirror address of the github that pulls the action repository. # It works when something like `uses: actions/checkout@v4` is used and DEFAULT_ACTIONS_URL is set to github, # and github_mirror is not empty. In this case, diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go index 5f573ff7..f620c0e9 100644 --- a/internal/pkg/config/config.go +++ b/internal/pkg/config/config.go @@ -39,6 +39,7 @@ type Runner struct { LogReportMaxLatency time.Duration `yaml:"log_report_max_latency"` // LogReportMaxLatency specifies the max time a log row can wait before being sent. LogReportBatchSize int `yaml:"log_report_batch_size"` // LogReportBatchSize triggers immediate log flush when buffer reaches this size. StateReportInterval time.Duration `yaml:"state_report_interval"` // StateReportInterval specifies the interval for state reporting. + ReportCloseTimeout time.Duration `yaml:"report_close_timeout"` // ReportCloseTimeout caps each RPC attempt when flushing the final logs and task state at job completion, on a detached context so a server cancel can't block the acknowledgement. Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup GithubMirror string `yaml:"github_mirror"` // GithubMirror defines what mirrors should be used when using github AllocatePTY bool `yaml:"allocate_pty"` // AllocatePTY allocates a pseudo-TTY for each step's process. Default is false, matching GitHub's actions/runner. Enable only for jobs that need an interactive terminal; tools like docker build emit redrawing progress frames into the captured log when a TTY is present. Applies to both host and docker backends. @@ -183,6 +184,9 @@ func LoadDefault(file string) (*Config, error) { if cfg.Runner.StateReportInterval <= 0 { cfg.Runner.StateReportInterval = 5 * time.Second } + if cfg.Runner.ReportCloseTimeout <= 0 { + cfg.Runner.ReportCloseTimeout = 10 * time.Second + } if cfg.Metrics.Addr == "" { cfg.Metrics.Addr = "127.0.0.1:9101" } diff --git a/internal/pkg/report/reporter.go b/internal/pkg/report/reporter.go index 1a2767b4..5dbea735 100644 --- a/internal/pkg/report/reporter.go +++ b/internal/pkg/report/reporter.go @@ -58,6 +58,9 @@ type Reporter struct { logReportMaxLatency time.Duration logBatchSize int stateReportInterval time.Duration + // closeTimeout bounds each RPC attempt in the final flush, on a context + // detached from r.ctx so a server cancel can't abort the acknowledgement. + closeTimeout time.Duration // Event notification channels (non-blocking, buffered 1) logNotify chan struct{} // signal: new log rows arrived @@ -89,6 +92,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C logReportMaxLatency: cfg.Runner.LogReportMaxLatency, logBatchSize: cfg.Runner.LogReportBatchSize, stateReportInterval: cfg.Runner.StateReportInterval, + closeTimeout: cfg.Runner.ReportCloseTimeout, logNotify: make(chan struct{}, 1), stateNotify: make(chan struct{}, 1), state: &runnerv1.TaskState{ @@ -329,6 +333,9 @@ func (r *Reporter) runDaemonLoop() { _ = r.ReportLog(false) case <-r.ctx.Done(): + // Stop heartbeating on cancel so Gitea sees the runner as offline + // during cleanup and won't assign an overlapping task. Close() still + // delivers the final flush on a detached context (flushFinal). close(r.daemon) return } @@ -431,17 +438,43 @@ func (r *Reporter) Close(lastWords string) error { } r.stateMu.Unlock() - // Report the job outcome even when all log upload retry attempts have been exhausted + // Separate budgets so a slow ReportLog can't starve the ReportState that + // carries the cancel acknowledgement. return errors.Join( - retry.New(retry.Context(r.ctx)).Do(func() error { - return r.ReportLog(true) - }), - retry.New(retry.Context(r.ctx)).Do(func() error { - return r.ReportState(true) - }), + r.flushFinal(func() error { return r.ReportLog(true) }), + r.flushFinal(func() error { return r.ReportState(true) }), ) } +// flushFinal retries fn on a detached, bounded context so a cancelled r.ctx +// does not abort the final flush. Each call gets its own fresh budget. +func (r *Reporter) flushFinal(fn func() error) error { + ctx, cancel := context.WithTimeout(context.Background(), 3*r.effectiveCloseTimeout()) + defer cancel() + return retry.New(retry.Context(ctx)).Do(fn) +} + +// effectiveCloseTimeout returns closeTimeout, or 10s when unset, so a zero +// value can't produce an already-expired context for the final flush. +func (r *Reporter) effectiveCloseTimeout() time.Duration { + if r.closeTimeout <= 0 { + return 10 * time.Second + } + return r.closeTimeout +} + +// rpcCtx returns the context for an outbound RPC plus a cancel func. While +// r.ctx is alive it's used directly; once cancelled (server RESULT_CANCELLED), +// RPCs switch to a fresh bounded context so Close()'s final flush still lands. +func (r *Reporter) rpcCtx() (context.Context, context.CancelFunc) { + select { + case <-r.ctx.Done(): + return context.WithTimeout(context.Background(), r.effectiveCloseTimeout()) + default: + return r.ctx, func() {} + } +} + func (r *Reporter) ReportLog(noMore bool) error { r.clientM.Lock() defer r.clientM.Unlock() @@ -454,8 +487,11 @@ func (r *Reporter) ReportLog(noMore bool) error { return nil } + rpcCtx, rpcCancel := r.rpcCtx() + defer rpcCancel() + start := time.Now() - resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{ + resp, err := r.client.UpdateLog(rpcCtx, connect.NewRequest(&runnerv1.UpdateLogRequest{ TaskId: r.state.Id, Index: int64(r.logOffset), Rows: rows, @@ -526,8 +562,11 @@ func (r *Reporter) ReportState(reportResult bool) error { state.Result = runnerv1.Result_RESULT_UNSPECIFIED } + rpcCtx, rpcCancel := r.rpcCtx() + defer rpcCancel() + start := time.Now() - resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ + resp, err := r.client.UpdateTask(rpcCtx, connect.NewRequest(&runnerv1.UpdateTaskRequest{ State: state, Outputs: outputs, })) diff --git a/internal/pkg/report/reporter_test.go b/internal/pkg/report/reporter_test.go index b0c0280d..3d76ea1f 100644 --- a/internal/pkg/report/reporter_test.go +++ b/internal/pkg/report/reporter_test.go @@ -754,3 +754,86 @@ func TestReporter_StateHeartbeat(t *testing.T) { require.NoError(t, reporter.ReportState(false)) assert.Equal(t, int64(2), updateTaskCalls.Load(), "ReportState must heartbeat after stateReportInterval even with no state change") } + +// TestReporter_ServerCancelStillFlushesFinal asserts that when the Gitea server +// returns RESULT_CANCELLED on an in-flight UpdateTask (which causes the +// reporter to cancel the task context), Close() still successfully sends the +// final UpdateLog{NoMore:true} and the final UpdateTask carrying the populated +// final state. Before the fix this final flush used r.ctx, which was just +// cancelled, so retry-go aborted on its context check and Gitea never received +// the runner's acknowledgement of the cancel. +func TestReporter_ServerCancelStillFlushesFinal(t *testing.T) { + var ( + updateTaskCalls atomic.Int64 + finalLogNoMoreSeen atomic.Bool + finalTaskStateSeen atomic.Bool + ) + + client := mocks.NewClient(t) + client.On("UpdateLog", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) { + if req.Msg.NoMore { + finalLogNoMoreSeen.Store(true) + } + return connect_go.NewResponse(&runnerv1.UpdateLogResponse{ + AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)), + }), nil + }, + ) + // The first UpdateTask returns RESULT_CANCELLED — modelling a server-side + // cancellation; the reporter must call r.cancel() in response. The final + // UpdateTask issued by Close() must still arrive even though r.ctx is now + // cancelled. + client.On("UpdateTask", mock.Anything, mock.Anything).Return( + func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) { + n := updateTaskCalls.Add(1) + if n == 1 { + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{ + State: &runnerv1.TaskState{ + Result: runnerv1.Result_RESULT_CANCELLED, + }, + }), nil + } + if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED { + finalTaskStateSeen.Store(true) + } + return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + taskCtx, err := structpb.NewStruct(map[string]any{}) + require.NoError(t, err) + cfg, _ := config.LoadDefault("") + reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg) + reporter.ResetSteps(1) + + // Force the first ReportState to actually call UpdateTask. + reporter.stateMu.Lock() + reporter.stateChanged = true + reporter.stateMu.Unlock() + + // First ReportState — server returns RESULT_CANCELLED, reporter cancels r.ctx. + require.NoError(t, reporter.ReportState(false)) + require.Equal(t, int64(1), updateTaskCalls.Load()) + + select { + case <-ctx.Done(): + // Expected: reporter called cancel() because the server reported the task as cancelled. + case <-time.After(time.Second): + t.Fatal("expected r.ctx to be cancelled after server returned RESULT_CANCELLED") + } + + // The test does not start the daemon goroutine; close(r.daemon) so Close() + // proceeds without waiting on its 60s timeout. + close(reporter.daemon) + + // Now Close() runs. Before the fix, both final RPCs aborted on the cancelled + // r.ctx via retry.Context. After the fix, Close() uses a detached context and + // the per-RPC rpcCtx() falls back to a fresh ctx, so both calls succeed. + require.NoError(t, reporter.Close("cancelled")) + + assert.True(t, finalLogNoMoreSeen.Load(), "Close() must send a final UpdateLog{NoMore:true} even after server-side cancellation") + assert.True(t, finalTaskStateSeen.Load(), "Close() must send a final UpdateTask with the populated final state even after server-side cancellation") +}