Snowflake pipeline error handling and monitoring with ScriptRunner

Intro

This is part 2 of our series on automating a Jira to Snowflake data pipeline with ScriptRunner, and in part 1 we built the working pipeline end to end and you can read it here https://bluegrid.io/edu/automating-jira-%e2%86%92-snowflake-data-pipeline-using-scriptrunner/, and this post turns that pipeline into a production ready job by adding structured logging, retries with backoff, idempotent loading, alerting, and run level metrics you can monitor.

Why this matters
A pipeline that works once is not the same as a pipeline you can trust every day, and the difference is error handling and monitoring that make failures visible, controllable, and recoverable with minimal human effort.

What you will build
You will add a correlation id to every run, write structured JSON logs, implement retry with exponential backoff and jitter, load data idempotently into Snowflake using a staging table and a merge, record each run in a run history table, and send actionable alerts to Slack or email only when needed.

Prerequisites

  • ScriptRunner for Jira Cloud
  • Jira Cloud admin privileges
  • Snowflake account with a role allowed to insert into staging and target tables
  • A working copy of the part 1 ScriptRunner job

Architecture recap

Jira Cloud → ScriptRunner scheduled job → Snowflake REST API → Snowflake staging table → Snowflake merge into raw table, plus in this part a logging side channel, a run history table, and a notification path.

Common failure modes

  • Network timeouts to Snowflake REST API
  • Snowflake SQL errors, like payload too large or transient lock conflicts
  • Jira API rate limits and pagination drift
  • Authentication and permission changes in either system
  • Duplicate loads caused by retries after partial writes
  • Clock skew in time windows, yielding overlap or gaps

Design goals

  • Every run is traceable with a correlation id
  • Transient errors are retried automatically with backoff and jitter
  • Duplicates are prevented with idempotent merges in Snowflake
  • Humans are alerted only with context that speeds resolution
  • SLO-style metrics are queryable for reliability reporting

Step 1: Add correlation IDs and structured logs

Use machine-readable JSON logs so you can ingest them into your log tool or even a Snowflake LOGS table, and always include a correlation ID to tie events together.

import groovy.json.JsonOutput
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.concurrent.ThreadLocalRandom

class Obs {
  static DateTimeFormatter ISO = DateTimeFormatter.ISO_INSTANT

  static String now() { ISO.format(Instant.now()) }

  static String corrId() {
    def rand = Integer.toHexString(ThreadLocalRandom.current().nextInt())
    "run_${ISO.format(Instant.now()).replaceAll('[:.-]','').substring(0,15)}_${rand.substring(0,4)}"
  }

  static void log(Map m) {
    println(JsonOutput.toJson(m))
  }
}

Step 2: Implement retry with exponential backoff and jitter

Backoff prevents thundering herds, and jitter reduces synchronized retries that can worsen rate limits.

class Retry {
  static <T> T withBackoff(int maxAttempts = 5, long baseMs = 500, long maxMs = 15000, Closure<T> fn) {
    int attempt = 1
    while (true) {
      try {
        return fn.call(attempt)
      } catch (Throwable t) {
        if (attempt >= maxAttempts) throw t
        long sleep = Math.min(maxMs, baseMs * Math.pow(2, attempt - 1) as long)
        long jitter = (long) (sleep * Math.random() * 0.2)
        Thread.sleep(sleep + jitter)
        attempt++
      }
    }
  }
}

Step 3: create Snowflake staging, raw, and run history tables

Use a staging table keyed by batch_key for idempotency and a run history table for observability.

create table if not exists etl_run_history (
  corr_id string primary key,
  started_at timestamp_tz,
  finished_at timestamp_tz,
  status string,
  issues_seen number,
  rows_inserted number,
  error_class string,
  error_message string,
  latency_ms number
);

create table if not exists jira_issues_stage (
  issue_id string,
  updated_at timestamp_tz,
  payload variant,
  batch_key string
);

create table if not exists jira_issues_raw like jira_issues_stage;

Step 4: define an idempotent merge to prevent duplicates on retries

Match on issue_id and updated_at so replays of the same window do not create duplicates.

merge into jira_issues_raw t
using (
  select issue_id, updated_at, payload, batch_key
  from jira_issues_stage
  where batch_key = :batch_key
) s
on t.issue_id = s.issue_id and t.updated_at = s.updated_at
when not matched then
  insert (issue_id, updated_at, payload, batch_key)
  values (s.issue_id, s.updated_at, s.payload, s.batch_key);

Optional stored procedure wrapper:

Encapsulate the merge for cleaner calls from ScriptRunner.

