close
Skip to content

ETL & SQL

Execute SQL queries and data operations directly in your workflows.

PostgreSQL and SQLite are built-in SQL step types. DuckDB is available as the official duckdb@v1 action, which keeps the Dagu core binary portable and cgo-free.

Supported Databases

DatabaseActionDescription
PostgreSQLpostgres.query, postgres.importFull-featured PostgreSQL support with advisory locks
SQLitesqlite.query, sqlite.importLightweight embedded database with file locking
DuckDBduckdb@v1 official actionEmbedded analytical database for local OLAP workflows

Basic Usage

yaml
secrets:
  - name: DB_PASSWORD
    provider: env           # Read from environment variable
    key: POSTGRES_PASSWORD  # Source variable name

steps:
  - id: query_users
    action: postgres.query
    with:
      dsn: "postgres://user:${DB_PASSWORD}@localhost:5432/mydb"
      query: "SELECT id, name, email FROM users WHERE active = true"
    output: USERS  # Capture results to variable

Output Destination

Query results are written to stdout by default. Use output: VAR_NAME to capture small results into an environment variable for use in subsequent steps. For large results, use streaming: true with output_file to write directly to a file. When output_file references DAG_RUN_ARTIFACTS_DIR, artifact storage is auto-enabled and the file appears as a run artifact.

Use postgres.query or sqlite.query for built-in SQL queries. Use postgres.import or sqlite.import to load CSV, TSV, or JSONL files into a table.

Use action: duckdb@v1 for DuckDB. DuckDB imports and exports are expressed with DuckDB SQL such as read_csv_auto, read_json_auto, read_parquet, and COPY.

Secrets

Secrets are automatically masked in logs. Use provider: file for Kubernetes/Docker secrets. See Secrets for details.

Key Features

  • Parameterized queries - Prevent SQL injection with named or positional parameters
  • Transactions - Wrap operations in transactions with configurable isolation levels
  • Data import - Import CSV, TSV, or JSONL files into database tables
  • Output formats - Export results as JSONL, JSON, or CSV
  • Streaming - Handle large result sets by streaming to explicit files
  • Locking - Advisory locks (PostgreSQL) and file locks (SQLite) for distributed workflows

Configuration Reference

Connection

FieldTypeDefaultDescription
dsnstringrequiredDatabase connection string

Connection Pooling

Connection pooling is not configurable per-step:

  • Non-worker mode: Uses fixed defaults (1 connection per step)
  • Worker mode (shared-nothing): Managed by global pool configuration at the worker level

For distributed workers running multiple concurrent DAGs, configure PostgreSQL connection pooling via worker.postgres_pool to prevent connection exhaustion.

Execution

FieldTypeDefaultDescription
timeoutint60Query timeout in seconds
transactionboolfalseWrap in transaction
isolation_levelstring-default, read_committed, repeatable_read, serializable
paramsmap/array-Query parameters

Output

FieldTypeDefaultDescription
output_formatstringjsonljsonl, json, csv
headersboolfalseInclude headers in CSV
null_stringstringnullNULL representation
max_rowsint0Limit rows (0 = unlimited)
streamingboolfalseStream to file
output_filestring-Explicit output path for streaming results

Locking

FieldTypeDescription
advisory_lockstringNamed lock (PostgreSQL only)
file_lockboolFile locking (SQLite only)

Data Import

Import data from files into database tables:

yaml
secrets:
  - name: DB_PASSWORD
    provider: env
    key: POSTGRES_PASSWORD

steps:
  - id: import_csv
    action: postgres.import
    with:
      dsn: "postgres://user:${DB_PASSWORD}@localhost:5432/mydb"
      import:
        input_file: /data/users.csv
        table: users
        format: csv
        has_header: true
        batch_size: 1000

Import Configuration

FieldTypeDefaultDescription
input_filestringrequiredSource file path
tablestringrequiredTarget table name
formatstringauto-detectcsv, tsv, jsonl (detected from file extension)
has_headerbooltrueFirst row is header
delimiterstring,Field delimiter
columns[]string-Explicit column names
null_values[]string["", "NULL", "null", "\\N"]Values treated as NULL
batch_sizeint1000Rows per INSERT
on_conflictstringerrorerror, ignore, replace
conflict_targetstring-Column(s) for conflict detection (PostgreSQL UPSERT)
update_columns[]string-Columns to update on conflict
skip_rowsint0Skip N data rows
max_rowsint0Limit rows (0 = unlimited)
dry_runboolfalseValidate without importing

