API Documentation

Integrate PingPulse into your workflows with our simple webhook-based API.

Base URL
https://pingpulsehq.com

Authentication

All API requests require authentication via your personal webhook key. Get your key from Settings → Webhook API Key in the dashboard.

Pass your key via:

GET: ?key=YOUR_KEY
POST: {"key": "YOUR_KEY", ...}

Security: Never expose your API key in client-side code. Use server-side requests or environment variables.

SHK

Static Workflows

Predefined workflows with sequential stages (1, 2, 3...). Create workflows in the dashboard, then ping stages from your pipeline.

GET POST /shk/{workflow_id}

Execute Stage

Execute a stage in a static workflow. First ping (stage=1) creates a new instance.

Parameters

stageintegerStage number (1, 2, 3...)
keystringYour API key (required)
agentstringAgent identifier for handoff tracking (required if strict mode enabled)

Example

curl "https://pingpulsehq.com/shk/swf123?stage=1&key=YOUR_KEY"

Response

{
  "status": "success",
  "workflow_id": "swf123",
  "instance_id": "sabc456",
  "stage_order": 1,
  "workflow_status": "running"
}
DHK

Dynamic Workflows

Workflows created on-the-fly via webhooks. Supports hierarchical stage paths (1, 2.1, 2.2, 3.4.2...) for branching pipelines.

GET POST /dhk/{workflow_id}

Execute Stage

Execute a stage in a dynamic workflow. Use start=yes to create a new workflow.

Parameters

stagePathstringStage path (e.g., "1", "2.1", "3.4.2")
keystringYour API key (required)
startstring"yes" to create new workflow
finalstring"yes" to mark final stage
stagenamestringOptional label for the stage
agentstringAgent name for handoff tracking (e.g., "gpt", "data-processor") (required if strict mode enabled)

Examples

# Create workflow and start
curl "https://pingpulsehq.com/dhk/ml_pipeline?stagePath=1&start=yes&key=YOUR_KEY"
# Execute branch stage with label
curl "https://pingpulsehq.com/dhk/ml_pipeline?stagePath=2.1&stagename=Train%20GPU&key=YOUR_KEY"
# With agent tracking for handoff insights
curl "https://pingpulsehq.com/dhk/ml_pipeline?stagePath=3&key=YOUR_KEY&agent=model-trainer"
GET /dhk/{workflow_id}/tree

Get Workflow Tree

Get the current state of all stages in a dynamic workflow.

Response

{
  "1": { "status": "healthy", "stage_name": "Ingest" },
  "2.1": { "status": "healthy", "stage_name": "Train GPU A" },
  "2.2": { "status": "pending", "stage_name": "Train GPU B" },
  "3": { "status": "pending" }
}
GET /api/workflows/{workflow_id}/instances

Get Instance History

Get the last 10 workflow instances with their complete stage execution history. Useful for AI agents to analyze past runs, debug failures, or implement retry logic.

Response

{
  "running_instance": {
    "id": "d8x9y0z1",
    "status": "running",
    "started_at": "2025-01-15T10:30:00Z",
    "agent_code": "orchestrator",
    "stage_statuses": {
      "1": { "status": "healthy", "stage_name": "Init", "agent_code": "gpt-4" },
      "2": { "status": "healthy", "stage_name": "Process", "agent_code": "worker" }
    }
  },
  "recent_instances": [
    {
      "id": "d7w8x9y0",
      "status": "completed",
      "started_at": "2025-01-15T09:00:00Z",
      "completed_at": "2025-01-15T09:05:30Z",
      "agent_code": "orchestrator",
      "stage_statuses": { ... }
    },
    {
      "id": "d6v7w8x9",
      "status": "failed",
      "started_at": "2025-01-15T08:00:00Z",
      "failure_reason": "Stage 3 timeout",
      "stage_statuses": { ... }
    }
  ]
}

Python Example

# Get instance history for debugging or retry logic
import requests

response = requests.get(
    "https://pingpulsehq.com/api/workflows/my_pipeline/instances",
    cookies={"access_token": "YOUR_JWT_TOKEN"}  # Or use session auth
)
data = response.json()

