Anomaly Detection Dashboard on Solana by Employing the Goldrush Streaming API - (Part 3)

Rajdeep Singha
Content Writer
Intelligent Observability for Web3: How Goldrush Powers Real-Time Security and Reliability on Solana - Part 3

Welcome back! If you’ve made it through Part 2 of our guide (the one where we set up streaming data on Solana and built a live dashboard that alerts you to unusual wallet activity and token-price weirdness), congratulations—you’re ready to level up. In this next chapter, we’re going from “seeing the problem” to “solving the problem.”

In Part 3 we’ll roll up our sleeves and turn our detection engine into an operational powerhouse: we’ll walk through building automated incident-response playbooks, integrating with remediation workflows, and embedding feedback loops that learn and improve over time. Because spotting an anomaly is only half the battle—what matters is acting fast, smart, and automatically.

So grab your favourite dev setup, and let’s transform your real-time observability stack into a full-blown defence system. Let’s go

Tutorial for Time Security and Reliability on Solana

Step 1: Environment Setup

Begin by installing Node.js (version 16+) and npm. Visit nodejs.org and download the LTS version. Verify installation:

node --version npm --version

Step 2: Project Initialization

2.1) Create a new directory for your project:

mkdir solana-anomaly-detector

cd solana-anomaly-detector

What this does: Creates a new folder for your project and enters it. All your files will be organized here.

2.2) Initialize npm and install dependencies:

npm init -y npm install dotenv @covalenthq/client-sdk chalk

What this does:

npm init -y
creates a package.json file (package configuration file) with default settings

npm install dotenv
installs the dotenv package to safely load environment variables from .env file

npm install @covalenthq/client-sdk
installs Covalent's official Goldrush SDK for streaming blockchain data

npm install chalk
installs the chalk package for colored terminal output (makes logs easier to read)

2.3) Create a .env file in the root directory:

GOLDRUSH_KEY=your_api_key_here

What this does: Stores your Goldrush Streaming API key securely in a .env file. This file is ignored by Git (never committed) so your credentials stay private.

Obtain your Goldrush API key from goldrush.dev by registering as a developer. This key authenticates your requests to the Goldrush Streaming API.

Step 3: Project Structure

Organize your project with this directory structure:

solana-anomaly-detector/

├── .env

├── .gitignore

├── package.json

├── package-lock.json

├── security_analytics.js

├── train_security_model.py

├── visualize_results.py

├── predict_anomalies.py

├── requirements.txt

Let’s start the python setup : 

In the root folder , your requirements.txt should be there 

numpy>=1.21.0 pandas>=1.3.0 scikit-learn>=1.0.0 matplotlib>=3.4.0 seaborn>=0.11.0 joblib>=1.1.0


Next :
Run in the root folder where requirement.txt is present

pip install -r requirements.txt

Now lets figure out the coding part :


Step 4 : Writing security_analytics.jsx


In the root , make a file named  security_analytics.jsx
1. First let's import the and setup the part we will need :

