Masumi Logo

Agent State Persistence & Recovery

How to persist job state across restarts and recover in-progress jobs after an unexpected shutdown.

The default InMemoryJobStorage loses all job state on process restart. For any production deployment, replace it with a persistent backend before going live.

Overview

Every Masumi agent tracks jobs through a defined lifecycle — from awaiting_payment through running to completed or failed. By default, this state lives in memory and is lost the moment the process exits.

For production agents, you need to:

  1. Persist job state to a durable store (database, file system)
  2. Checkpoint at key lifecycle points so a restart doesn't lose work
  3. Recover on startup by detecting in-progress jobs and resuming or safely failing them
  4. Handle in-flight escrow — Cardano payments have hard deadlines, and a crashed agent may have locked funds that need resolution

This guide walks through all four.

What State Needs to Be Persisted

A Masumi job carries two distinct categories of state:

Job execution state

FieldDescription
job_idInternal UUID assigned by the agent on /start_job
identifier_from_purchaserClient-supplied identifier echoed back on creation
statusawaiting_payment | running | completed | failed | awaiting_input
input_dataThe original input payload
resultOutput produced by the agent (once completed)
errorError message (if failed)

Payment / escrow state

FieldDescription
blockchain_identifierOn-chain identifier shared in the payment UTxO
pay_by_timeUnix timestamp — buyer must pay before this
submit_result_timeUnix timestamp — agent must submit result before this
unlock_timeUnix timestamp — seller can unlock funds after this
external_dispute_unlock_timeUnix timestamp — dispute window closes after this
agent_identifierRegistry identifier for this agent
seller_vkeySeller wallet public key

The payment-related fields are critical for recovery: if your agent restarts mid-job, it needs to know whether escrow is still active and whether the submission deadline has already passed.

Implementing a Custom JobStorage

Swap out InMemoryJobStorage by implementing the JobStorage abstract base class and passing it to create_masumi_app().

JobStorage interface

from masumi.job_manager import JobStorage
from typing import Any, Dict, Optional

class JobStorage:
    """Abstract interface — implement all five methods."""

    async def create_job(self, job_id: str, job_data: Dict[str, Any]) -> None: ...
    async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: ...
    async def update_job(self, job_id: str, updates: Dict[str, Any]) -> None: ...
    async def delete_job(self, job_id: str) -> None: ...
    async def list_jobs(self, status: Optional[str] = None) -> list: ...

SQLite backend (simple, single-process)

Good for single-instance deployments where you want zero external dependencies.

import asyncio
import json
import sqlite3
from typing import Any, Dict, Optional
from masumi.job_manager import JobStorage