# Check if there's a running instance
if data["running_instance"]:
    print(f"Active instance: {data['running_instance']['id']}")

# Analyze recent failures
for inst in data["recent_instances"]:
    if inst["status"] == "failed":
        print(f"Failed: {inst['id']} - {inst.get('failure_reason', 'Unknown')}")
        # Inspect stage_statuses to find where it failed
        for stage_key, stage_info in inst["stage_statuses"].items():
            if stage_info["status"] != "healthy":
                print(f"  Stage {stage_key}: {stage_info['status']}")
PHK

Process Hooks

Track long-running jobs with simple start/finish pings. Create hooks in the dashboard, then call from your job runner.

GET POST /phk/{process_hook_id}/start

Start Process

Signal the start of a process run. Returns a run_id for tracking.

Example (GET)

curl "https://pingpulsehq.com/phk/ph123/start?key=YOUR_KEY"

With Metadata (POST)

curl -X POST "https://pingpulsehq.com/phk/ph123/start" \
  -H "Content-Type: application/json" \
  -d '{"key":"YOUR_KEY","agent":"ci-server","metadata":{"job":"nightly_backup"}}'
GET POST /phk/{process_hook_id}/finish

Finish Process

Signal the completion of a process run. Include result metadata.

Example (GET)

curl "https://pingpulsehq.com/phk/ph123/finish?key=YOUR_KEY"

With Metadata (POST)

curl -X POST "https://pingpulsehq.com/phk/ph123/finish" \
  -H "Content-Type: application/json" \
  -d '{"key":"YOUR_KEY","agent":"ci-server","metadata":{"status":"success","duration_ms":45000}}'
HBK

Pulses

Monitor your servers, crons, robots, and IoT devices with periodic pings. Get alerted when pings stop arriving.

GET POST /hbk/{target_id}

Send Pulse

Send a pulse ping for a monitored target. Create targets in the dashboard first.

Example

curl "https://pingpulsehq.com/hbk/cron_backup?key=YOUR_KEY"

With Metadata (POST)

curl -X POST "https://pingpulsehq.com/hbk/cron_backup" \
  -H "Content-Type: application/json" \
  -d '{"key":"YOUR_KEY","status":"ok","rows_processed":1500}'
API

Notifications

Configure notification channels (email, Slack, Discord, PagerDuty, webhooks) for alerts. Managed via the dashboard.

Supported Integrations

📧
Email
💬
Slack
🎮
Discord
🚨
PagerDuty
🔗
Webhook

Configure notification channels in Dashboard → Notification Channels.

HITL

Approvals (Human-in-the-Loop)

Request human approval before proceeding with critical actions. The AI agent creates an approval request, then polls for the result while a human reviews via email or dashboard.

How It Works

  1. Create: Agent sends POST /api/approvals with context
  2. Notify: Email sent to user with Approve/Deny buttons
  3. Poll: Agent polls GET /api/approvals/{id} until result is not null
  4. Act: Agent proceeds (result=1) or aborts (result=0)
POST /api/approvals

Create an approval request. Returns immediately with approval_id for polling.

Request Body

{
  "context": "Publish blog post: Q4 Earnings Report (1,500 words)",
  "timeout_seconds": 300,
  "timeout_action": "deny"
}
context
Required. What needs approval.
timeout_seconds
Default: 300 (5 min). Max: 86400.
timeout_action
"deny" (default) or "approve"

Response (201 Created)

{
  "approval_id": "apr_8k2m4n6p",
  "status": "pending",
  "result": null,
  "expires_at": "2024-01-15T14:05:00Z"
}
GET /api/approvals/{approval_id}

Poll approval status. Keep polling until result is not null.

Response (Pending)

{
  "approval_id": "apr_8k2m4n6p",
  "status": "pending",
  "result": null,
  "seconds_remaining": 182
}

Response (Decided)

