Meta description: Add a validation layer to your Monad sniper bot: filter noise with sustained-liquidity + swap checks, enforce allowlists, and output trade-ready candidates.
A validation service that consumes your radar.jsonl (feed from part 1)
Liquidity reality checks (threshold + sustained time window)
Swap confirmation using GoldRush walletTxs (so liquidity exists ≠ trading exists)
DEX + quote-token allowlists (fast, enforceable risk constraints)
A clean candidates.jsonl feed to plug into Part 3 execution
Part 1 gave you eyes: a measurable, auditable radar that records pair creation and early-life updates as structured JSON logs. Part 2 is about providing those eyes judgment.
This matters because most “new pairs” are not opportunities. They’re just events. Some will never receive meaningful liquidity. Some will receive liquidity briefly and then unwind. Some will show “activity” that’s really just a handful of manipulative transactions. And some pairs become tradable, but not tradable for you, because your constraints (slippage tolerance, minimum depth, risk policies) would guarantee bad fills.
Validation is the layer that answers one question reliably:
Is this pair actually tradable under my constraints, or is it just chain noise wearing a ticker?
So in Part 2, we build a validation layer that sits between your radar and your execution engine:
Detection answers: “did a pair appear?”
Validation answers: “is this pair actually tradable under our rules?”
Execution answers: “should we buy, and how do we submit safely?”
You don’t want to discover “this pool had fake liquidity” or “this pair never had swaps” in Part 3 when you’re already signing transactions.
More specifically, in Part 2, we will transform the Part 1 radar into a gated pipeline. The process begins by detecting a new trading pair and filtering it quickly based on venue and quote token constraints. Next, we focus on tracking early-life liquidity to ascertain that real swaps commence appropriately. Only once the pair meets all predetermined criteria do we emit a candidate-ready event.
It's important to note that in this phase, we still do not engage in trading; instead, we generate a candidate feed that Part 3 will utilize for execution.

