Job Engine Design
Multi-step job pipelines — define, schedule, and execute file transfer workflows.
The Job Engine is the core runtime of Courier. It is responsible for scheduling, executing, pausing, resuming, and monitoring multi-step file transfer workflows. The engine supports both standalone jobs and ordered execution chains with configurable dependency behavior.
5.1 Core Concepts
Job: A named, versioned pipeline of one or more Steps that execute sequentially. A Job has configuration (connections, paths, encryption keys), a schedule, a failure policy, and an execution history. When a Job definition is edited, previous runs retain their original configuration — new runs use the updated definition.
Step: A single unit of work within a Job. Each Step has a type (e.g., SftpDownload, PgpEncrypt, Zip), its own configuration, a timeout, and independent state tracking. Steps pass data to downstream steps via a shared JobContext.
Job Chain: A named, ordered group of Jobs that must execute in sequence. Chains define execution order and dependency relationships. A Chain can be scheduled or triggered on demand, and it manages the lifecycle of all its member Jobs as a coordinated unit.
Job Context: A key-value dictionary that accumulates outputs as Steps execute. For example, an SftpDownload step writes \{ "downloaded_file": "/tmp/courier/abc123/invoice.pgp" \} and the subsequent PgpDecrypt step reads that path as its input. The context is persisted to the database alongside step state so that resumed jobs have access to all prior outputs.
5.2 Step Type Registry
Step types are implemented as classes that implement the IJobStep interface. This provides a plugin-style architecture so new step types can be added without modifying the engine core.
public interface IJobStep
{
string TypeKey { get; } // e.g., "sftp.download", "pgp.encrypt"
Task<StepResult> ExecuteAsync(
StepConfiguration config,
JobContext context,
CancellationToken cancellationToken);
Task<ValidationResult> ValidateAsync(
StepConfiguration config);
Task RollbackAsync(
StepConfiguration config,
JobContext context); // Best-effort cleanup on failure
}
V1 Step Types:
| Step Type Key | Description |
|---|---|
sftp.upload | Upload file(s) to an SFTP server |
sftp.download | Download file(s) from an SFTP server |
ftp.upload | Upload file(s) to an FTP/FTPS server |
ftp.download | Download file(s) from an FTP/FTPS server |
pgp.encrypt | Encrypt file(s) using a PGP public key |
pgp.decrypt | Decrypt file(s) using a PGP private key |
file.zip | Compress file(s) into an archive |
file.unzip | Extract file(s) from an archive |
file.move | Move file(s) to a destination path |
file.copy | Copy file(s) to a destination path |
file.delete | Delete file(s) from a path |
azure_function.execute | Trigger an Azure Function and poll for completion via Application Insights |
Each step type is registered in a StepTypeRegistry at startup via dependency injection. The registry resolves the correct IJobStep implementation by TypeKey at runtime.
5.3 Job State Machine
Jobs follow a strict state machine with defined valid transitions:
┌──────────┐
│ Created │
└────┬─────┘
│ (enqueue)
┌────▼─────┐
┌─────│ Queued │
│ └────┬─────┘
│ │ (slot available)
│ ┌────▼─────┐
│ │ Running │◄──── (resume)
│ └──┬──┬──┬─┘
│ │ │ │
│ │ │ └──────────┐
│ │ │ │ (pause)
│ │ │ ┌────▼─────┐
│ │ │ │ Paused │
│ │ │ └───────────┘
│ │ │
│ │ └─────┐ (failure, policy = stop)
│ │ │
(cancel) │ ┌────▼────┐ ┌─▼───────┐
└──►│Cancelled│ │ Failed │
└─────────┘ └─────────┘
│
┌────▼─────┐
│Completed │
└──────────┘
Valid transitions are enforced in code. Any invalid transition throws an InvalidJobStateTransitionException.
5.4 Step State Machine
Each step within a job tracks its own state independently:
- Pending → Step has not yet started
- Running → Step is currently executing
- Completed → Step finished successfully (output written to JobContext)
- Failed → Step failed (error details persisted)
- Skipped → Step was skipped due to failure policy configuration
After each step completes (or fails), its state and any context outputs are persisted to the database in a single transaction. This is the foundation of the checkpoint/resume system.
5.5 Checkpoint & Resume
When a Job is paused or fails at Step N, the engine persists:
- The Job's current state (
PausedorFailed) - Each Step's state (
Completed,Failed, orPending) - The full
JobContextdictionary at the point of interruption - Error details for the failed step (if applicable)
On resume, the engine:
- Loads the Job and its step states from the database
- Restores the
JobContextfrom the persisted snapshot - Identifies the first step in
PendingorFailedstate - Begins execution from that step, skipping all
Completedsteps
This means a 10-step job that failed at step 7 will resume from step 7 with all outputs from steps 1–6 intact. File outputs from completed steps must still exist on disk — the engine validates this before resuming and fails fast if intermediate files are missing.
5.6 Step Context & Data Passing
Steps communicate via the JobContext, a typed dictionary scoped to a single job execution:
public class JobContext
{
private readonly Dictionary<string, object> _data = new();
public void Set<T>(string key, T value);
public T Get<T>(string key);
public bool TryGet<T>(string key, out T value);
public IReadOnlyDictionary<string, object> Snapshot(); // For persistence
}
Convention for keys: Steps write outputs using the pattern \{stepIndex\}.\{outputName\}, e.g., "0.downloaded_files", "1.decrypted_file". Step configuration references upstream outputs using the same keys, which the engine resolves at runtime before passing config to the step.
The entire context is serialized to JSON and stored in the job_executions table after each step, enabling checkpoint/resume.
5.7 Scheduling
Courier uses Quartz.NET as its scheduling backbone, with schedule definitions stored in PostgreSQL so they survive application restarts and container re-deployments.
Cron scheduling: Jobs and Chains can be assigned a cron expression (e.g., 0 0 3 ? * TUE for every Tuesday at 3:00 AM). Quartz manages trigger firing and misfire handling.
One-shot scheduling: Jobs can be scheduled to run once at a specific future datetime. Implemented as a Quartz SimpleTrigger with a repeat count of zero.
On-demand execution: An API endpoint (POST /api/jobs/\{id\}/trigger) enqueues a job for immediate execution, bypassing the scheduler. This creates a new JobExecution record and places it in the queue.
All three mechanisms feed into the same execution queue, ensuring consistent concurrency management.
5.8 Concurrency Management
The engine enforces a configurable maximum of concurrent job executions (default: 5). This is implemented as a persistent semaphore backed by the database, not an in-memory SemaphoreSlim, so it works correctly across container restarts.
Queue dequeue pattern (runs on a 5-second poll in the Worker host):
-- Atomic dequeue: claim the next queued execution if a slot is available
WITH running AS (
SELECT COUNT(*) AS cnt
FROM job_executions
WHERE state = 'running'
),
next_job AS (
SELECT je.id
FROM job_executions je, running r
WHERE je.state = 'queued'
AND r.cnt < (SELECT value::int FROM system_settings WHERE key = 'job.concurrency_limit')
ORDER BY je.queued_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED -- prevents duplicate pickup across concurrent polls
)
UPDATE job_executions
SET state = 'running', started_at = now()
FROM next_job
WHERE job_executions.id = next_job.id
RETURNING job_executions.id, job_executions.job_id;
FOR UPDATE SKIP LOCKED is critical: if two poll cycles overlap (e.g., Quartz fires a trigger while the queue poll is running), the second query skips the row already locked by the first, preventing double-execution. This pattern is used consistently wherever Courier claims "one worker picks it up" — queue dequeue, monitor event processing, and partition maintenance.
Queue behavior: When all slots are occupied, new job executions enter Queued state and are picked up in FIFO order as slots free up. The queue is polled on a short interval (default: 5 seconds) by a background service.
Concurrency limit is global, not per-job. A single job definition can have multiple concurrent executions if the schedule overlaps and slots are available. If this needs to be prevented, a "max instances" setting per job can be added (not in V1 scope, but the schema supports it).
5.9 Job Dependencies
Jobs can declare dependencies on other Jobs. A dependency means "do not start this Job until the specified upstream Job's most recent execution has completed."
Dependency configuration per edge:
required_status: The upstream job must reach this status. Default:Completed. Can be set toAnyto allow the downstream job to run even if the upstream failed.run_on_failure: Boolean, defaultfalse. Iftrue, the downstream job starts even when the upstream job fails. This is configured per-dependency, giving fine-grained control.
Validation: At job creation/edit time, the system performs a topological sort to detect circular dependencies. If a cycle is detected, the save is rejected with a descriptive error.
5.10 Job Chains (Execution Groups)
A Job Chain is a first-class entity that represents an ordered sequence of Jobs:
Chain: "Daily Partner Invoice Processing"
├── Job 1: Download invoices from Partner SFTP
├── Job 2: Decrypt PGP files (depends on Job 1)
├── Job 3: Unzip archives (depends on Job 2)
└── Job 4: Move to processing folder (depends on Job 3)
Chain properties:
- Name & description: Human-readable identification
- Member jobs: Ordered list of Job references with dependency edges
- Schedule: A Chain can have its own cron/one-shot schedules via the separate
chain_schedulestable. When triggered, it starts the first Job(s) with no upstream dependencies - Chain-level state:
Pending → Running → Completed | Failed | Paused | Cancelled - Propagation behavior: As each Job completes, the Chain evaluates which downstream Jobs are unblocked and enqueues them
Chain Scheduling: Chains support the same scheduling types as Jobs — cron (recurring via Quartz cron expressions) and one_shot (single future execution that auto-disables after firing). Chain schedules are stored in a separate chain_schedules table (not the job_schedules table) to maintain clean separation. The Worker runs a ChainScheduleManager that registers chain schedules with Quartz.NET under the "courier-chains" group (separate from job schedules in the "courier" group). The ScheduleStartupSync background service syncs both job and chain schedules on a 30-second interval. When a chain schedule fires, QuartzChainAdapter calls ChainExecutionService.TriggerAsync() with triggeredBy: "schedule".
Chain vs. standalone dependencies: Jobs can have dependencies both within a Chain and as standalone relationships. Chains are a convenience for defining and scheduling a related group. Standalone dependencies allow cross-chain relationships.
Chain failure behavior: When a Job within a Chain fails and the dependency is configured with run_on_failure: false, all downstream Jobs in the chain are marked Skipped and the Chain transitions to Failed. If run_on_failure: true, execution continues.
5.11 Failure Handling
Each Job has a configurable failure policy that determines what happens when a Step fails:
| Policy | Behavior |
|---|---|
stop | Mark the Job as Failed. No further steps execute. |
retry_step | Retry the failed step up to N times with exponential backoff. If all retries fail, mark Failed. |
retry_job | Re-run the entire Job from step 1, up to N times. If all retries fail, mark Failed. |
skip_and_continue | Mark the failed step as Skipped, continue to the next step. |
Retry configuration:
max_retries: Maximum number of retry attempts (default: 3)backoff_base_seconds: Base delay for exponential backoff (default: 1)backoff_max_seconds: Ceiling for backoff delay (default: 60)- Backoff formula:
min(backoff_base * 2^attempt, backoff_max)
The failure policy is set at the Job level. A future enhancement could allow per-step failure policy overrides.
5.12 Idempotency Rules by Step Type
Retries without strict idempotency turn into duplicate uploads, partial overwrites, or "successful but wrong" downstream states. Each step type declares its idempotency strategy, detection mechanism, and rollback behavior:
| Step Type | Idempotency Strategy | Detection Key | Rollback on Retry | Notes |
|---|---|---|---|---|
sftp.upload | Overwrite — re-upload overwrites the remote file | Remote path (full destination path) | Delete partial remote file before re-upload | Uses atomic rename (upload to .tmp, rename on completion). If retry finds the .tmp file, it deletes and restarts. If it finds the final file from a previous attempt, it overwrites. |
sftp.download | Resume — continue from last byte offset | Local file path + size | Delete partial local file and restart if resume fails | SSH.NET supports offset-based reads. If the local file exists with size < remote size, resume. If sizes match, skip (already complete). If local > remote, delete and restart. |
ftp.upload | Overwrite — same as SFTP | Remote path | Delete and re-upload | Atomic rename via FTP RNFR/RNTO. Resume not reliable across FTP servers. |
ftp.download | Resume — same as SFTP | Local file path + size | Delete and re-download | FluentFTP FtpLocalExists.Resume handles offset. |
pgp.encrypt | Overwrite — re-encrypt to same output path | Output file path | Delete output file | PGP encryption is deterministic for the same input + key. Re-encryption produces functionally equivalent output (different session key, same plaintext). |
pgp.decrypt | Overwrite — re-decrypt to same output path | Output file path | Delete output file | Deterministic given same input + key. |
file.zip | Overwrite — recreate archive | Output archive path | Delete partial archive | Archive creation is atomic (write to .tmp, rename). |
file.unzip | Clean and re-extract | Output directory | Delete entire output directory and re-extract | Cannot safely resume partial extraction. Directory is wiped and re-extracted from scratch. |
file.rename | Check-and-skip | Destination path exists with expected size | No rollback needed | If destination already exists with correct size, skip. If source still exists, perform rename. If neither exists, fail. |
file.copy | Overwrite | Destination path | Delete destination | Copy is always safe to repeat. |
file.delete | Check-and-skip | File existence | No rollback | If file is already gone, succeed silently. |
Retry safety contract: Every step implementation must satisfy this contract:
/// <summary>
/// Step implementations must be safe to retry. Specifically:
/// 1. If the step partially completed, retry must not produce duplicates
/// 2. If the step fully completed, retry must detect this and either
/// skip (no-op) or overwrite with an equivalent result
/// 3. The step must clean up its own partial outputs before retrying
/// </summary>
public interface IJobStep
{
// Called before retry to clean up partial state from the failed attempt.
// Default implementation: delete all files this step wrote to the temp directory.
Task CleanupBeforeRetryAsync(StepContext context, CancellationToken ct);
Task ExecuteAsync(StepContext context, CancellationToken ct);
}
Detection on resume: When a paused or failed job resumes from a checkpoint, the engine validates the state before continuing:
- All output files from completed steps must still exist on disk (checked via path + size from the context snapshot)
- The step being resumed has its
CleanupBeforeRetryAsync()called to clear any partial state - The JobContext is restored from the database snapshot, so downstream steps see the same inputs as the original run
Upload duplicate prevention: For sftp.upload and ftp.upload, the atomic rename pattern is the primary defense. The file is uploaded as \{filename\}.courier-tmp, and only renamed to the final name on successful completion. If a retry finds \{filename\}.courier-tmp, it knows the previous attempt was incomplete and deletes it before restarting. If it finds \{filename\} (no .courier-tmp), the previous attempt succeeded and the step can skip or overwrite. Partner systems polling for files never see partial uploads because the .courier-tmp suffix doesn't match their expected patterns.
5.13 Step Timeouts
Each Step has a configurable timeout_seconds (default: 300 — 5 minutes). The timeout is enforced via a CancellationTokenSource linked to the step's execution. When the timeout fires:
- The
CancellationTokenis cancelled - The step implementation is expected to observe the token and terminate gracefully
- If the step does not terminate within a grace period (10 seconds), the engine forcibly marks it as
Failedwith a timeout error - The Job's failure policy then determines what happens next
5.14 Cancellation Support
A CancellationToken is threaded through the entire execution pipeline: Job → Step → underlying library calls. This allows:
- User-initiated cancellation: Via
POST /api/jobs/\{executionId\}/cancel - Timeout-initiated cancellation: Per-step timeout (see above)
- Application shutdown: Aspire host shutdown triggers graceful cancellation of all running jobs, which then persist their checkpoint state
All step implementations are required to observe the cancellation token at regular intervals (e.g., between file chunks during transfer) and throw OperationCancelledException when triggered.
5.15 Temp File Management
Each Job execution is assigned a unique working directory:
/data/courier/temp/{executionId}/
Steps write all intermediate files to this directory. The JobContext stores relative paths so they resolve correctly. On job completion (success or failure after all retries exhausted), the engine:
- Moves final output files to their configured destinations
- Deletes the temp directory
- Logs total temp disk usage in the execution audit record
For paused or resumable-failed jobs, the temp directory is retained until the job is either resumed and completed, or manually cancelled/cleaned up. A background cleanup service purges orphaned temp directories older than a configurable threshold (default: 7 days).
5.16 Job Audit Trail
Every state transition at both the Job and Step level is recorded in the job_audit_log table:
- Timestamp of the transition
- From state → To state
- Duration of the previous state
- Bytes transferred (for file transfer steps)
- Error details (message, stack trace, retry attempt number)
- User who initiated the action (for manual triggers, cancellations, pauses)
This audit log is append-only and never modified. It serves as the foundation for the V2 metrics dashboard and SLA monitoring system.
5.17 Job Versioning
Job definitions are versioned using a simple incrementing version number. When a Job is edited:
- The current definition is snapshotted as version N
- The updated definition becomes version N+1
- Any in-progress or paused executions continue using the version they were started with
- New executions use the latest version
The job_definitions table stores the current version, and job_definition_versions stores the full configuration for each historical version as a JSON column. This ensures auditability and prevents mid-execution configuration changes.
5.18 Notification Hooks (V2 Preparation)
The engine emits domain events at key lifecycle points:
JobStarted,JobCompleted,JobFailed,JobPaused,JobResumed,JobCancelledStepStarted,StepCompleted,StepFailed,StepSkippedChainStarted,ChainCompleted,ChainFailed
In V1, these events are used only for the audit log. In V2, subscribers can be registered to send email notifications (via SMTP), call webhooks (via REST), or trigger other jobs. The event infrastructure is built in V1 so V2 doesn't require engine changes — only new subscribers.
5.19 Dry Run Mode (Future Consideration)
Not in V1 scope, but the IJobStep interface includes ValidateAsync specifically to support a future dry-run mode. A dry run would execute ValidateAsync on each step (testing connections, verifying paths, checking key availability) without performing actual transfers. This can be implemented in V2 without modifying the step interface.
5.20 Control Flow (If/Else + ForEach)
The job engine supports branching and iteration through four special step types that use the existing flat step model with explicit block markers.
5.20.1 Control Flow Step Types
| Step Type | Purpose | Required Config |
|---|---|---|
flow.foreach | Iterate over a collection | source — context reference or JSON array |
flow.if | Conditional branch | left, operator, right (right optional for exists) |
flow.else | Alternate branch (must follow flow.if body) | none |
flow.end | Closes a flow.foreach or flow.if block | none |
5.20.2 Block Structure
Control flow steps are regular entries in the flat step list. Blocks are delimited by flow.foreach/flow.if at the start and flow.end at the end. The engine parses the flat list into a tree before execution.
Step 0: sftp.list { "connection_id": "...", "remote_path": "/incoming" }
Step 1: flow.foreach { "source": "context:0.file_list" }
Step 2: sftp.download { "connection_id": "...", "remote_path": "context:loop.current_item.name" }
Step 3: flow.if { "left": "context:loop.current_item.size", "operator": "greater_than", "right": "1048576" }
Step 4: pgp.encrypt { "input_path": "context:2.downloaded_file", "recipient_key_ids": ["..."] }
Step 5: flow.else
Step 6: file.copy { "source_path": "context:2.downloaded_file", "destination_path": "/archive/" }
Step 7: flow.end {} ← closes flow.if
Step 8: flow.end {} ← closes flow.foreach
Step 9: file.delete { "path": "/staging/*" }
The ExecutionPlanParser converts this flat list into a tree:
Root (Sequence)
├── StepNode(sftp.list)
├── ForEachNode(flow.foreach)
│ └── Body:
│ ├── StepNode(sftp.download)
│ └── IfElseNode(flow.if)
│ ├── Then: StepNode(pgp.encrypt)
│ └── Else: StepNode(file.copy)
└── StepNode(file.delete)
Parsing rules:
- Every
flow.foreachandflow.ifmust have a matchingflow.end flow.elsecan only appear inside aflow.ifblock (at most once)- Blocks can be nested (foreach inside foreach, if inside foreach, etc.)
- Malformed block structure causes the parser to throw before execution begins
5.20.3 Loop Context Variables
When iterating inside a flow.foreach, the engine injects magic context keys:
| Key | Value |
|---|---|
loop.current_item | The current item from the collection (JsonElement) |
loop.current_item.\{prop\} | Property access on the current item (for objects) |
loop.index | Zero-based iteration index |
These are injected directly into the JobContext data dictionary, so existing context: resolution works unchanged. Step handlers don't need any modifications to work inside loops.
Nested loops: Inner loops shadow loop.current_item and loop.index. Outer loop values are preserved at loop.\{depth\}.current_item and loop.\{depth\}.index (zero-based depth), and restored when the inner loop exits.
5.20.4 Condition Operators
The flow.if step evaluates left \{operator\} right using string comparison (case-insensitive) with numeric fallback for comparison operators.
| Operator | Behavior |
|---|---|
equals | Case-insensitive string equality |
not_equals | Negation of equals |
contains | Left contains right (case-insensitive) |
greater_than | Decimal comparison (string fallback if non-numeric) |
less_than | Decimal comparison (string fallback if non-numeric) |
exists | True if left is non-null and non-empty (right is ignored) |
regex | Right is a regex pattern matched against left |
Both left and right support context: references, which are resolved before evaluation.
5.20.5 Edge Cases
| Scenario | Behavior |
|---|---|
| Empty collection in foreach | Body skipped entirely. Execution continues with next step after flow.end. |
| Step outputs inside loops | Keyed as "\{stepOrder\}.\{key\}" — each iteration overwrites previous. Last iteration's values persist after loop. |
| Pause mid-loop | Pause checked before each step including inside loops. On resume, containing foreach restarts from iteration 0 with restored context. |
| Failure + Stop policy | Job fails immediately; abort signal propagates up through the loop. |
| Failure + SkipAndContinue | Failed step skipped; iteration continues with next step in body. |
| Malformed block structure | Parser throws descriptive error; job fails before execution begins. |
5.20.6 Step Executions for Loop Bodies
Steps executed inside a flow.foreach record their iteration_index in the step_executions table. This allows tracking which iteration produced which output or error. Non-loop steps have iteration_index = NULL.