{
  "approval_id": "apr_8k2m4n6p",
  "status": "approved",
  "result": 1,
  "notes": "Looks good, proceed!",
  "decided_by": "[email protected]"
}
Result Values
  • null - Still pending, keep polling
  • 1 - Approved, proceed with action
  • 0 - Denied, do not proceed

Python Example

import requests
import time

PINGPULSE_KEY = "YOUR_KEY"
BASE_URL = "https://pingpulsehq.com"

def request_approval(context, timeout=300, default="deny"):
    """Request human approval. Returns True if approved, False if denied."""

    # Step 1: Create approval request
    resp = requests.post(
        f"{BASE_URL}/api/approvals",
        params={"key": PINGPULSE_KEY},
        json={
            "context": context,
            "timeout_seconds": timeout,
            "timeout_action": default
        }
    )
    approval = resp.json()
    approval_id = approval["approval_id"]
    print(f"Approval requested: {approval_id}")

    # Step 2: Poll until decided
    while True:
        resp = requests.get(
            f"{BASE_URL}/api/approvals/{approval_id}",
            params={"key": PINGPULSE_KEY}
        )
        status = resp.json()

        if status["result"] is not None:
            return status["result"] == 1

        print(f"Waiting... {status['seconds_remaining']}s remaining")
        time.sleep(5)

# Usage
if request_approval("Publish article: Q4 Earnings Report"):
    publish_article()
else:
    save_as_draft()
DEBUG

Time-Travel Replay

Step through workflow executions, fork from any point, and compare runs side-by-side. Perfect for debugging agent behavior and understanding execution flow.

GET /api/instances/{instance_id}/timeline

Get complete execution timeline for an instance with full context at each step.

Response

{
  "instance_id": "d12345678",
  "workflow_id": "dwf_abc123",
  "status": "completed",
  "total_steps": 5,
  "timeline": [
    {
      "step": 0,
      "stage_path": "1",
      "status": "success",
      "executed_at": "2024-01-15T10:00:00Z",
      "response_time_ms": 150,
      "agent_code": "claude-1",
      "model_id": "claude-3-opus",
      "input_tokens": 1500,
      "output_tokens": 500,
      "cost_usd": 0.045,
      "cumulative_cost_usd": 0.045,
      "context_size_bytes": 4096,
      "handoff_reason": "task_complete",
      "payload": {"task": "analyze data"}
    }
  ]
}
POST /api/instances/{instance_id}/fork

Fork an instance from a specific stage with optional payload modifications. Creates a new instance with executions copied up to the fork point.

Request Body

{
  "fork_from_stage": "2",
  "payload_modifications": {
    "1": {"retry": true},
    "2": {"modified_input": "new value"}
  }
}

Response (201 Created)

{
  "forked_instance_id": "d87654321",
  "session_id": "uuid-here",
  "fork_from_stage": "2",
  "stages_copied": 2
}
POST /api/instances/compare

Compare two instances side-by-side. Returns aggregates, differences, and divergence point.

Request Body

{
  "instance_a": "d12345678",
  "instance_b": "d87654321"
}

Response

{
  "session_id": "uuid-here",
  "comparison": {
    "instance_a": {"id": "d12345678", "aggregates": {...}},
    "instance_b": {"id": "d87654321", "aggregates": {...}},
    "divergence_step": 3,
    "differences": {
      "cost_diff_usd": 0.025,
      "token_diff": 500,
      "time_diff_ms": 1200
    }
  }
}
POST /api/instances/{instance_id}/rerun

Re-run an instance from the beginning with optional payload modifications.

Request Body

{
  "payload_modifications": {
    "1": {"retry_attempt": 2}
  }
}

Response (201 Created)

{
  "new_instance_id": "d98765432",
  "session_id": "uuid-here",
  "message": "Instance created and ready for re-execution"
}
COST

Cost Monitoring & Circuit Breakers

Track LLM costs in real-time, detect anomalies, and automatically terminate runaway workflows. Protect your budget with configurable circuit breakers.

Circuit Breaker Actions

alert
Send notification, continue execution
pause
Pause instance, require manual resume
terminate
Immediately stop instance
GET /api/workflows/{workflow_id}/cost

