mirror of
https://gitea.com/gitea/act_runner.git
synced 2026-06-11 04:31:31 +00:00
fix: deliver cancel ack and reap leftover Windows job processes (#996)
## Summary - When Gitea cancels a job, the reporter cancels its own task context; the final Close() flush then aborted on that same cancelled context and Gitea never received the runner's acknowledgement (missing tail logs and final state). - On Windows the cancelled context also neutralised terminateRunningProcesses, leaving step grandchildren alive in the workspace, holding file handles, so the runner could no longer clean up and pick up new work. - Reporter.Close() now flushes on a detached, bounded context via a new rpcCtx() helper and configurable Runner.ReportCloseTimeout (default 10s). - terminateRunningProcesses now PowerShell-enumerates Win32_Process and taskkill /T /F's every process whose ExecutablePath or CommandLine references the job's workspace directories, on a detached context. - The daemon heartbeat loop still exits on <-r.ctx.Done(): the runner is intentionally seen as offline by Gitea during cleanup so it isn't handed a new task overlapping the in-progress teardown. ## Test plan - [x] go test ./internal/pkg/report/... ./act/container/ -run 'TestReporter_ServerCancelStillFlushesFinal|TestBuildWindowsWorkspaceKillScript' - [x] make fmt && make lint-go - 0 issues - [x] GOOS=windows go build ./... - clean - [x] Manual on a Windows runner: trigger a long-running workflow, cancel from Gitea UI; verify (a) the job ends with tail logs + cancelled state in Gitea, (b) workspace cleans up, (c) the runner picks up a new job without restart. Authored-by: bircni 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: silverwind <me@silverwind.io> Reviewed-on: https://gitea.com/gitea/runner/pulls/996 Reviewed-by: silverwind <2021+silverwind@noreply.gitea.com>
This commit is contained in:
@@ -16,9 +16,7 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -44,9 +42,6 @@ type HostEnvironment struct {
|
|||||||
CleanUp func()
|
CleanUp func()
|
||||||
StdOut io.Writer
|
StdOut io.Writer
|
||||||
AllocatePTY bool // allocate a pseudo-TTY for each step's process
|
AllocatePTY bool // allocate a pseudo-TTY for each step's process
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
runningPIDs map[int]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *HostEnvironment) Create(_, _ []string) common.Executor {
|
func (e *HostEnvironment) Create(_, _ []string) common.Executor {
|
||||||
@@ -353,24 +348,9 @@ func (e *HostEnvironment) exec(ctx context.Context, command []string, cmdline st
|
|||||||
go copyPtyOutput(writer, ppty, finishLog)
|
go copyPtyOutput(writer, ppty, finishLog)
|
||||||
go writeKeepAlive(ppty)
|
go writeKeepAlive(ppty)
|
||||||
}
|
}
|
||||||
// Split Start/Wait so the PID can be registered before the process can exit;
|
|
||||||
// cmd.Run() would block until exit, by which time the PID may have been reused.
|
|
||||||
if err := cmd.Start(); err != nil {
|
if err := cmd.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if cmd.Process != nil {
|
|
||||||
e.mu.Lock()
|
|
||||||
if e.runningPIDs == nil {
|
|
||||||
e.runningPIDs = map[int]struct{}{}
|
|
||||||
}
|
|
||||||
e.runningPIDs[cmd.Process.Pid] = struct{}{}
|
|
||||||
e.mu.Unlock()
|
|
||||||
defer func(pid int) {
|
|
||||||
e.mu.Lock()
|
|
||||||
delete(e.runningPIDs, pid)
|
|
||||||
e.mu.Unlock()
|
|
||||||
}(cmd.Process.Pid)
|
|
||||||
}
|
|
||||||
err = cmd.Wait()
|
err = cmd.Wait()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var exitErr *exec.ExitError
|
var exitErr *exec.ExitError
|
||||||
@@ -440,30 +420,80 @@ func removePathWithRetry(ctx context.Context, path string) error {
|
|||||||
return lastErr
|
return lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildWindowsWorkspaceKillScript builds a PowerShell command that `taskkill
|
||||||
|
// /T /F`s every process tree whose ExecutablePath or CommandLine references one
|
||||||
|
// of the given absolute workspace dirs, releasing file handles for cleanup.
|
||||||
|
//
|
||||||
|
// Win32_Process is used because it exposes both ExecutablePath and CommandLine
|
||||||
|
// (Get-Process doesn't, wmic is deprecated). Both match the dir+separator
|
||||||
|
// prefix, so a sibling dir sharing a name prefix (job1 vs job10) is spared.
|
||||||
|
// Ordinal String methods, not -like, so path metacharacters ([ ] ? *) stay
|
||||||
|
// literal.
|
||||||
|
//
|
||||||
|
// Pure function so the quote-escaping can be unit-tested without PowerShell.
|
||||||
|
func buildWindowsWorkspaceKillScript(dirs []string) string {
|
||||||
|
quoted := make([]string, len(dirs))
|
||||||
|
for i, d := range dirs {
|
||||||
|
// Single-quoted PowerShell literal; escape ' by doubling it.
|
||||||
|
quoted[i] = "'" + strings.ReplaceAll(d, "'", "''") + "'"
|
||||||
|
}
|
||||||
|
|
||||||
|
return `$paths = @(` + strings.Join(quoted, ",") + `)
|
||||||
|
$selfPid = $PID
|
||||||
|
Get-CimInstance Win32_Process -ErrorAction SilentlyContinue | Where-Object {
|
||||||
|
if ($_.ProcessId -eq $selfPid) { return $false }
|
||||||
|
foreach ($p in $paths) {
|
||||||
|
$prefix = $p + '\'
|
||||||
|
if ($_.ExecutablePath -and $_.ExecutablePath.StartsWith($prefix, [System.StringComparison]::OrdinalIgnoreCase)) { return $true }
|
||||||
|
if ($_.CommandLine -and $_.CommandLine.IndexOf($prefix, [System.StringComparison]::OrdinalIgnoreCase) -ge 0) { return $true }
|
||||||
|
}
|
||||||
|
return $false
|
||||||
|
} | ForEach-Object {
|
||||||
|
& taskkill.exe /PID $_.ProcessId /T /F 2>$null | Out-Null
|
||||||
|
}
|
||||||
|
`
|
||||||
|
}
|
||||||
|
|
||||||
func (e *HostEnvironment) terminateRunningProcesses(ctx context.Context) {
|
func (e *HostEnvironment) terminateRunningProcesses(ctx context.Context) {
|
||||||
if runtime.GOOS != "windows" {
|
if runtime.GOOS != "windows" {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
e.mu.Lock()
|
|
||||||
pids := make([]int, 0, len(e.runningPIDs))
|
|
||||||
for pid := range e.runningPIDs {
|
|
||||||
pids = append(pids, pid)
|
|
||||||
}
|
|
||||||
e.mu.Unlock()
|
|
||||||
|
|
||||||
if len(pids) == 0 {
|
// Detached: exec.CommandContext won't start on a cancelled ctx, and a
|
||||||
|
// server cancel has already cancelled the parent ctx.
|
||||||
|
killCtx, killCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer killCancel()
|
||||||
|
|
||||||
|
logger := common.Logger(ctx)
|
||||||
|
|
||||||
|
// Workspace dirs we own. Any process running from or referencing one is a
|
||||||
|
// leftover job process. ToolCache is shared across jobs; Workdir only when
|
||||||
|
// we own it (else it's a caller-provided checkout, e.g. act local mode).
|
||||||
|
owned := []string{e.Path, e.TmpDir}
|
||||||
|
if e.CleanWorkdir {
|
||||||
|
owned = append(owned, e.Workdir)
|
||||||
|
}
|
||||||
|
dirs := make([]string, 0, len(owned))
|
||||||
|
for _, d := range owned {
|
||||||
|
if d == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
abs, err := filepath.Abs(d)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dirs = append(dirs, abs)
|
||||||
|
}
|
||||||
|
if len(dirs) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := common.Logger(ctx)
|
script := buildWindowsWorkspaceKillScript(dirs)
|
||||||
for _, pid := range pids {
|
|
||||||
// Best-effort: forcibly terminate process tree to release file handles
|
cmd := exec.CommandContext(killCtx, "powershell.exe", "-NoProfile", "-NonInteractive", "-Command", script)
|
||||||
// so that workspace cleanup can succeed on Windows.
|
out, err := cmd.CombinedOutput()
|
||||||
cmd := exec.CommandContext(ctx, "taskkill", "/PID", strconv.Itoa(pid), "/T", "/F")
|
if err != nil {
|
||||||
out, err := cmd.CombinedOutput()
|
logger.Debugf("workspace process-tree kill via PowerShell failed: %v output=%s", err, strings.TrimSpace(string(out)))
|
||||||
if err != nil {
|
|
||||||
logger.Debugf("taskkill failed for pid=%d: %v output=%s", pid, err, strings.TrimSpace(string(out)))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -477,14 +507,20 @@ func (e *HostEnvironment) Remove() common.Executor {
|
|||||||
if e.CleanUp != nil {
|
if e.CleanUp != nil {
|
||||||
e.CleanUp()
|
e.CleanUp()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Detach: a cancelled ctx would skip removePathWithRetry's retries,
|
||||||
|
// which absorb Windows file-handle release lag after the kill above.
|
||||||
|
rmCtx, rmCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
|
defer rmCancel()
|
||||||
|
|
||||||
logger := common.Logger(ctx)
|
logger := common.Logger(ctx)
|
||||||
var errs []error
|
var errs []error
|
||||||
if err := removePathWithRetry(ctx, e.Path); err != nil {
|
if err := removePathWithRetry(rmCtx, e.Path); err != nil {
|
||||||
logger.Warnf("failed to remove host misc state %s: %v", e.Path, err)
|
logger.Warnf("failed to remove host misc state %s: %v", e.Path, err)
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
if e.CleanWorkdir {
|
if e.CleanWorkdir {
|
||||||
if err := removePathWithRetry(ctx, e.Workdir); err != nil {
|
if err := removePathWithRetry(rmCtx, e.Workdir); err != nil {
|
||||||
logger.Warnf("failed to remove host workspace %s: %v", e.Workdir, err)
|
logger.Warnf("failed to remove host workspace %s: %v", e.Workdir, err)
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -187,3 +187,64 @@ func TestHostEnvironmentRemoveCleansWorkdirWhenOwned(t *testing.T) {
|
|||||||
_, err := os.Stat(workdir)
|
_, err := os.Stat(workdir)
|
||||||
assert.ErrorIs(t, err, os.ErrNotExist)
|
assert.ErrorIs(t, err, os.ErrNotExist)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildWindowsWorkspaceKillScript(t *testing.T) {
|
||||||
|
t.Run("single dir", func(t *testing.T) {
|
||||||
|
s := buildWindowsWorkspaceKillScript([]string{`C:\workspace\job1`})
|
||||||
|
assert.Contains(t, s, `$paths = @('C:\workspace\job1')`)
|
||||||
|
// Self-PID guard is essential — without it the script could taskkill
|
||||||
|
// the PowerShell process running it.
|
||||||
|
assert.Contains(t, s, "$selfPid = $PID")
|
||||||
|
assert.Contains(t, s, "$_.ProcessId -eq $selfPid")
|
||||||
|
// Must match both ExecutablePath (binaries from the workspace) and
|
||||||
|
// CommandLine (system binaries invoked with workspace paths in args),
|
||||||
|
// both bounded by dir+separator so a name-prefix sibling is spared.
|
||||||
|
assert.Contains(t, s, `$prefix = $p + '\'`)
|
||||||
|
assert.Contains(t, s, "$_.ExecutablePath.StartsWith($prefix")
|
||||||
|
assert.Contains(t, s, "$_.CommandLine.IndexOf($prefix")
|
||||||
|
// Each matched PID must be tree-killed, not just stopped.
|
||||||
|
assert.Contains(t, s, "taskkill.exe /PID $_.ProcessId /T /F")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("multiple dirs comma-separated", func(t *testing.T) {
|
||||||
|
s := buildWindowsWorkspaceKillScript([]string{
|
||||||
|
`C:\work\path`,
|
||||||
|
`C:\work\workdir`,
|
||||||
|
`C:\Users\runner\AppData\Local\Temp\job-42`,
|
||||||
|
})
|
||||||
|
assert.Contains(t, s, `'C:\work\path'`)
|
||||||
|
assert.Contains(t, s, `'C:\work\workdir'`)
|
||||||
|
assert.Contains(t, s, `'C:\Users\runner\AppData\Local\Temp\job-42'`)
|
||||||
|
// Commas between entries — no trailing comma, no leading comma.
|
||||||
|
assert.Contains(t, s, `'C:\work\path','C:\work\workdir',`)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("path with single quote is escaped", func(t *testing.T) {
|
||||||
|
// In PowerShell single-quoted strings the only special char is the
|
||||||
|
// quote itself, escaped by doubling. A workspace path that ever
|
||||||
|
// contained `'` would inject a command into the script otherwise.
|
||||||
|
s := buildWindowsWorkspaceKillScript([]string{`C:\work\it's\path`})
|
||||||
|
assert.Contains(t, s, `'C:\work\it''s\path'`)
|
||||||
|
// And it must NOT appear unescaped — otherwise the quote would
|
||||||
|
// terminate the literal early.
|
||||||
|
assert.NotContains(t, s, `'C:\work\it's\path'`)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("path with wildcard metacharacters is matched literally", func(t *testing.T) {
|
||||||
|
// A path containing [ ] ? * must be embedded verbatim and matched with
|
||||||
|
// ordinal String methods, not -like, otherwise the metacharacters would
|
||||||
|
// be interpreted as wildcards and the leftover process could escape.
|
||||||
|
s := buildWindowsWorkspaceKillScript([]string{`C:\work\[job]?1`})
|
||||||
|
assert.Contains(t, s, `'C:\work\[job]?1'`)
|
||||||
|
assert.NotContains(t, s, "-like")
|
||||||
|
assert.Contains(t, s, "StartsWith")
|
||||||
|
assert.Contains(t, s, "IndexOf")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("empty dir list still produces a valid script", func(t *testing.T) {
|
||||||
|
s := buildWindowsWorkspaceKillScript(nil)
|
||||||
|
// Empty array literal — script runs, matches nothing, is a no-op.
|
||||||
|
assert.Contains(t, s, "$paths = @()")
|
||||||
|
assert.Contains(t, s, "Get-CimInstance Win32_Process")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -60,6 +60,9 @@ runner:
|
|||||||
# The interval for reporting task state (step status, timing) to the Gitea instance.
|
# The interval for reporting task state (step status, timing) to the Gitea instance.
|
||||||
# State is also reported immediately on step transitions (start/stop).
|
# State is also reported immediately on step transitions (start/stop).
|
||||||
state_report_interval: 5s
|
state_report_interval: 5s
|
||||||
|
# Per-attempt deadline for flushing the final logs and task state when a job
|
||||||
|
# finishes, on a detached context so a server cancel can't block the acknowledgement.
|
||||||
|
report_close_timeout: 10s
|
||||||
# The github_mirror of a runner is used to specify the mirror address of the github that pulls the action repository.
|
# The github_mirror of a runner is used to specify the mirror address of the github that pulls the action repository.
|
||||||
# It works when something like `uses: actions/checkout@v4` is used and DEFAULT_ACTIONS_URL is set to github,
|
# It works when something like `uses: actions/checkout@v4` is used and DEFAULT_ACTIONS_URL is set to github,
|
||||||
# and github_mirror is not empty. In this case,
|
# and github_mirror is not empty. In this case,
|
||||||
|
|||||||
@@ -39,6 +39,7 @@ type Runner struct {
|
|||||||
LogReportMaxLatency time.Duration `yaml:"log_report_max_latency"` // LogReportMaxLatency specifies the max time a log row can wait before being sent.
|
LogReportMaxLatency time.Duration `yaml:"log_report_max_latency"` // LogReportMaxLatency specifies the max time a log row can wait before being sent.
|
||||||
LogReportBatchSize int `yaml:"log_report_batch_size"` // LogReportBatchSize triggers immediate log flush when buffer reaches this size.
|
LogReportBatchSize int `yaml:"log_report_batch_size"` // LogReportBatchSize triggers immediate log flush when buffer reaches this size.
|
||||||
StateReportInterval time.Duration `yaml:"state_report_interval"` // StateReportInterval specifies the interval for state reporting.
|
StateReportInterval time.Duration `yaml:"state_report_interval"` // StateReportInterval specifies the interval for state reporting.
|
||||||
|
ReportCloseTimeout time.Duration `yaml:"report_close_timeout"` // ReportCloseTimeout caps each RPC attempt when flushing the final logs and task state at job completion, on a detached context so a server cancel can't block the acknowledgement.
|
||||||
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
|
Labels []string `yaml:"labels"` // Labels specify the labels of the runner. Labels are declared on each startup
|
||||||
GithubMirror string `yaml:"github_mirror"` // GithubMirror defines what mirrors should be used when using github
|
GithubMirror string `yaml:"github_mirror"` // GithubMirror defines what mirrors should be used when using github
|
||||||
AllocatePTY bool `yaml:"allocate_pty"` // AllocatePTY allocates a pseudo-TTY for each step's process. Default is false, matching GitHub's actions/runner. Enable only for jobs that need an interactive terminal; tools like docker build emit redrawing progress frames into the captured log when a TTY is present. Applies to both host and docker backends.
|
AllocatePTY bool `yaml:"allocate_pty"` // AllocatePTY allocates a pseudo-TTY for each step's process. Default is false, matching GitHub's actions/runner. Enable only for jobs that need an interactive terminal; tools like docker build emit redrawing progress frames into the captured log when a TTY is present. Applies to both host and docker backends.
|
||||||
@@ -183,6 +184,9 @@ func LoadDefault(file string) (*Config, error) {
|
|||||||
if cfg.Runner.StateReportInterval <= 0 {
|
if cfg.Runner.StateReportInterval <= 0 {
|
||||||
cfg.Runner.StateReportInterval = 5 * time.Second
|
cfg.Runner.StateReportInterval = 5 * time.Second
|
||||||
}
|
}
|
||||||
|
if cfg.Runner.ReportCloseTimeout <= 0 {
|
||||||
|
cfg.Runner.ReportCloseTimeout = 10 * time.Second
|
||||||
|
}
|
||||||
if cfg.Metrics.Addr == "" {
|
if cfg.Metrics.Addr == "" {
|
||||||
cfg.Metrics.Addr = "127.0.0.1:9101"
|
cfg.Metrics.Addr = "127.0.0.1:9101"
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,6 +58,9 @@ type Reporter struct {
|
|||||||
logReportMaxLatency time.Duration
|
logReportMaxLatency time.Duration
|
||||||
logBatchSize int
|
logBatchSize int
|
||||||
stateReportInterval time.Duration
|
stateReportInterval time.Duration
|
||||||
|
// closeTimeout bounds each RPC attempt in the final flush, on a context
|
||||||
|
// detached from r.ctx so a server cancel can't abort the acknowledgement.
|
||||||
|
closeTimeout time.Duration
|
||||||
|
|
||||||
// Event notification channels (non-blocking, buffered 1)
|
// Event notification channels (non-blocking, buffered 1)
|
||||||
logNotify chan struct{} // signal: new log rows arrived
|
logNotify chan struct{} // signal: new log rows arrived
|
||||||
@@ -89,6 +92,7 @@ func NewReporter(ctx context.Context, cancel context.CancelFunc, client client.C
|
|||||||
logReportMaxLatency: cfg.Runner.LogReportMaxLatency,
|
logReportMaxLatency: cfg.Runner.LogReportMaxLatency,
|
||||||
logBatchSize: cfg.Runner.LogReportBatchSize,
|
logBatchSize: cfg.Runner.LogReportBatchSize,
|
||||||
stateReportInterval: cfg.Runner.StateReportInterval,
|
stateReportInterval: cfg.Runner.StateReportInterval,
|
||||||
|
closeTimeout: cfg.Runner.ReportCloseTimeout,
|
||||||
logNotify: make(chan struct{}, 1),
|
logNotify: make(chan struct{}, 1),
|
||||||
stateNotify: make(chan struct{}, 1),
|
stateNotify: make(chan struct{}, 1),
|
||||||
state: &runnerv1.TaskState{
|
state: &runnerv1.TaskState{
|
||||||
@@ -329,6 +333,9 @@ func (r *Reporter) runDaemonLoop() {
|
|||||||
_ = r.ReportLog(false)
|
_ = r.ReportLog(false)
|
||||||
|
|
||||||
case <-r.ctx.Done():
|
case <-r.ctx.Done():
|
||||||
|
// Stop heartbeating on cancel so Gitea sees the runner as offline
|
||||||
|
// during cleanup and won't assign an overlapping task. Close() still
|
||||||
|
// delivers the final flush on a detached context (flushFinal).
|
||||||
close(r.daemon)
|
close(r.daemon)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -431,17 +438,43 @@ func (r *Reporter) Close(lastWords string) error {
|
|||||||
}
|
}
|
||||||
r.stateMu.Unlock()
|
r.stateMu.Unlock()
|
||||||
|
|
||||||
// Report the job outcome even when all log upload retry attempts have been exhausted
|
// Separate budgets so a slow ReportLog can't starve the ReportState that
|
||||||
|
// carries the cancel acknowledgement.
|
||||||
return errors.Join(
|
return errors.Join(
|
||||||
retry.New(retry.Context(r.ctx)).Do(func() error {
|
r.flushFinal(func() error { return r.ReportLog(true) }),
|
||||||
return r.ReportLog(true)
|
r.flushFinal(func() error { return r.ReportState(true) }),
|
||||||
}),
|
|
||||||
retry.New(retry.Context(r.ctx)).Do(func() error {
|
|
||||||
return r.ReportState(true)
|
|
||||||
}),
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// flushFinal retries fn on a detached, bounded context so a cancelled r.ctx
|
||||||
|
// does not abort the final flush. Each call gets its own fresh budget.
|
||||||
|
func (r *Reporter) flushFinal(fn func() error) error {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 3*r.effectiveCloseTimeout())
|
||||||
|
defer cancel()
|
||||||
|
return retry.New(retry.Context(ctx)).Do(fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// effectiveCloseTimeout returns closeTimeout, or 10s when unset, so a zero
|
||||||
|
// value can't produce an already-expired context for the final flush.
|
||||||
|
func (r *Reporter) effectiveCloseTimeout() time.Duration {
|
||||||
|
if r.closeTimeout <= 0 {
|
||||||
|
return 10 * time.Second
|
||||||
|
}
|
||||||
|
return r.closeTimeout
|
||||||
|
}
|
||||||
|
|
||||||
|
// rpcCtx returns the context for an outbound RPC plus a cancel func. While
|
||||||
|
// r.ctx is alive it's used directly; once cancelled (server RESULT_CANCELLED),
|
||||||
|
// RPCs switch to a fresh bounded context so Close()'s final flush still lands.
|
||||||
|
func (r *Reporter) rpcCtx() (context.Context, context.CancelFunc) {
|
||||||
|
select {
|
||||||
|
case <-r.ctx.Done():
|
||||||
|
return context.WithTimeout(context.Background(), r.effectiveCloseTimeout())
|
||||||
|
default:
|
||||||
|
return r.ctx, func() {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Reporter) ReportLog(noMore bool) error {
|
func (r *Reporter) ReportLog(noMore bool) error {
|
||||||
r.clientM.Lock()
|
r.clientM.Lock()
|
||||||
defer r.clientM.Unlock()
|
defer r.clientM.Unlock()
|
||||||
@@ -454,8 +487,11 @@ func (r *Reporter) ReportLog(noMore bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpcCtx, rpcCancel := r.rpcCtx()
|
||||||
|
defer rpcCancel()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
resp, err := r.client.UpdateLog(r.ctx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
resp, err := r.client.UpdateLog(rpcCtx, connect.NewRequest(&runnerv1.UpdateLogRequest{
|
||||||
TaskId: r.state.Id,
|
TaskId: r.state.Id,
|
||||||
Index: int64(r.logOffset),
|
Index: int64(r.logOffset),
|
||||||
Rows: rows,
|
Rows: rows,
|
||||||
@@ -526,8 +562,11 @@ func (r *Reporter) ReportState(reportResult bool) error {
|
|||||||
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
state.Result = runnerv1.Result_RESULT_UNSPECIFIED
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rpcCtx, rpcCancel := r.rpcCtx()
|
||||||
|
defer rpcCancel()
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
resp, err := r.client.UpdateTask(r.ctx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
resp, err := r.client.UpdateTask(rpcCtx, connect.NewRequest(&runnerv1.UpdateTaskRequest{
|
||||||
State: state,
|
State: state,
|
||||||
Outputs: outputs,
|
Outputs: outputs,
|
||||||
}))
|
}))
|
||||||
|
|||||||
@@ -754,3 +754,86 @@ func TestReporter_StateHeartbeat(t *testing.T) {
|
|||||||
require.NoError(t, reporter.ReportState(false))
|
require.NoError(t, reporter.ReportState(false))
|
||||||
assert.Equal(t, int64(2), updateTaskCalls.Load(), "ReportState must heartbeat after stateReportInterval even with no state change")
|
assert.Equal(t, int64(2), updateTaskCalls.Load(), "ReportState must heartbeat after stateReportInterval even with no state change")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestReporter_ServerCancelStillFlushesFinal asserts that when the Gitea server
|
||||||
|
// returns RESULT_CANCELLED on an in-flight UpdateTask (which causes the
|
||||||
|
// reporter to cancel the task context), Close() still successfully sends the
|
||||||
|
// final UpdateLog{NoMore:true} and the final UpdateTask carrying the populated
|
||||||
|
// final state. Before the fix this final flush used r.ctx, which was just
|
||||||
|
// cancelled, so retry-go aborted on its context check and Gitea never received
|
||||||
|
// the runner's acknowledgement of the cancel.
|
||||||
|
func TestReporter_ServerCancelStillFlushesFinal(t *testing.T) {
|
||||||
|
var (
|
||||||
|
updateTaskCalls atomic.Int64
|
||||||
|
finalLogNoMoreSeen atomic.Bool
|
||||||
|
finalTaskStateSeen atomic.Bool
|
||||||
|
)
|
||||||
|
|
||||||
|
client := mocks.NewClient(t)
|
||||||
|
client.On("UpdateLog", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateLogRequest]) (*connect_go.Response[runnerv1.UpdateLogResponse], error) {
|
||||||
|
if req.Msg.NoMore {
|
||||||
|
finalLogNoMoreSeen.Store(true)
|
||||||
|
}
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateLogResponse{
|
||||||
|
AckIndex: req.Msg.Index + int64(len(req.Msg.Rows)),
|
||||||
|
}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// The first UpdateTask returns RESULT_CANCELLED — modelling a server-side
|
||||||
|
// cancellation; the reporter must call r.cancel() in response. The final
|
||||||
|
// UpdateTask issued by Close() must still arrive even though r.ctx is now
|
||||||
|
// cancelled.
|
||||||
|
client.On("UpdateTask", mock.Anything, mock.Anything).Return(
|
||||||
|
func(_ context.Context, req *connect_go.Request[runnerv1.UpdateTaskRequest]) (*connect_go.Response[runnerv1.UpdateTaskResponse], error) {
|
||||||
|
n := updateTaskCalls.Add(1)
|
||||||
|
if n == 1 {
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{
|
||||||
|
State: &runnerv1.TaskState{
|
||||||
|
Result: runnerv1.Result_RESULT_CANCELLED,
|
||||||
|
},
|
||||||
|
}), nil
|
||||||
|
}
|
||||||
|
if req.Msg.State != nil && req.Msg.State.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
||||||
|
finalTaskStateSeen.Store(true)
|
||||||
|
}
|
||||||
|
return connect_go.NewResponse(&runnerv1.UpdateTaskResponse{}), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
taskCtx, err := structpb.NewStruct(map[string]any{})
|
||||||
|
require.NoError(t, err)
|
||||||
|
cfg, _ := config.LoadDefault("")
|
||||||
|
reporter := NewReporter(ctx, cancel, client, &runnerv1.Task{Context: taskCtx}, cfg)
|
||||||
|
reporter.ResetSteps(1)
|
||||||
|
|
||||||
|
// Force the first ReportState to actually call UpdateTask.
|
||||||
|
reporter.stateMu.Lock()
|
||||||
|
reporter.stateChanged = true
|
||||||
|
reporter.stateMu.Unlock()
|
||||||
|
|
||||||
|
// First ReportState — server returns RESULT_CANCELLED, reporter cancels r.ctx.
|
||||||
|
require.NoError(t, reporter.ReportState(false))
|
||||||
|
require.Equal(t, int64(1), updateTaskCalls.Load())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// Expected: reporter called cancel() because the server reported the task as cancelled.
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("expected r.ctx to be cancelled after server returned RESULT_CANCELLED")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The test does not start the daemon goroutine; close(r.daemon) so Close()
|
||||||
|
// proceeds without waiting on its 60s timeout.
|
||||||
|
close(reporter.daemon)
|
||||||
|
|
||||||
|
// Now Close() runs. Before the fix, both final RPCs aborted on the cancelled
|
||||||
|
// r.ctx via retry.Context. After the fix, Close() uses a detached context and
|
||||||
|
// the per-RPC rpcCtx() falls back to a fresh ctx, so both calls succeed.
|
||||||
|
require.NoError(t, reporter.Close("cancelled"))
|
||||||
|
|
||||||
|
assert.True(t, finalLogNoMoreSeen.Load(), "Close() must send a final UpdateLog{NoMore:true} even after server-side cancellation")
|
||||||
|
assert.True(t, finalTaskStateSeen.Load(), "Close() must send a final UpdateTask with the populated final state even after server-side cancellation")
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user