Skip to content

Price Watch Walkthrough

A price monitoring system where users register watches on products, the platform polls prices via an HTTP API, detects significant changes, and sends email alerts. The system enforces that alerts only fire for active watches and that snapshot sequences are monotonically ordered.

This walkthrough covers the full JacqOS pipeline with a focus on HTTP effects and recorded replay:

  1. Observations arrive as JSON events (watch registered, price snapshot, alert result, watch deactivated)
  2. Mappers extract semantic atoms from each observation
  3. Rules derive facts like price_current, price_change_pct, and alert_pending
  4. Invariants enforce that each product-email pair has at most one active watch and that alerts only exist for active watches
  5. Intents derive outbound actions (fetch a price, send an alert)
  6. Recorded replay captures HTTP interactions for deterministic re-execution
  7. Fixtures prove the system handles price drops, mid-flow deactivation, and multi-watcher scenarios
jacqos-price-watch/
jacqos.toml
ontology/
schema.dh # 15 relations
rules.dh # 3-stratum lifecycle rules + 3 invariants
intents.dh # Intent derivation
mappings/
inbound.rhai # 4 observation kinds -> atoms
helpers/
normalize.rhai # Pure math helpers
fixtures/
happy-path.jsonl # Price drops, alert sent
contradiction-path.jsonl # Watch deactivated mid-alert
stale-alert-path.jsonl # Two watchers, mixed outcomes
generated/
verification/ # Exported proof and provider-capture evidence
schemas/
watch-config.json
prompts/
watch-system.md

jacqos.toml declares the app identity, HTTP resources with recorded replay, and effect capabilities:

app_id = "jacqos-price-watch"
app_version = "0.1.0"
[paths]
ontology = ["ontology/*.dh"]
mappings = ["mappings/*.rhai"]
fixtures = ["fixtures/*.jsonl"]
helpers = ["helpers/*.rhai"]
[capabilities]
http_clients = ["price_api", "notify_api"]
timers = true
blob_store = true
[capabilities.intents]
"intent.fetch_price" = { capability = "http.fetch", resource = "price_api" }
"intent.send_alert" = { capability = "http.fetch", resource = "notify_api" }
[resources.http.price_api]
base_url = "https://prices.example.invalid"
credential_ref = "PRICE_API_TOKEN"
replay = "record"
[resources.http.notify_api]
base_url = "https://notify.example.invalid"
credential_ref = "NOTIFY_API_TOKEN"
replay = "record"

Two things are new compared to simpler examples:

  • replay = "record" on each HTTP resource tells the shell to persist redacted request/response captures on effect attempts. Switching the resource to replay makes the shell require a matching capture instead of hitting the live API. This makes fixtures fully deterministic even though the app depends on external HTTP services.
  • blob_store = true enables content-addressed storage for large API response payloads, keeping the observation log lean.

ontology/schema.dh declares the full schema. The relations fall into five groups:

-- Price state
relation price_snapshot(obs_seq: int, product_id: text, price: float, currency: text, source: text)
relation price_current(product_id: text, price: float, currency: text, source: text)
relation price_previous(product_id: text, price: float, currency: text, source: text)
-- Watch state
relation watch_active(watch_id: text, product_id: text, email: text, threshold_pct: float)
relation watch_deactivated(watch_id: text)
relation watch_status(watch_id: text, status: text)
-- Change detection
relation price_change_detected(product_id: text, old_price: float, new_price: float)
relation price_change_pct(product_id: text, pct: float)
-- Alert lifecycle
relation alert_pending(watch_id: text, email: text, product_id: text, change_pct: float)
relation alert_sent(watch_id: text)
relation alert_failed(watch_id: text, reason: text)
-- Intents
relation intent.fetch_price(product_id: text, source: text)
relation intent.send_alert(watch_id: text, email: text, product_id: text, change_pct: float)

price_snapshot keeps every observed price with a sequence number. price_current and price_previous are derived — the evaluator computes them from the snapshot history, not from mutable state.

mappings/inbound.rhai translates four observation kinds into typed atoms:

fn map_observation(obs) {
let body = parse_json(obs.payload);
if obs.kind == "price.snapshot" {
return [
atom("snapshot.seq", body.seq),
atom("snapshot.product_id", body.product_id),
atom("snapshot.price", body.price),
atom("snapshot.currency", body.currency),
atom("snapshot.source", body.source),
];
}
if obs.kind == "watch.registered" {
return [
atom("watch.watch_id", body.watch_id),
atom("watch.product_id", body.product_id),
atom("watch.email", body.email),
atom("watch.threshold_pct", body.threshold_pct),
];
}
if obs.kind == "price.alert_result" {
let atoms = [
atom("alert.result", body.result),
atom("alert.watch_id", body.watch_id),
];
if body.contains("reason") {
atoms.push(atom("alert.reason", body.reason));
}
return atoms;
}
if obs.kind == "watch.deactivated" {
return [
atom("watch.deactivated.watch_id", body.watch_id),
];
}
[]
}

The price.alert_result branch conditionally includes a reason atom only when the alert failed. This keeps the atom set minimal — successful alerts carry no reason field.

ontology/rules.dh organizes rules into three strata: observation projections, derived state, and status computation.

The first rules project atoms into typed relations:

rule price_snapshot(seq, pid, price, currency, source) :-
atom(obs, "snapshot.seq", seq),
atom(obs, "snapshot.product_id", pid),
atom(obs, "snapshot.price", price),
atom(obs, "snapshot.currency", currency),
atom(obs, "snapshot.source", source).
rule assert watch_active(wid, pid, email, threshold) :-
atom(obs, "watch.watch_id", wid),
atom(obs, "watch.product_id", pid),
atom(obs, "watch.email", email),
atom(obs, "watch.threshold_pct", threshold).

Watch registration uses assert because watches can be retracted later. Price snapshots are append-only and never retracted.

Deactivation retracts the watch:

rule assert watch_deactivated(wid) :-
atom(obs, "watch.deactivated.watch_id", wid).
rule retract watch_active(wid, pid, email, threshold) :-
watch_active(wid, pid, email, threshold),
watch_deactivated(wid).

Alert results project directly from atoms:

rule assert alert_sent(wid) :-
atom(obs, "alert.result", "sent"),
atom(obs, "alert.watch_id", wid).
rule assert alert_failed(wid, reason) :-
atom(obs, "alert.result", "failed"),
atom(obs, "alert.watch_id", wid),
atom(obs, "alert.reason", reason).

Current and previous prices are derived from the snapshot sequence. price_current is the snapshot with the highest sequence number:

rule price_current(pid, price, currency, source) :-
price_snapshot(seq, pid, price, currency, source),
not price_snapshot(seq2, pid, _, _, _), seq2 > seq.
rule price_previous(pid, price, currency, source) :-
price_snapshot(seq, pid, price, currency, source),
price_snapshot(seq2, pid, _, _, _), seq2 > seq,
not price_snapshot(seq3, pid, _, _, _), seq3 > seq, seq3 < seq2.

Change detection compares current and previous prices using a pure helper for the percentage calculation:

rule price_change_detected(pid, old_price, new_price) :-
price_previous(pid, old_price, _, _),
price_current(pid, new_price, _, _),
old_price != new_price.
rule price_change_pct(pid, pct) :-
price_change_detected(pid, old_price, new_price),
pct = helper.abs_pct_change(old_price, new_price).

An alert becomes pending when the change percentage exceeds the watch threshold:

rule assert alert_pending(wid, email, pid, change_pct) :-
watch_active(wid, pid, email, threshold),
price_change_pct(pid, change_pct),
change_pct >= threshold.

Alert pending is retracted when the alert is sent, fails, or the watch is deactivated:

rule retract alert_pending(wid, email, pid, change_pct) :-
alert_pending(wid, email, pid, change_pct),
alert_sent(wid).
rule retract alert_pending(wid, email, pid, change_pct) :-
alert_pending(wid, email, pid, change_pct),
alert_failed(wid, _).
rule retract alert_pending(wid, email, pid, change_pct) :-
alert_pending(wid, email, pid, change_pct),
watch_deactivated(wid).

The watch_status relation computes the lifecycle position of each watch:

rule watch_status(wid, "active") :-
watch_active(wid, _, _, _),
not watch_deactivated(wid).
rule watch_status(wid, "deactivated") :-
watch_deactivated(wid).
rule watch_status(wid, "alerted") :-
watch_active(wid, _, _, _),
alert_sent(wid),
not watch_deactivated(wid).
rule watch_status(wid, "alert_failed") :-
watch_active(wid, _, _, _),
alert_failed(wid, _),
not alert_sent(wid),
not watch_deactivated(wid).

Three invariants enforce structural correctness:

invariant one_active_watch_per_pair(pid, email) :-
count watch_active(_, pid, email, _) <= 1.
invariant no_alert_without_watch(wid) :-
alert_pending(wid, _, pid, _),
watch_active(wid, pid, _, _).
invariant monotonic_snapshot_seq(pid) :-
price_snapshot(seq1, pid, _, _, _),
price_snapshot(seq2, pid, _, _, _),
seq1 != seq2,
seq1 < seq2.

one_active_watch_per_pair prevents duplicate watches for the same product-email combination. no_alert_without_watch guarantees every pending alert traces back to an active watch. monotonic_snapshot_seq ensures price snapshots arrive in order — an out-of-order snapshot is a system invariant violation, not a business logic error.

ontology/intents.dh derives outbound HTTP actions from stable state:

rule intent.fetch_price(pid, source) :-
watch_active(_, pid, _, _),
source = helper.default_price_source(pid),
not price_current(pid, _, _, source).
rule intent.send_alert(wid, email, pid, change_pct) :-
alert_pending(wid, email, pid, change_pct),
not alert_sent(wid),
not alert_failed(wid, _),
not watch_deactivated(wid).

intent.fetch_price fires when a watch exists for a product but no current price is known from that product’s configured provider. In this example, helper.default_price_source(pid) is a small deterministic lookup that binds the provider before negation, keeping the rule safe while still deriving the expected price_api fetch. The response comes back as a price.snapshot observation.

intent.send_alert fires when an alert is pending and hasn’t been sent, failed, or had its watch deactivated. The shell executes this as an HTTP POST to the notify_api resource.

Fixtures define deterministic scenarios with expected world states.

A buyer watches a product with a 5% threshold. The price drops 14.98%, triggering an alert:

{"kind":"watch.registered","payload":{"watch_id":"watch-1","product_id":"prod-100","email":"buyer@example.com","threshold_pct":5.0}}
{"kind":"price.snapshot","payload":{"seq":1,"product_id":"prod-100","price":49.99,"currency":"USD","source":"vendor-a"}}
{"kind":"price.snapshot","payload":{"seq":2,"product_id":"prod-100","price":42.50,"currency":"USD","source":"vendor-a"}}
{"kind":"price.alert_result","payload":{"watch_id":"watch-1","result":"sent"}}

After replay: price_current("prod-100", 42.50, "USD", "vendor-a") is the derived current price. price_change_pct("prod-100", 14.98) exceeds the 5% threshold, so alert_pending was asserted and then retracted when the alert was sent. watch_status("watch-1", "alerted") is the final state.

Contradiction Path: Deactivation Mid-Alert

Section titled “Contradiction Path: Deactivation Mid-Alert”

A watch is deactivated after prices change but before the alert lifecycle completes:

{"kind":"watch.registered","payload":{"watch_id":"watch-2","product_id":"prod-200","email":"shopper@example.com","threshold_pct":3.0}}
{"kind":"price.snapshot","payload":{"seq":1,"product_id":"prod-200","price":100.00,"currency":"EUR","source":"vendor-b"}}
{"kind":"price.snapshot","payload":{"seq":2,"product_id":"prod-200","price":95.00,"currency":"EUR","source":"vendor-b"}}
{"kind":"watch.deactivated","payload":{"watch_id":"watch-2"}}

The price dropped 5% (exceeding the 3% threshold), so alert_pending was asserted. Then the watch was deactivated, causing three retractions: watch_active, alert_pending, and any pending intent.send_alert. These retractions appear in contradictions — the system correctly recorded that facts were believed and then revoked. The final state is watch_status("watch-2", "deactivated") with no pending alerts.

Stale Alert Path: Two Watchers, Mixed Outcomes

Section titled “Stale Alert Path: Two Watchers, Mixed Outcomes”

Two users watch the same product. One alert succeeds; the other bounces:

{"kind":"watch.registered","payload":{"watch_id":"watch-3","product_id":"prod-300","email":"alice@example.com","threshold_pct":2.0}}
{"kind":"watch.registered","payload":{"watch_id":"watch-4","product_id":"prod-300","email":"bob@example.com","threshold_pct":2.0}}
{"kind":"price.snapshot","payload":{"seq":1,"product_id":"prod-300","price":75.00,"currency":"USD","source":"vendor-c"}}
{"kind":"price.snapshot","payload":{"seq":2,"product_id":"prod-300","price":70.00,"currency":"USD","source":"vendor-c"}}
{"kind":"price.alert_result","payload":{"watch_id":"watch-3","result":"sent"}}
{"kind":"price.alert_result","payload":{"watch_id":"watch-4","result":"failed","reason":"email delivery bounced"}}