Get cost summary for a workflow over the past N days.

Query Parameters

days - Number of days to look back (default: 7)

Response

{
  "total_cost_usd": 45.23,
  "today_cost_usd": 8.50,
  "average_cost_per_instance_usd": 0.45,
  "instance_count": 100,
  "most_expensive_instance": {
    "id": "d12345678",
    "cost_usd": 2.30
  },
  "period_days": 7
}
GET /api/instances/{instance_id}/cost

Get detailed cost breakdown for an instance by stage.

Response

{
  "instance_id": "d12345678",
  "total_cost_usd": 0.125,
  "breakdown": [
    {
      "stage_path": "1",
      "model_id": "claude-3-opus",
      "input_tokens": 1500,
      "output_tokens": 500,
      "cost_usd": 0.045,
      "executed_at": "2024-01-15T10:00:00Z"
    }
  ]
}
PUT /api/workflows/{workflow_id}/budget

Configure budget limits and circuit breaker settings for a workflow.

Request Body

{
  "budget_enabled": true,
  "budget_limit_usd": 100.0,
  "budget_alert_threshold_usd": 50.0,
  "budget_daily_limit_usd": 500.0,
  "budget_action": "terminate",
  "cost_anomaly_detection": true,
  "cost_anomaly_multiplier": 3.0
}
budget_limit_usd
Max cost per instance before circuit breaker triggers
budget_action
"alert", "pause", or "terminate"
cost_anomaly_detection
Alert when cost exceeds N times the average
cost_anomaly_multiplier
Threshold multiplier (e.g., 3.0 = 3x average)
GET /api/workflows/{workflow_id}/cost-alerts

Get cost alerts and circuit breaker events for a workflow.

Query Parameters

limit - Max alerts (default: 50)
include_resolved - Include resolved (default: false)

Response

{
  "alerts": [
    {
      "id": 1,
      "alert_type": "budget_exceeded",
      "instance_id": "d12345678",
      "current_cost_usd": 105.50,
      "threshold_usd": 100.0,
      "action_taken": "terminate",
      "message": "Instance cost $105.50 exceeds budget limit $100.00",
      "created_at": "2024-01-15T10:30:00Z"
    }
  ]
}
POST /api/instances/{instance_id}/resume

Resume an instance that was paused by a circuit breaker. Only works if budget_action was "pause".

Response

{
  "message": "Instance resumed",
  "instance_id": "d12345678",
  "status": "running"
}

Sending Cost Data with Webhooks

Include LLM cost data in your webhook payloads for automatic tracking:

curl -X POST "https://pingpulsehq.com/dhk/dwf_xxx?stage=2&key=YOUR_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model_id": "claude-3-opus",
    "usage": {
      "input_tokens": 1500,
      "output_tokens": 500,
      "total_tokens": 2000
    },
    "cost_usd": 0.045,
    "tool_calls": [
      {"tool_name": "web_search", "latency_ms": 250, "success": true}
    ]
  }'

Error Codes

Code Description
400Invalid request or missing required fields
401Invalid API key or authentication failed
403Access denied (wrong user/workflow)
404Resource not found
409Stage executed out of order (strict mode)
429Rate limit exceeded
500Internal server error

Code Examples

Python

import requests

PINGPULSE_KEY = "YOUR_KEY"
BASE_URL = "https://pingpulsehq.com"

def ping_stage(workflow_id, stage, **metadata):
    """Ping a static workflow stage."""
    url = f"{BASE_URL}/shk/{workflow_id}"
    params = {"stage": stage, "key": PINGPULSE_KEY}
    return requests.get(url, params=params).json()

def ping_dynamic(workflow_id, stage_path, start=False, **kwargs):
    """Ping a dynamic workflow stage."""
    url = f"{BASE_URL}/dhk/{workflow_id}"
    params = {"stagePath": stage_path, "key": PINGPULSE_KEY}
    if start:
        params["start"] = "yes"
    if kwargs.get("stagename"):
        params["stagename"] = kwargs["stagename"]
    return requests.get(url, params=params).json()

