Intro
This is part 2 of our series on automating a Jira to Snowflake data pipeline with ScriptRunner, and in part 1 we built the working pipeline end to end and you can read it here https://bluegrid.io/edu/automating-jira-%e2%86%92-snowflake-data-pipeline-using-scriptrunner/, and this post turns that pipeline into a production ready job by adding structured logging, retries with backoff, idempotent loading, alerting, and run level metrics you can monitor.
Why this matters
A pipeline that works once is not the same as a pipeline you can trust every day, and the difference is error handling and monitoring that make failures visible, controllable, and recoverable with minimal human effort.
What you will build
You will add a correlation id to every run, write structured JSON logs, implement retry with exponential backoff and jitter, load data idempotently into Snowflake using a staging table and a merge, record each run in a run history table, and send actionable alerts to Slack or email only when needed.
Prerequisites
- ScriptRunner for Jira Cloud
- Jira Cloud admin privileges
- Snowflake account with a role allowed to insert into staging and target tables
- A working copy of the part 1 ScriptRunner job
Architecture recap
Jira Cloud → ScriptRunner scheduled job → Snowflake REST API → Snowflake staging table → Snowflake merge into raw table, plus in this part a logging side channel, a run history table, and a notification path.
Common failure modes
- Network timeouts to Snowflake REST API
- Snowflake SQL errors, like payload too large or transient lock conflicts
- Jira API rate limits and pagination drift
- Authentication and permission changes in either system
- Duplicate loads caused by retries after partial writes
- Clock skew in time windows, yielding overlap or gaps
Design goals
- Every run is traceable with a correlation id
- Transient errors are retried automatically with backoff and jitter
- Duplicates are prevented with idempotent merges in Snowflake
- Humans are alerted only with context that speeds resolution
- SLO-style metrics are queryable for reliability reporting
Step 1: Add correlation IDs and structured logs
Use machine-readable JSON logs so you can ingest them into your log tool or even a Snowflake LOGS table, and always include a correlation ID to tie events together.
import groovy.json.JsonOutput
import java.time.Instant
import java.time.format.DateTimeFormatter
import java.util.concurrent.ThreadLocalRandom
class Obs {
static DateTimeFormatter ISO = DateTimeFormatter.ISO_INSTANT
static String now() { ISO.format(Instant.now()) }
static String corrId() {
def rand = Integer.toHexString(ThreadLocalRandom.current().nextInt())
"run_${ISO.format(Instant.now()).replaceAll('[:.-]','').substring(0,15)}_${rand.substring(0,4)}"
}
static void log(Map m) {
println(JsonOutput.toJson(m))
}
}
Step 2: Implement retry with exponential backoff and jitter
Backoff prevents thundering herds, and jitter reduces synchronized retries that can worsen rate limits.
class Retry {
static <T> T withBackoff(int maxAttempts = 5, long baseMs = 500, long maxMs = 15000, Closure<T> fn) {
int attempt = 1
while (true) {
try {
return fn.call(attempt)
} catch (Throwable t) {
if (attempt >= maxAttempts) throw t
long sleep = Math.min(maxMs, baseMs * Math.pow(2, attempt - 1) as long)
long jitter = (long) (sleep * Math.random() * 0.2)
Thread.sleep(sleep + jitter)
attempt++
}
}
}
}Step 3: create Snowflake staging, raw, and run history tables
Use a staging table keyed by batch_key for idempotency and a run history table for observability.
create table if not exists etl_run_history (
corr_id string primary key,
started_at timestamp_tz,
finished_at timestamp_tz,
status string,
issues_seen number,
rows_inserted number,
error_class string,
error_message string,
latency_ms number
);
create table if not exists jira_issues_stage (
issue_id string,
updated_at timestamp_tz,
payload variant,
batch_key string
);
create table if not exists jira_issues_raw like jira_issues_stage;Step 4: define an idempotent merge to prevent duplicates on retries
Match on issue_id and updated_at so replays of the same window do not create duplicates.
merge into jira_issues_raw t
using (
select issue_id, updated_at, payload, batch_key
from jira_issues_stage
where batch_key = :batch_key
) s
on t.issue_id = s.issue_id and t.updated_at = s.updated_at
when not matched then
insert (issue_id, updated_at, payload, batch_key)
values (s.issue_id, s.updated_at, s.payload, s.batch_key);Optional stored procedure wrapper:
Encapsulate the merge for cleaner calls from ScriptRunner.
create or replace procedure merge_jira_stage_to_raw(batch_key string)
returns string
language sql
as
$$
merge into jira_issues_raw t
using (
select issue_id, updated_at, payload, batch_key
from jira_issues_stage
where batch_key = :batch_key
) s
on t.issue_id = s.issue_id and t.updated_at = s.updated_at
when not matched then
insert (issue_id, updated_at, payload, batch_key)
values (s.issue_id, s.updated_at, s.payload, s.batch_key);
return 'ok';
$$;Step 5: Implement the ScriptRunner main flow with retries, staging, merge, and run history
This skeleton assumes you already have HTTP helpers from part 1 and shows where to log, retry, and write run history.
// imports and classes Obs and Retry from earlier blocks assumed available
def corrId = Obs.corrId()
def startedAt = System.currentTimeMillis()
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "start", severity: "INFO"])
try {
// 1 fetch Jira issues with pagination and rate limit handling
List<Map> allIssues = []
int pages = 0
String nextUrl = buildInitialJqlUrl() // from part 1
while (nextUrl) {
pages++
def page = Retry.withBackoff(6, 400, 20000) { attempt ->
def res = httpGet(nextUrl) // implement httpGet to return [status, body, headers]
if (res.status == 429) {
long waitMs = parseRetryAfterMs(res.headers) ?: 5000
Thread.sleep(waitMs)
throw new RuntimeException("RateLimited")
}
if (res.status >= 500) throw new RuntimeException("Jira5xx")
if (res.status != 200) throw new RuntimeException("JiraHttp " + res.status)
return parseIssuesPage(res.body) // returns [issues: [...], next: urlOrNull]
}
allIssues.addAll(page.issues)
nextUrl = page.next
}
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "jira_fetch_done", severity: "INFO",
counts: [issues_seen: allIssues.size(), pages: pages]])
// 2 chunk and send to Snowflake staging with retry and size control
int rowsInserted = 0
int chunkSize = 500
List<List<Map>> chunks = allIssues.collate(chunkSize)
chunks.eachWithIndex { chunk, idx ->
Retry.withBackoff(5, 800, 20000) { attempt ->
def stmt = buildStageInsertStatement(chunk, corrId) // returns POST body for Snowflake /statements
def resp = callSnowflakeApi(stmt) // implement REST call from part 1
if (resp.isSqlError()) {
if (resp.isPayloadTooLarge()) {
// split the chunk and retry recursively
chunk.collate(Math.max(1, chunk.size() / 2)).each { sub ->
def subStmt = buildStageInsertStatement(sub, corrId)
def subResp = callSnowflakeApi(subStmt)
if (subResp.isSqlError()) throw new RuntimeException("SnowflakeSql " + subResp.code + " " + subResp.message)
rowsInserted += sub.size()
}
return
}
if (resp.isDuplicateKey()) {
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "dup", severity: "WARN", chunk_index: idx])
return
}
throw new RuntimeException("SnowflakeSql " + resp.code + " " + resp.message)
}
rowsInserted += chunk.size()
}
}
// 3 finalize with MERGE for idempotency
def mergeCall = "call merge_jira_stage_to_raw(?)"
execSnowflake(mergeCall, [corrId])
// 4 success bookkeeping
long latency = System.currentTimeMillis() - startedAt
execSnowflake(
"insert into etl_run_history(corr_id, started_at, finished_at, status, issues_seen, rows_inserted, latency_ms) values(?, current_timestamp(), current_timestamp(), 'success', ?, ?, ?)",
[corrId, allIssues.size(), rowsInserted, latency]
)
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "finish", severity: "INFO",
result: "success", counts: [issues_seen: allIssues.size(), rows_inserted: rowsInserted], latency_ms: latency])
} catch (Throwable t) {
long latency = System.currentTimeMillis() - startedAt
execSnowflake(
"insert into etl_run_history(corr_id, started_at, finished_at, status, error_class, error_message, latency_ms) values(?, current_timestamp(), current_timestamp(), 'failed', ?, ?, ?)",
[corrId, t.class.name, (t.message ?: '').take(500), latency]
)
Obs.log([ts: Obs.now(), corr_id: corrId, stage: "finish", severity: "ERROR",
result: "failed", error: [class: t.class.name, message: t.message], latency_ms: latency])
notifyOnFailure(corrId, t)
throw t
}
// helper stubs you should implement or reuse from part 1
String buildInitialJqlUrl() { /* return URL with your JQL and fields */ }
Map httpGet(String url) { /* return [status:int, headers:Map, body:String] */ }
long parseRetryAfterMs(Map headers) { /* parse Retry-After seconds → ms */ }
Map parseIssuesPage(String body) { /* parse JSON → [issues: List<Map>, next: StringOrNull] */ }
String buildStageInsertStatement(List<Map> issues, String batchKey) { /* construct SQL to insert into jira_issues_stage via /statements */ }
Map callSnowflakeApi(String sqlOrPayload) { /* perform REST call and return wrapper with helpers like isSqlError, isPayloadTooLarge, isDuplicateKey */ }
void execSnowflake(String sql, List params) { /* simple REST call wrapper for statements with bindings */ }
void notifyOnFailure(String corrId, Throwable t) { /* call Slack or email, see below */ }Step 6: Send alerts with context to Slack or email
Alert only on failure or on SLO breaches and include the correlation id and a query hint.
{
"text": ":rotating_light: Jira → Snowflake pipeline run failed",
"blocks": [
{ "type": "section", "text": { "type": "mrkdwn", "text": "*Status:* failed\n*Run:* run_20250901T1836Z_7b2c\n*Stage:* load_to_snowflake\n*Error:* SnowflakeSql 100072 payload too large" } },
{ "type": "context", "elements": [ { "type": "mrkdwn", "text": "Snowflake query hint: `select * from etl_run_history where corr_id = 'run_20250901T1836Z_7b2c'`" } ] }
]
}Optional email fallback text:
Subject
Jira → Snowflake run failed run_20250901T1836Z_7b2c
Body
Status failed
Stage load_to_snowflake
Error SnowflakeSql 100072 payload too large
Open in Snowflake select * from etl_run_history where corr_id = 'run_20250901T1836Z_7b2c'
Step 7: Track reliability with SLO-style queries
Measure success rate, latency, and failure patterns and build a simple dashboard in your BI tool.
-- last 30 days success rate
select
date_trunc('day', finished_at) as day,
count_if(status = 'success') * 100.0 / nullif(count(*), 0) as success_rate_pct
from etl_run_history
where finished_at >= dateadd(day, -30, current_timestamp())
group by 1
order by 1;
-- latency p50 and p95 if you capture latency_ms
select
date_trunc('day', finished_at) as day,
approx_percentile(latency_ms, 0.5) as p50_ms,
approx_percentile(latency_ms, 0.95) as p95_ms
from etl_run_history
where finished_at >= dateadd(day, -30, current_timestamp())
group by 1
order by 1;
-- recent failures
select corr_id, started_at, finished_at, error_class, error_message
from etl_run_history
where status = 'failed'
order by finished_at desc
limit 50;Testing failure modes
- Force a 429 by lowering your rate limit configuration, and confirm the script reads Retry After correctly.
- Break Snowflake credentials to verify fail-fast behavior and human alerting.
- Inject an oversized payload to confirm splitting logic works and no duplicates are created.
- Kill the job mid-run and rerun to validate that the merge prevents duplicate rows.
Security best practices
- Store Jira and Snowflake secrets in ScriptRunner secure fields, use least privilege roles in Snowflake, mask PII or avoid sending PII fields entirely, and keep logs free of sensitive data so you can ship them to central logging safely.
Runbook quick reference
- Symptom consistent HTTP 429 from Jira and action add sleeps, respect Retry After, reduce page size.
- Symptom SnowflakeSql 100072 payload too large and action reduce chunk size and verify splitting logic.
- Symptom duplicate rows detected and action check merge keys and ensure staging is merged only by batch_key.
- Symptom frequent network timeouts and action increase backoff ceilings and consider using a smaller warehouse for more predictable spin up.
With correlation IDs, structured logs, retries with backoff and jitter, idempotent Snowflake merges, run history, and actionable alerts, your ScriptRunner-based Jira to Snowflake pipeline becomes resilient and observable, and in part 3, we will cover data modeling and dashboarding to turn your Jira data into operational and executive-level insights.