create or replace procedure merge_jira_stage_to_raw(batch_key string)
returns string
language sql
as
$$
  merge into jira_issues_raw t
  using (
    select issue_id, updated_at, payload, batch_key
    from jira_issues_stage
    where batch_key = :batch_key
  ) s
  on t.issue_id = s.issue_id and t.updated_at = s.updated_at
  when not matched then
    insert (issue_id, updated_at, payload, batch_key)
    values (s.issue_id, s.updated_at, s.payload, s.batch_key);
  return 'ok';
$$;

Step 5: Implement the ScriptRunner main flow with retries, staging, merge, and run history

This skeleton assumes you already have HTTP helpers from part 1 and shows where to log, retry, and write run history.

// imports and classes Obs and Retry from earlier blocks assumed available

def corrId = Obs.corrId()
def startedAt = System.currentTimeMillis()
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "start", severity: "INFO"])

try {
  // 1 fetch Jira issues with pagination and rate limit handling
  List<Map> allIssues = []
  int pages = 0
  String nextUrl = buildInitialJqlUrl() // from part 1

  while (nextUrl) {
    pages++
    def page = Retry.withBackoff(6, 400, 20000) { attempt ->
      def res = httpGet(nextUrl) // implement httpGet to return [status, body, headers]
      if (res.status == 429) {
        long waitMs = parseRetryAfterMs(res.headers) ?: 5000
        Thread.sleep(waitMs)
        throw new RuntimeException("RateLimited")
      }
      if (res.status >= 500) throw new RuntimeException("Jira5xx")
      if (res.status != 200) throw new RuntimeException("JiraHttp " + res.status)
      return parseIssuesPage(res.body) // returns [issues: [...], next: urlOrNull]
    }
    allIssues.addAll(page.issues)
    nextUrl = page.next
  }

  Obs.log([ts: Obs.now(), corr_id: corrId, stage: "jira_fetch_done", severity: "INFO",
           counts: [issues_seen: allIssues.size(), pages: pages]])

  // 2 chunk and send to Snowflake staging with retry and size control
  int rowsInserted = 0
  int chunkSize = 500
  List<List<Map>> chunks = allIssues.collate(chunkSize)

  chunks.eachWithIndex { chunk, idx ->
    Retry.withBackoff(5, 800, 20000) { attempt ->
      def stmt = buildStageInsertStatement(chunk, corrId) // returns POST body for Snowflake /statements
      def resp = callSnowflakeApi(stmt) // implement REST call from part 1
      if (resp.isSqlError()) {
        if (resp.isPayloadTooLarge()) {
          // split the chunk and retry recursively
          chunk.collate(Math.max(1, chunk.size() / 2)).each { sub ->
            def subStmt = buildStageInsertStatement(sub, corrId)
            def subResp = callSnowflakeApi(subStmt)
            if (subResp.isSqlError()) throw new RuntimeException("SnowflakeSql " + subResp.code + " " + subResp.message)
            rowsInserted += sub.size()
          }
          return
        }
        if (resp.isDuplicateKey()) {
          Obs.log([ts: Obs.now(), corr_id: corrId, stage: "dup", severity: "WARN", chunk_index: idx])
          return
        }
        throw new RuntimeException("SnowflakeSql " + resp.code + " " + resp.message)
      }
      rowsInserted += chunk.size()
    }
  }

  // 3 finalize with MERGE for idempotency
  def mergeCall = "call merge_jira_stage_to_raw(?)"
  execSnowflake(mergeCall, [corrId])

  // 4 success bookkeeping
  long latency = System.currentTimeMillis() - startedAt
  execSnowflake(
    "insert into etl_run_history(corr_id, started_at, finished_at, status, issues_seen, rows_inserted, latency_ms) values(?, current_timestamp(), current_timestamp(), 'success', ?, ?, ?)",
    [corrId, allIssues.size(), rowsInserted, latency]
  )

  Obs.log([ts: Obs.now(), corr_id: corrId, stage: "finish", severity: "INFO",
           result: "success", counts: [issues_seen: allIssues.size(), rows_inserted: rowsInserted], latency_ms: latency])

} catch (Throwable t) {
  long latency = System.currentTimeMillis() - startedAt
  execSnowflake(
    "insert into etl_run_history(corr_id, started_at, finished_at, status, error_class, error_message, latency_ms) values(?, current_timestamp(), current_timestamp(), 'failed', ?, ?, ?)",
    [corrId, t.class.name, (t.message ?: '').take(500), latency]
  )
  Obs.log([ts: Obs.now(), corr_id: corrId, stage: "finish", severity: "ERROR",
           result: "failed", error: [class: t.class.name, message: t.message], latency_ms: latency])
  notifyOnFailure(corrId, t)
  throw t
}

// helper stubs you should implement or reuse from part 1