# Usage
ping_stage("swf123", 1)
ping_dynamic("ml_pipeline", "1", start=True)
ping_dynamic("ml_pipeline", "2.1", stagename="Training")

Node.js

const axios = require('axios');

const PINGPULSE_KEY = 'YOUR_KEY';
const BASE_URL = 'https://pingpulsehq.com';

async function pingStage(workflowId, stage) {
  const url = `${BASE_URL}/shk/${workflowId}`;
  return axios.get(url, { params: { stage, key: PINGPULSE_KEY } });
}

async function pingDynamic(workflowId, stagePath, options = {}) {
  const url = `${BASE_URL}/dhk/${workflowId}`;
  const params = { stagePath, key: PINGPULSE_KEY };
  if (options.start) params.start = 'yes';
  if (options.stagename) params.stagename = options.stagename;
  return axios.get(url, { params });
}

// Usage
await pingStage('swf123', 1);
await pingDynamic('ml_pipeline', '1', { start: true });

cURL

# Static workflow
curl "https://pingpulsehq.com/shk/swf123?stage=1&key=YOUR_KEY"

# Dynamic workflow
curl "https://pingpulsehq.com/dhk/ml_pipeline?stagePath=1&start=yes&key=YOUR_KEY"

# Process hook start
curl "https://pingpulsehq.com/phk/ph123/start?key=YOUR_KEY"

# Heartbeat
curl "https://pingpulsehq.com/hbk/cron_backup?key=YOUR_KEY"

Integrations

PingPulse integrates with any system that can make HTTP requests. Choose the integration method that fits your stack.

AIRFLOW

Apache Airflow Integration

Official Airflow provider for PingPulse. Monitor your DAGs with automatic pings on task success/failure.

Installation

pip install pingpulse-airflow

Then configure your API key in Airflow: Admin → Connections → pingpulse_default

Choose Your Integration Pattern

Pattern Modifies Code Visible in DAG Best For
Operator No Yes (extra nodes) New DAGs, explicit tracking
Decorator Yes (wrap func) No Clean DAGs, TaskFlow API
Callback No No Existing DAGs, any operator
Option 1

Operator (Explicit Tasks)

Add dedicated ping tasks to your DAG. Pings are visible as separate nodes in the Airflow UI.

from airflow import DAG
from airflow.operators.python import PythonOperator
from pingpulse_airflow import PingPulseOperator
from datetime import datetime