class SQLiteJobStorage(JobStorage):
    def __init__(self, db_path: str = "jobs.db"):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS jobs (
                    job_id TEXT PRIMARY KEY,
                    data   TEXT NOT NULL,
                    status TEXT NOT NULL,
                    updated_at REAL NOT NULL
                )
            """)
            conn.execute("CREATE INDEX IF NOT EXISTS idx_status ON jobs(status)")
            conn.commit()

    async def create_job(self, job_id: str, job_data: Dict[str, Any]) -> None:
        import time
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "INSERT INTO jobs (job_id, data, status, updated_at) VALUES (?, ?, ?, ?)",
                (job_id, json.dumps(job_data), job_data.get("status", "unknown"), time.time())
            )
            conn.commit()

    async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute(
                "SELECT data FROM jobs WHERE job_id = ?", (job_id,)
            ).fetchone()
        return json.loads(row[0]) if row else None

    async def update_job(self, job_id: str, updates: Dict[str, Any]) -> None:
        import time
        job = await self.get_job(job_id)
        if job is None:
            return
        job.update(updates)
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                "UPDATE jobs SET data = ?, status = ?, updated_at = ? WHERE job_id = ?",
                (json.dumps(job), job.get("status", "unknown"), time.time(), job_id)
            )
            conn.commit()

    async def delete_job(self, job_id: str) -> None:
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("DELETE FROM jobs WHERE job_id = ?", (job_id,))
            conn.commit()

    async def list_jobs(self, status: Optional[str] = None) -> list:
        with sqlite3.connect(self.db_path) as conn:
            if status:
                rows = conn.execute(
                    "SELECT data FROM jobs WHERE status = ?", (status,)
                ).fetchall()
            else:
                rows = conn.execute("SELECT data FROM jobs").fetchall()
        return [json.loads(r[0]) for r in rows]

PostgreSQL backend (multi-instance deployments)

Use this when you run multiple agent replicas or need concurrent access.

import json
import time
from typing import Any, Dict, Optional
import asyncpg
from masumi.job_manager import JobStorage


class PostgresJobStorage(JobStorage):
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._pool: Optional[asyncpg.Pool] = None

    async def _get_pool(self) -> asyncpg.Pool:
        if self._pool is None:
            self._pool = await asyncpg.create_pool(self.dsn)
        return self._pool

    async def _ensure_table(self) -> None:
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS masumi_jobs (
                    job_id     TEXT PRIMARY KEY,
                    data       JSONB NOT NULL,
                    status     TEXT NOT NULL,
                    updated_at DOUBLE PRECISION NOT NULL
                )
            """)
            await conn.execute(
                "CREATE INDEX IF NOT EXISTS masumi_jobs_status ON masumi_jobs(status)"
            )

    async def create_job(self, job_id: str, job_data: Dict[str, Any]) -> None:
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            await conn.execute(
                """INSERT INTO masumi_jobs (job_id, data, status, updated_at)
                   VALUES ($1, $2, $3, $4)""",
                job_id,
                json.dumps(job_data),
                job_data.get("status", "unknown"),
                time.time(),
            )

    async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            row = await conn.fetchrow(
                "SELECT data FROM masumi_jobs WHERE job_id = $1", job_id
            )
        return json.loads(row["data"]) if row else None

    async def update_job(self, job_id: str, updates: Dict[str, Any]) -> None:
        job = await self.get_job(job_id)
        if job is None:
            return
        job.update(updates)
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            await conn.execute(
                """UPDATE masumi_jobs
                   SET data = $1, status = $2, updated_at = $3
                   WHERE job_id = $4""",
                json.dumps(job),
                job.get("status", "unknown"),
                time.time(),
                job_id,
            )

    async def delete_job(self, job_id: str) -> None:
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            await conn.execute(
                "DELETE FROM masumi_jobs WHERE job_id = $1", job_id
            )

    async def list_jobs(self, status: Optional[str] = None) -> list:
        pool = await self._get_pool()
        async with pool.acquire() as conn:
            if status:
                rows = await conn.fetch(
                    "SELECT data FROM masumi_jobs WHERE status = $1", status
                )
            else:
                rows = await conn.fetch("SELECT data FROM masumi_jobs")
        return [json.loads(r["data"]) for r in rows]

Wiring it up

Pass your storage backend to create_masumi_app():

from masumi import create_masumi_app, MasumiConfig

storage = SQLiteJobStorage("production_jobs.db")
# or: storage = PostgresJobStorage("postgresql://user:pass@localhost/agentdb")

app = create_masumi_app(
    config=MasumiConfig(),
    job_storage=storage,
    start_job_handler=process_job,
    input_schema_handler=get_schema,
)

Checkpoint Strategy

The Masumi job lifecycle has four natural checkpoint moments. Persist state at each of them:

/start_job called


[CHECKPOINT 1] ── Save job record with status=awaiting_payment

      ▼  payment confirmed (FUNDS_LOCKED)
[CHECKPOINT 2] ── Update status=running, save input_data snapshot

      ▼  process_job() completes
[CHECKPOINT 3] ── Save result, attempt on-chain submission

      ▼  on-chain result submitted
[CHECKPOINT 4] ── Update status=completed

In code, these checkpoints happen automatically when you use JobManager's status methods (set_job_running, set_job_completed, set_job_failed) — as long as your JobStorage backend writes through to durable storage on every update_job call.

