# frozen_string_literal: true # Service for managing DuckDB analytics database # Provides fast analytical queries on events data using columnar storage class AnalyticsDuckdbService include Singleton DUCKDB_PATH = Rails.root.join("storage", "analytics.duckdb").to_s BATCH_SIZE = 10_000 # Execute block with connection, ensuring database and connection are closed afterward def with_connection db = DuckDB::Database.open(DUCKDB_PATH) conn = db.connect yield conn ensure conn&.close db&.close end # Create events table if it doesn't exist (must be called within with_connection block) def setup_schema(conn) conn.execute(<<~SQL) CREATE TABLE IF NOT EXISTS events ( id BIGINT PRIMARY KEY, timestamp TIMESTAMP NOT NULL, ip_address VARCHAR, network_range_id BIGINT, country VARCHAR, company VARCHAR, asn INTEGER, asn_org VARCHAR, is_datacenter BOOLEAN, is_vpn BOOLEAN, is_proxy BOOLEAN, waf_action INTEGER, request_path VARCHAR, user_agent VARCHAR ) SQL Rails.logger.info "[DuckDB] Schema setup complete" end # Get timestamp of oldest event in DuckDB # Returns nil if table is empty def oldest_event_timestamp with_connection do |conn| result = conn.query("SELECT MIN(timestamp) as oldest FROM events") first_row = result.first first_row&.first # Returns the value or nil end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}" nil end # Get timestamp of newest event in DuckDB # Returns nil if table is empty def newest_event_timestamp with_connection do |conn| result = conn.query("SELECT MAX(timestamp) as newest FROM events") first_row = result.first first_row&.first # Returns the value or nil end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}" nil end # Get maximum event ID already synced to DuckDB def max_synced_id with_connection do |conn| result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events") first_row = result.first first_row&.first || 0 end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}" 0 end # Sync new events from PostgreSQL to DuckDB # Uses PostgreSQL cursor for memory-efficient streaming # Uses Appender API for fast bulk inserts # Filters by ID to avoid duplicates def sync_new_events(from_timestamp) total_synced = 0 with_connection do |conn| # Ensure table exists setup_schema(conn) # Get max ID already in DuckDB to avoid duplicates max_id_result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events") max_id = max_id_result.first&.first || 0 Rails.logger.info "[DuckDB] Syncing events from #{from_timestamp}, max_id=#{max_id}" start_time = Time.current appender = nil batch_count = 0 begin # Use PostgreSQL cursor for memory-efficient streaming Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id) .select( :id, :timestamp, :ip_address, :network_range_id, :country, :company, :asn, :asn_org, :is_datacenter, :is_vpn, :is_proxy, :waf_action, :request_path, :user_agent ) .order(:id) .each_row(block_size: BATCH_SIZE) do |event_data| # Create new appender for each batch if batch_count % BATCH_SIZE == 0 appender&.close # Close previous appender appender = conn.appender("events") end # Unpack event data from cursor row (Hash from each_row) begin appender.append_row( event_data["id"], event_data["timestamp"], event_data["ip_address"]&.to_s, event_data["network_range_id"], event_data["country"], event_data["company"], event_data["asn"], event_data["asn_org"], event_data["is_datacenter"], event_data["is_vpn"], event_data["is_proxy"], event_data["waf_action"], event_data["request_path"], event_data["user_agent"] ) rescue StandardError => e Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}" Rails.logger.error "[DuckDB] event_data = #{event_data.inspect}" raise end batch_count += 1 total_synced += 1 # Log progress every BATCH_SIZE events if batch_count % BATCH_SIZE == 0 Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)" end end # Close final appender appender&.close duration = Time.current - start_time rate = total_synced / duration if duration > 0 Rails.logger.info "[DuckDB] Sync complete: #{total_synced} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)" rescue StandardError => e appender&.close rescue nil # Ensure appender is closed on error Rails.logger.error "[DuckDB] Error syncing events: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise # Re-raise to be caught by outer rescue end end total_synced rescue StandardError => e Rails.logger.error "[DuckDB] Sync failed: #{e.message}" 0 end # Execute analytical query on DuckDB def query(sql, *params) with_connection do |conn| conn.query(sql, *params) end rescue StandardError => e Rails.logger.error "[DuckDB] Query error: #{e.message}" Rails.logger.error "SQL: #{sql}" raise end # Get event count in DuckDB def event_count with_connection do |conn| result = conn.query("SELECT COUNT(*) as count FROM events") first_row = result.first first_row&.first || 0 end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting event count: #{e.message}" 0 end # Analytics query: Total events since timestamp def total_events_since(start_time) with_connection do |conn| result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time) result.first&.first || 0 end end # Analytics query: Event breakdown by WAF action def event_breakdown_by_action(start_time) with_connection do |conn| result = conn.query(<<~SQL, start_time) SELECT waf_action, COUNT(*) as count FROM events WHERE timestamp >= ? GROUP BY waf_action SQL # Convert to hash like PostgreSQL returns result.to_a.to_h { |row| [row["waf_action"], row["count"]] } end end # Analytics query: Top countries def top_countries(start_time, limit = 10) with_connection do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT country, COUNT(*) as count FROM events WHERE timestamp >= ? AND country IS NOT NULL GROUP BY country ORDER BY count DESC LIMIT ? SQL result.to_a.map { |row| [row["country"], row["count"]] } end end # Analytics query: Top blocked IPs def top_blocked_ips(start_time, limit = 10) with_connection do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT ip_address, COUNT(*) as count FROM events WHERE timestamp >= ? AND waf_action = 0 GROUP BY ip_address ORDER BY count DESC LIMIT ? SQL result.to_a.map { |row| [row["ip_address"], row["count"]] } end end # Analytics query: Hourly timeline (events grouped by hour) def hourly_timeline(start_time, end_time) with_connection do |conn| result = conn.query(<<~SQL, start_time, end_time) SELECT DATE_TRUNC('hour', timestamp) as hour, COUNT(*) as count FROM events WHERE timestamp >= ? AND timestamp < ? GROUP BY hour ORDER BY hour SQL # Convert to hash with Time keys like PostgreSQL result.to_a.to_h { |row| [row["hour"], row["count"]] } end end # Close DuckDB connection (for cleanup/testing) def close @connection&.close @connection = nil end end