Welcome back! If you’ve made it through Part 2 of our guide (the one where we set up streaming data on Solana and built a live dashboard that alerts you to unusual wallet activity and token-price weirdness), congratulations—you’re ready to level up. In this next chapter, we’re going from “seeing the problem” to “solving the problem.”
In Part 3 we’ll roll up our sleeves and turn our detection engine into an operational powerhouse: we’ll walk through building automated incident-response playbooks, integrating with remediation workflows, and embedding feedback loops that learn and improve over time. Because spotting an anomaly is only half the battle—what matters is acting fast, smart, and automatically.
So grab your favourite dev setup, and let’s transform your real-time observability stack into a full-blown defence system. Let’s go

Begin by installing Node.js (version 16+) and npm. Visit nodejs.org and download the LTS version. Verify installation:
node --version
npm --version2.1) Create a new directory for your project:
mkdir solana-anomaly-detector
cd solana-anomaly-detector
What this does: Creates a new folder for your project and enters it. All your files will be organized here.
2.2) Initialize npm and install dependencies:
npm init -y
npm install dotenv @covalenthq/client-sdk chalkWhat this does:
npm init -y
creates a package.json file (package configuration file) with default settings
npm install dotenv
installs the dotenv package to safely load environment variables from .env file
npm install @covalenthq/client-sdk
installs Covalent's official Goldrush SDK for streaming blockchain data
npm install chalk
installs the chalk package for colored terminal output (makes logs easier to read)
2.3) Create a .env file in the root directory:
GOLDRUSH_KEY=your_api_key_hereWhat this does: Stores your Goldrush Streaming API key securely in a .env file. This file is ignored by Git (never committed) so your credentials stay private.
Obtain your Goldrush API key from goldrush.dev by registering as a developer. This key authenticates your requests to the Goldrush Streaming API.
Organize your project with this directory structure:
solana-anomaly-detector/
├── .env
├── .gitignore
├── package.json
├── package-lock.json
├── security_analytics.js
├── train_security_model.py
├── visualize_results.py
├── predict_anomalies.py
├── requirements.txt
Let’s start the python setup :
In the root folder , your requirements.txt should be there
numpy>=1.21.0
pandas>=1.3.0
scikit-learn>=1.0.0
matplotlib>=3.4.0
seaborn>=0.11.0
joblib>=1.1.0
Next :
Run in the root folder where requirement.txt is presentpip install -r requirements.txt
Now lets figure out the coding part :
In the root , make a file named security_analytics.jsx
1. First let's import the and setup the part we will need :
require('dotenv').config();
const fs = require('fs');
const path = require('path');
const {
GoldRushClient,
StreamingChain,
StreamingInterval,
StreamingTimeframe,
} = require('@covalenthq/client-sdk');
let chalk;
(async () => {
chalk = (await import('chalk')).default;
const apiKey = process.env.GOLDRUSH_KEY;
if (!apiKey) {
console.error("ERROR: GOLDRUSH_KEY not found in .env file");
process.exit(1);
}
const client = new GoldRushClient(apiKey, {}, {
onConnecting: () => console.log(chalk.blue("🔗 Connecting to GoldRush streaming service...")),
onOpened: () => {
console.clear();
displayHeader();
console.log(chalk.green("✓ Connected to streaming service!\n"));
console.log(chalk.yellow("🛡️ Advanced Security Monitoring Active...\n"));
},
onClosed: () => {
console.log(chalk.yellow("✓ Disconnected from streaming service"));
process.exit(0);
},
onError: (error) => {
console.error(chalk.red("✗ Streaming error:"), error);
},
});2. Setting up the configuration
const CONFIG = {
tokenAddress: "So11111111111111111111111111111111111111112", // Wrapped SOL
maxHistory: 100,
// Detection Thresholds
flashLoan: {
minPriceSwing: 0.05, // 5% minimum price swing
maxTimeWindow: 60000, // 60 seconds
minVolumeMultiplier: 10 // 10x volume spike
},
rugPull: {
volumeDropThreshold: 0.5, // 50% volume drop
priceDropThreshold: 0.2, // 20% price drop
observationWindow: 10 // Last 10 candles
},
sandwichAttack: {
minSpikePercent: 0.03, // 3% spike
minDropPercent: 0.025, // 2.5% drop
windowSize: 3 // 3 candles
},
priceManipulation: {
stdDevMultiplier: 2.5, // 2.5 standard deviations
minDataPoints: 20
},
// ML Data Export
exportData: true,
exportPath: './security_data',
exportBatchSize: 100
};3. Now here comes the data storage part
const dataStore = {
priceHistory: [],
volumeHistory: [],
anomalyHistory: [],
networkMetrics: {
blockTimes: [],
gasUsage: [],
failedTxRate: []
},
securityEvents: [],
mlDataBuffer: []
};
let streamStartTime = Date.now();
let dataCount = 0;
let criticalAlertsCount = 0;
let totalAnomaliesDetected = 0;4. Let's set up the utility functions
function displayHeader() {
console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════'));
console.log(chalk.cyan.bold(' 🛡️ WEB3 ADVANCED SECURITY ANALYTICS - GOLDRUSH'));
console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════'));
console.log(chalk.yellow.bold(' Detection Systems: Flash Loans | Rug Pulls | Sandwich Attacks'));
console.log(chalk.yellow.bold(' ML Ready: Real-time Data Export | Pattern Recognition'));
console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════'));
}
function clearScreen() {
console.clear();
displayHeader();
}
function formatPrice(price) {
if (!price || price === 0) return '0.000000';
return parseFloat(price).toFixed(6);
}
function formatVolume(volume) {
if (!volume || volume === 0) return '0.0000e+0';
return parseFloat(volume).toExponential(4);
}
function calculateStatistics(history) {
if (history.length < 2) return null;
const values = history.map(c => parseFloat(c.close) || 0).filter(p => p > 0);
if (values.length < 2) return null;
const mean = values.reduce((a, b) => a + b) / values.length;
const variance = values.reduce((sq, n) => sq + Math.pow(n - mean, 2), 0) / values.length;
const stdDev = Math.sqrt(variance);
const sorted = [...values].sort((a, b) => a - b);
const median = sorted[Math.floor(sorted.length / 2)];
return { mean, median, stdDev, variance, count: values.length };
}5. Now we will setup our Detection Algorithm
a. FLASH LOAN EXPLOIT DETECTION
Detects rapid price manipulation characteristic of flash loan attacks
function detectFlashLoanExploit(candle, history) {
const anomalies = [];
if (history.length < 2) return anomalies;
const prevCandle = history[history.length - 1];
const timeDiff = new Date(candle.timestamp) - new Date(prevCandle.timestamp);
const currentPrice = parseFloat(candle.close) || 0;
const prevPrice = parseFloat(prevCandle.close) || 0;
const currentVolume = parseFloat(candle.volume) || 0;
const prevVolume = parseFloat(prevCandle.volume) || 1;
if (currentPrice <= 0 || prevPrice <= 0) return anomalies;
// Calculate metrics
const priceSwing = Math.abs((currentPrice - prevPrice) / prevPrice);
const volumeRatio = currentVolume / prevVolume;
const priceVelocity = priceSwing / (timeDiff / 1000); // % per second
// Flash loan signature: rapid price movement + massive volume spike
if (timeDiff < CONFIG.flashLoan.maxTimeWindow &&
priceSwing > CONFIG.flashLoan.minPriceSwing &&
volumeRatio > CONFIG.flashLoan.minVolumeMultiplier) {
anomalies.push({
type: 'FLASH_LOAN_EXPLOIT',
severity: 'CRITICAL',
confidence: Math.min(95, 60 + (volumeRatio * 3)),
value: `${(priceSwing*100).toFixed(2)}% in ${(timeDiff/1000).toFixed(1)}s`,
details: `Price velocity: ${(priceVelocity*100).toFixed(2)}%/s, Volume: ${volumeRatio.toFixed(1)}x`,
metadata: {
priceSwing,
timeDiff,
volumeRatio,
priceVelocity,
currentPrice,
prevPrice
}
});
}
// Secondary indicator: Repeated rapid reversals (attack attempts)
if (history.length >= 5) {
const recentSwings = [];
for (let i = history.length - 4; i < history.length; i++) {
const swing = Math.abs((history[i].close - history[i-1].close) / history[i-1].close);
recentSwings.push(swing);
}
const avgSwing = recentSwings.reduce((a,b) => a+b) / recentSwings.length;
if (avgSwing > 0.03) { // Average 3% swings
anomalies.push({
type: 'PRICE_MANIPULATION_PATTERN',
severity: 'HIGH',
confidence: 75,
value: `${(avgSwing*100).toFixed(2)}% avg swing`,
details: 'Repeated rapid price reversals detected',
metadata: { avgSwing, swingCount: recentSwings.length }
});
}
}
return anomalies;
}b. RUG PULL DETECTION
Monitors liquidity drain patterns and suspicious trading restrictions
function detectRugPullSignals(candle, history) {
const signals = [];
if (history.length < CONFIG.rugPull.observationWindow) return signals;
const currentVolume = parseFloat(candle.volume) || 0;
const currentPrice = parseFloat(candle.close) || 0;
// Calculate recent averages
const recentCandles = history.slice(-CONFIG.rugPull.observationWindow);
const recentVolumes = recentCandles.map(c => parseFloat(c.volume) || 0);
const avgRecentVolume = recentVolumes.reduce((a,b) => a+b) / recentVolumes.length;
const oldestPrice = parseFloat(recentCandles[0].close) || 1;
const priceChange = (currentPrice - oldestPrice) / oldestPrice;
// SIGNAL 1: Liquidity Drain (volume drops + price drops)
const volumeDropRatio = currentVolume / avgRecentVolume;
if (volumeDropRatio < CONFIG.rugPull.volumeDropThreshold &&
priceChange < -CONFIG.rugPull.priceDropThreshold) {
signals.push({
type: 'LIQUIDITY_DRAIN',
severity: 'CRITICAL',
confidence: 85,
value: `Vol -${((1-volumeDropRatio)*100).toFixed(0)}%, Price -${Math.abs(priceChange*100).toFixed(1)}%`,
details: 'Potential rug pull: Liquidity draining with price collapse',
metadata: {
volumeDropRatio,
priceChange,
avgRecentVolume,
currentVolume
}
});
}
// SIGNAL 2: Death Spiral (accelerating decline)
if (history.length >= 20) {
const last20Candles = history.slice(-20);
const priceChanges = [];
for (let i = 1; i < last20Candles.length; i++) {
const change = (last20Candles[i].close - last20Candles[i-1].close) / last20Candles[i-1].close;
priceChanges.push(change);
}
const negativeCandles = priceChanges.filter(c => c < 0).length;
const avgDecline = priceChanges.filter(c => c < 0).reduce((a,b) => a+b, 0) / negativeCandles || 0;
if (negativeCandles >= 15 && avgDecline < -0.05) {
signals.push({
type: 'DEATH_SPIRAL',
severity: 'HIGH',
confidence: 80,
value: `${negativeCandles}/20 red candles`,
details: `Average decline: ${(avgDecline*100).toFixed(2)}% per candle`,
metadata: { negativeCandles, avgDecline }
});
}
}
// SIGNAL 3: Volume Evaporation (sudden complete volume loss)
if (currentVolume < avgRecentVolume * 0.1 && avgRecentVolume > 0) {
signals.push({
type: 'VOLUME_EVAPORATION',
severity: 'HIGH',
confidence: 70,
value: `${((1 - currentVolume/avgRecentVolume)*100).toFixed(0)}% volume loss`,
details: 'Trading activity collapsed - possible liquidity removal',
metadata: { currentVolume, avgRecentVolume }
});
}
return signals;
}c. SANDWICH ATTACK DETECTION
Identifies front-running patterns (price spike → victim trade → price drop)
function detectSandwichAttack(candle, history) {
const attacks = [];
if (history.length < CONFIG.sandwichAttack.windowSize) return attacks;
// Analyze last 3 candles for sandwich pattern
const before = history[history.length - 2];
const during = history[history.length - 1];
const after = candle;
const beforePrice = parseFloat(before.close) || 0;
const duringHigh = parseFloat(during.high) || 0;
const afterPrice = parseFloat(after.close) || 0;
if (beforePrice <= 0 || duringHigh <= 0 || afterPrice <= 0) return attacks;
// Pattern: price spikes up, then drops back down
const spike = (duringHigh - beforePrice) / beforePrice;
const normalization = (afterPrice - duringHigh) / duringHigh;
if (spike > CONFIG.sandwichAttack.minSpikePercent &&
normalization < -CONFIG.sandwichAttack.minDropPercent) {
attacks.push({
type: 'SANDWICH_ATTACK',
severity: 'HIGH',
confidence: 75,
value: `+${(spike*100).toFixed(2)}% → ${(normalization*100).toFixed(2)}%`,
details: 'Front-running pattern: Price manipulated around victim transaction',
metadata: {
spike,
normalization,
beforePrice,
duringHigh,
afterPrice,
victimLoss: Math.abs(spike + normalization) * 100
}
});
}
// Extended pattern detection (5-candle window)
if (history.length >= 5) {
for (let i = history.length - 4; i < history.length - 2; i++) { // important line
const c1 = parseFloat(history[i].close) || 0;
const c2 = parseFloat(history[i+1].high) || 0;
const c3 = parseFloat(history[i+2].close) || 0;
if (c1 > 0 && c2 > 0 && c3 > 0) {
const earlySpike = (c2 - c1) / c1;
const laterDrop = (c3 - c2) / c2;
if (earlySpike > 0.02 && laterDrop < -0.015) {
attacks.push({
type: 'MULTI_BLOCK_SANDWICH',
severity: 'MEDIUM',
confidence: 65,
value: `Extended pattern detected`,
details: `Multi-block sandwich across ${i} to ${i+2}`,
metadata: { earlySpike, laterDrop, startIndex: i }
});
break;
}
}
}
}
return attacks;
}d. SMART CONTRACT ANOMALY DETECTION
Statistical analysis of price behavior for contract-level issues
function detectSmartContractAnomalies(candle, history) {
const anomalies = [];
if (history.length < CONFIG.priceManipulation.minDataPoints) return anomalies;
const stats = calculateStatistics(history);
if (!stats) return anomalies;
const currentPrice = parseFloat(candle.close) || 0;
if (currentPrice <= 0) return anomalies;
// Z-score analysis (statistical deviation)
const zScore = (currentPrice - stats.mean) / stats.stdDev;
if (Math.abs(zScore) > CONFIG.priceManipulation.stdDevMultiplier) {
anomalies.push({
type: 'STATISTICAL_ANOMALY',
severity: Math.abs(zScore) > 3.5 ? 'CRITICAL' : 'HIGH',
confidence: Math.min(95, 50 + Math.abs(zScore) * 10),
value: `Z-score: ${zScore.toFixed(2)}σ`,
details: `Price ${zScore > 0 ? 'above' : 'below'} normal by ${Math.abs(zScore).toFixed(2)} std deviations`,
metadata: {
zScore,
currentPrice,
mean: stats.mean,
stdDev: stats.stdDev
}
});
}
// Volatility spike detection
const currentVolatility = (parseFloat(candle.high) - parseFloat(candle.low)) / parseFloat(candle.open);
const recentVolatilities = history.slice(-20).map(c =>
(parseFloat(c.high) - parseFloat(c.low)) / parseFloat(c.open)
);
const avgVolatility = recentVolatilities.reduce((a,b) => a+b) / recentVolatilities.length;
if (currentVolatility > avgVolatility * 3) {
anomalies.push({
type: 'EXTREME_VOLATILITY',
severity: 'MEDIUM',
confidence: 70,
value: `${(currentVolatility*100).toFixed(2)}% range`,
details: `Volatility ${(currentVolatility/avgVolatility).toFixed(1)}x higher than average`,
metadata: { currentVolatility, avgVolatility }
});
}
return anomalies;
}e. ECONOMIC EXPLOIT DETECTION (MEV)
Detects Maximal Extractable Value patterns
function detectEconomicExploits(candle, history) {
const exploits = [];
if (history.length < 10) return exploits;
// Pattern: Repeated micro-profits (MEV bots)
const last10 = history.slice(-10);
let profitablePatterns = 0;
for (let i = 1; i < last10.length; i++) {
const priceMove = (last10[i].close - last10[i-1].close) / last10[i-1].close;
const volumeSpike = last10[i].volume / (last10[i-1].volume || 1);
// Small profitable move with volume spike = MEV
if (Math.abs(priceMove) > 0.005 && Math.abs(priceMove) < 0.02 && volumeSpike > 2) {
profitablePatterns++;
}
}
if (profitablePatterns >= 5) {
exploits.push({
type: 'MEV_BOT_ACTIVITY',
severity: 'MEDIUM',
confidence: 60,
value: `${profitablePatterns}/10 patterns`,
details: 'Repeated micro-arbitrage patterns detected (likely MEV bots)',
metadata: { profitablePatterns }
});
}
return exploits;
}f. NETWORK CONGESTION PREDICTION
Predicts network issues based on transaction patterns
function predictNetworkCongestion(candle, history) {
const predictions = [];
// This would ideally use block-level data, but we can infer from trading patterns
if (history.length < 30) return predictions;
const recent = history.slice(-30);
const timestamps = recent.map(c => new Date(c.timestamp).getTime());
const intervals = [];
for (let i = 1; i < timestamps.length; i++) {
intervals.push(timestamps[i] - timestamps[i-1]);
}
const avgInterval = intervals.reduce((a,b) => a+b) / intervals.length;
const currentInterval = new Date(candle.timestamp).getTime() - timestamps[timestamps.length - 1];
// If candles are arriving slower than usual
if (currentInterval > avgInterval * 1.5) {
predictions.push({
type: 'NETWORK_CONGESTION',
severity: 'MEDIUM',
confidence: 50,
value: `${(currentInterval/1000).toFixed(0)}s delay`,
details: `Data arrival ${(currentInterval/avgInterval).toFixed(1)}x slower than normal`,
metadata: { currentInterval, avgInterval }
});
}
return predictions;
}5. ML DATA EXPORT
Prepare training data for machine learning models
function prepareMLDataPoint(candle, anomalies, history) {
const stats = calculateStatistics(history);
return {
timestamp: candle.timestamp,
features: {
// Price features
open: parseFloat(candle.open) || 0,
high: parseFloat(candle.high) || 0,
low: parseFloat(candle.low) || 0,
close: parseFloat(candle.close) || 0,
// Volume features
volume: parseFloat(candle.volume) || 0,
volumeUsd: parseFloat(candle.volume_usd) || 0,
// Derived features
priceRange: (parseFloat(candle.high) - parseFloat(candle.low)) || 0,
bodySize: Math.abs((parseFloat(candle.close) - parseFloat(candle.open))) || 0,
volatility: ((parseFloat(candle.high) - parseFloat(candle.low)) / parseFloat(candle.open)) || 0,
// Statistical features
zScore: stats ? ((parseFloat(candle.close) - stats.mean) / stats.stdDev) : 0,
distanceFromMean: stats ? Math.abs(parseFloat(candle.close) - stats.mean) : 0,
// Historical features
priceChange1: history.length > 0 ?
((parseFloat(candle.close) - parseFloat(history[history.length-1].close)) / parseFloat(history[history.length-1].close)) : 0,
volumeChange1: history.length > 0 ?
(parseFloat(candle.volume) / (parseFloat(history[history.length-1].volume) || 1)) : 0,
},
labels: {
hasAnomaly: anomalies.length > 0,
anomalyCount: anomalies.length,
maxSeverity: anomalies.length > 0 ? anomalies[0].severity : 'NONE',
anomalyTypes: anomalies.map(a => a.type),
isCritical: anomalies.some(a => a.severity === 'CRITICAL')
},
metadata: {
tokenAddress: CONFIG.tokenAddress,
dataPointIndex: dataCount
}
};
}
function exportMLData(dataPoint) {
if (!CONFIG.exportData) return;
// Add to buffer
dataStore.mlDataBuffer.push(dataPoint);
// Batch write to file
if (dataStore.mlDataBuffer.length >= CONFIG.exportBatchSize) {
const exportDir = CONFIG.exportPath;
// Create directory if it doesn't exist
if (!fs.existsSync(exportDir)) {
fs.mkdirSync(exportDir, { recursive: true });
}
const filename = path.join(exportDir, `security_data_${Date.now()}.jsonl`);
const dataLines = dataStore.mlDataBuffer.map(d => JSON.stringify(d)).join('\n') + '\n';
fs.appendFileSync(filename, dataLines);
console.log(chalk.gray(`\n💾 Exported ${dataStore.mlDataBuffer.length} data points to ${filename}`));
dataStore.mlDataBuffer = [];
}
}6. DISPLAY FUNCTIONS
function displaySecurityDashboard(candle, allAnomalies) {
clearScreen();
const timestamp = new Date(candle.timestamp).toLocaleString();
const uptime = Math.floor((Date.now() - streamStartTime) / 1000);
const uptimeStr = `${Math.floor(uptime / 60)}m ${uptime % 60}s`;
// Header Info
console.log(chalk.yellow.bold(`\n⏰ ${timestamp} | Uptime: ${uptimeStr} | Events: ${dataCount}`));
console.log(chalk.gray('─'.repeat(70)));
// Price Display
console.log(chalk.cyan('\n📊 MARKET DATA'));
console.log(chalk.white(` Open: ${chalk.blue(formatPrice(candle.open))}`));
console.log(chalk.white(` High: ${chalk.green(formatPrice(candle.high))}`));
console.log(chalk.white(` Low: ${chalk.red(formatPrice(candle.low))}`));
console.log(chalk.white(` Close: ${chalk.yellow(formatPrice(candle.close))}`));
console.log(chalk.white(` Volume: ${chalk.magenta(formatVolume(candle.volume))}`));
// Security Status
console.log(chalk.cyan('\n🛡️ SECURITY STATUS'));
if (allAnomalies.length === 0) {
console.log(chalk.green(' ✓ No threats detected - All systems normal'));
} else {
// Group by severity
const critical = allAnomalies.filter(a => a.severity === 'CRITICAL');
const high = allAnomalies.filter(a => a.severity === 'HIGH');
const medium = allAnomalies.filter(a => a.severity === 'MEDIUM');
if (critical.length > 0) {
console.log(chalk.red.bold(` 🚨 CRITICAL THREATS: ${critical.length}`));
critical.forEach(a => {
console.log(chalk.red(` ${a.type}: ${a.value}`));
console.log(chalk.red(` └─ ${a.details}`));
console.log(chalk.gray(` Confidence: ${a.confidence}%`));
});
}
if (high.length > 0) {
console.log(chalk.yellow(` ⚠️ HIGH SEVERITY: ${high.length}`));
high.forEach(a => {
console.log(chalk.yellow(` ${a.type}: ${a.value}`));
console.log(chalk.yellow(` └─ ${a.details}`));
});
}
if (medium.length > 0) {
console.log(chalk.blue(` ℹ️ MEDIUM ALERTS: ${medium.length}`));
medium.forEach(a => {
console.log(chalk.blue(` ${a.type}: ${a.value}`));
});
}
}
// Statistics
console.log(chalk.cyan('\n📈 DETECTION STATISTICS'));
console.log(chalk.white(` Total Anomalies Detected: ${chalk.yellow(totalAnomaliesDetected)}`));
console.log(chalk.white(` Critical Alerts: ${chalk.red(criticalAlertsCount)}`));
console.log(chalk.white(` Data Points Analyzed: ${chalk.blue(dataCount)}`));
console.log(chalk.white(` History Buffer: ${chalk.magenta(dataStore.priceHistory.length)}/${CONFIG.maxHistory}`));
console.log(chalk.gray('\n' + '─'.repeat(70)));
console.log(chalk.gray('Press Ctrl+C to exit and save data'));
}7. Main Detection Pipeline
function runSecurityAnalysis(candle, history) {
const allAnomalies = [];
// Run all detection algorithms
allAnomalies.push(...detectFlashLoanExploit(candle, history));
allAnomalies.push(...detectRugPullSignals(candle, history));
allAnomalies.push(...detectSandwichAttack(candle, history));
allAnomalies.push(...detectSmartContractAnomalies(candle, history));
allAnomalies.push(...detectEconomicExploits(candle, history));
allAnomalies.push(...predictNetworkCongestion(candle, history));
// Update statistics
totalAnomaliesDetected += allAnomalies.length;
criticalAlertsCount += allAnomalies.filter(a => a.severity === 'CRITICAL').length;
// Store security events
if (allAnomalies.length > 0) {
dataStore.securityEvents.push({
timestamp: candle.timestamp,
anomalies: allAnomalies,
candle: {
open: candle.open,
high: candle.high,
low: candle.low,
close: candle.close,
volume: candle.volume
}
});
}
return allAnomalies;
}8. Streaming Subscription
console.log(chalk.blue('🚀 Starting security monitoring...'));
console.log(chalk.gray(`Token: ${CONFIG.tokenAddress}`));
console.log(chalk.gray(`Export: ${CONFIG.exportData ? 'Enabled' : 'Disabled'}`));
console.log('');
const unsubscribe = client.StreamingService.subscribeToOHLCVTokens(
{
chain_name: StreamingChain.SOLANA_MAINNET,
token_addresses: [CONFIG.tokenAddress],
interval: StreamingInterval.ONE_MINUTE,
timeframe: StreamingTimeframe.ONE_HOUR,
},
{
next: (data) => {
// Extract candles
let candles = [];
if (data && typeof data === 'object') {
if (data.ohlcvCandlesForToken) {
candles = Array.isArray(data.ohlcvCandlesForToken)
? data.ohlcvCandlesForToken
: [data.ohlcvCandlesForToken];
} else if (data.data && data.data.ohlcvCandlesForToken) {
candles = Array.isArray(data.data.ohlcvCandlesForToken)
? data.data.ohlcvCandlesForToken
: [data.data.ohlcvCandlesForToken];
} else if (Array.isArray(data)) {
candles = data;
} else if (data.items) {
candles = data.items;
} else {
candles = [data];
}
}
if (candles.length === 0) {
console.log(chalk.yellow('⚠️ No candles in data batch'));
return;
}
// Process each candle
candles.forEach((candle) => {
dataCount++;
// Add to history
dataStore.priceHistory.push(candle);
if (dataStore.priceHistory.length > CONFIG.maxHistory) {
dataStore.priceHistory.shift();
}
// Run security analysis
const anomalies = runSecurityAnalysis(candle, dataStore.priceHistory.slice(0, -1));
// Prepare and export ML data
const mlDataPoint = prepareMLDataPoint(candle, anomalies, dataStore.priceHistory.slice(0, -1));
exportMLData(mlDataPoint);
// Display results
displaySecurityDashboard(candle, anomalies);
// Critical alert notification
const criticalAnomalies = anomalies.filter(a => a.severity === 'CRITICAL');
if (criticalAnomalies.length > 0) {
console.log(chalk.red.bold('\n🚨 IMMEDIATE ACTION REQUIRED 🚨'));
console.log(chalk.red('Critical security threats detected. Review immediately.'));
// Log to separate critical events file
const criticalLogPath = path.join(CONFIG.exportPath, 'critical_events.log');
const logEntry = `${new Date().toISOString()} - ${JSON.stringify(criticalAnomalies)}\n`;
fs.appendFileSync(criticalLogPath, logEntry);
}
});
},
error: (error) => {
console.clear();
displayHeader();
console.log(chalk.red('\n✗ Subscription error:'));
console.log(JSON.stringify(error, null, 2));
console.log(chalk.yellow('\nTroubleshooting:'));
console.log('1. Verify your API key has streaming permissions');
console.log('2. Check token address is valid for Solana');
console.log('3. Contact [email protected] for Beta access');
process.exit(1);
},
complete: () => {
console.log(chalk.green("\n✓ Stream completed"));
saveSessionReport();
process.exit(0);
},
}
);9. Session Reporting
function saveSessionReport() {
const report = {
sessionInfo: {
startTime: new Date(streamStartTime).toISOString(),
endTime: new Date().toISOString(),
duration: Date.now() - streamStartTime,
totalDataPoints: dataCount
},
statistics: {
totalAnomalies: totalAnomaliesDetected,
criticalAlerts: criticalAlertsCount,
securityEvents: dataStore.securityEvents.length
},
configuration: CONFIG,
securityEvents: dataStore.securityEvents
};
const reportPath = path.join(CONFIG.exportPath, `session_report_${Date.now()}.json`);
if (!fs.existsSync(CONFIG.exportPath)) {
fs.mkdirSync(CONFIG.exportPath, { recursive: true });
}
fs.writeFileSync(reportPath, JSON.stringify(report, null, 2));
console.log(chalk.green(`\n📊 Session report saved: ${reportPath}`));
}10 . Graceful Shutdown
process.on('SIGINT', async () => {
console.log("\n" + chalk.yellow("🛑 Shutting down security monitoring..."));
// Flush remaining ML data
if (dataStore.mlDataBuffer.length > 0) {
const filename = path.join(CONFIG.exportPath, `security_data_final_${Date.now()}.jsonl`);
const dataLines = dataStore.mlDataBuffer.map(d => JSON.stringify(d)).join('\n') + '\n';
if (!fs.existsSync(CONFIG.exportPath)) {
fs.mkdirSync(CONFIG.exportPath, { recursive: true });
}
fs.writeFileSync(filename, dataLines);
console.log(chalk.green(`✓ Saved ${dataStore.mlDataBuffer.length} pending data points`));
}
// Save session report
saveSessionReport();
// Display final summary
console.log(chalk.cyan('\n═══════════════════════════════════════════'));
console.log(chalk.cyan(' SESSION SUMMARY'));
console.log(chalk.cyan('═══════════════════════════════════════════'));
console.log(chalk.white(` Duration: ${Math.floor((Date.now() - streamStartTime) / 60000)} minutes`));
console.log(chalk.white(` Data Points: ${dataCount}`));
console.log(chalk.white(` Total Anomalies: ${totalAnomaliesDetected}`));
console.log(chalk.white(` Critical Alerts: ${criticalAlertsCount}`));
console.log(chalk.white(` Security Events: ${dataStore.securityEvents.length}`));
console.log(chalk.cyan('═══════════════════════════════════════════\n'));
unsubscribe();
await client.StreamingService.disconnect();
console.log(chalk.green('✓ Shutdown complete. Data saved.'));
process.exit(0);
});11. At the end adding a Heartbeat to show system is running
setInterval(() => {
const uptime = Math.floor((Date.now() - streamStartTime) / 1000);
if (dataCount === 0 && uptime > 30) {
console.log(chalk.gray(`⏳ Waiting for data... (${uptime}s)`));
}
}, 30000); // Every 30 seconds
})();So after compiling all this in our security_analytics.js file we run :
node security_analytics.js
It will check all recent transaction happening in the wallet given and will show how many anomalies are there . And it will automatically save all transactions in the security_data folder in json format . Which we gonna use for advanced check of anomaly through our