Parameterized Queries

Use named parameters for SQL injection prevention:

yaml
steps:
  - id: safe_query
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      params:
        status: active
        min_age: 18
      query: |
        SELECT * FROM users
        WHERE status = :status AND age >= :min_age

Or positional parameters:

yaml
steps:
  - id: safe_query
    action: sqlite.query
    with:
      dsn: "file:./app.db"
      params:
        - active
        - 18
      query: "SELECT * FROM users WHERE status = ? AND age >= ?"

Transactions

Wrap multiple statements in a transaction:

yaml
steps:
  - id: transfer_funds
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      transaction: true
      isolation_level: serializable
      query: |
        UPDATE accounts SET balance = balance - 100 WHERE id = 1;
        UPDATE accounts SET balance = balance + 100 WHERE id = 2;

Output Formats

JSONL (default)

One JSON object per line, ideal for streaming:

yaml
steps:
  - id: export_jsonl
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      output_format: jsonl
      query: "SELECT * FROM orders"

Output:

{"id":1,"product":"Widget","price":9.99}
{"id":2,"product":"Gadget","price":19.99}

JSON

Array of objects:

yaml
steps:
  - id: export_json
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      output_format: json
      query: "SELECT * FROM orders"

Memory Usage

The json format buffers ALL rows in memory before writing. For large result sets, use jsonl or csv instead to stream rows one at a time. Using json with millions of rows can cause out-of-memory errors.

CSV

Tabular format with optional headers:

yaml
steps:
  - id: export_csv
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      output_format: csv
      headers: true
      query: "SELECT * FROM orders"

Streaming Large Results

For large result sets, stream directly to a file:

yaml
steps:
  - id: export_large_table
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      streaming: true
      output_file: "${DAG_RUN_ARTIFACTS_DIR}/export.jsonl"
      output_format: jsonl    # Use jsonl or csv for streaming
      query: "SELECT * FROM large_table"

Best Practices for Large Results

  • Use output_format: jsonl or csv - these formats stream rows immediately
  • Avoid output_format: json - it buffers all rows in memory before writing
  • Set max_rows as a safety limit for unbounded queries
  • Use streaming: true with output_file to write directly to disk
  • output_file is an explicit target path. Existing files at that path can be replaced, so prefer run-scoped paths such as ${DAG_RUN_ARTIFACTS_DIR}/export.jsonl; this reference auto-enables artifact storage.

Error Handling

yaml
steps:
  - id: query_with_retry
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      timeout: 30
      query: "SELECT * FROM orders"
    retry_policy:
      limit: 3
      interval_sec: 5
    continue_on:
      failure: true

Fan-Out: Running Across Multiple Databases

To run the same query or migration across multiple databases in parallel, use parallel.items with a sub-DAG. Each database gets its own run with separate logs, status, and retry tracking.

yaml
# migrate-tenants.yaml
steps:
  - name: migrate-all-tenants
    parallel:
      items:
        - tenant_a
        - tenant_b
        - tenant_c
      max_concurrent: 2
    action: dag.run
    with:
      dag: run-migration
      params:
        SCHEMA: ${ITEM}
---
name: run-migration

params:
  - SCHEMA: ""

steps:
  - name: migrate
    action: postgres.query
    with:
      dsn: "${DATABASE_URL}"
      transaction: true
      query: |
        SET search_path TO ${SCHEMA};
        ALTER TABLE orders ADD COLUMN IF NOT EXISTS processed_at TIMESTAMPTZ;

For object items (e.g., different DSN per tenant), use ${ITEM.field} references:

yaml
steps:
  - name: migrate-all-tenants
    parallel:
      items:
        - schema: tenant_a
          dsn: "${TENANT_A_DSN}"
        - schema: tenant_b
          dsn: "${TENANT_B_DSN}"
      max_concurrent: 2
    action: dag.run
    with:
      dag: run-migration
      params:
        SCHEMA: ${ITEM.schema}
        DSN: ${ITEM.dsn}
---
name: run-migration

params:
  - SCHEMA: ""
  - DSN: ""

steps:
  - name: migrate
    action: postgres.query
    with:
      dsn: "${DSN}"
      transaction: true
      query: |
        SET search_path TO ${SCHEMA};
        ALTER TABLE orders ADD COLUMN IF NOT EXISTS processed_at TIMESTAMPTZ;

See parallel.items for full fan-out options.

See Also