# frozen_string_literal: true # Service for managing DuckDB analytics database # Provides fast analytical queries on events data using columnar storage class AnalyticsDuckdbService include Singleton BATCH_SIZE = 10_000 MAX_EVENTS_PER_SYNC = 50_000 # Limit events per job run to prevent OOM # Execute block with DuckDB connection # Always uses in-memory database (no file locks, no conflicts) # Used for writing parquet files and querying parquet files def with_connection db = DuckDB::Database.open(":memory:") conn = db.connect yield conn ensure conn&.close db&.close end # Create events table if it doesn't exist (must be called within with_connection block) def setup_schema(conn) conn.execute(<<~SQL) CREATE TABLE IF NOT EXISTS events ( id BIGINT PRIMARY KEY, timestamp TIMESTAMP NOT NULL, ip_address VARCHAR, network_range_id BIGINT, country VARCHAR, company VARCHAR, asn INTEGER, asn_org VARCHAR, is_datacenter BOOLEAN, is_vpn BOOLEAN, is_proxy BOOLEAN, is_bot BOOLEAN, waf_action INTEGER, request_method INTEGER, response_status INTEGER, rule_id BIGINT, request_path VARCHAR, user_agent VARCHAR, tags VARCHAR[] ) SQL # Create indexes for common query patterns create_indexes(conn) Rails.logger.info "[DuckDB] Schema setup complete" end # Create indexes for fast querying def create_indexes(conn) indexes = [ "CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp DESC)", "CREATE INDEX IF NOT EXISTS idx_events_network_range_id ON events(network_range_id)", "CREATE INDEX IF NOT EXISTS idx_events_ip_address ON events(ip_address)", "CREATE INDEX IF NOT EXISTS idx_events_waf_action ON events(waf_action)", "CREATE INDEX IF NOT EXISTS idx_events_country ON events(country)", "CREATE INDEX IF NOT EXISTS idx_events_company ON events(company)", "CREATE INDEX IF NOT EXISTS idx_events_asn ON events(asn)", "CREATE INDEX IF NOT EXISTS idx_events_rule_id ON events(rule_id)", "CREATE INDEX IF NOT EXISTS idx_events_is_bot ON events(is_bot)", "CREATE INDEX IF NOT EXISTS idx_events_is_datacenter ON events(is_datacenter)", "CREATE INDEX IF NOT EXISTS idx_events_is_vpn ON events(is_vpn)", "CREATE INDEX IF NOT EXISTS idx_events_is_proxy ON events(is_proxy)" ] indexes.each do |index_sql| conn.execute(index_sql) end Rails.logger.info "[DuckDB] Indexes created" rescue StandardError => e Rails.logger.warn "[DuckDB] Index creation warning: #{e.message}" end # Get timestamp of oldest event in DuckDB # Returns nil if table is empty def oldest_event_timestamp with_connection do |conn| result = conn.query("SELECT MIN(timestamp) as oldest FROM events") first_row = result.first first_row&.first # Returns the value or nil end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}" raise end # Get timestamp of newest event in DuckDB # Returns nil if table is empty def newest_event_timestamp with_connection do |conn| result = conn.query("SELECT MAX(timestamp) as newest FROM events") first_row = result.first first_row&.first # Returns the value or nil end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}" raise end # Get maximum event ID already synced to DuckDB def max_synced_id with_connection do |conn| result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events") first_row = result.first first_row&.first || 0 end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}" raise end # Export new events from PostgreSQL directly to timestamped Parquet file # Uses PostgreSQL cursor for memory-efficient streaming # Writes to minute/YYYYMMDDHHmmss.parquet # @param from_timestamp [Time] Start timestamp to export from # @param max_id [Integer] Maximum event ID already exported (to avoid duplicates) # @return [Hash] { count: Integer, file_path: String, max_id: Integer } def export_new_events_to_parquet(from_timestamp, max_id = 0) ensure_parquet_directories total_exported = 0 exported_max_id = max_id timestamp = Time.current.utc.strftime("%Y%m%d%H%M%S") parquet_file = PARQUET_MINUTE_PATH.join("#{timestamp}.parquet") Rails.logger.info "[Parquet] Exporting events from #{from_timestamp}, max_id=#{max_id} to #{parquet_file}" start_time = Time.current with_connection do |conn| # Create temporary table in memory setup_schema(conn) appender = nil batch_count = 0 begin # Create appender for in-memory table appender = conn.appender("events") # Stream from PostgreSQL cursor and append to DuckDB in-memory table # Limit to MAX_EVENTS_PER_SYNC to prevent OOM on large backlogs Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id) .select( :id, :timestamp, :ip_address, :network_range_id, :country, :company, :asn, :asn_org, :is_datacenter, :is_vpn, :is_proxy, :is_bot, :waf_action, :request_method, :response_status, :rule_id, :request_path, :user_agent, :tags ) .order(:id) .limit(MAX_EVENTS_PER_SYNC) .each_row(block_size: BATCH_SIZE) do |event_data| begin appender.append_row( event_data["id"], event_data["timestamp"], event_data["ip_address"]&.to_s, event_data["network_range_id"], event_data["country"], event_data["company"], event_data["asn"], event_data["asn_org"], event_data["is_datacenter"], event_data["is_vpn"], event_data["is_proxy"], event_data["is_bot"], event_data["waf_action"], event_data["request_method"], event_data["response_status"], event_data["rule_id"], event_data["request_path"], event_data["user_agent"], event_data["tags"] || [] ) # Track maximum exported ID exported_max_id = [exported_max_id, event_data["id"]].max rescue StandardError => e Rails.logger.error "[Parquet] Error appending event #{event_data['id']}: #{e.message}" Rails.logger.error "[Parquet] event_data = #{event_data.inspect}" raise end batch_count += 1 total_exported += 1 # Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow if batch_count % BATCH_SIZE == 0 appender.close appender = conn.appender("events") Rails.logger.info "[Parquet] Loaded batch (total: #{total_exported} events)" end end # Close appender appender&.close # Export in-memory table to parquet file conn.execute(<<~SQL) COPY (SELECT * FROM events ORDER BY timestamp DESC) TO '#{parquet_file}' (FORMAT PARQUET, COMPRESSION ZSTD) SQL duration = Time.current - start_time rate = total_exported / duration if duration > 0 # Log completion and check if there are more events to export if total_exported >= MAX_EVENTS_PER_SYNC Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec) - hit limit, more events may be pending" else Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)" end rescue StandardError => e appender&.close rescue nil # Ensure appender is closed on error Rails.logger.error "[Parquet] Error exporting events: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise # Re-raise to be caught by outer rescue end end { count: total_exported, file_path: parquet_file.to_s, max_id: exported_max_id } rescue StandardError => e Rails.logger.error "[Parquet] Export failed: #{e.message}" raise end # Execute analytical query on DuckDB def query(sql, *params) with_connection do |conn| conn.query(sql, *params) end rescue StandardError => e Rails.logger.error "[DuckDB] Query error: #{e.message}" Rails.logger.error "SQL: #{sql}" raise end # Get event count in DuckDB def event_count with_connection do |conn| result = conn.query("SELECT COUNT(*) as count FROM events") first_row = result.first first_row&.first || 0 end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting event count: #{e.message}" raise end # Analytics query: Total events since timestamp def total_events_since(start_time) with_connection do |conn| result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time) result.first&.first || 0 end end # Analytics query: Event breakdown by WAF action def event_breakdown_by_action(start_time) with_connection do |conn| result = conn.query(<<~SQL, start_time) SELECT waf_action, COUNT(*) as count FROM events WHERE timestamp >= ? GROUP BY waf_action SQL # Convert to hash like PostgreSQL returns # DuckDB returns arrays: [waf_action, count] result.to_a.to_h { |row| [row[0], row[1]] } end end # Analytics query: Top countries def top_countries(start_time, limit = 10) with_connection do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT country, COUNT(*) as count FROM events WHERE timestamp >= ? AND country IS NOT NULL GROUP BY country ORDER BY count DESC LIMIT ? SQL # DuckDB returns arrays: [country, count] result.to_a.map { |row| [row[0], row[1]] } end end # Analytics query: Top blocked IPs def top_blocked_ips(start_time, limit = 10) with_connection do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT ip_address, COUNT(*) as count FROM events WHERE timestamp >= ? AND waf_action = 0 GROUP BY ip_address ORDER BY count DESC LIMIT ? SQL # DuckDB returns arrays: [ip_address, count] result.to_a.map { |row| [row[0], row[1]] } end end # Analytics query: Hourly timeline (events grouped by hour) def hourly_timeline(start_time, end_time) with_connection do |conn| result = conn.query(<<~SQL, start_time, end_time) SELECT DATE_TRUNC('hour', timestamp) as hour, COUNT(*) as count FROM events WHERE timestamp >= ? AND timestamp < ? GROUP BY hour ORDER BY hour SQL # Convert to hash with Time keys like PostgreSQL # DuckDB returns arrays: [hour, count] result.to_a.to_h { |row| [row[0], row[1]] } end end # Close DuckDB connection (for cleanup/testing) def close @connection&.close @connection = nil end # ============================================================================ # PARQUET EXPORT SYSTEM # ============================================================================ PARQUET_BASE_PATH = Rails.root.join("storage", "parquet") PARQUET_MINUTE_PATH = PARQUET_BASE_PATH.join("minute") PARQUET_HOURS_PATH = PARQUET_BASE_PATH.join("hours") PARQUET_DAYS_PATH = PARQUET_BASE_PATH.join("days") PARQUET_WEEKS_PATH = PARQUET_BASE_PATH.join("weeks") WEEK_RETENTION = ENV.fetch("PARQUET_WEEK_RETENTION", 104).to_i # Keep N weeks (default: 104 = 2 years) # One-time export of entire DuckDB to Parquet (bootstrap) # Exports all data and organizes into week files # Memory-efficient: processes one week at a time with new connections def export_all_to_parquet ensure_parquet_directories # Get date range first, then close connection min_time, max_time = with_connection do |conn| result = conn.query("SELECT MIN(timestamp) as min_time, MAX(timestamp) as max_time FROM events") row = result.first return unless row && row[0] && row[1] [Time.parse(row[0].to_s), Time.parse(row[1].to_s)] end Rails.logger.info "[Parquet] Exporting all events from #{min_time} to #{max_time}" # Export week by week with separate connections (more memory efficient) current_week_start = min_time.beginning_of_week weeks_exported = 0 while current_week_start <= max_time week_end = current_week_start.end_of_week year = current_week_start.year week_num = current_week_start.strftime("%U").to_i week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet") # Skip if week file already exists unless File.exist?(week_file) Rails.logger.info "[Parquet] Exporting week #{year}-#{week_num} (#{current_week_start} to #{week_end})" # Use separate connection per week to limit memory usage with_connection do |conn| # COPY directly without ORDER BY to save memory # Parquet files can be sorted during queries if needed conn.execute(<<~SQL) COPY ( SELECT * FROM events WHERE timestamp >= '#{current_week_start.iso8601}' AND timestamp < '#{week_end.iso8601}' ) TO '#{week_file}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000) SQL end weeks_exported += 1 # Force garbage collection after each week to free memory GC.start end current_week_start += 1.week end Rails.logger.info "[Parquet] Bootstrap complete: exported #{weeks_exported} weeks" rescue StandardError => e Rails.logger.error "[Parquet] Error in bootstrap export: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise end # Consolidate completed hour's minute files into hour file # @param time [Time] The hour to consolidate def consolidate_hour_to_day(time) ensure_parquet_directories hour = time.utc.hour day_of_year = time.utc.yday hour_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet") hour_temp_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet.temp") day_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet") day_temp_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet.temp") # Find all minute files from previous hour hour_prefix = time.utc.strftime("%Y%m%d%H") minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("#{hour_prefix}*.parquet")) if minute_files.empty? Rails.logger.info "[Parquet] No minute files found for hour #{hour_prefix}" return end with_connection do |conn| Rails.logger.info "[Parquet] Consolidating #{minute_files.size} minute files from hour #{hour} into day #{day_of_year}" # Merge minute files into hour file using .temp file_list = minute_files.map { |f| "'#{f}'" }.join(", ") conn.execute(<<~SQL) COPY ( SELECT * FROM read_parquet([#{file_list}]) ORDER BY timestamp DESC ) TO '#{hour_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) SQL # Atomic rename FileUtils.mv(hour_temp_file, hour_file, force: true) # Now merge hour file into day file if File.exist?(day_file) # Merge hour data into existing day file conn.execute(<<~SQL) COPY ( SELECT * FROM read_parquet(['#{day_file}', '#{hour_file}']) ORDER BY timestamp DESC ) TO '#{day_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) SQL # Replace old day file with merged file FileUtils.mv(day_temp_file, day_file, force: true) # Delete hour file after merging into day File.delete(hour_file) else # First hour of the day - just rename hour file to day file FileUtils.mv(hour_file, day_file) end # Delete the minute files after successful consolidation minute_files.each { |f| File.delete(f) } Rails.logger.info "[Parquet] Consolidated #{minute_files.size} minute files into #{day_file}, deleted source files" end rescue StandardError => e Rails.logger.error "[Parquet] Error consolidating hour: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise end # Consolidate completed week into archive # @param week_start [Time] The start of the week to consolidate def consolidate_days_to_week(week_start) ensure_parquet_directories year = week_start.year week_num = week_start.strftime("%U").to_i # Week number (00-53) week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet") week_temp_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet.temp") # Collect day files for this week (7 days) day_files = (0..6).map do |offset| day = week_start + offset.days day_of_year = day.yday PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet") end.select { |f| File.exist?(f) } return if day_files.empty? with_connection do |conn| Rails.logger.info "[Parquet] Consolidating #{day_files.size} days into week #{year}-#{week_num}" # Merge all day files into week archive using .temp file_list = day_files.map { |f| "'#{f}'" }.join(", ") conn.execute(<<~SQL) COPY ( SELECT * FROM read_parquet([#{file_list}]) ORDER BY timestamp DESC ) TO '#{week_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) SQL # Atomic rename FileUtils.mv(week_temp_file, week_file, force: true) # Delete day files after successful consolidation day_files.each { |f| File.delete(f) } Rails.logger.info "[Parquet] Consolidated week #{year}-#{week_num}, deleted #{day_files.size} day files" end # Cleanup old weeks cleanup_old_weeks rescue StandardError => e Rails.logger.error "[Parquet] Error consolidating week: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise end # Build list of Parquet files to query for a given time range # @param start_time [Time] Start of query range # @param end_time [Time] End of query range (defaults to now) # @return [Array] List of Parquet file paths def parquet_files_for_range(start_time, end_time = Time.current) files = [] # Add minute files (most recent, not yet consolidated) minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("*.parquet")) files.concat(minute_files) # Add hour files (consolidated but not yet in day files) hour_files = Dir.glob(PARQUET_HOURS_PATH.join("*.parquet")) files.concat(hour_files) # Add relevant day files day_files = Dir.glob(PARQUET_DAYS_PATH.join("*.parquet")) files.concat(day_files) # Add relevant week files based on time range # For simplicity, include all weeks (DuckDB will filter) week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")) files.concat(week_files) files.sort end # Query Parquet files using in-memory DuckDB (no file locks) # @param block [Block] Block that receives DuckDB connection def with_parquet_connection(&block) # Open in-memory DuckDB database (no file locks) db = DuckDB::Database.open(":memory:") conn = db.connect yield conn ensure conn&.close db&.close end # Cleanup old week archives beyond retention period def cleanup_old_weeks week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")).sort.reverse if week_files.size > WEEK_RETENTION files_to_delete = week_files[WEEK_RETENTION..-1] files_to_delete.each do |file| File.delete(file) Rails.logger.info "[Parquet] Deleted old week archive: #{file}" end end end private # Ensure Parquet directory structure exists def ensure_parquet_directories [PARQUET_MINUTE_PATH, PARQUET_HOURS_PATH, PARQUET_DAYS_PATH, PARQUET_WEEKS_PATH].each do |path| FileUtils.mkdir_p(path) unless Dir.exist?(path) end end end