MODEL
What this security_analytics.js does :
Initializes the Goldrush client
Connects to Solana’s live OHLCV (price and volume) stream via Goldrush SDK for high-frequency data access.
Maintains rolling historical data
Stores recent candle data (price, volume) in buffers to detect patterns and compute moving statistics.
Performs real-time anomaly detection
Continuously analyzes live data to detect:
Generates multi-level alerts
Flags anomalies as LOW, MEDIUM, or CRITICAL severity based on deviation thresholds.
Tracks and counts security events
Maintains a counter of critical alerts and logs all detected anomalies in structured form.
Prepares machine learning data points
Converts analyzed candle + anomaly data into ML-friendly format for predictive model training.
Exports ML dataset
Automatically writes real-time analytical data to a local or cloud file (for training anomaly-prediction models).
Displays live security dashboard
Prints color-coded (via chalk) terminal summaries of:
Handles streaming errors gracefully
Captures and logs API errors or connection drops for reliability.
Supports graceful shutdown
Listens for process termination (SIGINT) to safely close the stream and persist session data.
Now, Let’s jump on our anomaly segregation using ML detection part :
#!/usr/bin/env python3
"""
Security Analytics ML Training Script
Trains machine learning models to detect blockchain security anomalies
"""
import json
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
classification_report, confusion_matrix,
roc_auc_score, roc_curve, precision_recall_curve,
accuracy_score, f1_score
)
import joblib
import warnings
warnings.filterwarnings('ignore')
# Set style for plots
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 8)
class SecurityMLTrainer:
"""Train and evaluate ML models for security anomaly detection"""
def __init__(self, data_path):
self.data_path = data_path
self.df = None
self.X_train = None
self.X_test = None
self.y_train = None
self.y_test = None
self.scaler = StandardScaler()
self.models = {}
self.best_model = None
self.feature_names = None
def load_data(self):
"""Load data from JSONL file"""
print("📂 Loading data from", self.data_path)
data = []
with open(self.data_path, 'r') as f:
for line in f:
if line.strip():
data.append(json.loads(line))
print(f"✓ Loaded {len(data)} data points")
# Extract features and labels
features_list = []
labels_list = []
for item in data:
features_list.append(item['features'])
labels_list.append(item['labels']['hasAnomaly'])
self.df = pd.DataFrame(features_list)
self.df['hasAnomaly'] = labels_list
print(f"\n📊 Dataset Overview:")
print(f" Total samples: {len(self.df)}")
print(f" Features: {len(self.df.columns) - 1}")
print(f" Anomalies: {self.df['hasAnomaly'].sum()} ({self.df['hasAnomaly'].mean()*100:.1f}%)")
print(f" Normal: {(~self.df['hasAnomaly']).sum()} ({(~self.df['hasAnomaly']).mean()*100:.1f}%)")
return self.df
def prepare_features(self, test_size=0.25, random_state=42):
"""Prepare features for training"""
print("\n🔧 Preparing features...")
# Separate features and target
X = self.df.drop('hasAnomaly', axis=1)
y = self.df['hasAnomaly']
# Store feature names
self.feature_names = X.columns.tolist()
# Handle any infinite or missing values
X = X.replace([np.inf, -np.inf], np.nan)
X = X.fillna(0)
# Split data
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state, stratify=y
)
# Scale features
self.X_train_scaled = self.scaler.fit_transform(self.X_train)
self.X_test_scaled = self.scaler.transform(self.X_test)
print(f"✓ Train set: {len(self.X_train)} samples")
print(f"✓ Test set: {len(self.X_test)} samples")
print(f"✓ Features scaled using StandardScaler")
def train_models(self):
"""Train multiple ML models"""
print("\n🤖 Training models...")
# Define models
models_config = {
'Random Forest': RandomForestClassifier(
n_estimators=100,
max_depth=10,
min_samples_split=5,
random_state=42,
class_weight='balanced'
),
'Gradient Boosting': GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
),
'Logistic Regression': LogisticRegression(
max_iter=1000,
random_state=42,
class_weight='balanced'
)
}
results = []
for name, model in models_config.items():
print(f"\n Training {name}...")
# Use scaled features for Logistic Regression, raw for tree-based
if name == 'Logistic Regression':
X_train_use = self.X_train_scaled
X_test_use = self.X_test_scaled
else:
X_train_use = self.X_train
X_test_use = self.X_test
# Train model
model.fit(X_train_use, self.y_train)
# Predictions
y_pred = model.predict(X_test_use)
y_pred_proba = model.predict_proba(X_test_use)[:, 1]
# Calculate metrics
accuracy = accuracy_score(self.y_test, y_pred)
f1 = f1_score(self.y_test, y_pred)
# Handle case where only one class is present
try:
roc_auc = roc_auc_score(self.y_test, y_pred_proba)
except:
roc_auc = 0.0
# Cross-validation score
cv_scores = cross_val_score(model, X_train_use, self.y_train, cv=3, scoring='f1')
results.append({
'Model': name,
'Accuracy': accuracy,
'F1 Score': f1,
'ROC AUC': roc_auc,
'CV F1 Mean': cv_scores.mean(),
'CV F1 Std': cv_scores.std()
})
# Store model
self.models[name] = {
'model': model,
'scaled': name == 'Logistic Regression',
'predictions': y_pred,
'probabilities': y_pred_proba
}
print(f" ✓ Accuracy: {accuracy:.3f}")
print(f" ✓ F1 Score: {f1:.3f}")
print(f" ✓ ROC AUC: {roc_auc:.3f}")
print(f" ✓ CV F1: {cv_scores.mean():.3f} (+/- {cv_scores.std():.3f})")
# Results summary
results_df = pd.DataFrame(results)
print("\n📊 Model Comparison:")
print(results_df.to_string(index=False))
# Select best model based on F1 score
best_idx = results_df['F1 Score'].idxmax()
best_model_name = results_df.iloc[best_idx]['Model']
self.best_model = self.models[best_model_name]
self.best_model['name'] = best_model_name
print(f"\n🏆 Best Model: {best_model_name}")
return results_df
def evaluate_best_model(self):
"""Detailed evaluation of the best model"""
print(f"\n📈 Detailed Evaluation - {self.best_model['name']}")
# Classification Report
print("\n" + "="*60)
print("CLASSIFICATION REPORT")
print("="*60)
print(classification_report(
self.y_test,
self.best_model['predictions'],
target_names=['Normal', 'Anomaly']
))
# Confusion Matrix
cm = confusion_matrix(self.y_test, self.best_model['predictions'])
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
xticklabels=['Normal', 'Anomaly'],
yticklabels=['Normal', 'Anomaly'])
plt.title(f'Confusion Matrix - {self.best_model["name"]}')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight')
print("✓ Confusion matrix saved to confusion_matrix.png")
def plot_feature_importance(self):
"""Plot feature importance for tree-based models"""
model = self.best_model['model']
if hasattr(model, 'feature_importances_'):
print("\n📊 Feature Importance Analysis")
importance_df = pd.DataFrame({
'Feature': self.feature_names,
'Importance': model.feature_importances_
}).sort_values('Importance', ascending=False)
print("\nTop 10 Most Important Features:")
print(importance_df.head(10).to_string(index=False))
# Plot
plt.figure(figsize=(10, 8))
top_features = importance_df.head(15)
plt.barh(range(len(top_features)), top_features['Importance'])
plt.yticks(range(len(top_features)), top_features['Feature'])
plt.xlabel('Importance')
plt.title(f'Top 15 Feature Importances - {self.best_model["name"]}')
plt.gca().invert_yaxis()
plt.tight_layout()
plt.savefig('feature_importance.png', dpi=300, bbox_inches='tight')
print("✓ Feature importance plot saved to feature_importance.png")
def plot_roc_curve(self):
"""Plot ROC curve"""
try:
fpr, tpr, _ = roc_curve(self.y_test, self.best_model['probabilities'])
roc_auc = roc_auc_score(self.y_test, self.best_model['probabilities'])
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, color='darkorange', lw=2,
label=f'ROC curve (AUC = {roc_auc:.3f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title(f'ROC Curve - {self.best_model["name"]}')
plt.legend(loc="lower right")
plt.tight_layout()
plt.savefig('roc_curve.png', dpi=300, bbox_inches='tight')
print("✓ ROC curve saved to roc_curve.png")
except:
print("⚠ Could not generate ROC curve (insufficient data)")
def plot_precision_recall_curve(self):
"""Plot Precision-Recall curve"""
try:
precision, recall, _ = precision_recall_curve(
self.y_test,
self.best_model['probabilities']
)
plt.figure(figsize=(8, 6))
plt.plot(recall, precision, color='blue', lw=2)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title(f'Precision-Recall Curve - {self.best_model["name"]}')
plt.tight_layout()
plt.savefig('precision_recall_curve.png', dpi=300, bbox_inches='tight')
print("✓ Precision-Recall curve saved to precision_recall_curve.png")
except:
print("⚠ Could not generate Precision-Recall curve")
def analyze_predictions(self):
"""Analyze prediction patterns"""
print("\n🔍 Prediction Analysis")
# Create analysis dataframe
analysis_df = self.X_test.copy()
analysis_df['true_label'] = self.y_test.values
analysis_df['predicted_label'] = self.best_model['predictions']
analysis_df['prediction_prob'] = self.best_model['probabilities']
# Identify false positives and false negatives
fp = analysis_df[(analysis_df['true_label'] == False) &
(analysis_df['predicted_label'] == True)]
fn = analysis_df[(analysis_df['true_label'] == True) &
(analysis_df['predicted_label'] == False)]
print(f"\n False Positives: {len(fp)}")
print(f" False Negatives: {len(fn)}")
if len(fp) > 0:
print("\n High-confidence False Positives (top 3):")
print(fp.nlargest(3, 'prediction_prob')[['prediction_prob']].to_string())
if len(fn) > 0:
print("\n High-confidence False Negatives (top 3):")
print(fn.nsmallest(3, 'prediction_prob')[['prediction_prob']].to_string())
def save_model(self, output_dir='models'):
"""Save the trained model and scaler"""
print(f"\n💾 Saving model...")
# Create output directory
Path(output_dir).mkdir(exist_ok=True)
# Save model
model_path = Path(output_dir) / 'security_model.pkl'
joblib.dump(self.best_model['model'], model_path)
print(f"✓ Model saved to {model_path}")
# Save scaler
scaler_path = Path(output_dir) / 'scaler.pkl'
joblib.dump(self.scaler, scaler_path)
print(f"✓ Scaler saved to {scaler_path}")
# Save feature names
feature_path = Path(output_dir) / 'feature_names.json'
with open(feature_path, 'w') as f:
json.dump({
'features': self.feature_names,
'model_name': self.best_model['name'],
'trained_date': datetime.now().isoformat(),
'use_scaler': self.best_model['scaled']
}, f, indent=2)
print(f"✓ Feature names saved to {feature_path}")
# Save model metadata
metadata = {
'model_name': self.best_model['name'],
'training_date': datetime.now().isoformat(),
'n_samples': len(self.df),
'n_features': len(self.feature_names),
'n_anomalies': int(self.df['hasAnomaly'].sum()),
'test_accuracy': float(accuracy_score(self.y_test, self.best_model['predictions'])),
'test_f1': float(f1_score(self.y_test, self.best_model['predictions'])),
'feature_names': self.feature_names
}
metadata_path = Path(output_dir) / 'model_metadata.json'
with open(metadata_path, 'w') as f:
json.dump(metadata, f, indent=2)
print(f"✓ Metadata saved to {metadata_path}")
print(f"\n✅ Model training complete! All files saved to '{output_dir}/' directory")
def main():
"""Main training pipeline"""
print("="*70)
print("🛡️ BLOCKCHAIN SECURITY ANOMALY DETECTION - ML TRAINING")
print("="*70)
# Find the most recent data file
data_dir = Path('security_data')
if not data_dir.exists():
print("❌ Error: security_data directory not found!")
return
jsonl_files = list(data_dir.glob('*.jsonl'))
if not jsonl_files:
print("❌ Error: No .jsonl files found in security_data directory!")
return
# Use the most recent file
data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime)
print(f"\n📁 Using data file: {data_file}")
# Initialize trainer
trainer = SecurityMLTrainer(data_file)
# Training pipeline
try:
# Load data
trainer.load_data()
# Prepare features
trainer.prepare_features()
# Train models
results = trainer.train_models()
# Evaluate best model
trainer.evaluate_best_model()
# Generate visualizations
trainer.plot_feature_importance()
trainer.plot_roc_curve()
trainer.plot_precision_recall_curve()
# Analyze predictions
trainer.analyze_predictions()
# Save model
trainer.save_model()
print("\n" + "="*70)
print("✅ TRAINING COMPLETED SUCCESSFULLY!")
print("="*70)
print("\n📦 Generated Files:")
print(" - models/security_model.pkl")
print(" - models/scaler.pkl")
print(" - models/feature_names.json")
print(" - models/model_metadata.json")
print(" - confusion_matrix.png")
print(" - feature_importance.png")
print(" - roc_curve.png")
print(" - precision_recall_curve.png")
except Exception as e:
print(f"\n❌ Error during training: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()After this once , we run : python train_security_model.py
We will get the following output :
This Python script is basically the brain-training phase of your Goldrush security analytics system.
While the earlier JavaScript code was all about detecting anomalies in real time, this script teaches an ML model how to recognize those patterns intelligently in the future — like training an AI analyst for blockchain data.
1. Data Loading & Preparation
The script starts by scanning your security_data folder for the latest .jsonl file (this is where your live monitoring system stores exported analytics data).
It then loads that data into a pandas DataFrame.
Each row represents a time snapshot (like a minute of on-chain activity), with features such as price change, volume, volatility, etc.
The label (hasAnomaly) tells the model whether that data point represented a security anomaly or normal behavior.
Think of it as: “Here’s what normal Solana activity looks like — and here’s what an exploit looks like.”
2. Feature Scaling & Data Splitting
It cleans up the data by replacing missing or infinite values.
Then it splits the dataset into a training set (75%) and a test set (25%).
The features are standardized (scaled) so models like Logistic Regression can perform better.
Basically, this is setting up clean, balanced data before throwing it into the machine learning cauldron.
3. Training Multiple Models
Three models are trained in parallel:
Random Forest → Great for nonlinear, high-variance blockchain data.
Gradient Boosting → Captures subtle interactions and trends.
Logistic Regression → A fast, linear baseline model.
Each model is trained and then evaluated on the test data.
It computes Accuracy, F1 Score, and ROC-AUC, along with cross-validation F1 scores.
The script automatically picks the best performing model based on F1 score (which balances false positives & negatives).
4. Model Evaluation & Visualization
Once the best model is chosen, it generates visual reports:
Classification report → Shows how well it predicts anomalies vs normal cases.
Confusion Matrix → Visual heatmap of correct vs incorrect predictions.
Feature Importance → Ranks which metrics (price deviation, volume spikes, etc.) most influenced anomaly decisions.
ROC & Precision-Recall Curves → Shows model confidence and trade-offs between recall and precision.
This turns raw metrics into visual, explainable results — super helpful for audits or reports.
5. Prediction Error Analysis
The script identifies false positives (flagged normal data) and false negatives (missed anomalies).
It even prints out the top 3 highest-confidence errors in both categories.
This helps you understand where the model is over-reacting or missing subtle exploit patterns.
6. Model Saving & Metadata
After evaluation, the best model is saved neatly inside a /models folder:
security_model.pkl → The trained ML model
scaler.pkl → The StandardScaler object for consistent input normalization
feature_names.json → Metadata of which features were used
model_metadata.json → Training info (accuracy, dataset stats, date, etc.)
So, next time, you can directly load this trained model and start detecting anomalies instantly — no retraining needed
#!/usr/bin/env python3
"""
Anomaly Prediction Script
Uses the trained ML model to predict anomalies on new data
"""
import json
import pandas as pd
import numpy as np
import joblib
from pathlib import Path
import sys
class AnomalyPredictor:
"""Predict security anomalies using trained ML model"""
def __init__(self, model_dir='models'):
self.model_dir = Path(model_dir)
self.model = None
self.scaler = None
self.feature_names = None
self.metadata = None
self.load_model()
def load_model(self):
"""Load the trained model and associated files"""
print("📦 Loading trained model...")
# Load model
model_path = self.model_dir / 'security_model.pkl'
if not model_path.exists():
raise FileNotFoundError(f"Model not found at {model_path}")
self.model = joblib.load(model_path)
# Load scaler
scaler_path = self.model_dir / 'scaler.pkl'
if scaler_path.exists():
self.scaler = joblib.load(scaler_path)
# Load feature names
feature_path = self.model_dir / 'feature_names.json'
if feature_path.exists():
with open(feature_path, 'r') as f:
feature_config = json.load(f)
self.feature_names = feature_config['features']
# Load metadata
metadata_path = self.model_dir / 'model_metadata.json'
if metadata_path.exists():
with open(metadata_path, 'r') as f:
self.metadata = json.load(f)
print(f"✓ Model loaded: {self.metadata.get('model_name', 'Unknown')}")
print(f"✓ Training date: {self.metadata.get('training_date', 'Unknown')}")
print(f"✓ Test accuracy: {self.metadata.get('test_accuracy', 0):.3f}")
print(f"✓ Test F1 score: {self.metadata.get('test_f1', 0):.3f}")
def predict_from_features(self, features_dict):
"""
Predict anomaly from a features dictionary
Args:
features_dict: Dictionary containing feature values
Returns:
dict: Prediction result with probability and label
"""
# Create DataFrame with correct feature order
df = pd.DataFrame([features_dict])[self.feature_names]
# Handle missing/infinite values
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(0)
# Make prediction
prediction = self.model.predict(df)[0]
probability = self.model.predict_proba(df)[0]
return {
'is_anomaly': bool(prediction),
'anomaly_probability': float(probability[1]),
'normal_probability': float(probability[0]),
'confidence': float(max(probability)),
'prediction_label': 'ANOMALY' if prediction else 'NORMAL'
}
def predict_from_jsonl(self, jsonl_path):
"""
Predict anomalies for all records in a JSONL file
Args:
jsonl_path: Path to JSONL file
Returns:
list: List of predictions for each record
"""
print(f"\n🔍 Analyzing data from {jsonl_path}...")
predictions = []
with open(jsonl_path, 'r') as f:
for idx, line in enumerate(f, 1):
if not line.strip():
continue
data = json.loads(line)
features = data['features']
# Make prediction
result = self.predict_from_features(features)
# Add metadata
result['timestamp'] = data.get('timestamp')
result['data_index'] = idx
# Add actual label if available
if 'labels' in data:
result['actual_anomaly'] = data['labels'].get('hasAnomaly')
result['actual_types'] = data['labels'].get('anomalyTypes', [])
predictions.append(result)
return predictions
def print_predictions(self, predictions):
"""Print prediction results in a formatted way"""
anomalies = [p for p in predictions if p['is_anomaly']]
normal = [p for p in predictions if not p['is_anomaly']]
print(f"\n📊 Prediction Results:")
print(f" Total records: {len(predictions)}")
print(f" Predicted anomalies: {len(anomalies)}")
print(f" Predicted normal: {len(normal)}")
if anomalies:
print(f"\n⚠️ Detected Anomalies:")
for pred in anomalies:
print(f"\n 📍 Record #{pred['data_index']} - {pred.get('timestamp', 'N/A')}")
print(f" Anomaly Probability: {pred['anomaly_probability']:.1%}")
print(f" Confidence: {pred['confidence']:.1%}")
if 'actual_anomaly' in pred:
correct = pred['actual_anomaly'] == pred['is_anomaly']
status = "✓ CORRECT" if correct else "✗ INCORRECT"
print(f" Actual Label: {pred['actual_anomaly']} {status}")
if pred.get('actual_types'):
print(f" Actual Types: {', '.join(pred['actual_types'])}")
# Calculate accuracy if actual labels available
if 'actual_anomaly' in predictions[0]:
correct = sum(1 for p in predictions if p['is_anomaly'] == p['actual_anomaly'])
accuracy = correct / len(predictions)
print(f"\n📈 Model Performance on this data:")
print(f" Accuracy: {accuracy:.1%}")
def save_predictions(self, predictions, output_path):
"""Save predictions to a JSON file"""
with open(output_path, 'w') as f:
json.dump(predictions, f, indent=2)
print(f"\n💾 Predictions saved to {output_path}")
def main():
"""Main prediction pipeline"""
print("="*70)
print("🔮 BLOCKCHAIN SECURITY ANOMALY PREDICTION")
print("="*70)
# Initialize predictor
try:
predictor = AnomalyPredictor()
except FileNotFoundError as e:
print(f"\n❌ Error: {e}")
print(" Please train the model first by running: python train_security_model.py")
return
# Check for data file
data_dir = Path('security_data')
if len(sys.argv) > 1:
# Use file specified in command line
data_file = Path(sys.argv[1])
else:
# Use the most recent JSONL file
jsonl_files = list(data_dir.glob('*.jsonl'))
if not jsonl_files:
print("\n❌ No data files found!")
print(" Usage: python predict_anomalies.py [path/to/data.jsonl]")
return
data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime)
if not data_file.exists():
print(f"\n❌ File not found: {data_file}")
return
# Make predictions
predictions = predictor.predict_from_jsonl(data_file)
# Print results
predictor.print_predictions(predictions)
# Save predictions
output_file = f"predictions_{data_file.stem}.json"
predictor.save_predictions(predictions, output_file)
print("\n" + "="*70)
print("✅ PREDICTION COMPLETED!")
print("="*70)
if __name__ == "__main__":
main()Now , after creating this file we have to run : python predict_anomalies.py
While the script is running , it will create the following :
Loads a Trained Model for Security Prediction
The script starts by loading your pre-trained anomaly detection model, along with its associated scaler, feature names, and metadata. Think of it as powering up your AI assistant that’s ready to spot suspicious blockchain activity.
Handles All the Setup Automatically
No manual hassle — it automatically finds the right model files, checks their paths, and gives you a quick summary of when and how the model was trained (accuracy, F1 score, etc.).
Prepares the Data for Prediction
Before running predictions, it cleans and formats incoming data — removing infinities, filling missing values, and scaling everything just like it did during training. This ensures consistent and reliable results every time.
Makes Smart Predictions on New Data
Whether you feed it a single transaction or a batch of new blockchain events, it runs them through the trained model to figure out if they’re normal or anomalous (potentially malicious).
Outputs Confidence Scores & Probabilities
The predictions aren’t just binary; they come with detailed confidence levels and probability scores — helping you understand how “sure” the model is about each call.
Supports Real-time Monitoring Pipelines
This code can easily plug into your live observability stack. Think of it as the intelligence layer that turns streaming blockchain data into actionable insights for security teams.
Logs Everything Clearly for Developers
Every step prints clear, developer-friendly logs, so you always know what’s being loaded, predicted, or skipped
And at the end it will show something like this in the terminal :
Perfect! The model is working great with 97% accuracy!
Atlast , We have :
#!/usr/bin/env python3
"""
Visualization Dashboard for Security Analytics
Creates comprehensive visualizations of training data and predictions
"""
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime
# Set style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 10)
class SecurityVisualizationDashboard:
"""Create visualizations for security analytics"""
def __init__(self, data_path):
self.data_path = data_path
self.df = None
self.load_data()
def load_data(self):
"""Load data from JSONL file"""
print(f"📂 Loading data from {self.data_path}...")
data = []
with open(self.data_path, 'r') as f:
for line in f:
if line.strip():
data.append(json.loads(line))
# Parse data
features_list = []
for item in data:
row = item['features'].copy()
row['timestamp'] = item['timestamp']
row['hasAnomaly'] = item['labels']['hasAnomaly']
row['anomalyTypes'] = ','.join(item['labels']['anomalyTypes'])
row['severity'] = item['labels']['maxSeverity']
features_list.append(row)
self.df = pd.DataFrame(features_list)
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
print(f"✓ Loaded {len(self.df)} records")
print(f" Anomalies: {self.df['hasAnomaly'].sum()}")
print(f" Normal: {(~self.df['hasAnomaly']).sum()}")
def create_dashboard(self, output_file='dashboard.png'):
"""Create comprehensive dashboard"""
print("\n📊 Creating visualization dashboard...")
fig = plt.figure(figsize=(20, 12))
gs = fig.add_gridspec(4, 3, hspace=0.3, wspace=0.3)
# 1. Price and Volume Over Time
ax1 = fig.add_subplot(gs[0, :])
self.plot_price_volume_timeline(ax1)
# 2. Anomaly Distribution
ax2 = fig.add_subplot(gs[1, 0])
self.plot_anomaly_distribution(ax2)
# 3. Anomaly Types
ax3 = fig.add_subplot(gs[1, 1])
self.plot_anomaly_types(ax3)
# 4. Volume Distribution
ax4 = fig.add_subplot(gs[1, 2])
self.plot_volume_distribution(ax4)
# 5. Volatility Analysis
ax5 = fig.add_subplot(gs[2, 0])
self.plot_volatility_analysis(ax5)
# 6. Price Range Analysis
ax6 = fig.add_subplot(gs[2, 1])
self.plot_price_range_analysis(ax6)
# 7. Volume Changes
ax7 = fig.add_subplot(gs[2, 2])
self.plot_volume_changes(ax7)
# 8. Feature Correlation Heatmap
ax8 = fig.add_subplot(gs[3, :])
self.plot_correlation_heatmap(ax8)
plt.suptitle('🛡️ Blockchain Security Analytics Dashboard',
fontsize=20, fontweight='bold', y=0.995)
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"✓ Dashboard saved to {output_file}")
plt.close()
def plot_price_volume_timeline(self, ax):
"""Plot price and volume over time with anomalies highlighted"""
ax2 = ax.twinx()
# Plot closing price
ax.plot(self.df['timestamp'], self.df['close'],
label='Close Price', color='blue', linewidth=2)
# Plot volume
ax2.plot(self.df['timestamp'], self.df['volumeUsd'],
label='Volume USD', color='green', alpha=0.6, linewidth=1.5)
# Highlight anomalies
anomalies = self.df[self.df['hasAnomaly']]
if len(anomalies) > 0:
ax.scatter(anomalies['timestamp'], anomalies['close'],
color='red', s=100, marker='x', linewidths=3,
label='Anomalies', zorder=5)
ax.set_xlabel('Time')
ax.set_ylabel('Close Price', color='blue')
ax2.set_ylabel('Volume (USD)', color='green')
ax.tick_params(axis='y', labelcolor='blue')
ax2.tick_params(axis='y', labelcolor='green')
ax.legend(loc='upper left')
ax2.legend(loc='upper right')
ax.set_title('Price & Volume Timeline with Anomalies', fontweight='bold')
ax.grid(True, alpha=0.3)
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')
def plot_anomaly_distribution(self, ax):
"""Plot anomaly vs normal distribution"""
counts = self.df['hasAnomaly'].value_counts()
colors = ['#2ecc71', '#e74c3c']
labels = ['Normal', 'Anomaly']
wedges, texts, autotexts = ax.pie(counts, labels=labels, autopct='%1.1f%%',
colors=colors, startangle=90,
textprops={'fontsize': 12, 'weight': 'bold'})
ax.set_title('Anomaly Distribution', fontweight='bold', pad=20)
def plot_anomaly_types(self, ax):
"""Plot distribution of anomaly types"""
anomaly_types = []
for types_str in self.df[self.df['hasAnomaly']]['anomalyTypes']:
if types_str:
anomaly_types.extend(types_str.split(','))
if anomaly_types:
type_counts = pd.Series(anomaly_types).value_counts()
colors = sns.color_palette('Set2', len(type_counts))
type_counts.plot(kind='barh', ax=ax, color=colors)
ax.set_xlabel('Count')
ax.set_title('Anomaly Types Distribution', fontweight='bold')
else:
ax.text(0.5, 0.5, 'No anomaly types recorded',
ha='center', va='center', transform=ax.transAxes)
ax.set_title('Anomaly Types Distribution', fontweight='bold')
def plot_volume_distribution(self, ax):
"""Plot volume distribution"""
normal = self.df[~self.df['hasAnomaly']]['volumeUsd']
anomaly = self.df[self.df['hasAnomaly']]['volumeUsd']
ax.hist(normal, bins=20, alpha=0.6, label='Normal', color='green')
ax.hist(anomaly, bins=20, alpha=0.6, label='Anomaly', color='red')
ax.set_xlabel('Volume (USD)')
ax.set_ylabel('Frequency')
ax.set_title('Volume Distribution', fontweight='bold')
ax.legend()
ax.set_yscale('log')
def plot_volatility_analysis(self, ax):
"""Plot volatility comparison"""
data_to_plot = [
self.df[~self.df['hasAnomaly']]['volatility'],
self.df[self.df['hasAnomaly']]['volatility']
]
bp = ax.boxplot(data_to_plot, labels=['Normal', 'Anomaly'],
patch_artist=True, showmeans=True)
colors = ['lightgreen', 'lightcoral']
for patch, color in zip(bp['boxes'], colors):
patch.set_facecolor(color)
ax.set_ylabel('Volatility')
ax.set_title('Volatility Comparison', fontweight='bold')
ax.grid(True, alpha=0.3)
def plot_price_range_analysis(self, ax):
"""Plot price range analysis"""
data_to_plot = [
self.df[~self.df['hasAnomaly']]['priceRange'],
self.df[self.df['hasAnomaly']]['priceRange']
]
bp = ax.boxplot(data_to_plot, labels=['Normal', 'Anomaly'],
patch_artist=True, showmeans=True)
colors = ['lightgreen', 'lightcoral']
for patch, color in zip(bp['boxes'], colors):
patch.set_facecolor(color)
ax.set_ylabel('Price Range')
ax.set_title('Price Range Comparison', fontweight='bold')
ax.grid(True, alpha=0.3)
def plot_volume_changes(self, ax):
"""Plot volume changes over time"""
ax.plot(self.df['timestamp'], self.df['volumeChange1'],
label='Volume Change', color='purple', linewidth=1.5)
# Add zero line
ax.axhline(y=0, color='black', linestyle='--', alpha=0.3)
# Highlight anomalies
anomalies = self.df[self.df['hasAnomaly']]
if len(anomalies) > 0:
ax.scatter(anomalies['timestamp'], anomalies['volumeChange1'],
color='red', s=50, marker='x', linewidths=2,
label='Anomalies', zorder=5)
ax.set_xlabel('Time')
ax.set_ylabel('Volume Change Ratio')
ax.set_title('Volume Changes Over Time', fontweight='bold')
ax.legend()
ax.grid(True, alpha=0.3)
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')
def plot_correlation_heatmap(self, ax):
"""Plot feature correlation heatmap"""
# Select numeric features
numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'volumeUsd',
'priceRange', 'bodySize', 'volatility', 'zScore',
'distanceFromMean', 'priceChange1', 'volumeChange1']
corr = self.df[numeric_cols].corr()
sns.heatmap(corr, annot=False, cmap='coolwarm', center=0,
square=True, linewidths=0.5, ax=ax, cbar_kws={"shrink": 0.8})
ax.set_title('Feature Correlation Heatmap', fontweight='bold', pad=10)
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')
plt.setp(ax.yaxis.get_majorticklabels(), rotation=0)
def create_anomaly_timeline(self, output_file='anomaly_timeline.png'):
"""Create detailed anomaly timeline"""
print("\n📈 Creating anomaly timeline...")
fig, axes = plt.subplots(3, 1, figsize=(16, 10), sharex=True)
# Price with anomalies
axes[0].plot(self.df['timestamp'], self.df['close'],
label='Close Price', color='blue', linewidth=2)
anomalies = self.df[self.df['hasAnomaly']]
if len(anomalies) > 0:
axes[0].scatter(anomalies['timestamp'], anomalies['close'],
color='red', s=150, marker='X', linewidths=2,
label='Anomalies', zorder=5, edgecolors='black')
axes[0].set_ylabel('Price')
axes[0].set_title('Price Timeline', fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
# Volume with anomalies
axes[1].fill_between(self.df['timestamp'], self.df['volumeUsd'],
color='green', alpha=0.5, label='Volume USD')
if len(anomalies) > 0:
axes[1].scatter(anomalies['timestamp'], anomalies['volumeUsd'],
color='red', s=150, marker='X', linewidths=2,
label='Anomalies', zorder=5, edgecolors='black')
axes[1].set_ylabel('Volume (USD)')
axes[1].set_title('Volume Timeline', fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Volatility with anomalies
axes[2].plot(self.df['timestamp'], self.df['volatility'],
color='orange', linewidth=2, label='Volatility')
if len(anomalies) > 0:
axes[2].scatter(anomalies['timestamp'], anomalies['volatility'],
color='red', s=150, marker='X', linewidths=2,
label='Anomalies', zorder=5, edgecolors='black')
axes[2].set_xlabel('Time')
axes[2].set_ylabel('Volatility')
axes[2].set_title('Volatility Timeline', fontweight='bold')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
plt.suptitle('🔍 Detailed Anomaly Timeline Analysis',
fontsize=18, fontweight='bold', y=0.995)
for ax in axes:
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right')
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"✓ Timeline saved to {output_file}")
plt.close()
def create_statistics_report(self, output_file='statistics_report.txt'):
"""Generate detailed statistics report"""
print("\n📝 Generating statistics report...")
report = []
report.append("="*70)
report.append("BLOCKCHAIN SECURITY ANALYTICS - STATISTICS REPORT")
report.append("="*70)
report.append(f"\nReport Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"\nData File: {self.data_path}")
report.append(f"Total Records: {len(self.df)}")
report.append(f"Time Range: {self.df['timestamp'].min()} to {self.df['timestamp'].max()}")
report.append("\n" + "-"*70)
report.append("ANOMALY SUMMARY")
report.append("-"*70)
report.append(f"Total Anomalies: {self.df['hasAnomaly'].sum()}")
report.append(f"Anomaly Rate: {self.df['hasAnomaly'].mean()*100:.2f}%")
report.append(f"Normal Records: {(~self.df['hasAnomaly']).sum()}")
# Anomaly types
anomaly_types = []
for types_str in self.df[self.df['hasAnomaly']]['anomalyTypes']:
if types_str:
anomaly_types.extend(types_str.split(','))
if anomaly_types:
report.append("\nAnomaly Types:")
type_counts = pd.Series(anomaly_types).value_counts()
for atype, count in type_counts.items():
report.append(f" - {atype}: {count}")
report.append("\n" + "-"*70)
report.append("PRICE STATISTICS")
report.append("-"*70)
report.append(f"Mean Close Price: {self.df['close'].mean():.6f}")
report.append(f"Std Close Price: {self.df['close'].std():.6f}")
report.append(f"Min Close Price: {self.df['close'].min():.6f}")
report.append(f"Max Close Price: {self.df['close'].max():.6f}")
report.append("\n" + "-"*70)
report.append("VOLUME STATISTICS")
report.append("-"*70)
report.append(f"Mean Volume (USD): ${self.df['volumeUsd'].mean():,.2f}")
report.append(f"Std Volume (USD): ${self.df['volumeUsd'].std():,.2f}")
report.append(f"Min Volume (USD): ${self.df['volumeUsd'].min():,.2f}")
report.append(f"Max Volume (USD): ${self.df['volumeUsd'].max():,.2f}")
report.append(f"Total Volume (USD): ${self.df['volumeUsd'].sum():,.2f}")
report.append("\n" + "-"*70)
report.append("VOLATILITY STATISTICS")
report.append("-"*70)
report.append(f"Mean Volatility: {self.df['volatility'].mean():.6f}")
report.append(f"Max Volatility: {self.df['volatility'].max():.6f}")
report.append(f"Volatility Std: {self.df['volatility'].std():.6f}")
# Comparison: Normal vs Anomaly
report.append("\n" + "-"*70)
report.append("COMPARATIVE ANALYSIS (Normal vs Anomaly)")
report.append("-"*70)
normal_df = self.df[~self.df['hasAnomaly']]
anomaly_df = self.df[self.df['hasAnomaly']]
if len(anomaly_df) > 0:
report.append(f"\nVolume (USD):")
report.append(f" Normal Mean: ${normal_df['volumeUsd'].mean():,.2f}")
report.append(f" Anomaly Mean: ${anomaly_df['volumeUsd'].mean():,.2f}")
report.append(f"\nVolatility:")
report.append(f" Normal Mean: {normal_df['volatility'].mean():.6f}")
report.append(f" Anomaly Mean: {anomaly_df['volatility'].mean():.6f}")
report.append(f"\nPrice Range:")
report.append(f" Normal Mean: {normal_df['priceRange'].mean():.6f}")
report.append(f" Anomaly Mean: {anomaly_df['priceRange'].mean():.6f}")
report.append("\n" + "="*70)
report.append("END OF REPORT")
report.append("="*70)
# Save report
with open(output_file, 'w') as f:
f.write('\n'.join(report))
print(f"✓ Statistics report saved to {output_file}")
# Also print to console
print('\n'.join(report))
def main():
"""Main visualization pipeline"""
print("="*70)
print("📊 BLOCKCHAIN SECURITY ANALYTICS - VISUALIZATION DASHBOARD")
print("="*70)
# Find most recent data file
data_dir = Path('security_data')
if not data_dir.exists():
print("\n❌ Error: security_data directory not found!")
return
jsonl_files = list(data_dir.glob('*.jsonl'))
if not jsonl_files:
print("\n❌ Error: No .jsonl files found!")
return
data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime)
print(f"\n📁 Using data file: {data_file}")
try:
# Create dashboard
dashboard = SecurityVisualizationDashboard(data_file)
# Generate all visualizations
dashboard.create_dashboard('security_dashboard.png')
dashboard.create_anomaly_timeline('anomaly_timeline.png')
dashboard.create_statistics_report('statistics_report.txt')
print("\n" + "="*70)
print("✅ VISUALIZATION COMPLETE!")
print("="*70)
print("\n📦 Generated Files:")
print(" - security_dashboard.png")
print(" - anomaly_timeline.png")
print(" - statistics_report.txt")
except Exception as e:
print(f"\n❌ Error: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()
After running the script ,
It will :
Builds a Security Analytics Visualization Dashboard for blockchain anomaly detection data.
Loads, analyzes, and visualizes data from JSONL files containing price, volume, and anomaly information.
Produces a dashboard, anomaly timeline, and statistical report summarizing the dataset.
1. Imports & Configuration
Imports standard libraries (json, pandas, numpy, matplotlib, seaborn, datetime, pathlib) for data processing and plotting.
Sets Seaborn style and figure size for consistent, clean visualization.
2. SecurityVisualizationDashboard Class
This class handles the entire data pipeline — from loading data to generating visualizations and reports.
a. __init__
Initializes with a JSONL data file path.
Calls load_data() automatically upon creation.
b. load_data()
Reads JSONL file line-by-line and parses each record.
Extracts and flattens relevant fields:
timestamp, hasAnomaly, anomalyTypes, severity, and features (price/volume metrics).
Converts timestamp to datetime format.
Displays:
Total records loaded
Number of anomalies vs. normal data points.
3. create_dashboard()
Generates a multi-plot dashboard (8 visual panels in one figure):
Price & Volume Timeline with anomalies highlighted.
Anomaly Distribution Pie Chart.
Anomaly Type Distribution.
Volume Distribution (Normal vs. Anomaly).
Volatility Comparison (Boxplot).
Price Range Comparison (Boxplot).
Volume Changes Over Time.
Correlation Heatmap of numerical features.
Exports the dashboard as dashboard.png.

4. Plotting Functions
Each method handles one part of the visualization dashboard:
plot_price_volume_timeline() → Plots close price and volume over time, marking anomalies in red.
plot_anomaly_distribution() → Pie chart showing anomaly vs normal data percentage.
plot_anomaly_types() → Bar chart showing which anomaly types occurred and their counts.
plot_volume_distribution() → Histogram comparing volume distributions for normal vs anomalous data.
plot_volatility_analysis() → Boxplot comparing volatility in normal vs anomalous records.
plot_price_range_analysis() → Boxplot comparing price ranges.
plot_volume_changes() → Time-series of volume change ratios with anomalies highlighted.
plot_correlation_heatmap() → Heatmap showing correlation among numeric features (e.g., open, close, volume, volatility).

5. create_anomaly_timeline()
Builds a 3-panel time-series visualization for:
Close price
Volume in USD
Volatility
Highlights anomalies in red across all timelines.
Saves output as anomaly_timeline.png.

6. create_statistics_report()
Generates a text-based analytics report containing:
Data summary (file name, total records, time range)
Anomaly counts and rates
Detailed anomaly type counts
Descriptive statistics for price, volume, and volatility
Comparative metrics (Normal vs. Anomaly)
Saves the report to statistics_report.txt.
7. main() Function
Searches the security_data/ directory for the most recent .jsonl data file.
Initializes the dashboard class using that file.
Runs the full visualization pipeline:
Creates dashboard image
Creates anomaly timeline image
Creates statistics report
Prints all output paths and completion status.

Output Files
When executed, this script generates:

security_dashboard.png → full visualization dashboard

anomaly_timeline.png → anomaly-focused time analysis
statistics_report.txt → text summary of all metrics
And there we have it — from raw blockchain noise to real-time intelligence, we’ve just turned Goldrush into your very own Web3 guardian angel.
What started as a simple anomaly detector is now a full-blown incident response machine that doesn’t just see the chaos… it predicts it, classifies it, and sometimes even laughs at it before fixing it. (Okay, maybe not the last part — but we’re getting there.)
With Goldrush’s high-frequency data streams, ML-powered anomaly detection, and lightning-fast performance benchmarking, you’re not just monitoring your network — you’re building the early warning system of decentralized finance.
Because in Web3, milliseconds matter, and prevention always beats panic.
So whether you’re protecting validators on Solana, sniffing out rug pulls before they happen, or just flexing your data wizardry — remember:
Goldrush isn’t just observability. It’s foresight — on-chain.
Stay curious, stay safe, and may your nodes always stay stable