with DAG('etl_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    extract = PythonOperator(task_id='extract', python_callable=extract_data)

    ping_start = PingPulseOperator(
        task_id='ping_start',
        workflow_id='dwf123abc',
        stage_path='1',
        start=True,  # Creates new workflow instance
    )

    transform = PythonOperator(task_id='transform', python_callable=transform_data)

    ping_complete = PingPulseOperator(
        task_id='ping_complete',
        workflow_id='dwf123abc',
        stage_path='2',
        final=True,  # Marks workflow complete
    )

    extract >> ping_start >> transform >> ping_complete
DAG Visualization:
[Extract] → [Ping Start] → [Transform] → [Ping Complete]
Option 2

Decorator (Auto-Ping)

Wrap your Python functions to automatically ping on success/failure. No extra nodes in the DAG.

from airflow import DAG
from pingpulse_airflow import pingpulse_task
from datetime import datetime

with DAG('etl_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    @pingpulse_task(workflow_id='dwf123abc', stage_path='1', start=True)
    def extract():
        data = fetch_from_source()
        return {'records': len(data)}

    @pingpulse_task(workflow_id='dwf123abc', stage_path='2')
    def transform(data):
        return process(data)

    @pingpulse_task(workflow_id='dwf123abc', stage_path='3', final=True)
    def load(data):
        save_to_destination(data)

    extract() >> transform() >> load()
DAG Visualization:
[Extract] → [Transform] → [Load]
Pings happen automatically inside each task
Option 3

Callbacks (Retrofit Existing DAGs)

Add monitoring to existing DAGs without modifying task code. Attach callbacks to any operator.

from airflow import DAG
from airflow.operators.python import PythonOperator
from pingpulse_airflow import pingpulse_success, pingpulse_failure
from datetime import datetime

with DAG('existing_pipeline', start_date=datetime(2024, 1, 1), schedule='@daily') as dag:

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
        on_success_callback=pingpulse_success('dwf123abc', '1', start=True),
        on_failure_callback=pingpulse_failure('dwf123abc', '1'),
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
        on_success_callback=pingpulse_success('dwf123abc', '2'),
        on_failure_callback=pingpulse_failure('dwf123abc', '2'),
    )

    load = PythonOperator(
        task_id='load',
        python_callable=load_data,
        on_success_callback=pingpulse_success('dwf123abc', '3', final=True),
        on_failure_callback=pingpulse_failure('dwf123abc', '3'),
    )

    extract >> transform >> load
Best for existing DAGs
No code changes needed inside your task functions

What Gets Sent to PingPulse

Each ping automatically includes Airflow context:

{
  "status": "success",
  "duration": 12.345,
  "airflow": {
    "dag_id": "etl_pipeline",
    "task_id": "transform",
    "run_id": "scheduled__2024-01-15T00:00:00+00:00",
    "execution_date": "2024-01-15T00:00:00+00:00",
    "try_number": 1
  }
}

Full documentation and source code:

pingpulse-airflow on GitHub

GitHub Actions

Add PingPulse monitoring to your GitHub Actions workflows to track CI/CD pipeline execution.

Add to Your Workflow

Add a step after each stage you want to track:

name: CI Pipeline

on: [push]

jobs:
  build:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install dependencies
        run: npm install

      - name: Ping PingPulse - Install Complete
        run: |
          curl -X POST "${{ secrets.PINGPULSE_URL }}?stagePath=1&start=yes" \
            -H "Content-Type: application/json" \
            -d '{"stage": "install", "commit": "${{ github.sha }}"}'

      - name: Run tests
        run: npm test

      - name: Ping PingPulse - Tests Complete
        run: |
          curl -X POST "${{ secrets.PINGPULSE_URL }}?stagePath=2" \
            -H "Content-Type: application/json" \
            -d '{"stage": "test", "status": "passed"}'

      - name: Build
        run: npm run build

      - name: Ping PingPulse - Build Complete
        run: |
          curl -X POST "${{ secrets.PINGPULSE_URL }}?stagePath=3&final=yes" \
            -H "Content-Type: application/json" \
            -d '{"stage": "build", "artifact": "dist/"}'

Setup Steps

  1. 1 Create a dynamic workflow in PingPulse dashboard
  2. 2 Copy your webhook URL: https://app.pingpulse.com/dhk/YOUR_ID?key=YOUR_KEY
  3. 3 Add as repository secret: PINGPULSE_URL
  4. 4 Add curl steps after each stage in your workflow

Cron Jobs

Monitor scheduled tasks using heartbeat monitoring. Get alerted if your cron job fails to run on schedule.

Add Ping to Your Cron Job

Append a curl command to ping PingPulse on successful completion:

# In your crontab (crontab -e)

# Option 1: Ping only on success (&&)
0 * * * * /path/to/backup.sh && curl -fsS "https://app.pingpulse.com/hhk/MONITOR_ID?key=API_KEY" > /dev/null

# Option 2: Always ping with exit status
0 2 * * * /path/to/job.sh; curl -fsS "https://app.pingpulse.com/hhk/MONITOR_ID?key=API_KEY&status=$?" > /dev/null

Or wrap your script:

#!/bin/bash
# backup.sh

set -e  # Exit on error

# Your backup logic
pg_dump -h localhost mydb > /backups/mydb_$(date +%Y%m%d).sql
aws s3 cp /backups/mydb_$(date +%Y%m%d).sql s3://my-backups/

# Ping PingPulse on success
curl -fsS "https://app.pingpulse.com/hhk/MONITOR_ID?key=API_KEY" > /dev/null

Setup Heartbeat Monitor

  1. 1 Create a Heartbeat (Pulse) in the dashboard
  2. 2 Set the expected schedule (e.g., every hour, daily at 2am)
  3. 3 Configure grace period (how late before alerting)
  4. 4 Copy the ping URL and add to your cron job

Generic HTTP

Integrate from any programming language or system that can make HTTP requests.

Python
import requests

PINGPULSE_URL = "https://app.pingpulse.com/dhk/YOUR_WORKFLOW_ID"
API_KEY = "your_api_key"

def ping_stage(stage_path, payload=None, start=False, final=False):
    params = {"key": API_KEY, "stagePath": stage_path}
    if start:
        params["start"] = "yes"
    if final:
        params["final"] = "yes"

    response = requests.post(PINGPULSE_URL, params=params, json=payload or {})
    return response.json()

# Usage
ping_stage("1", {"step": "extract"}, start=True)
ping_stage("2", {"records": 1000})
ping_stage("3", {"status": "complete"}, final=True)
JavaScript / Node.js
const PINGPULSE_URL = "https://app.pingpulse.com/dhk/YOUR_WORKFLOW_ID";
const API_KEY = "your_api_key";

async function pingStage(stagePath, payload = {}, { start = false, final = false } = {}) {
  const params = new URLSearchParams({
    key: API_KEY,
    stagePath,
    ...(start && { start: "yes" }),
    ...(final && { final: "yes" }),
  });

  const response = await fetch(`${PINGPULSE_URL}?${params}`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify(payload),
  });

  return response.json();
}