require('dotenv').config(); const fs = require('fs'); const path = require('path'); const { GoldRushClient, StreamingChain, StreamingInterval, StreamingTimeframe, } = require('@covalenthq/client-sdk'); let chalk; (async () => { chalk = (await import('chalk')).default; const apiKey = process.env.GOLDRUSH_KEY; if (!apiKey) { console.error("ERROR: GOLDRUSH_KEY not found in .env file"); process.exit(1); } const client = new GoldRushClient(apiKey, {}, { onConnecting: () => console.log(chalk.blue("🔗 Connecting to GoldRush streaming service...")), onOpened: () => { console.clear(); displayHeader(); console.log(chalk.green("✓ Connected to streaming service!\n")); console.log(chalk.yellow("🛡️ Advanced Security Monitoring Active...\n")); }, onClosed: () => { console.log(chalk.yellow("✓ Disconnected from streaming service")); process.exit(0); }, onError: (error) => { console.error(chalk.red("✗ Streaming error:"), error); }, });

2. Setting up the configuration

const CONFIG = { tokenAddress: "So11111111111111111111111111111111111111112", // Wrapped SOL maxHistory: 100, // Detection Thresholds flashLoan: { minPriceSwing: 0.05, // 5% minimum price swing maxTimeWindow: 60000, // 60 seconds minVolumeMultiplier: 10 // 10x volume spike }, rugPull: { volumeDropThreshold: 0.5, // 50% volume drop priceDropThreshold: 0.2, // 20% price drop observationWindow: 10 // Last 10 candles }, sandwichAttack: { minSpikePercent: 0.03, // 3% spike minDropPercent: 0.025, // 2.5% drop windowSize: 3 // 3 candles }, priceManipulation: { stdDevMultiplier: 2.5, // 2.5 standard deviations minDataPoints: 20 }, // ML Data Export exportData: true, exportPath: './security_data', exportBatchSize: 100 };

3. Now here comes the data storage part

const dataStore = { priceHistory: [], volumeHistory: [], anomalyHistory: [], networkMetrics: { blockTimes: [], gasUsage: [], failedTxRate: [] }, securityEvents: [], mlDataBuffer: [] }; let streamStartTime = Date.now(); let dataCount = 0; let criticalAlertsCount = 0; let totalAnomaliesDetected = 0;

4. Let's set up the utility functions

function displayHeader() { console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════')); console.log(chalk.cyan.bold(' 🛡️ WEB3 ADVANCED SECURITY ANALYTICS - GOLDRUSH')); console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════')); console.log(chalk.yellow.bold(' Detection Systems: Flash Loans | Rug Pulls | Sandwich Attacks')); console.log(chalk.yellow.bold(' ML Ready: Real-time Data Export | Pattern Recognition')); console.log(chalk.cyan.bold('═══════════════════════════════════════════════════════════════')); } function clearScreen() { console.clear(); displayHeader(); } function formatPrice(price) { if (!price || price === 0) return '0.000000'; return parseFloat(price).toFixed(6); } function formatVolume(volume) { if (!volume || volume === 0) return '0.0000e+0'; return parseFloat(volume).toExponential(4); } function calculateStatistics(history) { if (history.length < 2) return null; const values = history.map(c => parseFloat(c.close) || 0).filter(p => p > 0); if (values.length < 2) return null; const mean = values.reduce((a, b) => a + b) / values.length; const variance = values.reduce((sq, n) => sq + Math.pow(n - mean, 2), 0) / values.length; const stdDev = Math.sqrt(variance); const sorted = [...values].sort((a, b) => a - b); const median = sorted[Math.floor(sorted.length / 2)]; return { mean, median, stdDev, variance, count: values.length }; }

5. Now we will setup our Detection Algorithm

a. FLASH LOAN EXPLOIT DETECTION   
Detects rapid price manipulation characteristic of flash loan attacks 

function detectFlashLoanExploit(candle, history) { const anomalies = []; if (history.length < 2) return anomalies; const prevCandle = history[history.length - 1]; const timeDiff = new Date(candle.timestamp) - new Date(prevCandle.timestamp); const currentPrice = parseFloat(candle.close) || 0; const prevPrice = parseFloat(prevCandle.close) || 0; const currentVolume = parseFloat(candle.volume) || 0; const prevVolume = parseFloat(prevCandle.volume) || 1; if (currentPrice <= 0 || prevPrice <= 0) return anomalies; // Calculate metrics const priceSwing = Math.abs((currentPrice - prevPrice) / prevPrice); const volumeRatio = currentVolume / prevVolume; const priceVelocity = priceSwing / (timeDiff / 1000); // % per second // Flash loan signature: rapid price movement + massive volume spike if (timeDiff < CONFIG.flashLoan.maxTimeWindow && priceSwing > CONFIG.flashLoan.minPriceSwing && volumeRatio > CONFIG.flashLoan.minVolumeMultiplier) { anomalies.push({ type: 'FLASH_LOAN_EXPLOIT', severity: 'CRITICAL', confidence: Math.min(95, 60 + (volumeRatio * 3)), value: `${(priceSwing*100).toFixed(2)}% in ${(timeDiff/1000).toFixed(1)}s`, details: `Price velocity: ${(priceVelocity*100).toFixed(2)}%/s, Volume: ${volumeRatio.toFixed(1)}x`, metadata: { priceSwing, timeDiff, volumeRatio, priceVelocity, currentPrice, prevPrice } }); } // Secondary indicator: Repeated rapid reversals (attack attempts) if (history.length >= 5) { const recentSwings = []; for (let i = history.length - 4; i < history.length; i++) { const swing = Math.abs((history[i].close - history[i-1].close) / history[i-1].close); recentSwings.push(swing); } const avgSwing = recentSwings.reduce((a,b) => a+b) / recentSwings.length; if (avgSwing > 0.03) { // Average 3% swings anomalies.push({ type: 'PRICE_MANIPULATION_PATTERN', severity: 'HIGH', confidence: 75, value: `${(avgSwing*100).toFixed(2)}% avg swing`, details: 'Repeated rapid price reversals detected', metadata: { avgSwing, swingCount: recentSwings.length } }); } } return anomalies; }

b. RUG PULL DETECTION   
Monitors liquidity drain patterns and suspicious trading restrictions 

function detectRugPullSignals(candle, history) { const signals = []; if (history.length < CONFIG.rugPull.observationWindow) return signals; const currentVolume = parseFloat(candle.volume) || 0; const currentPrice = parseFloat(candle.close) || 0; // Calculate recent averages const recentCandles = history.slice(-CONFIG.rugPull.observationWindow); const recentVolumes = recentCandles.map(c => parseFloat(c.volume) || 0); const avgRecentVolume = recentVolumes.reduce((a,b) => a+b) / recentVolumes.length; const oldestPrice = parseFloat(recentCandles[0].close) || 1; const priceChange = (currentPrice - oldestPrice) / oldestPrice; // SIGNAL 1: Liquidity Drain (volume drops + price drops) const volumeDropRatio = currentVolume / avgRecentVolume; if (volumeDropRatio < CONFIG.rugPull.volumeDropThreshold && priceChange < -CONFIG.rugPull.priceDropThreshold) { signals.push({ type: 'LIQUIDITY_DRAIN', severity: 'CRITICAL', confidence: 85, value: `Vol -${((1-volumeDropRatio)*100).toFixed(0)}%, Price -${Math.abs(priceChange*100).toFixed(1)}%`, details: 'Potential rug pull: Liquidity draining with price collapse', metadata: { volumeDropRatio, priceChange, avgRecentVolume, currentVolume } }); } // SIGNAL 2: Death Spiral (accelerating decline) if (history.length >= 20) { const last20Candles = history.slice(-20); const priceChanges = []; for (let i = 1; i < last20Candles.length; i++) { const change = (last20Candles[i].close - last20Candles[i-1].close) / last20Candles[i-1].close; priceChanges.push(change); } const negativeCandles = priceChanges.filter(c => c < 0).length; const avgDecline = priceChanges.filter(c => c < 0).reduce((a,b) => a+b, 0) / negativeCandles || 0; if (negativeCandles >= 15 && avgDecline < -0.05) { signals.push({ type: 'DEATH_SPIRAL', severity: 'HIGH', confidence: 80, value: `${negativeCandles}/20 red candles`, details: `Average decline: ${(avgDecline*100).toFixed(2)}% per candle`, metadata: { negativeCandles, avgDecline } }); } } // SIGNAL 3: Volume Evaporation (sudden complete volume loss) if (currentVolume < avgRecentVolume * 0.1 && avgRecentVolume > 0) { signals.push({ type: 'VOLUME_EVAPORATION', severity: 'HIGH', confidence: 70, value: `${((1 - currentVolume/avgRecentVolume)*100).toFixed(0)}% volume loss`, details: 'Trading activity collapsed - possible liquidity removal', metadata: { currentVolume, avgRecentVolume } }); } return signals; }

c. SANDWICH ATTACK DETECTION   
Identifies front-running patterns (price spike → victim trade → price drop)

function detectSandwichAttack(candle, history) { const attacks = []; if (history.length < CONFIG.sandwichAttack.windowSize) return attacks; // Analyze last 3 candles for sandwich pattern const before = history[history.length - 2]; const during = history[history.length - 1]; const after = candle; const beforePrice = parseFloat(before.close) || 0; const duringHigh = parseFloat(during.high) || 0; const afterPrice = parseFloat(after.close) || 0; if (beforePrice <= 0 || duringHigh <= 0 || afterPrice <= 0) return attacks; // Pattern: price spikes up, then drops back down const spike = (duringHigh - beforePrice) / beforePrice; const normalization = (afterPrice - duringHigh) / duringHigh; if (spike > CONFIG.sandwichAttack.minSpikePercent && normalization < -CONFIG.sandwichAttack.minDropPercent) { attacks.push({ type: 'SANDWICH_ATTACK', severity: 'HIGH', confidence: 75, value: `+${(spike*100).toFixed(2)}% → ${(normalization*100).toFixed(2)}%`, details: 'Front-running pattern: Price manipulated around victim transaction', metadata: { spike, normalization, beforePrice, duringHigh, afterPrice, victimLoss: Math.abs(spike + normalization) * 100 } }); } // Extended pattern detection (5-candle window) if (history.length >= 5) { for (let i = history.length - 4; i < history.length - 2; i++) { // important line const c1 = parseFloat(history[i].close) || 0; const c2 = parseFloat(history[i+1].high) || 0; const c3 = parseFloat(history[i+2].close) || 0; if (c1 > 0 && c2 > 0 && c3 > 0) { const earlySpike = (c2 - c1) / c1; const laterDrop = (c3 - c2) / c2; if (earlySpike > 0.02 && laterDrop < -0.015) { attacks.push({ type: 'MULTI_BLOCK_SANDWICH', severity: 'MEDIUM', confidence: 65, value: `Extended pattern detected`, details: `Multi-block sandwich across ${i} to ${i+2}`, metadata: { earlySpike, laterDrop, startIndex: i } }); break; } } } } return attacks; }

d. SMART CONTRACT ANOMALY DETECTION   
Statistical analysis of price behavior for contract-level issues

function detectSmartContractAnomalies(candle, history) { const anomalies = []; if (history.length < CONFIG.priceManipulation.minDataPoints) return anomalies; const stats = calculateStatistics(history); if (!stats) return anomalies; const currentPrice = parseFloat(candle.close) || 0; if (currentPrice <= 0) return anomalies; // Z-score analysis (statistical deviation) const zScore = (currentPrice - stats.mean) / stats.stdDev; if (Math.abs(zScore) > CONFIG.priceManipulation.stdDevMultiplier) { anomalies.push({ type: 'STATISTICAL_ANOMALY', severity: Math.abs(zScore) > 3.5 ? 'CRITICAL' : 'HIGH', confidence: Math.min(95, 50 + Math.abs(zScore) * 10), value: `Z-score: ${zScore.toFixed(2)}σ`, details: `Price ${zScore > 0 ? 'above' : 'below'} normal by ${Math.abs(zScore).toFixed(2)} std deviations`, metadata: { zScore, currentPrice, mean: stats.mean, stdDev: stats.stdDev } }); } // Volatility spike detection const currentVolatility = (parseFloat(candle.high) - parseFloat(candle.low)) / parseFloat(candle.open); const recentVolatilities = history.slice(-20).map(c => (parseFloat(c.high) - parseFloat(c.low)) / parseFloat(c.open) ); const avgVolatility = recentVolatilities.reduce((a,b) => a+b) / recentVolatilities.length; if (currentVolatility > avgVolatility * 3) { anomalies.push({ type: 'EXTREME_VOLATILITY', severity: 'MEDIUM', confidence: 70, value: `${(currentVolatility*100).toFixed(2)}% range`, details: `Volatility ${(currentVolatility/avgVolatility).toFixed(1)}x higher than average`, metadata: { currentVolatility, avgVolatility } }); } return anomalies; }

e.   ECONOMIC EXPLOIT DETECTION (MEV)   
Detects Maximal Extractable Value patterns

function detectEconomicExploits(candle, history) { const exploits = []; if (history.length < 10) return exploits; // Pattern: Repeated micro-profits (MEV bots) const last10 = history.slice(-10); let profitablePatterns = 0; for (let i = 1; i < last10.length; i++) { const priceMove = (last10[i].close - last10[i-1].close) / last10[i-1].close; const volumeSpike = last10[i].volume / (last10[i-1].volume || 1); // Small profitable move with volume spike = MEV if (Math.abs(priceMove) > 0.005 && Math.abs(priceMove) < 0.02 && volumeSpike > 2) { profitablePatterns++; } } if (profitablePatterns >= 5) { exploits.push({ type: 'MEV_BOT_ACTIVITY', severity: 'MEDIUM', confidence: 60, value: `${profitablePatterns}/10 patterns`, details: 'Repeated micro-arbitrage patterns detected (likely MEV bots)', metadata: { profitablePatterns } }); } return exploits; }

f. NETWORK CONGESTION PREDICTION   
Predicts network issues based on transaction patterns

function predictNetworkCongestion(candle, history) { const predictions = []; // This would ideally use block-level data, but we can infer from trading patterns if (history.length < 30) return predictions; const recent = history.slice(-30); const timestamps = recent.map(c => new Date(c.timestamp).getTime()); const intervals = []; for (let i = 1; i < timestamps.length; i++) { intervals.push(timestamps[i] - timestamps[i-1]); } const avgInterval = intervals.reduce((a,b) => a+b) / intervals.length; const currentInterval = new Date(candle.timestamp).getTime() - timestamps[timestamps.length - 1]; // If candles are arriving slower than usual if (currentInterval > avgInterval * 1.5) { predictions.push({ type: 'NETWORK_CONGESTION', severity: 'MEDIUM', confidence: 50, value: `${(currentInterval/1000).toFixed(0)}s delay`, details: `Data arrival ${(currentInterval/avgInterval).toFixed(1)}x slower than normal`, metadata: { currentInterval, avgInterval } }); } return predictions; }

5. ML DATA EXPORT

Prepare training data for machine learning models

function prepareMLDataPoint(candle, anomalies, history) { const stats = calculateStatistics(history); return { timestamp: candle.timestamp, features: { // Price features open: parseFloat(candle.open) || 0, high: parseFloat(candle.high) || 0, low: parseFloat(candle.low) || 0, close: parseFloat(candle.close) || 0, // Volume features volume: parseFloat(candle.volume) || 0, volumeUsd: parseFloat(candle.volume_usd) || 0, // Derived features priceRange: (parseFloat(candle.high) - parseFloat(candle.low)) || 0, bodySize: Math.abs((parseFloat(candle.close) - parseFloat(candle.open))) || 0, volatility: ((parseFloat(candle.high) - parseFloat(candle.low)) / parseFloat(candle.open)) || 0, // Statistical features zScore: stats ? ((parseFloat(candle.close) - stats.mean) / stats.stdDev) : 0, distanceFromMean: stats ? Math.abs(parseFloat(candle.close) - stats.mean) : 0, // Historical features priceChange1: history.length > 0 ? ((parseFloat(candle.close) - parseFloat(history[history.length-1].close)) / parseFloat(history[history.length-1].close)) : 0, volumeChange1: history.length > 0 ? (parseFloat(candle.volume) / (parseFloat(history[history.length-1].volume) || 1)) : 0, }, labels: { hasAnomaly: anomalies.length > 0, anomalyCount: anomalies.length, maxSeverity: anomalies.length > 0 ? anomalies[0].severity : 'NONE', anomalyTypes: anomalies.map(a => a.type), isCritical: anomalies.some(a => a.severity === 'CRITICAL') }, metadata: { tokenAddress: CONFIG.tokenAddress, dataPointIndex: dataCount } }; } function exportMLData(dataPoint) { if (!CONFIG.exportData) return; // Add to buffer dataStore.mlDataBuffer.push(dataPoint); // Batch write to file if (dataStore.mlDataBuffer.length >= CONFIG.exportBatchSize) { const exportDir = CONFIG.exportPath; // Create directory if it doesn't exist if (!fs.existsSync(exportDir)) { fs.mkdirSync(exportDir, { recursive: true }); } const filename = path.join(exportDir, `security_data_${Date.now()}.jsonl`); const dataLines = dataStore.mlDataBuffer.map(d => JSON.stringify(d)).join('\n') + '\n'; fs.appendFileSync(filename, dataLines); console.log(chalk.gray(`\n💾 Exported ${dataStore.mlDataBuffer.length} data points to ${filename}`)); dataStore.mlDataBuffer = []; } }

6. DISPLAY FUNCTIONS

function displaySecurityDashboard(candle, allAnomalies) { clearScreen(); const timestamp = new Date(candle.timestamp).toLocaleString(); const uptime = Math.floor((Date.now() - streamStartTime) / 1000); const uptimeStr = `${Math.floor(uptime / 60)}m ${uptime % 60}s`; // Header Info console.log(chalk.yellow.bold(`\n⏰ ${timestamp} | Uptime: ${uptimeStr} | Events: ${dataCount}`)); console.log(chalk.gray('─'.repeat(70))); // Price Display console.log(chalk.cyan('\n📊 MARKET DATA')); console.log(chalk.white(` Open: ${chalk.blue(formatPrice(candle.open))}`)); console.log(chalk.white(` High: ${chalk.green(formatPrice(candle.high))}`)); console.log(chalk.white(` Low: ${chalk.red(formatPrice(candle.low))}`)); console.log(chalk.white(` Close: ${chalk.yellow(formatPrice(candle.close))}`)); console.log(chalk.white(` Volume: ${chalk.magenta(formatVolume(candle.volume))}`)); // Security Status console.log(chalk.cyan('\n🛡️ SECURITY STATUS')); if (allAnomalies.length === 0) { console.log(chalk.green(' ✓ No threats detected - All systems normal')); } else { // Group by severity const critical = allAnomalies.filter(a => a.severity === 'CRITICAL'); const high = allAnomalies.filter(a => a.severity === 'HIGH'); const medium = allAnomalies.filter(a => a.severity === 'MEDIUM'); if (critical.length > 0) { console.log(chalk.red.bold(` 🚨 CRITICAL THREATS: ${critical.length}`)); critical.forEach(a => { console.log(chalk.red(` ${a.type}: ${a.value}`)); console.log(chalk.red(` └─ ${a.details}`)); console.log(chalk.gray(` Confidence: ${a.confidence}%`)); }); } if (high.length > 0) { console.log(chalk.yellow(` ⚠️ HIGH SEVERITY: ${high.length}`)); high.forEach(a => { console.log(chalk.yellow(` ${a.type}: ${a.value}`)); console.log(chalk.yellow(` └─ ${a.details}`)); }); } if (medium.length > 0) { console.log(chalk.blue(` ℹ️ MEDIUM ALERTS: ${medium.length}`)); medium.forEach(a => { console.log(chalk.blue(` ${a.type}: ${a.value}`)); }); } } // Statistics console.log(chalk.cyan('\n📈 DETECTION STATISTICS')); console.log(chalk.white(` Total Anomalies Detected: ${chalk.yellow(totalAnomaliesDetected)}`)); console.log(chalk.white(` Critical Alerts: ${chalk.red(criticalAlertsCount)}`)); console.log(chalk.white(` Data Points Analyzed: ${chalk.blue(dataCount)}`)); console.log(chalk.white(` History Buffer: ${chalk.magenta(dataStore.priceHistory.length)}/${CONFIG.maxHistory}`)); console.log(chalk.gray('\n' + '─'.repeat(70))); console.log(chalk.gray('Press Ctrl+C to exit and save data')); }

7. Main Detection Pipeline

function runSecurityAnalysis(candle, history) { const allAnomalies = []; // Run all detection algorithms allAnomalies.push(...detectFlashLoanExploit(candle, history)); allAnomalies.push(...detectRugPullSignals(candle, history)); allAnomalies.push(...detectSandwichAttack(candle, history)); allAnomalies.push(...detectSmartContractAnomalies(candle, history)); allAnomalies.push(...detectEconomicExploits(candle, history)); allAnomalies.push(...predictNetworkCongestion(candle, history)); // Update statistics totalAnomaliesDetected += allAnomalies.length; criticalAlertsCount += allAnomalies.filter(a => a.severity === 'CRITICAL').length; // Store security events if (allAnomalies.length > 0) { dataStore.securityEvents.push({ timestamp: candle.timestamp, anomalies: allAnomalies, candle: { open: candle.open, high: candle.high, low: candle.low, close: candle.close, volume: candle.volume } }); } return allAnomalies; }

8. Streaming Subscription

console.log(chalk.blue('🚀 Starting security monitoring...')); console.log(chalk.gray(`Token: ${CONFIG.tokenAddress}`)); console.log(chalk.gray(`Export: ${CONFIG.exportData ? 'Enabled' : 'Disabled'}`)); console.log(''); const unsubscribe = client.StreamingService.subscribeToOHLCVTokens( { chain_name: StreamingChain.SOLANA_MAINNET, token_addresses: [CONFIG.tokenAddress], interval: StreamingInterval.ONE_MINUTE, timeframe: StreamingTimeframe.ONE_HOUR, }, { next: (data) => { // Extract candles let candles = []; if (data && typeof data === 'object') { if (data.ohlcvCandlesForToken) { candles = Array.isArray(data.ohlcvCandlesForToken) ? data.ohlcvCandlesForToken : [data.ohlcvCandlesForToken]; } else if (data.data && data.data.ohlcvCandlesForToken) { candles = Array.isArray(data.data.ohlcvCandlesForToken) ? data.data.ohlcvCandlesForToken : [data.data.ohlcvCandlesForToken]; } else if (Array.isArray(data)) { candles = data; } else if (data.items) { candles = data.items; } else { candles = [data]; } } if (candles.length === 0) { console.log(chalk.yellow('⚠️ No candles in data batch')); return; } // Process each candle candles.forEach((candle) => { dataCount++; // Add to history dataStore.priceHistory.push(candle); if (dataStore.priceHistory.length > CONFIG.maxHistory) { dataStore.priceHistory.shift(); } // Run security analysis const anomalies = runSecurityAnalysis(candle, dataStore.priceHistory.slice(0, -1)); // Prepare and export ML data const mlDataPoint = prepareMLDataPoint(candle, anomalies, dataStore.priceHistory.slice(0, -1)); exportMLData(mlDataPoint); // Display results displaySecurityDashboard(candle, anomalies); // Critical alert notification const criticalAnomalies = anomalies.filter(a => a.severity === 'CRITICAL'); if (criticalAnomalies.length > 0) { console.log(chalk.red.bold('\n🚨 IMMEDIATE ACTION REQUIRED 🚨')); console.log(chalk.red('Critical security threats detected. Review immediately.')); // Log to separate critical events file const criticalLogPath = path.join(CONFIG.exportPath, 'critical_events.log'); const logEntry = `${new Date().toISOString()} - ${JSON.stringify(criticalAnomalies)}\n`; fs.appendFileSync(criticalLogPath, logEntry); } }); }, error: (error) => { console.clear(); displayHeader(); console.log(chalk.red('\n✗ Subscription error:')); console.log(JSON.stringify(error, null, 2)); console.log(chalk.yellow('\nTroubleshooting:')); console.log('1. Verify your API key has streaming permissions'); console.log('2. Check token address is valid for Solana'); console.log('3. Contact [email protected] for Beta access'); process.exit(1); }, complete: () => { console.log(chalk.green("\n✓ Stream completed")); saveSessionReport(); process.exit(0); }, } );

9. Session Reporting

function saveSessionReport() { const report = { sessionInfo: { startTime: new Date(streamStartTime).toISOString(), endTime: new Date().toISOString(), duration: Date.now() - streamStartTime, totalDataPoints: dataCount }, statistics: { totalAnomalies: totalAnomaliesDetected, criticalAlerts: criticalAlertsCount, securityEvents: dataStore.securityEvents.length }, configuration: CONFIG, securityEvents: dataStore.securityEvents }; const reportPath = path.join(CONFIG.exportPath, `session_report_${Date.now()}.json`); if (!fs.existsSync(CONFIG.exportPath)) { fs.mkdirSync(CONFIG.exportPath, { recursive: true }); } fs.writeFileSync(reportPath, JSON.stringify(report, null, 2)); console.log(chalk.green(`\n📊 Session report saved: ${reportPath}`)); }

10 . Graceful Shutdown

process.on('SIGINT', async () => { console.log("\n" + chalk.yellow("🛑 Shutting down security monitoring...")); // Flush remaining ML data if (dataStore.mlDataBuffer.length > 0) { const filename = path.join(CONFIG.exportPath, `security_data_final_${Date.now()}.jsonl`); const dataLines = dataStore.mlDataBuffer.map(d => JSON.stringify(d)).join('\n') + '\n'; if (!fs.existsSync(CONFIG.exportPath)) { fs.mkdirSync(CONFIG.exportPath, { recursive: true }); } fs.writeFileSync(filename, dataLines); console.log(chalk.green(`✓ Saved ${dataStore.mlDataBuffer.length} pending data points`)); } // Save session report saveSessionReport(); // Display final summary console.log(chalk.cyan('\n═══════════════════════════════════════════')); console.log(chalk.cyan(' SESSION SUMMARY')); console.log(chalk.cyan('═══════════════════════════════════════════')); console.log(chalk.white(` Duration: ${Math.floor((Date.now() - streamStartTime) / 60000)} minutes`)); console.log(chalk.white(` Data Points: ${dataCount}`)); console.log(chalk.white(` Total Anomalies: ${totalAnomaliesDetected}`)); console.log(chalk.white(` Critical Alerts: ${criticalAlertsCount}`)); console.log(chalk.white(` Security Events: ${dataStore.securityEvents.length}`)); console.log(chalk.cyan('═══════════════════════════════════════════\n')); unsubscribe(); await client.StreamingService.disconnect(); console.log(chalk.green('✓ Shutdown complete. Data saved.')); process.exit(0); });

11. At the end adding a Heartbeat to show system is running

setInterval(() => { const uptime = Math.floor((Date.now() - streamStartTime) / 1000); if (dataCount === 0 && uptime > 30) { console.log(chalk.gray(`⏳ Waiting for data... (${uptime}s)`)); } }, 30000); // Every 30 seconds })();

So after compiling all this in our security_analytics.js file we run : 
node security_analytics.js


It will check all recent transaction happening in the wallet given and will show how many anomalies are there . And it will automatically save  all  transactions in the security_data folder in json format . Which we gonna use for advanced check of anomaly through our

MODEL 

What this security_analytics.js does : 

  • Initializes the Goldrush client
    Connects to Solana’s live OHLCV (price and volume) stream via Goldrush SDK for high-frequency data access.

  • Maintains rolling historical data
    Stores recent candle data (price, volume) in buffers to detect patterns and compute moving statistics.

  • Performs real-time anomaly detection
    Continuously analyzes live data to detect:

  • Generates multi-level alerts
    Flags anomalies as LOW, MEDIUM, or CRITICAL severity based on deviation thresholds.

  • Tracks and counts security events
    Maintains a counter of critical alerts and logs all detected anomalies in structured form.

  • Prepares machine learning data points
    Converts analyzed candle + anomaly data into ML-friendly format for predictive model training.

  • Exports ML dataset
    Automatically writes real-time analytical data to a local or cloud file (for training anomaly-prediction models).

  • Displays live security dashboard
    Prints color-coded (via chalk) terminal summaries of:

  • Handles streaming errors gracefully
    Captures and logs API errors or connection drops for reliability.

  • Supports graceful shutdown
    Listens for process termination (SIGINT) to safely close the stream and persist session data.

Now, Let’s jump on our anomaly segregation using ML detection part : 

STEP 5 : train_security_model.py

#!/usr/bin/env python3 """ Security Analytics ML Training Script Trains machine learning models to detect blockchain security anomalies """ import json import pandas as pd import numpy as np from pathlib import Path from datetime import datetime import matplotlib.pyplot as plt import seaborn as sns from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier from sklearn.linear_model import LogisticRegression from sklearn.preprocessing import StandardScaler from sklearn.metrics import ( classification_report, confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, accuracy_score, f1_score ) import joblib import warnings warnings.filterwarnings('ignore') # Set style for plots sns.set_style('whitegrid') plt.rcParams['figure.figsize'] = (12, 8) class SecurityMLTrainer: """Train and evaluate ML models for security anomaly detection""" def __init__(self, data_path): self.data_path = data_path self.df = None self.X_train = None self.X_test = None self.y_train = None self.y_test = None self.scaler = StandardScaler() self.models = {} self.best_model = None self.feature_names = None def load_data(self): """Load data from JSONL file""" print("📂 Loading data from", self.data_path) data = [] with open(self.data_path, 'r') as f: for line in f: if line.strip(): data.append(json.loads(line)) print(f"✓ Loaded {len(data)} data points") # Extract features and labels features_list = [] labels_list = [] for item in data: features_list.append(item['features']) labels_list.append(item['labels']['hasAnomaly']) self.df = pd.DataFrame(features_list) self.df['hasAnomaly'] = labels_list print(f"\n📊 Dataset Overview:") print(f" Total samples: {len(self.df)}") print(f" Features: {len(self.df.columns) - 1}") print(f" Anomalies: {self.df['hasAnomaly'].sum()} ({self.df['hasAnomaly'].mean()*100:.1f}%)") print(f" Normal: {(~self.df['hasAnomaly']).sum()} ({(~self.df['hasAnomaly']).mean()*100:.1f}%)") return self.df def prepare_features(self, test_size=0.25, random_state=42): """Prepare features for training""" print("\n🔧 Preparing features...") # Separate features and target X = self.df.drop('hasAnomaly', axis=1) y = self.df['hasAnomaly'] # Store feature names self.feature_names = X.columns.tolist() # Handle any infinite or missing values X = X.replace([np.inf, -np.inf], np.nan) X = X.fillna(0) # Split data self.X_train, self.X_test, self.y_train, self.y_test = train_test_split( X, y, test_size=test_size, random_state=random_state, stratify=y ) # Scale features self.X_train_scaled = self.scaler.fit_transform(self.X_train) self.X_test_scaled = self.scaler.transform(self.X_test) print(f"✓ Train set: {len(self.X_train)} samples") print(f"✓ Test set: {len(self.X_test)} samples") print(f"✓ Features scaled using StandardScaler") def train_models(self): """Train multiple ML models""" print("\n🤖 Training models...") # Define models models_config = { 'Random Forest': RandomForestClassifier( n_estimators=100, max_depth=10, min_samples_split=5, random_state=42, class_weight='balanced' ), 'Gradient Boosting': GradientBoostingClassifier( n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42 ), 'Logistic Regression': LogisticRegression( max_iter=1000, random_state=42, class_weight='balanced' ) } results = [] for name, model in models_config.items(): print(f"\n Training {name}...") # Use scaled features for Logistic Regression, raw for tree-based if name == 'Logistic Regression': X_train_use = self.X_train_scaled X_test_use = self.X_test_scaled else: X_train_use = self.X_train X_test_use = self.X_test # Train model model.fit(X_train_use, self.y_train) # Predictions y_pred = model.predict(X_test_use) y_pred_proba = model.predict_proba(X_test_use)[:, 1] # Calculate metrics accuracy = accuracy_score(self.y_test, y_pred) f1 = f1_score(self.y_test, y_pred) # Handle case where only one class is present try: roc_auc = roc_auc_score(self.y_test, y_pred_proba) except: roc_auc = 0.0 # Cross-validation score cv_scores = cross_val_score(model, X_train_use, self.y_train, cv=3, scoring='f1') results.append({ 'Model': name, 'Accuracy': accuracy, 'F1 Score': f1, 'ROC AUC': roc_auc, 'CV F1 Mean': cv_scores.mean(), 'CV F1 Std': cv_scores.std() }) # Store model self.models[name] = { 'model': model, 'scaled': name == 'Logistic Regression', 'predictions': y_pred, 'probabilities': y_pred_proba } print(f" ✓ Accuracy: {accuracy:.3f}") print(f" ✓ F1 Score: {f1:.3f}") print(f" ✓ ROC AUC: {roc_auc:.3f}") print(f" ✓ CV F1: {cv_scores.mean():.3f} (+/- {cv_scores.std():.3f})") # Results summary results_df = pd.DataFrame(results) print("\n📊 Model Comparison:") print(results_df.to_string(index=False)) # Select best model based on F1 score best_idx = results_df['F1 Score'].idxmax() best_model_name = results_df.iloc[best_idx]['Model'] self.best_model = self.models[best_model_name] self.best_model['name'] = best_model_name print(f"\n🏆 Best Model: {best_model_name}") return results_df def evaluate_best_model(self): """Detailed evaluation of the best model""" print(f"\n📈 Detailed Evaluation - {self.best_model['name']}") # Classification Report print("\n" + "="*60) print("CLASSIFICATION REPORT") print("="*60) print(classification_report( self.y_test, self.best_model['predictions'], target_names=['Normal', 'Anomaly'] )) # Confusion Matrix cm = confusion_matrix(self.y_test, self.best_model['predictions']) plt.figure(figsize=(8, 6)) sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Normal', 'Anomaly'], yticklabels=['Normal', 'Anomaly']) plt.title(f'Confusion Matrix - {self.best_model["name"]}') plt.ylabel('True Label') plt.xlabel('Predicted Label') plt.tight_layout() plt.savefig('confusion_matrix.png', dpi=300, bbox_inches='tight') print("✓ Confusion matrix saved to confusion_matrix.png") def plot_feature_importance(self): """Plot feature importance for tree-based models""" model = self.best_model['model'] if hasattr(model, 'feature_importances_'): print("\n📊 Feature Importance Analysis") importance_df = pd.DataFrame({ 'Feature': self.feature_names, 'Importance': model.feature_importances_ }).sort_values('Importance', ascending=False) print("\nTop 10 Most Important Features:") print(importance_df.head(10).to_string(index=False)) # Plot plt.figure(figsize=(10, 8)) top_features = importance_df.head(15) plt.barh(range(len(top_features)), top_features['Importance']) plt.yticks(range(len(top_features)), top_features['Feature']) plt.xlabel('Importance') plt.title(f'Top 15 Feature Importances - {self.best_model["name"]}') plt.gca().invert_yaxis() plt.tight_layout() plt.savefig('feature_importance.png', dpi=300, bbox_inches='tight') print("✓ Feature importance plot saved to feature_importance.png") def plot_roc_curve(self): """Plot ROC curve""" try: fpr, tpr, _ = roc_curve(self.y_test, self.best_model['probabilities']) roc_auc = roc_auc_score(self.y_test, self.best_model['probabilities']) plt.figure(figsize=(8, 6)) plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (AUC = {roc_auc:.3f})') plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--') plt.xlim([0.0, 1.0]) plt.ylim([0.0, 1.05]) plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title(f'ROC Curve - {self.best_model["name"]}') plt.legend(loc="lower right") plt.tight_layout() plt.savefig('roc_curve.png', dpi=300, bbox_inches='tight') print("✓ ROC curve saved to roc_curve.png") except: print("⚠ Could not generate ROC curve (insufficient data)") def plot_precision_recall_curve(self): """Plot Precision-Recall curve""" try: precision, recall, _ = precision_recall_curve( self.y_test, self.best_model['probabilities'] ) plt.figure(figsize=(8, 6)) plt.plot(recall, precision, color='blue', lw=2) plt.xlabel('Recall') plt.ylabel('Precision') plt.title(f'Precision-Recall Curve - {self.best_model["name"]}') plt.tight_layout() plt.savefig('precision_recall_curve.png', dpi=300, bbox_inches='tight') print("✓ Precision-Recall curve saved to precision_recall_curve.png") except: print("⚠ Could not generate Precision-Recall curve") def analyze_predictions(self): """Analyze prediction patterns""" print("\n🔍 Prediction Analysis") # Create analysis dataframe analysis_df = self.X_test.copy() analysis_df['true_label'] = self.y_test.values analysis_df['predicted_label'] = self.best_model['predictions'] analysis_df['prediction_prob'] = self.best_model['probabilities'] # Identify false positives and false negatives fp = analysis_df[(analysis_df['true_label'] == False) & (analysis_df['predicted_label'] == True)] fn = analysis_df[(analysis_df['true_label'] == True) & (analysis_df['predicted_label'] == False)] print(f"\n False Positives: {len(fp)}") print(f" False Negatives: {len(fn)}") if len(fp) > 0: print("\n High-confidence False Positives (top 3):") print(fp.nlargest(3, 'prediction_prob')[['prediction_prob']].to_string()) if len(fn) > 0: print("\n High-confidence False Negatives (top 3):") print(fn.nsmallest(3, 'prediction_prob')[['prediction_prob']].to_string()) def save_model(self, output_dir='models'): """Save the trained model and scaler""" print(f"\n💾 Saving model...") # Create output directory Path(output_dir).mkdir(exist_ok=True) # Save model model_path = Path(output_dir) / 'security_model.pkl' joblib.dump(self.best_model['model'], model_path) print(f"✓ Model saved to {model_path}") # Save scaler scaler_path = Path(output_dir) / 'scaler.pkl' joblib.dump(self.scaler, scaler_path) print(f"✓ Scaler saved to {scaler_path}") # Save feature names feature_path = Path(output_dir) / 'feature_names.json' with open(feature_path, 'w') as f: json.dump({ 'features': self.feature_names, 'model_name': self.best_model['name'], 'trained_date': datetime.now().isoformat(), 'use_scaler': self.best_model['scaled'] }, f, indent=2) print(f"✓ Feature names saved to {feature_path}") # Save model metadata metadata = { 'model_name': self.best_model['name'], 'training_date': datetime.now().isoformat(), 'n_samples': len(self.df), 'n_features': len(self.feature_names), 'n_anomalies': int(self.df['hasAnomaly'].sum()), 'test_accuracy': float(accuracy_score(self.y_test, self.best_model['predictions'])), 'test_f1': float(f1_score(self.y_test, self.best_model['predictions'])), 'feature_names': self.feature_names } metadata_path = Path(output_dir) / 'model_metadata.json' with open(metadata_path, 'w') as f: json.dump(metadata, f, indent=2) print(f"✓ Metadata saved to {metadata_path}") print(f"\n✅ Model training complete! All files saved to '{output_dir}/' directory") def main(): """Main training pipeline""" print("="*70) print("🛡️ BLOCKCHAIN SECURITY ANOMALY DETECTION - ML TRAINING") print("="*70) # Find the most recent data file data_dir = Path('security_data') if not data_dir.exists(): print("❌ Error: security_data directory not found!") return jsonl_files = list(data_dir.glob('*.jsonl')) if not jsonl_files: print("❌ Error: No .jsonl files found in security_data directory!") return # Use the most recent file data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime) print(f"\n📁 Using data file: {data_file}") # Initialize trainer trainer = SecurityMLTrainer(data_file) # Training pipeline try: # Load data trainer.load_data() # Prepare features trainer.prepare_features() # Train models results = trainer.train_models() # Evaluate best model trainer.evaluate_best_model() # Generate visualizations trainer.plot_feature_importance() trainer.plot_roc_curve() trainer.plot_precision_recall_curve() # Analyze predictions trainer.analyze_predictions() # Save model trainer.save_model() print("\n" + "="*70) print("✅ TRAINING COMPLETED SUCCESSFULLY!") print("="*70) print("\n📦 Generated Files:") print(" - models/security_model.pkl") print(" - models/scaler.pkl") print(" - models/feature_names.json") print(" - models/model_metadata.json") print(" - confusion_matrix.png") print(" - feature_importance.png") print(" - roc_curve.png") print(" - precision_recall_curve.png") except Exception as e: print(f"\n❌ Error during training: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()

After this once , we run : python train_security_model.py

We will get the following output : 

This Python script is basically the brain-training phase of your Goldrush security analytics system.
While the earlier JavaScript code was all about detecting anomalies in real time, this script teaches an ML model how to recognize those patterns intelligently in the future — like training an AI analyst for blockchain data.


 Step-by-Step Breakdown

1. Data Loading & Preparation

  • The script starts by scanning your security_data folder for the latest .jsonl file (this is where your live monitoring system stores exported analytics data).

  • It then loads that data into a pandas DataFrame.

  • Each row represents a time snapshot (like a minute of on-chain activity), with features such as price change, volume, volatility, etc.

  • The label (hasAnomaly) tells the model whether that data point represented a security anomaly or normal behavior.

Think of it as: “Here’s what normal Solana activity looks like — and here’s what an exploit looks like.”


2. Feature Scaling & Data Splitting

  • It cleans up the data by replacing missing or infinite values.

  • Then it splits the dataset into a training set (75%) and a test set (25%).

  • The features are standardized (scaled) so models like Logistic Regression can perform better.

 Basically, this is setting up clean, balanced data before throwing it into the machine learning cauldron.


3. Training Multiple Models

  • Three models are trained in parallel:

  • Random Forest → Great for nonlinear, high-variance blockchain data.

  • Gradient Boosting → Captures subtle interactions and trends.

  • Logistic Regression → A fast, linear baseline model.

  • Each model is trained and then evaluated on the test data.

  • It computes Accuracy, F1 Score, and ROC-AUC, along with cross-validation F1 scores.

 The script automatically picks the best performing model based on F1 score (which balances false positives & negatives).


4. Model Evaluation & Visualization

Once the best model is chosen, it generates visual reports:

  • Classification report → Shows how well it predicts anomalies vs normal cases.

  • Confusion Matrix → Visual heatmap of correct vs incorrect predictions.

  • Feature Importance → Ranks which metrics (price deviation, volume spikes, etc.) most influenced anomaly decisions.

  • ROC & Precision-Recall Curves → Shows model confidence and trade-offs between recall and precision.

 This turns raw metrics into visual, explainable results — super helpful for audits or reports.


5. Prediction Error Analysis

  • The script identifies false positives (flagged normal data) and false negatives (missed anomalies).

  • It even prints out the top 3 highest-confidence errors in both categories.

 This helps you understand where the model is over-reacting or missing subtle exploit patterns.


6. Model Saving & Metadata

After evaluation, the best model is saved neatly inside a /models folder:

  • security_model.pkl → The trained ML model

  • scaler.pkl → The StandardScaler object for consistent input normalization

  • feature_names.json → Metadata of which features were used

  • model_metadata.json → Training info (accuracy, dataset stats, date, etc.)

 So, next time, you can directly load this trained model and start detecting anomalies instantly — no retraining needed

Step 6 : anomaly_detection/predict_anomalies.py

#!/usr/bin/env python3 """ Anomaly Prediction Script Uses the trained ML model to predict anomalies on new data """ import json import pandas as pd import numpy as np import joblib from pathlib import Path import sys class AnomalyPredictor: """Predict security anomalies using trained ML model""" def __init__(self, model_dir='models'): self.model_dir = Path(model_dir) self.model = None self.scaler = None self.feature_names = None self.metadata = None self.load_model() def load_model(self): """Load the trained model and associated files""" print("📦 Loading trained model...") # Load model model_path = self.model_dir / 'security_model.pkl' if not model_path.exists(): raise FileNotFoundError(f"Model not found at {model_path}") self.model = joblib.load(model_path) # Load scaler scaler_path = self.model_dir / 'scaler.pkl' if scaler_path.exists(): self.scaler = joblib.load(scaler_path) # Load feature names feature_path = self.model_dir / 'feature_names.json' if feature_path.exists(): with open(feature_path, 'r') as f: feature_config = json.load(f) self.feature_names = feature_config['features'] # Load metadata metadata_path = self.model_dir / 'model_metadata.json' if metadata_path.exists(): with open(metadata_path, 'r') as f: self.metadata = json.load(f) print(f"✓ Model loaded: {self.metadata.get('model_name', 'Unknown')}") print(f"✓ Training date: {self.metadata.get('training_date', 'Unknown')}") print(f"✓ Test accuracy: {self.metadata.get('test_accuracy', 0):.3f}") print(f"✓ Test F1 score: {self.metadata.get('test_f1', 0):.3f}") def predict_from_features(self, features_dict): """ Predict anomaly from a features dictionary Args: features_dict: Dictionary containing feature values Returns: dict: Prediction result with probability and label """ # Create DataFrame with correct feature order df = pd.DataFrame([features_dict])[self.feature_names] # Handle missing/infinite values df = df.replace([np.inf, -np.inf], np.nan) df = df.fillna(0) # Make prediction prediction = self.model.predict(df)[0] probability = self.model.predict_proba(df)[0] return { 'is_anomaly': bool(prediction), 'anomaly_probability': float(probability[1]), 'normal_probability': float(probability[0]), 'confidence': float(max(probability)), 'prediction_label': 'ANOMALY' if prediction else 'NORMAL' } def predict_from_jsonl(self, jsonl_path): """ Predict anomalies for all records in a JSONL file Args: jsonl_path: Path to JSONL file Returns: list: List of predictions for each record """ print(f"\n🔍 Analyzing data from {jsonl_path}...") predictions = [] with open(jsonl_path, 'r') as f: for idx, line in enumerate(f, 1): if not line.strip(): continue data = json.loads(line) features = data['features'] # Make prediction result = self.predict_from_features(features) # Add metadata result['timestamp'] = data.get('timestamp') result['data_index'] = idx # Add actual label if available if 'labels' in data: result['actual_anomaly'] = data['labels'].get('hasAnomaly') result['actual_types'] = data['labels'].get('anomalyTypes', []) predictions.append(result) return predictions def print_predictions(self, predictions): """Print prediction results in a formatted way""" anomalies = [p for p in predictions if p['is_anomaly']] normal = [p for p in predictions if not p['is_anomaly']] print(f"\n📊 Prediction Results:") print(f" Total records: {len(predictions)}") print(f" Predicted anomalies: {len(anomalies)}") print(f" Predicted normal: {len(normal)}") if anomalies: print(f"\n⚠️ Detected Anomalies:") for pred in anomalies: print(f"\n 📍 Record #{pred['data_index']} - {pred.get('timestamp', 'N/A')}") print(f" Anomaly Probability: {pred['anomaly_probability']:.1%}") print(f" Confidence: {pred['confidence']:.1%}") if 'actual_anomaly' in pred: correct = pred['actual_anomaly'] == pred['is_anomaly'] status = "✓ CORRECT" if correct else "✗ INCORRECT" print(f" Actual Label: {pred['actual_anomaly']} {status}") if pred.get('actual_types'): print(f" Actual Types: {', '.join(pred['actual_types'])}") # Calculate accuracy if actual labels available if 'actual_anomaly' in predictions[0]: correct = sum(1 for p in predictions if p['is_anomaly'] == p['actual_anomaly']) accuracy = correct / len(predictions) print(f"\n📈 Model Performance on this data:") print(f" Accuracy: {accuracy:.1%}") def save_predictions(self, predictions, output_path): """Save predictions to a JSON file""" with open(output_path, 'w') as f: json.dump(predictions, f, indent=2) print(f"\n💾 Predictions saved to {output_path}") def main(): """Main prediction pipeline""" print("="*70) print("🔮 BLOCKCHAIN SECURITY ANOMALY PREDICTION") print("="*70) # Initialize predictor try: predictor = AnomalyPredictor() except FileNotFoundError as e: print(f"\n❌ Error: {e}") print(" Please train the model first by running: python train_security_model.py") return # Check for data file data_dir = Path('security_data') if len(sys.argv) > 1: # Use file specified in command line data_file = Path(sys.argv[1]) else: # Use the most recent JSONL file jsonl_files = list(data_dir.glob('*.jsonl')) if not jsonl_files: print("\n❌ No data files found!") print(" Usage: python predict_anomalies.py [path/to/data.jsonl]") return data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime) if not data_file.exists(): print(f"\n❌ File not found: {data_file}") return # Make predictions predictions = predictor.predict_from_jsonl(data_file) # Print results predictor.print_predictions(predictions) # Save predictions output_file = f"predictions_{data_file.stem}.json" predictor.save_predictions(predictions, output_file) print("\n" + "="*70) print("✅ PREDICTION COMPLETED!") print("="*70) if __name__ == "__main__": main()

Now , after creating this file we have to run : python predict_anomalies.py

While  the script is running , it will create the following : 

  • Loads a Trained Model for Security Prediction
    The script starts by loading your pre-trained anomaly detection model, along with its associated scaler, feature names, and metadata. Think of it as powering up your AI assistant that’s ready to spot suspicious blockchain activity.

  • Handles All the Setup Automatically
    No manual hassle — it automatically finds the right model files, checks their paths, and gives you a quick summary of when and how the model was trained (accuracy, F1 score, etc.).

  • Prepares the Data for Prediction
    Before running predictions, it cleans and formats incoming data — removing infinities, filling missing values, and scaling everything just like it did during training. This ensures consistent and reliable results every time.

  • Makes Smart Predictions on New Data
    Whether you feed it a single transaction or a batch of new blockchain events, it runs them through the trained model to figure out if they’re normal or anomalous (potentially malicious).

  • Outputs Confidence Scores & Probabilities
    The predictions aren’t just binary; they come with detailed confidence levels and probability scores — helping you understand how “sure” the model is about each call.

  • Supports Real-time Monitoring Pipelines
    This code can easily plug into your live observability stack. Think of it as the intelligence layer that turns streaming blockchain data into actionable insights for security teams.

  • Logs Everything Clearly for Developers
    Every step prints clear, developer-friendly logs, so you always know what’s being loaded, predicted, or skipped

And at the end it will show something like this in the terminal : 
Perfect! The model is working great with 97% accuracy!

Atlast , We have : 


STEP 7 : anomaly_detection/visualize_result.py

#!/usr/bin/env python3 """ Visualization Dashboard for Security Analytics Creates comprehensive visualizations of training data and predictions """ import json import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from pathlib import Path from datetime import datetime # Set style sns.set_style('whitegrid') plt.rcParams['figure.figsize'] = (14, 10) class SecurityVisualizationDashboard: """Create visualizations for security analytics""" def __init__(self, data_path): self.data_path = data_path self.df = None self.load_data() def load_data(self): """Load data from JSONL file""" print(f"📂 Loading data from {self.data_path}...") data = [] with open(self.data_path, 'r') as f: for line in f: if line.strip(): data.append(json.loads(line)) # Parse data features_list = [] for item in data: row = item['features'].copy() row['timestamp'] = item['timestamp'] row['hasAnomaly'] = item['labels']['hasAnomaly'] row['anomalyTypes'] = ','.join(item['labels']['anomalyTypes']) row['severity'] = item['labels']['maxSeverity'] features_list.append(row) self.df = pd.DataFrame(features_list) self.df['timestamp'] = pd.to_datetime(self.df['timestamp']) print(f"✓ Loaded {len(self.df)} records") print(f" Anomalies: {self.df['hasAnomaly'].sum()}") print(f" Normal: {(~self.df['hasAnomaly']).sum()}") def create_dashboard(self, output_file='dashboard.png'): """Create comprehensive dashboard""" print("\n📊 Creating visualization dashboard...") fig = plt.figure(figsize=(20, 12)) gs = fig.add_gridspec(4, 3, hspace=0.3, wspace=0.3) # 1. Price and Volume Over Time ax1 = fig.add_subplot(gs[0, :]) self.plot_price_volume_timeline(ax1) # 2. Anomaly Distribution ax2 = fig.add_subplot(gs[1, 0]) self.plot_anomaly_distribution(ax2) # 3. Anomaly Types ax3 = fig.add_subplot(gs[1, 1]) self.plot_anomaly_types(ax3) # 4. Volume Distribution ax4 = fig.add_subplot(gs[1, 2]) self.plot_volume_distribution(ax4) # 5. Volatility Analysis ax5 = fig.add_subplot(gs[2, 0]) self.plot_volatility_analysis(ax5) # 6. Price Range Analysis ax6 = fig.add_subplot(gs[2, 1]) self.plot_price_range_analysis(ax6) # 7. Volume Changes ax7 = fig.add_subplot(gs[2, 2]) self.plot_volume_changes(ax7) # 8. Feature Correlation Heatmap ax8 = fig.add_subplot(gs[3, :]) self.plot_correlation_heatmap(ax8) plt.suptitle('🛡️ Blockchain Security Analytics Dashboard', fontsize=20, fontweight='bold', y=0.995) plt.savefig(output_file, dpi=300, bbox_inches='tight') print(f"✓ Dashboard saved to {output_file}") plt.close() def plot_price_volume_timeline(self, ax): """Plot price and volume over time with anomalies highlighted""" ax2 = ax.twinx() # Plot closing price ax.plot(self.df['timestamp'], self.df['close'], label='Close Price', color='blue', linewidth=2) # Plot volume ax2.plot(self.df['timestamp'], self.df['volumeUsd'], label='Volume USD', color='green', alpha=0.6, linewidth=1.5) # Highlight anomalies anomalies = self.df[self.df['hasAnomaly']] if len(anomalies) > 0: ax.scatter(anomalies['timestamp'], anomalies['close'], color='red', s=100, marker='x', linewidths=3, label='Anomalies', zorder=5) ax.set_xlabel('Time') ax.set_ylabel('Close Price', color='blue') ax2.set_ylabel('Volume (USD)', color='green') ax.tick_params(axis='y', labelcolor='blue') ax2.tick_params(axis='y', labelcolor='green') ax.legend(loc='upper left') ax2.legend(loc='upper right') ax.set_title('Price & Volume Timeline with Anomalies', fontweight='bold') ax.grid(True, alpha=0.3) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right') def plot_anomaly_distribution(self, ax): """Plot anomaly vs normal distribution""" counts = self.df['hasAnomaly'].value_counts() colors = ['#2ecc71', '#e74c3c'] labels = ['Normal', 'Anomaly'] wedges, texts, autotexts = ax.pie(counts, labels=labels, autopct='%1.1f%%', colors=colors, startangle=90, textprops={'fontsize': 12, 'weight': 'bold'}) ax.set_title('Anomaly Distribution', fontweight='bold', pad=20) def plot_anomaly_types(self, ax): """Plot distribution of anomaly types""" anomaly_types = [] for types_str in self.df[self.df['hasAnomaly']]['anomalyTypes']: if types_str: anomaly_types.extend(types_str.split(',')) if anomaly_types: type_counts = pd.Series(anomaly_types).value_counts() colors = sns.color_palette('Set2', len(type_counts)) type_counts.plot(kind='barh', ax=ax, color=colors) ax.set_xlabel('Count') ax.set_title('Anomaly Types Distribution', fontweight='bold') else: ax.text(0.5, 0.5, 'No anomaly types recorded', ha='center', va='center', transform=ax.transAxes) ax.set_title('Anomaly Types Distribution', fontweight='bold') def plot_volume_distribution(self, ax): """Plot volume distribution""" normal = self.df[~self.df['hasAnomaly']]['volumeUsd'] anomaly = self.df[self.df['hasAnomaly']]['volumeUsd'] ax.hist(normal, bins=20, alpha=0.6, label='Normal', color='green') ax.hist(anomaly, bins=20, alpha=0.6, label='Anomaly', color='red') ax.set_xlabel('Volume (USD)') ax.set_ylabel('Frequency') ax.set_title('Volume Distribution', fontweight='bold') ax.legend() ax.set_yscale('log') def plot_volatility_analysis(self, ax): """Plot volatility comparison""" data_to_plot = [ self.df[~self.df['hasAnomaly']]['volatility'], self.df[self.df['hasAnomaly']]['volatility'] ] bp = ax.boxplot(data_to_plot, labels=['Normal', 'Anomaly'], patch_artist=True, showmeans=True) colors = ['lightgreen', 'lightcoral'] for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) ax.set_ylabel('Volatility') ax.set_title('Volatility Comparison', fontweight='bold') ax.grid(True, alpha=0.3) def plot_price_range_analysis(self, ax): """Plot price range analysis""" data_to_plot = [ self.df[~self.df['hasAnomaly']]['priceRange'], self.df[self.df['hasAnomaly']]['priceRange'] ] bp = ax.boxplot(data_to_plot, labels=['Normal', 'Anomaly'], patch_artist=True, showmeans=True) colors = ['lightgreen', 'lightcoral'] for patch, color in zip(bp['boxes'], colors): patch.set_facecolor(color) ax.set_ylabel('Price Range') ax.set_title('Price Range Comparison', fontweight='bold') ax.grid(True, alpha=0.3) def plot_volume_changes(self, ax): """Plot volume changes over time""" ax.plot(self.df['timestamp'], self.df['volumeChange1'], label='Volume Change', color='purple', linewidth=1.5) # Add zero line ax.axhline(y=0, color='black', linestyle='--', alpha=0.3) # Highlight anomalies anomalies = self.df[self.df['hasAnomaly']] if len(anomalies) > 0: ax.scatter(anomalies['timestamp'], anomalies['volumeChange1'], color='red', s=50, marker='x', linewidths=2, label='Anomalies', zorder=5) ax.set_xlabel('Time') ax.set_ylabel('Volume Change Ratio') ax.set_title('Volume Changes Over Time', fontweight='bold') ax.legend() ax.grid(True, alpha=0.3) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right') def plot_correlation_heatmap(self, ax): """Plot feature correlation heatmap""" # Select numeric features numeric_cols = ['open', 'high', 'low', 'close', 'volume', 'volumeUsd', 'priceRange', 'bodySize', 'volatility', 'zScore', 'distanceFromMean', 'priceChange1', 'volumeChange1'] corr = self.df[numeric_cols].corr() sns.heatmap(corr, annot=False, cmap='coolwarm', center=0, square=True, linewidths=0.5, ax=ax, cbar_kws={"shrink": 0.8}) ax.set_title('Feature Correlation Heatmap', fontweight='bold', pad=10) plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right') plt.setp(ax.yaxis.get_majorticklabels(), rotation=0) def create_anomaly_timeline(self, output_file='anomaly_timeline.png'): """Create detailed anomaly timeline""" print("\n📈 Creating anomaly timeline...") fig, axes = plt.subplots(3, 1, figsize=(16, 10), sharex=True) # Price with anomalies axes[0].plot(self.df['timestamp'], self.df['close'], label='Close Price', color='blue', linewidth=2) anomalies = self.df[self.df['hasAnomaly']] if len(anomalies) > 0: axes[0].scatter(anomalies['timestamp'], anomalies['close'], color='red', s=150, marker='X', linewidths=2, label='Anomalies', zorder=5, edgecolors='black') axes[0].set_ylabel('Price') axes[0].set_title('Price Timeline', fontweight='bold') axes[0].legend() axes[0].grid(True, alpha=0.3) # Volume with anomalies axes[1].fill_between(self.df['timestamp'], self.df['volumeUsd'], color='green', alpha=0.5, label='Volume USD') if len(anomalies) > 0: axes[1].scatter(anomalies['timestamp'], anomalies['volumeUsd'], color='red', s=150, marker='X', linewidths=2, label='Anomalies', zorder=5, edgecolors='black') axes[1].set_ylabel('Volume (USD)') axes[1].set_title('Volume Timeline', fontweight='bold') axes[1].legend() axes[1].grid(True, alpha=0.3) # Volatility with anomalies axes[2].plot(self.df['timestamp'], self.df['volatility'], color='orange', linewidth=2, label='Volatility') if len(anomalies) > 0: axes[2].scatter(anomalies['timestamp'], anomalies['volatility'], color='red', s=150, marker='X', linewidths=2, label='Anomalies', zorder=5, edgecolors='black') axes[2].set_xlabel('Time') axes[2].set_ylabel('Volatility') axes[2].set_title('Volatility Timeline', fontweight='bold') axes[2].legend() axes[2].grid(True, alpha=0.3) plt.suptitle('🔍 Detailed Anomaly Timeline Analysis', fontsize=18, fontweight='bold', y=0.995) for ax in axes: plt.setp(ax.xaxis.get_majorticklabels(), rotation=45, ha='right') plt.tight_layout() plt.savefig(output_file, dpi=300, bbox_inches='tight') print(f"✓ Timeline saved to {output_file}") plt.close() def create_statistics_report(self, output_file='statistics_report.txt'): """Generate detailed statistics report""" print("\n📝 Generating statistics report...") report = [] report.append("="*70) report.append("BLOCKCHAIN SECURITY ANALYTICS - STATISTICS REPORT") report.append("="*70) report.append(f"\nReport Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report.append(f"\nData File: {self.data_path}") report.append(f"Total Records: {len(self.df)}") report.append(f"Time Range: {self.df['timestamp'].min()} to {self.df['timestamp'].max()}") report.append("\n" + "-"*70) report.append("ANOMALY SUMMARY") report.append("-"*70) report.append(f"Total Anomalies: {self.df['hasAnomaly'].sum()}") report.append(f"Anomaly Rate: {self.df['hasAnomaly'].mean()*100:.2f}%") report.append(f"Normal Records: {(~self.df['hasAnomaly']).sum()}") # Anomaly types anomaly_types = [] for types_str in self.df[self.df['hasAnomaly']]['anomalyTypes']: if types_str: anomaly_types.extend(types_str.split(',')) if anomaly_types: report.append("\nAnomaly Types:") type_counts = pd.Series(anomaly_types).value_counts() for atype, count in type_counts.items(): report.append(f" - {atype}: {count}") report.append("\n" + "-"*70) report.append("PRICE STATISTICS") report.append("-"*70) report.append(f"Mean Close Price: {self.df['close'].mean():.6f}") report.append(f"Std Close Price: {self.df['close'].std():.6f}") report.append(f"Min Close Price: {self.df['close'].min():.6f}") report.append(f"Max Close Price: {self.df['close'].max():.6f}") report.append("\n" + "-"*70) report.append("VOLUME STATISTICS") report.append("-"*70) report.append(f"Mean Volume (USD): ${self.df['volumeUsd'].mean():,.2f}") report.append(f"Std Volume (USD): ${self.df['volumeUsd'].std():,.2f}") report.append(f"Min Volume (USD): ${self.df['volumeUsd'].min():,.2f}") report.append(f"Max Volume (USD): ${self.df['volumeUsd'].max():,.2f}") report.append(f"Total Volume (USD): ${self.df['volumeUsd'].sum():,.2f}") report.append("\n" + "-"*70) report.append("VOLATILITY STATISTICS") report.append("-"*70) report.append(f"Mean Volatility: {self.df['volatility'].mean():.6f}") report.append(f"Max Volatility: {self.df['volatility'].max():.6f}") report.append(f"Volatility Std: {self.df['volatility'].std():.6f}") # Comparison: Normal vs Anomaly report.append("\n" + "-"*70) report.append("COMPARATIVE ANALYSIS (Normal vs Anomaly)") report.append("-"*70) normal_df = self.df[~self.df['hasAnomaly']] anomaly_df = self.df[self.df['hasAnomaly']] if len(anomaly_df) > 0: report.append(f"\nVolume (USD):") report.append(f" Normal Mean: ${normal_df['volumeUsd'].mean():,.2f}") report.append(f" Anomaly Mean: ${anomaly_df['volumeUsd'].mean():,.2f}") report.append(f"\nVolatility:") report.append(f" Normal Mean: {normal_df['volatility'].mean():.6f}") report.append(f" Anomaly Mean: {anomaly_df['volatility'].mean():.6f}") report.append(f"\nPrice Range:") report.append(f" Normal Mean: {normal_df['priceRange'].mean():.6f}") report.append(f" Anomaly Mean: {anomaly_df['priceRange'].mean():.6f}") report.append("\n" + "="*70) report.append("END OF REPORT") report.append("="*70) # Save report with open(output_file, 'w') as f: f.write('\n'.join(report)) print(f"✓ Statistics report saved to {output_file}") # Also print to console print('\n'.join(report)) def main(): """Main visualization pipeline""" print("="*70) print("📊 BLOCKCHAIN SECURITY ANALYTICS - VISUALIZATION DASHBOARD") print("="*70) # Find most recent data file data_dir = Path('security_data') if not data_dir.exists(): print("\n❌ Error: security_data directory not found!") return jsonl_files = list(data_dir.glob('*.jsonl')) if not jsonl_files: print("\n❌ Error: No .jsonl files found!") return data_file = max(jsonl_files, key=lambda p: p.stat().st_mtime) print(f"\n📁 Using data file: {data_file}") try: # Create dashboard dashboard = SecurityVisualizationDashboard(data_file) # Generate all visualizations dashboard.create_dashboard('security_dashboard.png') dashboard.create_anomaly_timeline('anomaly_timeline.png') dashboard.create_statistics_report('statistics_report.txt') print("\n" + "="*70) print("✅ VISUALIZATION COMPLETE!") print("="*70) print("\n📦 Generated Files:") print(" - security_dashboard.png") print(" - anomaly_timeline.png") print(" - statistics_report.txt") except Exception as e: print(f"\n❌ Error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()

After running the script , 

It will : 

  • Builds a Security Analytics Visualization Dashboard for blockchain anomaly detection data.
    Loads, analyzes, and visualizes data from JSONL files containing price, volume, and anomaly information.

  • Produces a dashboard, anomaly timeline, and statistical report summarizing the dataset.


 Core Components

1. Imports & Configuration

  • Imports standard libraries (json, pandas, numpy, matplotlib, seaborn, datetime, pathlib) for data processing and plotting.

  • Sets Seaborn style and figure size for consistent, clean visualization.


2. SecurityVisualizationDashboard Class

This class handles the entire data pipeline — from loading data to generating visualizations and reports.

a. __init__

  • Initializes with a JSONL data file path.

  • Calls load_data() automatically upon creation.

b. load_data()

  • Reads JSONL file line-by-line and parses each record.

  • Extracts and flattens relevant fields:

  • timestamp, hasAnomaly, anomalyTypes, severity, and features (price/volume metrics).

  • Converts timestamp to datetime format.

  • Displays:

  • Total records loaded

  • Number of anomalies vs. normal data points.


3. create_dashboard()

  • Generates a multi-plot dashboard (8 visual panels in one figure):

  • Price & Volume Timeline with anomalies highlighted.

  • Anomaly Distribution Pie Chart.

  • Anomaly Type Distribution.

  • Volume Distribution (Normal vs. Anomaly).

  • Volatility Comparison (Boxplot).

  • Price Range Comparison (Boxplot).

  • Volume Changes Over Time.

  • Correlation Heatmap of numerical features.

  • Exports the dashboard as dashboard.png.


4. Plotting Functions

Each method handles one part of the visualization dashboard:

  • plot_price_volume_timeline() → Plots close price and volume over time, marking anomalies in red.

  • plot_anomaly_distribution() → Pie chart showing anomaly vs normal data percentage.

  • plot_anomaly_types() → Bar chart showing which anomaly types occurred and their counts.

  • plot_volume_distribution() → Histogram comparing volume distributions for normal vs anomalous data.

  • plot_volatility_analysis() → Boxplot comparing volatility in normal vs anomalous records.

  • plot_price_range_analysis() → Boxplot comparing price ranges.

  • plot_volume_changes() → Time-series of volume change ratios with anomalies highlighted.

  • plot_correlation_heatmap() → Heatmap showing correlation among numeric features (e.g., open, close, volume, volatility).


5. create_anomaly_timeline()

  • Builds a 3-panel time-series visualization for:

  • Close price

  • Volume in USD

  • Volatility

  • Highlights anomalies in red across all timelines.

  • Saves output as anomaly_timeline.png.


6. create_statistics_report()

  • Generates a text-based analytics report containing:

  • Data summary (file name, total records, time range)

  • Anomaly counts and rates

  • Detailed anomaly type counts

  • Descriptive statistics for price, volume, and volatility

  • Comparative metrics (Normal vs. Anomaly)

  • Saves the report to statistics_report.txt.


7. main() Function

  • Searches the security_data/ directory for the most recent .jsonl data file.

  • Initializes the dashboard class using that file.

  • Runs the full visualization pipeline:

  • Creates dashboard image

  • Creates anomaly timeline image

  • Creates statistics report

  • Prints all output paths and completion status.

Output Files

When executed, this script generates:

  • security_dashboard.png → full visualization dashboard

  • anomaly_timeline.png → anomaly-focused time analysis

  • statistics_report.txt → text summary of all metrics

Wrapping It Up: When Data Gets Superpowers

And there we have it — from raw blockchain noise to real-time intelligence, we’ve just turned Goldrush into your very own Web3 guardian angel. 

What started as a simple anomaly detector is now a full-blown incident response machine that doesn’t just see the chaos… it predicts it, classifies it, and sometimes even laughs at it before fixing it. (Okay, maybe not the last part — but we’re getting there.)

With Goldrush’s high-frequency data streams, ML-powered anomaly detection, and lightning-fast performance benchmarking, you’re not just monitoring your network — you’re building the early warning system of decentralized finance.

Because in Web3, milliseconds matter, and prevention always beats panic.

So whether you’re protecting validators on Solana, sniffing out rug pulls before they happen, or just flexing your data wizardry — remember:

Goldrush isn’t just observability. It’s foresight — on-chain.

Stay curious, stay safe, and may your nodes always stay stable

Get Started

Get started with GoldRush API in minutes. Sign up for a free API key and start building.

Support

Explore multiple support options! From FAQs for self-help to real-time interactions on Discord.

Contact Sales

Interested in our professional or enterprise plans? Contact our sales team to learn more.