String buildInitialJqlUrl() { /* return URL with your JQL and fields */ }
Map httpGet(String url) { /* return [status:int, headers:Map, body:String] */ }
long parseRetryAfterMs(Map headers) { /* parse Retry-After seconds → ms */ }
Map parseIssuesPage(String body) { /* parse JSON → [issues: List<Map>, next: StringOrNull] */ }
String buildStageInsertStatement(List<Map> issues, String batchKey) { /* construct SQL to insert into jira_issues_stage via /statements */ }
Map callSnowflakeApi(String sqlOrPayload) { /* perform REST call and return wrapper with helpers like isSqlError, isPayloadTooLarge, isDuplicateKey */ }
void execSnowflake(String sql, List params) { /* simple REST call wrapper for statements with bindings */ }
void notifyOnFailure(String corrId, Throwable t) { /* call Slack or email, see below */ }

Step 6: Send alerts with context to Slack or email

Alert only on failure or on SLO breaches and include the correlation id and a query hint.

{
  "text": ":rotating_light: Jira → Snowflake pipeline run failed",
  "blocks": [
    { "type": "section", "text": { "type": "mrkdwn", "text": "*Status:* failed\n*Run:* run_20250901T1836Z_7b2c\n*Stage:* load_to_snowflake\n*Error:* SnowflakeSql 100072 payload too large" } },
    { "type": "context", "elements": [ { "type": "mrkdwn", "text": "Snowflake query hint: `select * from etl_run_history where corr_id = 'run_20250901T1836Z_7b2c'`" } ] }
  ]
}

Optional email fallback text:

Subject
Jira → Snowflake run failed run_20250901T1836Z_7b2c
Body
Status failed
Stage load_to_snowflake
Error SnowflakeSql 100072 payload too large
Open in Snowflake select * from etl_run_history where corr_id = 'run_20250901T1836Z_7b2c'

Step 7: Track reliability with SLO-style queries

Measure success rate, latency, and failure patterns and build a simple dashboard in your BI tool.

-- last 30 days success rate
select
  date_trunc('day', finished_at) as day,
  count_if(status = 'success') * 100.0 / nullif(count(*), 0) as success_rate_pct
from etl_run_history
where finished_at >= dateadd(day, -30, current_timestamp())
group by 1
order by 1;

-- latency p50 and p95 if you capture latency_ms
select
  date_trunc('day', finished_at) as day,
  approx_percentile(latency_ms, 0.5) as p50_ms,
  approx_percentile(latency_ms, 0.95) as p95_ms
from etl_run_history
where finished_at >= dateadd(day, -30, current_timestamp())
group by 1
order by 1;

-- recent failures
select corr_id, started_at, finished_at, error_class, error_message
from etl_run_history
where status = 'failed'
order by finished_at desc
limit 50;

Testing failure modes

  • Force a 429 by lowering your rate limit configuration, and confirm the script reads Retry After correctly.
  • Break Snowflake credentials to verify fail-fast behavior and human alerting.
  • Inject an oversized payload to confirm splitting logic works and no duplicates are created.
  • Kill the job mid-run and rerun to validate that the merge prevents duplicate rows.

Security best practices

  • Store Jira and Snowflake secrets in ScriptRunner secure fields, use least privilege roles in Snowflake, mask PII or avoid sending PII fields entirely, and keep logs free of sensitive data so you can ship them to central logging safely.

Runbook quick reference

  • Symptom consistent HTTP 429 from Jira and action add sleeps, respect Retry After, reduce page size.
  • Symptom SnowflakeSql 100072 payload too large and action reduce chunk size and verify splitting logic.
  • Symptom duplicate rows detected and action check merge keys and ensure staging is merged only by batch_key.
  • Symptom frequent network timeouts and action increase backoff ceilings and consider using a smaller warehouse for more predictable spin up.

With correlation IDs, structured logs, retries with backoff and jitter, idempotent Snowflake merges, run history, and actionable alerts, your ScriptRunner-based Jira to Snowflake pipeline becomes resilient and observable, and in part 3, we will cover data modeling and dashboarding to turn your Jira data into operational and executive-level insights.

BlueGrid.io Content Team

Three people pose together against a plain white background. The woman on the left is smiling with her hand on her hip, while the two men beside her stand closely, one in a hoodie and the other in a plaid shirt.

BlueGrid.io Content Team

BlueGrid.io Team is an editorial collective of engineers, practitioners, and contributors sharing insights across technology, operations, company culture, and the people behind the systems. Content is created through interviews, hands-on experience, internal collaboration, and editorial review, reflecting both how systems are built and how teams work together in real-world environments.

Share this post

Share this link via

Or copy link