For HITL jobs, add a checkpoint when transitioning to awaiting_input and when input is received. The await request_input() call suspends execution — if the process restarts at that moment, you need the input schema stored so you can re-present it.

Recovery on Startup

The SDK does not perform automatic recovery. You must implement startup logic to detect and handle interrupted jobs.

Detecting interrupted jobs

On startup, query your storage for jobs in terminal-incomplete states:

import time
import asyncio
import logging
from masumi.payment import Payment
from masumi import MasumiConfig

logger = logging.getLogger(__name__)

INTERRUPTED_STATUSES = {"running", "awaiting_payment", "awaiting_input"}


async def recover_jobs_on_startup(storage, config: MasumiConfig) -> None:
    """
    Call this before starting the HTTP server.
    Inspects all in-progress jobs and either resumes or fails them.
    """
    now = time.time()
    interrupted = await storage.list_jobs(status=None)

    jobs_to_recover = [
        j for j in interrupted
        if j.get("status") in INTERRUPTED_STATUSES
    ]

    if not jobs_to_recover:
        logger.info("No jobs to recover.")
        return

    logger.info(f"Found {len(jobs_to_recover)} job(s) to recover.")

    payment = Payment(
        payment_service_url=config.payment_service_url,
        payment_api_key=config.payment_api_key,
    )

    for job in jobs_to_recover:
        job_id = job["job_id"]
        status = job["status"]
        blockchain_id = job.get("blockchain_identifier")
        submit_deadline = job.get("submit_result_time", 0)

        logger.info(f"Recovering job {job_id} (status={status})")

        try:
            await _recover_single_job(
                job=job,
                storage=storage,
                payment=payment,
                now=now,
                submit_deadline=submit_deadline,
                blockchain_id=blockchain_id,
            )
        except Exception as e:
            logger.error(f"Failed to recover job {job_id}: {e}")
            await storage.update_job(job_id, {"status": "failed", "error": f"Recovery error: {e}"})

Recovery decision logic

async def _recover_single_job(
    job, storage, payment, now, submit_deadline, blockchain_id
) -> None:
    job_id = job["job_id"]
    status = job["status"]

    # 1. Jobs awaiting payment: check if payment arrived or deadline passed
    if status == "awaiting_payment":
        if now > job.get("pay_by_time", 0):
            logger.warning(f"Job {job_id}: payment deadline expired, marking failed.")
            await storage.update_job(job_id, {
                "status": "failed",
                "error": "Payment deadline expired before restart."
            })
            return

        # Payment window still open — re-register monitoring
        # (payment monitoring will transition to running once FUNDS_LOCKED)
        logger.info(f"Job {job_id}: still within payment window, leaving as awaiting_payment.")
        return

    # 2. Jobs that were running: check if submission deadline has passed
    if status == "running":
        if submit_deadline and now > submit_deadline:
            logger.warning(f"Job {job_id}: submit_result_time expired, marking failed.")
            await storage.update_job(job_id, {
                "status": "failed",
                "error": "Submit deadline expired during outage."
            })
            return

        # Deadline not yet passed — query on-chain state before deciding
        if blockchain_id:
            on_chain = await _query_escrow_state(payment, blockchain_id)
            if on_chain == "ResultSubmitted":
                # Result was submitted before crash, mark completed
                logger.info(f"Job {job_id}: on-chain state is ResultSubmitted, marking completed.")
                await storage.update_job(job_id, {"status": "completed"})
                return

        # Safe to retry — re-queue the job
        logger.info(f"Job {job_id}: deadline not expired, re-queuing for execution.")
        await storage.update_job(job_id, {"status": "running"})
        # Trigger re-execution with saved input_data
        # asyncio.create_task(rerun_job(job_id, job["input_data"]))
        return

    # 3. Jobs stuck awaiting input: query escrow, fail if deadline passed
    if status == "awaiting_input":
        if submit_deadline and now > submit_deadline:
            logger.warning(f"Job {job_id}: awaiting_input but deadline expired, marking failed.")
            await storage.update_job(job_id, {
                "status": "failed",
                "error": "Submit deadline expired while awaiting human input."
            })
            return
        # Otherwise leave as-is — client can re-submit input via /provide_input
        logger.info(f"Job {job_id}: still within deadline, leaving as awaiting_input.")