Validation isn’t one check. It’s a gate made of small, explainable checks, each of which removes a different, standard failure mode before execution.
In most AMM DEX designs, a “new market” begins when a factory emits a pair-creation event, such as PairCreated(token0, token1, pair, …) in Uniswap-V2-style factories. While this event confirms the existence of a pair, it does not guarantee that the market is usable.
A sniper bot typically validates the market by combining evidence of the DEX lifecycle, which shows what the pool is doing, with persistence checks to ensure that this evidence lasts long enough to be significant. Fortunately, in Uniswap-style AMMs, the essential information is already encoded in a small set of canonical events.
A pool can briefly cross a USD threshold and still be unusable. Validation treats liquidity as a time-evolving state, not a boolean.
What we’ll check:
Liquidity crosses your minimum threshold (e.g., $5,000)
Liquidity stays above it for a short persistence window (e.g., 20–60s)
Liquidity doesn’t follow obvious bait patterns (spike to immediate collapse)
Pair creation + liquidity still isn’t “trading.” A real market has swaps and meaningful activity.
What we’ll check:
At least N swap events in the early-life window
(Optional hook) basic sanity around activity concentration so you’re not treating a single-wallet “ping” as market demand
Even if a market exists, some tokens are structurally hostile to buyers. Honeypot scams often make buying easy but restrict selling—one method is adding buyers to a blacklist so they can’t sell.
What we’ll check (kept lightweight in Part 2):
Minimums (liquidity, swaps, time since discovery)
Constraint flags that downgrade to watch instead of candidate when conditions are thin or risky
Validation is only as good as its input. If your radar is late, duplicate-heavy, or unstable, your validation output will be wrong—so Part 2 carries forward lag, duplicates, and reconnect signals into decision logs.
Instead of a single true/false, the validator produces an explainable result you can tune:
ignore — not tradable / too noisy/dead
watch — interesting but not yet validated (e.g., crossed once but not persistent)
candidate — meets minimum tradability requirements for execution wiring in Part 3
And we’ll log the reasons each outcome occurred.
This Part 2 tutorial assumes you have already completed Part 1 and you’re working in the same project folder you created there.
You should already have these from Part 1:
src/core/* (config, logger, metrics, pairStore, logSchema)
src/streams/* (client, newPairs, updatePairs)
a working npm run dev that prints JSONL events
We’ll add new validation modules and upgrade a few existing files.
1cd monad-sniper-part1-pair-radarls
# you should see: package.json, src/, .env2# -----------------------------
# Part 2: Validation controls
# -----------------------------
# Liquidity must stay >= MIN_LIQUIDITY_USD for at least this long to be “real”
LIQUIDITY_SUSTAIN_SECONDS=30
# Optional safety gates (comma-separated). Leave blank to disable.
# Use lowercase addresses for consistency.
QUOTE_TOKEN_ALLOWLIST=
DEX_ALLOWLIST=
# If true, we only count swaps that happen after liquidity is above threshold
REQUIRE_SWAP_AFTER_LIQUIDITY=true
# Output file for trade-ready candidates (1 JSON object per line)
CANDIDATES_FILE=candidates.jsonl
# Safety valve: limit how many concurrent swap-watchers we open
MAX_SWAP_WATCHERS=25QUOTE_TOKEN_ALLOWLIST is empty, you’ll accept any quote token.DEX_ALLOWLIST (names depend on what the stream payload provides).
3src/core/config.ts to this full version// src/core/config.ts
import "dotenv/config";
import { z } from "zod";
function parseCsv(value: string | undefined): string[] {
if (!value) return [];
return value
.split(",")
.map((s) => s.trim().toLowerCase())
.filter(Boolean);
}
const EnvSchema = z.object({
COVALENT_API_KEY: z.string().min(1, "COVALENT_API_KEY is required"),
STREAM_CHAIN: z.string().min(1, "STREAM_CHAIN is required"),
EARLY_LIFE_SECONDS: z.coerce.number().int().positive().default(180),
MIN_LIQUIDITY_USD: z.coerce.number().nonnegative().default(5000),
MIN_SWAP_EVENTS: z.coerce.number().int().nonnegative().default(1),
PRINT_METRICS_EVERY: z.coerce.number().int().positive().default(25),
// Part 2 validation
LIQUIDITY_SUSTAIN_SECONDS: z.coerce.number().int().positive().default(30),
QUOTE_TOKEN_ALLOWLIST: z.string().optional(),
DEX_ALLOWLIST: z.string().optional(),
REQUIRE_SWAP_AFTER_LIQUIDITY: z.coerce.boolean().default(true),
CANDIDATES_FILE: z.string().default("candidates.jsonl"),
MAX_SWAP_WATCHERS: z.coerce.number().int().positive().default(25),
});
const env = EnvSchema.parse(process.env);
export const CONFIG = {
apiKey: env.COVALENT_API_KEY,
streamChain: env.STREAM_CHAIN,
earlyLifeSeconds: env.EARLY_LIFE_SECONDS,
minLiquidityUsd: env.MIN_LIQUIDITY_USD,
minSwapEvents: env.MIN_SWAP_EVENTS,
printMetricsEvery: env.PRINT_METRICS_EVERY,
// Part 2 validation
liquiditySustainSeconds: env.LIQUIDITY_SUSTAIN_SECONDS,
quoteTokenAllowlist: parseCsv(env.QUOTE_TOKEN_ALLOWLIST),
dexAllowlist: parseCsv(env.DEX_ALLOWLIST),
requireSwapAfterLiquidity: env.REQUIRE_SWAP_AFTER_LIQUIDITY,
candidatesFile: env.CANDIDATES_FILE,
maxSwapWatchers: env.MAX_SWAP_WATCHERS,
};
CONFIG) used everywhere else.4src/core/logSchema.ts and add this new type near the bottom. This creates a single, explicit “truth line” you can search for later, including:decision=candidate → safe to send to execution in Part 3decision=reject → your bot ignored it, with reasons you can auditsrc/core/logSchema.tsexport type ValidationDecisionLog = {
kind: "validation_decision";
observed_at: string;
chain: string;
pair_address: string;
dex_name?: string;
token0?: string;
token1?: string;
decision: "candidate" | "reject";
reasons: string[];
snapshot: {
age_s: number;
liquidity_usd?: number;
liquidity_sustain_s?: number;
swaps_seen: number;
};
};
src/core/logSchema.ts (the part where PairDiscoveredLog, PairEarlyLifeLog, MetricsSummaryLog are defined).export type RadarLog = PairDiscoveredLog | PairEarlyLifeLog | MetricsSummaryLog;Replace with:export type RadarLog =
| PairDiscoveredLog
| PairEarlyLifeLog
| MetricsSummaryLog
| ValidationDecisionLog;5src/core/candidateSink.ts acts like a tiny “export layer” for Part 2. It opens a write stream to candidates.jsonl (append mode), and then writes only ValidationDecisionLog events whose decision === "candidate"—so you end up with a clean, trade-ready feed that Part 3 can consume without parsing the full console logs.src/core/candidateSink.ts:// src/core/candidateSink.ts
import fs from "node:fs";
import { CONFIG } from "./config";
import { ValidationDecisionLog } from "./logSchema";
let stream: fs.WriteStream | null = null;
export function initCandidateSink() {
stream = fs.createWriteStream(CONFIG.candidatesFile, { flags: "a" });
}
export function writeCandidate(evt: ValidationDecisionLog) {
if (!stream) return;
if (evt.decision !== "candidate") return;
stream.write(JSON.stringify(evt) + "\n");
}
export function closeCandidateSink() {
stream?.end();
stream = null;
}
candidates.jsonl contains only candidate decisions6src/core/pairStore.ts with the upgraded version (it’s a drop-in replacement for the Part 1 file).PairStore is the bot’s in-memory “truth” for each pair: it tracks sustained liquidity (not just a one-time spike), swap confirmation (real trading happened). It locks a single validation decision (candidate or reject), so later steps can act deterministically.src/core/pairStore.ts with this upgraded version:// src/core/pairStore.ts
import { CONFIG } from "./config";
export type PairState = {
pairAddress: string;
firstObservedAtMs: number;
createdAtChainMs?: number;
dexName?: string;
token0?: string;
token1?: string;
lastLiquidityUsd?: number;
// Sustained liquidity tracking
liquidityAboveSinceMs?: number;
// Swap confirmation (from walletTxs watcher)
swapTxsSeen: number;
firstSwapAtMs?: number;
// Validation outcome (set once)
decision?: "candidate" | "reject";
decisionReasons?: string[];
decidedAtMs?: number;
// Subscription lifecycle
trackingEndsAtMs: number;
};
export class PairStore {
private pairs = new Map<string, PairState>();
normalize(addr: string) {
return addr.trim().toLowerCase();
}
has(addr: string) {
return this.pairs.has(this.normalize(addr));
}
upsertDiscovery(input: {
pairAddress: string;
createdAtChainMs?: number;
dexName?: string;
token0?: string;
token1?: string;
}) {
const pair = this.normalize(input.pairAddress);
const existing = this.pairs.get(pair);
if (existing) return existing;
const now = Date.now();
const st: PairState = {
pairAddress: pair,
firstObservedAtMs: now,
createdAtChainMs: input.createdAtChainMs,
dexName: input.dexName?.toLowerCase(),
token0: input.token0?.toLowerCase(),
token1: input.token1?.toLowerCase(),
lastLiquidityUsd: undefined,
liquidityAboveSinceMs: undefined,
swapTxsSeen: 0,
firstSwapAtMs: undefined,
decision: undefined,
decisionReasons: undefined,
decidedAtMs: undefined,
trackingEndsAtMs: now + CONFIG.earlyLifeSeconds * 1000,
};
this.pairs.set(pair, st);
return st;
}
get(addr: string) {
return this.pairs.get(this.normalize(addr));
}
updateLiquidity(st: PairState, liquidityUsd?: number) {
st.lastLiquidityUsd = liquidityUsd;
const now = Date.now();
const above = typeof liquidityUsd === "number" && liquidityUsd >= CONFIG.minLiquidityUsd;
if (above) {
// Start sustain timer if it’s not already running
if (!st.liquidityAboveSinceMs) st.liquidityAboveSinceMs = now;
} else {
// Reset sustain timer if liquidity falls below threshold
st.liquidityAboveSinceMs = undefined;
}
}
recordSwapTx(st: PairState) {
const now = Date.now();
st.swapTxsSeen += 1;
if (!st.firstSwapAtMs) st.firstSwapAtMs = now;
}
decide(st: PairState, decision: "candidate" | "reject", reasons: string[]) {
if (st.decision) return; // fail-closed: never flip decisions
st.decision = decision;
st.decisionReasons = reasons;
st.decidedAtMs = Date.now();
}
shouldStillTrack(st: PairState) {
return Date.now() <= st.trackingEndsAtMs;
}
}
7mkdir -p src/validationsrc/validation/ to keep Part 2’s “decision logic” separated from Part 1’s core plumbing (streams, metrics, stores). Think of it as the folder where the bot’s rules and scoring gates live, so you can iterate on validation without touching the data-plane code.src/validation/evaluate.ts is the first file in src/validation/, and you’ll add/adjust other Part 2 files alongside it (plus small tweaks to existing Part 1 files) as we wire validation into the running radar.src/validation/evaluate.ts:pending while the pair develops.rejects with reasons.candidate.// src/validation/evaluate.ts
import { CONFIG } from "../core/config";
import { PairState } from "../core/pairStore";
function includesAny(list: string[], a?: string, b?: string) {
if (!list.length) return true; // allowlist disabled
const aa = (a || "").toLowerCase();
const bb = (b || "").toLowerCase();
return list.includes(aa) || list.includes(bb);
}
export function evaluatePair(st: PairState, opts?: { finalize?: boolean }) {
const now = Date.now();
const ageS = Math.max(0, Math.floor((now - st.firstObservedAtMs) / 1000));
// If already decided, do nothing.
if (st.decision) {
return { status: "decided" as const, ageS };
}
const reasons: string[] = [];
// Fast gates (only if allowlists are configured)
if (CONFIG.dexAllowlist.length) {
const dex = (st.dexName || "").toLowerCase();
if (!dex || !CONFIG.dexAllowlist.includes(dex)) reasons.push("dex_not_allowlisted");
}
if (CONFIG.quoteTokenAllowlist.length) {
const ok = includesAny(CONFIG.quoteTokenAllowlist, st.token0, st.token1);
if (!ok) reasons.push("quote_token_not_allowlisted");
}
// Liquidity sustain check
const sustainMs = CONFIG.liquiditySustainSeconds * 1000;
const hasSustain =
typeof st.liquidityAboveSinceMs === "number" && now - st.liquidityAboveSinceMs >= sustainMs;
if (!hasSustain) reasons.push("liquidity_not_sustained");
// Swap confirmation
const hasSwaps = st.swapTxsSeen >= CONFIG.minSwapEvents;
if (!hasSwaps) reasons.push("no_swap_confirmation");
if (CONFIG.requireSwapAfterLiquidity && hasSwaps && st.firstSwapAtMs && st.liquidityAboveSinceMs) {
if (st.firstSwapAtMs < st.liquidityAboveSinceMs) {
reasons.push("swap_before_liquidity_threshold");
}
}
// Candidate only if no reasons
if (reasons.length === 0) {
return { status: "candidate" as const, ageS };
}
// Reject only when finalizing (end of early-life window) OR when we hit a hard gate
const hardReject =
reasons.includes("dex_not_allowlisted") || reasons.includes("quote_token_not_allowlisted");
if (opts?.finalize || hardReject) {
return { status: "reject" as const, ageS, reasons };
}
return { status: "pending" as const, ageS };
}8src/streams/walletTxs.ts adds a real-time swap confirmation signal: it subscribes to GoldRush’s walletTxs stream for a given address and triggers onSwap() whenever a swap-like decoded transaction is observed, so your validator can mark “trading has begun” instead of relying only on pair creation/liquidity.src/streams/walletTxs.ts:// src/streams/walletTxs.ts
import { makeClient } from "./client";
import { CONFIG } from "../core/config";
function isSwapLike(payload: any): boolean {
const t = String(payload?.decoded_type || payload?.event_type || payload?.tx_type || "")
.toUpperCase()
.trim();
if (t.includes("SWAP")) return true;
// Defensive fallback: some payloads expose protocol action names
const action = String(payload?.action || payload?.category || "").toUpperCase();
return action.includes("SWAP");
}
export async function watchWalletTxs(args: {
address: string;
onSwap: () => void;
onError?: (err: unknown) => void;
}) {
const client = makeClient({
error: (err) => args.onError?.(err),
});
// NOTE:
// SDK method names can vary. If this name differs in your version,
// use the same grep approach from Part 1 Step 5 and swap it here.
const unsubscribe = (client as any).StreamingService.subscribeToWalletTxs(
{
chain_name: CONFIG.streamChain,
wallet_address: args.address,
},
{
next: (payload: any) => {
if (isSwapLike(payload)) args.onSwap();
},
error: (err: any) => {
args.onError?.(err?.message || err);
},
complete: () => {},
}
);
return {
stop: async () => {
unsubscribe?.();
await (client as any).StreamingService.disconnect?.();
},
};
}
9src/streams/updatePairs.ts to trigger validation.updatePairs.ts do more than “log early-life updates”: it calls your validator as updates arrive and emits a decision (ignore/watch/candidate) once the pair has enough evidence. It also stops tracking early once a decision is reached, so you don’t waste subscriptions and keep the process stable during bursts.trackPairEarlyLife in src/streams/updatePairs.ts with this version:// src/streams/updatePairs.ts
import { makeClient } from "./client";
import { CONFIG } from "../core/config";
import { Metrics } from "../core/metrics";
import { PairStore } from "../core/pairStore";
import { logEvent, nowIso } from "../core/logger";
import { evaluatePair } from "../validation/evaluate";
function extractLiquidityUsd(payload: any): number | undefined {
const candidates = [
payload?.liquidity_usd,
payload?.liquidityUSD,
payload?.quote_usd,
payload?.tvl_usd,
payload?.reserve_quote_usd,
];
for (const c of candidates) {
const n = Number(c);
if (Number.isFinite(n)) return n;
}
return undefined;
}
export async function trackPairEarlyLife(args: {
pairAddress: string;
metrics: Metrics;
store: PairStore;
// Called once liquidity is meaningful, so we can begin swap confirmation
ensureSwapWatch: (pairAddress: string) => void;
// Called when the pair is decided, so the caller can stop other resources
onDecided: (pairAddress: string) => void;
}) {
const st = args.store.get(args.pairAddress);
if (!st) return;
args.metrics.pairsTracked++;
const client = makeClient({
error: (err) => console.error("[updatePairs] error:", err),
});
const unsubscribe = (client as any).StreamingService.subscribeToUpdatePairs(
{
chain_name: CONFIG.streamChain,
pair_addresses: [args.pairAddress],
},
{
next: (payload: any) => {
const state = args.store.get(args.pairAddress);
if (!state) return;
if (state.decision) {
unsubscribe?.();
return;
}
const liqUsd = extractLiquidityUsd(payload);
args.store.updateLiquidity(state, liqUsd);
// As soon as liquidity is meaningful, start swap watcher (limited by caller)
if (
typeof liqUsd === "number" &&
liqUsd >= CONFIG.minLiquidityUsd
) {
args.ensureSwapWatch(args.pairAddress);
}
// Emit early-life observation (still useful for debugging/tuning)
const ageS = (Date.now() - state.firstObservedAtMs) / 1000;
logEvent({
kind: "pair_early_life",
observed_at: nowIso(),
chain: CONFIG.streamChain,
pair_address: args.pairAddress,
age_s: Math.max(0, Math.round(ageS)),
liquidity_usd: liqUsd,
swap_count_seen: state.swapTxsSeen,
flags: {
crossed_liquidity_threshold:
typeof liqUsd === "number" && liqUsd >= CONFIG.minLiquidityUsd,
became_active: state.swapTxsSeen >= CONFIG.minSwapEvents,
},
});
// Validate after every update
const res = evaluatePair(state);
if (res.status === "candidate") {
args.store.decide(state, "candidate", []);
args.onDecided(args.pairAddress);
unsubscribe?.();
}
// End-of-window finalize
if (!args.store.shouldStillTrack(state)) {
const final = evaluatePair(state, { finalize: true });
if (final.status === "reject") {
args.store.decide(state, "reject", final.reasons);
args.onDecided(args.pairAddress);
}
unsubscribe?.();
}
},
error: (err: any) => {
args.metrics.reconnects++;
console.error("[updatePairs] subscription error:", err?.message || err);
},
complete: () => {},
}
);
// Hard stop fallback
setTimeout(() => unsubscribe?.(), CONFIG.earlyLifeSeconds * 1000 + 2_000);
return {
stop: async () => {
unsubscribe?.();
await (client as any).StreamingService.disconnect?.();
},
};
}
10src/index.ts because Part 2 changes the app’s role from a passive radar (Part 1: “log everything”) into a validation controller (Part 2: “log + decide + output candidates”).index.ts writes decisions to a candidate's file (via candidateSink) so execution can later consume a clean, explainable queue.walletTxs swap watchers only when needed, caps concurrent watchers, and shuts them down once a pair is decided—so the process stays stable under bursts.src/index.ts with this Part 2 version:// src/index.ts
import { CONFIG } from "./core/config";
import { Metrics } from "./core/metrics";
import { PairStore } from "./core/pairStore";
import { logEvent, nowIso } from "./core/logger";
import { startNewPairsStream } from "./streams/newPairs";
import { trackPairEarlyLife } from "./streams/updatePairs";
import { evaluatePair } from "./validation/evaluate";
import { initCandidateSink, writeCandidate, closeCandidateSink } from "./core/candidateSink";
import { watchWalletTxs } from "./streams/walletTxs";
type Stopper = { stop: () => Promise<void> };
async function main() {
console.log("Monad Sniper Bot (Part 2) — Validation Layer");
console.log(`Chain: ${CONFIG.streamChain}`);
console.log(
`Liquidity >= $${CONFIG.minLiquidityUsd} sustained ${CONFIG.liquiditySustainSeconds}s | Swaps >= ${CONFIG.minSwapEvents}`
);
console.log(`Candidates file: ${CONFIG.candidatesFile}\n`);
initCandidateSink();
const metrics = new Metrics();
const store = new PairStore();
// Track swap watchers so we can cap them
const swapWatchers = new Map<string, Stopper>();
function canStartSwapWatcher() {
return swapWatchers.size < CONFIG.maxSwapWatchers;
}
async function ensureSwapWatch(pairAddress: string) {
if (swapWatchers.has(pairAddress)) return;
if (!canStartSwapWatcher()) return;
const watcher = await watchWalletTxs({
address: pairAddress,
onSwap: () => {
const st = store.get(pairAddress);
if (!st || st.decision) return;
store.recordSwapTx(st);
// Re-evaluate whenever swaps arrive
const res = evaluatePair(st);
if (res.status === "candidate") {
store.decide(st, "candidate", []);
emitDecision(pairAddress);
}
},
onError: (err) => {
metrics.reconnects++;
console.error("[walletTxs] error:", err);
},
});
swapWatchers.set(pairAddress, watcher);
}
function stopSwapWatch(pairAddress: string) {
const w = swapWatchers.get(pairAddress);
if (!w) return;
w.stop().catch(() => {});
swapWatchers.delete(pairAddress);
}
function emitDecision(pairAddress: string) {
const st = store.get(pairAddress);
if (!st || !st.decision) return;
const ageS = Math.max(0, Math.floor((Date.now() - st.firstObservedAtMs) / 1000));
const evt = {
kind: "validation_decision" as const,
observed_at: nowIso(),
chain: CONFIG.streamChain,
pair_address: pairAddress,
dex_name: st.dexName,
token0: st.token0,
token1: st.token1,
decision: st.decision,
reasons: st.decision === "reject" ? st.decisionReasons || [] : [],
snapshot: {
age_s: ageS,
liquidity_usd: st.lastLiquidityUsd,
liquidity_sustain_s: st.liquidityAboveSinceMs
? Math.floor((Date.now() - st.liquidityAboveSinceMs) / 1000)
: undefined,
swaps_seen: st.swapTxsSeen,
},
};
logEvent(evt);
writeCandidate(evt);
// Once decided, stop swap watching to avoid wasted subscriptions
stopSwapWatch(pairAddress);
}
const newPairs = await startNewPairsStream({
metrics,
store,
onPairDiscovered: async (pairAddress) => {
const st = store.get(pairAddress);
if (!st || st.decision) return;
// Fast validation at discovery (allowlist rejections happen here)
const first = evaluatePair(st);
if (first.status === "reject") {
store.decide(st, "reject", first.reasons);
emitDecision(pairAddress);
return;
}
// Otherwise track early life
trackPairEarlyLife({
pairAddress,
metrics,
store,
ensureSwapWatch: (addr) => {
ensureSwapWatch(addr).catch((e) => console.error("[ensureSwapWatch] failed:", e));
},
onDecided: (addr) => {
emitDecision(addr);
},
}).catch((e) => console.error("[trackPairEarlyLife] failed:", e));
// Periodic metrics summary (still useful)
if (metrics.pairsSeen % CONFIG.printMetricsEvery === 0) {
const s = metrics.summary();
logEvent({
kind: "metrics_summary",
observed_at: nowIso(),
chain: CONFIG.streamChain,
totals: s.totals,
lag_ms: s.lag_ms,
});
}
},
});
process.on("SIGINT", async () => {
console.log("\n[main] shutting down…");
closeCandidateSink();
// stop streams
await newPairs.stop();
// stop swap watchers
for (const [addr, w] of swapWatchers) {
await w.stop().catch(() => {});
swapWatchers.delete(addr);
}
process.exit(0);
});
}
main().catch((err) => {
console.error("[main] fatal:", err);
process.exit(1);
});
startNewPairsStream (from Part 1) stores dexName/token0/token1 on discovery. We need a minor upgrade to src/streams/newPairs.ts.11src/streams/newPairs.ts Specifically, you’re updating the object you pass into opts.store.upsertDiscovery(...) so the radar stores DEX + token addresses at discovery time (needed for allowlists/validation in Part 2).src/streams/newPairs.tsconst st = opts.store.upsertDiscovery({
pairAddress: normalized,
createdAtChainMs: createdAtMs,
});const st = opts.store.upsertDiscovery({
pairAddress: normalized,
createdAtChainMs: createdAtMs,
dexName: payload?.dex_name || payload?.dex,
token0: payload?.token0_address || payload?.token0,
token1: payload?.token1_address || payload?.token1,
});
upsertDiscovery(...) call. Everything else can remain the same.12validation_decision events.src/index.ts (via the dev script)package.json script should still look like:"dev": "tsx src/index.ts"npm run devpair_discovered, pair_early_life), but now you should also see validation decisions.{"kind":"pair_discovered","observed_at":"2025-12-23T02:11:04.921Z","chain":"MONAD_MAINNET","pair_address":"0x8b1c...f2a9","dex_name":"uniswapv2","token0":"0xC02a...6Cc2","token1":"0xA0b8...eB48","event_lag_ms":187,"is_duplicate":false}
{"kind":"pair_early_life","observed_at":"2025-12-23T02:11:18.402Z","chain":"MONAD_MAINNET","pair_address":"0x8b1c...f2a9","age_s":13,"liquidity_usd":6420,"swap_count_seen":2,"flags":{"crossed_liquidity_threshold":true,"became_active":true}}
{"kind":"validation_decision","observed_at":"2025-12-23T02:11:18.410Z","chain":"MONAD_MAINNET","pair_address":"0x8b1c...f2a9","decision":"candidate","reasons":["liquidity_persisted","swaps_confirmed","allowlists_passed"],"signals":{"liquidity_usd":6420,"swap_count_seen":2,"persistence_s":24}}ls candidates.jsonl
tail -n 5 candidates.jsonldecision":"candidate" lines in candidates.jsonl, you’re good — that file becomes the execution input for Part 3.
After ~20–30 minutes, you should be able to answer:
Do most pairs get rejected for sensible reasons (noise reduction is working)?
Do candidate decisions correspond to pairs that actually show sustained liquidity + swaps?
Is candidates.jsonl small and high-intent (not a firehose)?
Does tightening allowlists reduce tracking load without missing “real” pairs?
If your candidate feed is huge, your gates are too loose.
If it’s always empty, your gates are too strict—or swap confirmation isn’t wired correctly.
1) subscribeToWalletTxs is not a function
Causes: Your SDK uses a different method name. Use the same approach from Part 1, Step 5:
Search your installed SDK in node_modules
Find the wallet tx subscription method
Replace the call in src/streams/walletTxs.ts
2) Candidates never appear
Most common causes:
QUOTE_TOKEN_ALLOWLIST is set, but doesn’t match the tokens you’re seeing
liquidity never sustains above your threshold long enough (LIQUIDITY_SUSTAIN_SECONDS too high)
swap watcher isn’t receiving swap-like payloads (decoded fields differ)
Quick test:
temporarily set LIQUIDITY_SUSTAIN_SECONDS=5
Leave allowlists blank
Confirm you see any candidates
Then tighten back up.
3) Too many concurrent watchers
That’s why we added MAX_SWAP_WATCHERS.
If you still hit load issues:
reduce EARLY_LIFE_SECONDS
raise MIN_LIQUIDITY_USD
add allowlists
Those three changes are the cleanest way to lower volume without hacks.
You now have the missing middle of most sniper systems: a validation layer that turns noisy discovery into a small set of auditable candidates.
You’re still not trading—and that’s the point. You’re building the discipline that prevents the execution layer from firing at garbage.
In Part 3, we will focus on wiring execution using the candidates.jsonl file. This will involve implementing deterministic entry rules and incorporating slippage and failure handling to ensure a robust process. Additionally, we will explore safe submission patterns for transactions to enhance reliability.
Moving on to Part 4, we will operationalize our approach by setting up monitoring and alerting systems. We will also discuss the importance of backfills and create runbooks to guide our procedures. Lastly, we will establish production guardrails to keep the bot operational during bursts, ensuring consistent performance across varied conditions.