// Usage
await pingStage("1", { step: "extract" }, { start: true });
await pingStage("2", { records: 1000 });
await pingStage("3", { status: "complete" }, { final: true });
curl / Bash
# Start a new workflow instance
curl -X POST "https://app.pingpulse.com/dhk/YOUR_ID?key=API_KEY&stagePath=1&start=yes" \
  -H "Content-Type: application/json" \
  -d '{"step": "extract"}'

# Send intermediate stage
curl -X POST "https://app.pingpulse.com/dhk/YOUR_ID?key=API_KEY&stagePath=2" \
  -H "Content-Type: application/json" \
  -d '{"records": 1000}'

# Complete the workflow
curl -X POST "https://app.pingpulse.com/dhk/YOUR_ID?key=API_KEY&stagePath=3&final=yes" \
  -H "Content-Type: application/json" \
  -d '{"status": "complete"}'

Webhook Parameters

Parameter Required Description
key Required Your API key
stagePath Required Stage identifier (e.g., "1", "2", "2.1" for branches)
start Optional Set to "yes" to start a new workflow instance
final Optional Set to "yes" to mark workflow as complete

Workflow Modes Explained

PingPulse supports two validation modes for workflow execution. Watch the animations below to understand how each mode handles stage execution.

Sequential Execution (Normal Flow)

In a typical workflow, stages execute in order. Each webhook ping is tracked and the workflow progresses through each stage sequentially.

Normal Execution
1
2
3
4
Start
Stage 1
Stage 2
Complete

Stages execute in order, each ping is tracked.

Strict Mode: Missed Stage Detection

Strict mode enforces sequential execution. If a stage is skipped (e.g., Stage 3 pings before Stage 2), PingPulse detects the missed stage and triggers an alert. Use this mode for critical pipelines where order matters.

Strict Mode
1
2
3
4
Start
Stage 1
Stage 2
Complete

Starting workflow demonstration...

API Behavior: When a stage is missed, the webhook returns 400 Bad Request with error details. The ping is logged to the failed queue for investigation.

Flexible Mode: Out-of-Order Execution

Flexible mode accepts stages in any order within a grace period. This is ideal for distributed systems, async processing, or when execution order is non-deterministic. All stages are tracked regardless of arrival order.

Flexible Mode
1
2
3
4
Start
Stage 1
Stage 2
Complete

Starting workflow demonstration...

API Behavior: All pings return 200 OK regardless of order. Set validation_mode: "flexible" when creating the workflow.

Ready to Integrate?

Get your API key and start monitoring in minutes.

Get Your API Key