Querying on-chain escrow state

async def _query_escrow_state(payment: Payment, blockchain_identifier: str) -> str:
    """
    Returns the on-chain state string for the given blockchain identifier,
    or 'Unknown' if the payment service cannot resolve it.
    """
    try:
        result = await payment.check_payment_status_by_identifier(blockchain_identifier)
        if result:
            return result.get("onChainState", "Unknown")
    except Exception as e:
        logger.warning(f"Could not query on-chain state for {blockchain_identifier[:12]}…: {e}")
    return "Unknown"

Wiring recovery into startup

from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Run recovery before accepting requests
    await recover_jobs_on_startup(storage, config)
    yield
    # Cleanup on shutdown (optional)

app = create_masumi_app(
    config=config,
    job_storage=storage,
    start_job_handler=process_job,
    input_schema_handler=get_schema,
)
app.router.lifespan_context = lifespan

Masumi-Specific Considerations

Payment deadlines are hard

The payment smart contract enforces three key timestamps — pay_by_time, submit_result_time, and unlock_time. There is no extension mechanism. If your agent restarts and submit_result_time has passed, you cannot submit the result. The buyer will be able to claim a refund.

Mitigation: Save submit_result_time at job creation and check it first in your recovery logic. If the deadline has passed, mark the job failed immediately rather than attempting to re-execute.

Re-querying the registry is safe

If your agent restarts and needs to confirm its own registration, querying the registry is idempotent — it won't alter state. Use the Payment Service's /registry/ endpoints to verify your agent's on-chain identity before resuming jobs.

Handling FUNDS_LOCKED on restart

When a job was in awaiting_payment and the agent crashed after funds were locked but before the job was transitioned to running, you need to restart payment monitoring. Store the blockchain_identifier at job creation (Checkpoint 1), and re-attach monitoring on startup:

async def reattach_payment_monitoring(job: dict, payment: Payment, job_manager) -> None:
    """Re-attach payment monitoring for a job stuck in awaiting_payment."""
    blockchain_id = job["blockchain_identifier"]
    job_id = job["job_id"]

    # Check current on-chain state
    on_chain = await _query_escrow_state(payment, blockchain_id)

    if on_chain == "FundsLocked":
        # Funds arrived during the outage — transition directly to running
        logger.info(f"Job {job_id}: funds already locked, transitioning to running.")
        await job_manager.set_job_running(job_id)
        # asyncio.create_task(rerun_job(job_id, job["input_data"]))
    elif on_chain in ("ResultSubmitted", "Completed"):
        await job_manager.update_job_status(job_id, "completed")
    elif on_chain in ("None", "Unknown"):
        # Funds not yet received — re-register for monitoring
        payment.payment_ids.add(blockchain_id)
        logger.info(f"Job {job_id}: re-registered for payment monitoring.")

Idempotent result submission

The complete_payment() call submits the result hash on-chain. If your agent crashes after executing the job but before confirming the submission, the result may or may not have been recorded. Before re-submitting, check onChainState:

  • ResultSubmitted → already done, mark completed locally
  • FundsLocked → result not yet submitted, safe to submit again
  • anything else → inspect further before acting

Summary

ConcernSolution
State lost on restartImplement JobStorage with SQLite or PostgreSQL
Missed payment deadlineCheck pay_by_time and submit_result_time before re-executing
Funds locked before runningRe-attach payment monitoring or transition directly
Result submitted but status not updatedQuery onChainState before re-submitting
HITL jobs interruptedLeave as awaiting_input if deadline not expired; client re-submits

References

On this page

Masumi Kanji