Both watches fire because the 6.67% drop exceeds both 2% thresholds. After replay, watch_status("watch-3", "alerted") and watch_status("watch-4", "alert_failed"). The alert_failed relation captures the bounce reason for inspection. Both alert_pending facts are retracted — one by success, one by failure.

Run all fixtures and invariants:

Terminal window
$ jacqos verify
Replaying fixtures...
happy-path.jsonl PASS (4 observations, 8 facts)
contradiction-path.jsonl PASS (4 observations, 6 facts)
stale-alert-path.jsonl PASS (6 observations, 12 facts)
Checking invariants...
one_active_watch_per_pair PASS
no_alert_without_watch PASS
monotonic_snapshot_seq PASS
All checks passed.

Every fixture replays from scratch on a clean database. The output is deterministic — same observations, same evaluator, same facts every time.

The replay = "record" setting on HTTP resources is what makes this example fully deterministic despite depending on external APIs.

During live execution (jacqos dev), the shell captures each outbound HTTP request and response as a redacted effect receipt and outcome observation. The capture is keyed by the canonical request digest, including the client reference, method, resolved URL, headers before secret injection, body, and idempotency key.

During fixture replay (jacqos replay or jacqos verify), replay-only resources use matching captures instead of making live requests. This gives you:

  • Deterministic fixtures that produce identical facts regardless of network availability
  • Digest stability — the evaluator digest stays the same across clean rebuilds because the same inputs always produce the same outputs
  • Fast iteration — no network latency during development, ontology changes hot-reload in <250ms

If replay = "replay" is set and a matching capture is missing, dispatch fails explicitly rather than silently hitting the live API.

Here is the complete flow for the happy path:

Observation: watch.registered {watch_id: "watch-1", product_id: "prod-100", ...}
-> Atoms: watch.watch_id="watch-1", watch.product_id="prod-100",
watch.email="buyer@example.com", watch.threshold_pct=5.0
-> Assert: watch_active("watch-1", "prod-100", "buyer@example.com", 5.0)
-> Fact: watch_status("watch-1", "active")
-> Intent: intent.fetch_price("prod-100", "vendor-a") [no current price]
Observation: price.snapshot {seq: 1, product_id: "prod-100", price: 49.99, ...}
-> Atoms: snapshot.seq=1, snapshot.product_id="prod-100", snapshot.price=49.99, ...
-> Fact: price_snapshot(1, "prod-100", 49.99, "USD", "vendor-a")
-> Fact: price_current("prod-100", 49.99, "USD", "vendor-a")
Observation: price.snapshot {seq: 2, product_id: "prod-100", price: 42.50, ...}
-> Atoms: snapshot.seq=2, snapshot.product_id="prod-100", snapshot.price=42.50, ...
-> Fact: price_snapshot(2, "prod-100", 42.50, "USD", "vendor-a")
-> Fact: price_current("prod-100", 42.50, "USD", "vendor-a") [replaces seq 1]
-> Fact: price_previous("prod-100", 49.99, "USD", "vendor-a")
-> Fact: price_change_detected("prod-100", 49.99, 42.50)
-> Fact: price_change_pct("prod-100", 14.98)
-> Assert: alert_pending("watch-1", "buyer@example.com", "prod-100", 14.98)
-> Intent: intent.send_alert("watch-1", "buyer@example.com", "prod-100", 14.98)
Observation: price.alert_result {watch_id: "watch-1", result: "sent"}
-> Atoms: alert.result="sent", alert.watch_id="watch-1"
-> Assert: alert_sent("watch-1")
-> Retract: alert_pending("watch-1", "buyer@example.com", "prod-100", 14.98)
-> Fact: watch_status("watch-1", "alerted")

Every fact traces back to specific observations. Open Studio to follow any provenance edge visually.

Recorded replay makes HTTP effects deterministic. External API calls are captured once and replayed during fixtures. You get the guarantees of golden fixtures without sacrificing the ability to test real HTTP effects end-to-end.

Derived state replaces mutable state. price_current and price_previous are not stored — they are recomputed from the snapshot history every evaluation. This means you can replay any observation log from scratch and get identical results.

Multiple retractions handle complex lifecycles. alert_pending has three retraction rules — sent, failed, and watch deactivated. Each represents a different exit path from the pending state. The evaluator picks whichever applies; the contradiction log records the transition.