diff --git a/app/controllers/network_ranges_controller.rb b/app/controllers/network_ranges_controller.rb index 5dc13a1..2df997c 100644 --- a/app/controllers/network_ranges_controller.rb +++ b/app/controllers/network_ranges_controller.rb @@ -256,8 +256,8 @@ class NetworkRangesController < ApplicationController def calculate_traffic_stats(network_range) if network_range.persisted? - # Real network - use cached events_count for total requests (much more performant) - if network_range.events_count > 0 + # Real network - check if network has events using DuckDB for performance + if network_range.has_events? # Use indexed network_range_id for much better performance instead of expensive CIDR operator # Include child network ranges to capture all traffic within this network block network_ids = [network_range.id] + network_range.child_ranges.pluck(:id) diff --git a/app/jobs/bootstrap_parquet_export_job.rb b/app/jobs/bootstrap_parquet_export_job.rb new file mode 100644 index 0000000..e5925cb --- /dev/null +++ b/app/jobs/bootstrap_parquet_export_job.rb @@ -0,0 +1,52 @@ +# frozen_string_literal: true + +# One-time job to bootstrap Parquet export system +# Exports all existing DuckDB data to weekly Parquet archives +# Run this once when setting up Parquet exports for the first time +# +# Usage: +# BootstrapParquetExportJob.perform_now +# # or via docker: +# docker compose exec jobs bin/rails runner "BootstrapParquetExportJob.perform_now" +class BootstrapParquetExportJob < ApplicationJob + queue_as :default + + def perform + service = AnalyticsDuckdbService.instance + + # Check if DuckDB has any data + event_count = service.event_count + Rails.logger.info "[Parquet Bootstrap] DuckDB event count: #{event_count}" + + if event_count == 0 + Rails.logger.warn "[Parquet Bootstrap] No events in DuckDB. Run SyncEventsToDuckdbJob first." + return + end + + # Check if Parquet files already exist + existing_weeks = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size + if existing_weeks > 0 + Rails.logger.info "[Parquet Bootstrap] Found #{existing_weeks} existing week archives" + end + + Rails.logger.info "[Parquet Bootstrap] Starting export of all DuckDB data to Parquet..." + + start_time = Time.current + + # Run the bootstrap export + service.export_all_to_parquet + + duration = Time.current - start_time + week_count = Dir.glob(AnalyticsDuckdbService::PARQUET_WEEKS_PATH.join("*.parquet")).size + + Rails.logger.info "[Parquet Bootstrap] Complete!" + Rails.logger.info "[Parquet Bootstrap] - Time taken: #{duration.round(2)} seconds" + Rails.logger.info "[Parquet Bootstrap] - Week archives: #{week_count}" + Rails.logger.info "[Parquet Bootstrap] - Storage: #{AnalyticsDuckdbService::PARQUET_BASE_PATH}" + Rails.logger.info "[Parquet Bootstrap] System is ready - jobs will maintain exports automatically" + rescue StandardError => e + Rails.logger.error "[Parquet Bootstrap] Job failed: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise # Re-raise to mark job as failed + end +end diff --git a/app/jobs/consolidate_parquet_hourly_job.rb b/app/jobs/consolidate_parquet_hourly_job.rb new file mode 100644 index 0000000..4a32873 --- /dev/null +++ b/app/jobs/consolidate_parquet_hourly_job.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +# Background job to consolidate completed hour into day file +# Runs at :05 past each hour (e.g., 01:05, 02:05, etc.) +# Merges the previous hour's data into the day file and deletes the hour file +class ConsolidateParquetHourlyJob < ApplicationJob + queue_as :default + + def perform + service = AnalyticsDuckdbService.instance + + # Consolidate the previous hour (not current hour, which is still being written) + previous_hour = 1.hour.ago + + Rails.logger.info "[Parquet Consolidate] Starting hourly consolidation for #{previous_hour.strftime('%Y-%m-%d %H:00')}" + + service.consolidate_hour_to_day(previous_hour) + + Rails.logger.info "[Parquet Consolidate] Hourly consolidation complete" + rescue StandardError => e + Rails.logger.error "[Parquet Consolidate] Hourly job failed: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise # Re-raise to mark job as failed in Solid Queue + end +end diff --git a/app/jobs/consolidate_parquet_weekly_job.rb b/app/jobs/consolidate_parquet_weekly_job.rb new file mode 100644 index 0000000..93952dc --- /dev/null +++ b/app/jobs/consolidate_parquet_weekly_job.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +# Background job to consolidate completed week into archive +# Runs Monday at 00:05 (start of new week) +# Merges the previous week's day files into a week archive and deletes day files +class ConsolidateParquetWeeklyJob < ApplicationJob + queue_as :default + + def perform + service = AnalyticsDuckdbService.instance + + # Consolidate the previous week (Monday to Sunday) + previous_week_start = 1.week.ago.beginning_of_week + + Rails.logger.info "[Parquet Consolidate] Starting weekly consolidation for week starting #{previous_week_start.strftime('%Y-%m-%d')}" + + service.consolidate_days_to_week(previous_week_start) + + Rails.logger.info "[Parquet Consolidate] Weekly consolidation complete" + rescue StandardError => e + Rails.logger.error "[Parquet Consolidate] Weekly job failed: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise # Re-raise to mark job as failed in Solid Queue + end +end diff --git a/app/jobs/sync_events_to_duckdb_job.rb b/app/jobs/sync_events_to_duckdb_job.rb deleted file mode 100644 index 5741a18..0000000 --- a/app/jobs/sync_events_to_duckdb_job.rb +++ /dev/null @@ -1,89 +0,0 @@ -# frozen_string_literal: true - -# Background job to sync events from PostgreSQL to DuckDB -# Runs every 5 minutes to keep analytics database up-to-date -# Uses watermark tracking to only sync new events -class SyncEventsToDuckdbJob < ApplicationJob - queue_as :default - - # Key for storing last sync timestamp in Rails cache - WATERMARK_CACHE_KEY = "duckdb_last_sync_time" - WATERMARK_TTL = 1.week - - # Overlap window to catch late-arriving events - SYNC_OVERLAP = 1.minute - - def perform - service = AnalyticsDuckdbService.instance - - # Determine where to start syncing - from_timestamp = determine_sync_start_time(service) - - Rails.logger.info "[DuckDB Sync] Starting sync from #{from_timestamp}" - - # Sync new events using PostgreSQL cursor + DuckDB Appender - # (setup_schema is called internally within sync_new_events) - count = service.sync_new_events(from_timestamp) - - # Update watermark if we synced any events - if count > 0 - update_last_sync_time - Rails.logger.info "[DuckDB Sync] Successfully synced #{count} events" - else - Rails.logger.info "[DuckDB Sync] No new events to sync" - end - rescue StandardError => e - Rails.logger.error "[DuckDB Sync] Job failed: #{e.message}" - Rails.logger.error e.backtrace.join("\n") - raise # Re-raise to mark job as failed in Solid Queue - end - - private - - # Determine timestamp to start syncing from - # Strategy: - # 1. First run (DuckDB empty): sync from oldest PostgreSQL event - # 2. Subsequent runs: sync from last watermark with overlap - def determine_sync_start_time(service) - oldest_duckdb = service.oldest_event_timestamp - - if oldest_duckdb.nil? - # DuckDB is empty - this is the first sync - # Start from oldest PostgreSQL event (or reasonable cutoff) - oldest_pg = Event.minimum(:timestamp) - - if oldest_pg.nil? - # No events in PostgreSQL at all - Rails.logger.warn "[DuckDB Sync] No events found in PostgreSQL" - 1.day.ago # Default to recent window - else - Rails.logger.info "[DuckDB Sync] First sync - starting from oldest event: #{oldest_pg}" - oldest_pg - end - else - # DuckDB has data - sync from last watermark with overlap - last_sync = Rails.cache.read(WATERMARK_CACHE_KEY) - - if last_sync.nil? - # Watermark not in cache (maybe cache expired or restarted) - # Fall back to newest event in DuckDB - newest_duckdb = service.newest_event_timestamp - start_time = newest_duckdb ? newest_duckdb - SYNC_OVERLAP : oldest_duckdb - Rails.logger.info "[DuckDB Sync] Watermark not found, using newest DuckDB event: #{start_time}" - start_time - else - # Normal case: use watermark with overlap to catch late arrivals - start_time = last_sync - SYNC_OVERLAP - Rails.logger.debug "[DuckDB Sync] Using watermark: #{last_sync} (with #{SYNC_OVERLAP}s overlap)" - start_time - end - end - end - - # Update last sync watermark in cache - def update_last_sync_time - now = Time.current - Rails.cache.write(WATERMARK_CACHE_KEY, now, expires_in: WATERMARK_TTL) - Rails.logger.debug "[DuckDB Sync] Updated watermark to #{now}" - end -end diff --git a/app/models/event_ddb.rb b/app/models/event_ddb.rb index caf8864..68cda06 100644 --- a/app/models/event_ddb.rb +++ b/app/models/event_ddb.rb @@ -31,9 +31,31 @@ class EventDdb AnalyticsDuckdbService.instance end + # Helper to load parquet files into in-memory events view + # This allows all existing queries to work without modification + # Uses glob pattern to read all parquet files (excluding .temp files) + def with_events_from_parquet(&block) + service.with_connection do |conn| + # Create events view from all parquet files using glob pattern + # Pattern matches: minute/*.parquet, hours/*.parquet, days/*.parquet, weeks/*.parquet + # Excludes .temp files automatically (they don't match *.parquet) + parquet_pattern = "#{AnalyticsDuckdbService::PARQUET_BASE_PATH}/**/*.parquet" + + conn.execute(<<~SQL) + CREATE VIEW events AS + SELECT * FROM read_parquet('#{parquet_pattern}') + SQL + + yield conn + end + rescue StandardError => e + Rails.logger.error "[EventDdb] Error loading parquet files: #{e.message}" + nil + end + # Total events since timestamp def count_since(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time) result.first&.first || 0 end @@ -44,7 +66,7 @@ class EventDdb # Event breakdown by WAF action def breakdown_by_action(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time) SELECT waf_action, COUNT(*) as count FROM events @@ -65,7 +87,7 @@ class EventDdb # Top countries with event counts def top_countries(start_time, limit = 10) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT country, COUNT(*) as count FROM events @@ -86,7 +108,7 @@ class EventDdb # Top blocked IPs def top_blocked_ips(start_time, limit = 10) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT ip_address, COUNT(*) as count FROM events @@ -106,7 +128,7 @@ class EventDdb # Hourly timeline aggregation def hourly_timeline(start_time, end_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, end_time) SELECT DATE_TRUNC('hour', timestamp) as hour, @@ -129,7 +151,7 @@ class EventDdb # Top networks by traffic volume # Returns array of arrays: [network_range_id, event_count, unique_ips] def top_networks(start_time, limit = 50) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT network_range_id, @@ -152,7 +174,7 @@ class EventDdb # Top companies # Returns array of OpenStruct objects with: company, event_count, unique_ips, network_count def top_companies(start_time, limit = 20) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT company, @@ -184,7 +206,7 @@ class EventDdb # Top ASNs # Returns array of OpenStruct objects with: asn, asn_org, event_count, unique_ips, network_count def top_asns(start_time, limit = 15) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT asn, @@ -218,7 +240,7 @@ class EventDdb # Network type breakdown (datacenter, VPN, proxy, standard) # Returns hash with network_type as key and hash of stats as value def network_type_breakdown(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time) SELECT CASE @@ -255,7 +277,7 @@ class EventDdb # Top countries with detailed stats (event count and unique IPs) # Returns array of OpenStruct objects with: country, event_count, unique_ips def top_countries_with_stats(start_time, limit = 15) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT country, @@ -285,7 +307,7 @@ class EventDdb # Network type stats with formatted output matching controller expectations # Returns hash with type keys containing label, networks, events, unique_ips, percentage def network_type_stats(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| # Get total events for percentage calculation total_result = conn.query("SELECT COUNT(*) as total FROM events WHERE timestamp >= ?", start_time) total_events = total_result.first&.first || 0 @@ -328,7 +350,7 @@ class EventDdb network_range_ids = Array(network_range_ids) return nil if network_range_ids.empty? - service.with_connection do |conn| + with_events_from_parquet do |conn| # Build IN clause with placeholders placeholders = network_range_ids.map { "?" }.join(", ") @@ -363,7 +385,7 @@ class EventDdb network_range_ids = Array(network_range_ids) return nil if network_range_ids.empty? - service.with_connection do |conn| + with_events_from_parquet do |conn| # Build IN clause with placeholders placeholders = network_range_ids.map { "?" }.join(", ") @@ -391,7 +413,7 @@ class EventDdb network_range_ids = Array(network_range_ids) return nil if network_range_ids.empty? - service.with_connection do |conn| + with_events_from_parquet do |conn| # Build IN clause with placeholders placeholders = network_range_ids.map { "?" }.join(", ") @@ -414,13 +436,36 @@ class EventDdb nil end + # Count events for network range(s) + # Returns integer count of all events in the network + def network_event_count(network_range_ids) + network_range_ids = Array(network_range_ids) + return nil if network_range_ids.empty? + + with_events_from_parquet do |conn| + # Build IN clause with placeholders + placeholders = network_range_ids.map { "?" }.join(", ") + + result = conn.query(<<~SQL, *network_range_ids) + SELECT COUNT(*) as count + FROM events + WHERE network_range_id IN (#{placeholders}) + SQL + + result.first&.first || 0 + end + rescue StandardError => e + Rails.logger.error "[EventDdb] Error in network_event_count: #{e.message}" + nil + end + # Full user agent tally for network range(s) # Returns hash of user_agent => count for all agents in the network def network_agent_tally(network_range_ids) network_range_ids = Array(network_range_ids) return nil if network_range_ids.empty? - service.with_connection do |conn| + with_events_from_parquet do |conn| # Build IN clause with placeholders placeholders = network_range_ids.map { "?" }.join(", ") @@ -445,7 +490,7 @@ class EventDdb # Suspicious network activity patterns # Detects high-volume networks, high deny rates, and distributed companies def suspicious_patterns(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| # High volume networks (5x average) avg_query = conn.query(<<~SQL, start_time) SELECT @@ -523,7 +568,7 @@ class EventDdb # Bot traffic analysis - breakdown of bot vs human traffic def bot_traffic_breakdown(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time) SELECT is_bot, @@ -553,7 +598,7 @@ class EventDdb # Count human traffic (non-bot) since timestamp def human_traffic_count(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time) SELECT COUNT(*) as count FROM events @@ -569,7 +614,7 @@ class EventDdb # Count bot traffic since timestamp def bot_traffic_count(start_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time) SELECT COUNT(*) as count FROM events @@ -585,7 +630,7 @@ class EventDdb # Top bot user agents def top_bot_user_agents(start_time, limit = 20) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, limit) SELECT user_agent, @@ -614,7 +659,7 @@ class EventDdb # Bot traffic timeline (hourly breakdown) def bot_traffic_timeline(start_time, end_time) - service.with_connection do |conn| + with_events_from_parquet do |conn| result = conn.query(<<~SQL, start_time, end_time) SELECT DATE_TRUNC('hour', timestamp) as hour, @@ -648,7 +693,63 @@ class EventDdb # Returns { total_count:, events:[], page:, per_page: } # Supports filters: ip, waf_action, country, rule_id, company, asn, network_type, network_range_id, exclude_bots def search(filters = {}, page: 1, per_page: 50) - service.with_connection do |conn| + # Get list of Parquet files to query + parquet_files = service.parquet_files_for_range(1.year.ago, Time.current) + + if parquet_files.empty? + Rails.logger.warn "[EventDdb] No Parquet files found, falling back to DuckDB" + return search_duckdb(filters, page, per_page) + end + + # Query Parquet files using in-memory DuckDB (no file locks!) + service.with_parquet_connection do |conn| + # Build WHERE clause + where_clause, params = build_where_clause(filters) + + # Build file list for read_parquet + file_list = parquet_files.map { |f| "'#{f}'" }.join(", ") + + # Get total count + count_sql = "SELECT COUNT(*) FROM read_parquet([#{file_list}])#{where_clause}" + count_result = conn.query(count_sql, *params) + total_count = count_result.first&.first || 0 + + # Get paginated results + offset = (page - 1) * per_page + + data_sql = <<~SQL + SELECT + id, timestamp, ip_address, network_range_id, country, company, + asn, asn_org, is_datacenter, is_vpn, is_proxy, is_bot, + waf_action, request_method, response_status, rule_id, + request_path, user_agent, tags + FROM read_parquet([#{file_list}]) + #{where_clause} + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + SQL + + result = conn.query(data_sql, *params, per_page, offset) + + # Convert rows to event-like objects + events = result.to_a.map { |row| row_to_event(row) } + + { + total_count: total_count, + events: events, + page: page, + per_page: per_page + } + end + rescue StandardError => e + Rails.logger.error "[EventDdb] Error in Parquet search: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + nil + end + + # Fallback to querying DuckDB directly (for backward compatibility) + def search_duckdb(filters = {}, page: 1, per_page: 50) + with_events_from_parquet do |conn| # Build WHERE clause where_clause, params = build_where_clause(filters) @@ -685,7 +786,7 @@ class EventDdb } end rescue StandardError => e - Rails.logger.error "[EventDdb] Error in search: #{e.message}" + Rails.logger.error "[EventDdb] Error in DuckDB search: #{e.message}" Rails.logger.error e.backtrace.join("\n") nil end diff --git a/app/models/network_range.rb b/app/models/network_range.rb index f80e281..0c98316 100644 --- a/app/models/network_range.rb +++ b/app/models/network_range.rb @@ -15,6 +15,7 @@ class NetworkRange < ApplicationRecord # Associations has_many :rules, dependent: :destroy + has_many :events, foreign_key: :network_range_id, dependent: :nullify belongs_to :user, optional: true # Validations @@ -36,8 +37,8 @@ class NetworkRange < ApplicationRecord scope :geolite_imported, -> { where(source: ['geolite_asn', 'geolite_country']) } scope :geolite_asn, -> { where(source: 'geolite_asn') } scope :geolite_country, -> { where(source: 'geolite_country') } - scope :with_events, -> { where("events_count > 0") } - scope :most_active, -> { order(events_count: :desc) } + scope :with_events, -> { joins(:events).distinct } + scope :most_active, -> { joins(:events).group('network_ranges.id').order('COUNT(events.id) DESC') } # Callbacks before_validation :set_default_source @@ -241,7 +242,7 @@ class NetworkRange < ApplicationRecord def agent_tally Rails.cache.fetch("#{cache_key}:agent_tally", expires_in: 5.minutes) do # Use DuckDB for fast agent tally instead of loading all events into memory - if persisted? && events_count > 0 + if persisted? && has_events? # Include child network ranges to capture all traffic within this network block network_ids = [id] + child_ranges.pluck(:id) @@ -417,10 +418,16 @@ class NetworkRange < ApplicationRecord cidr.to_s.gsub('/', '_') end - # Analytics methods - events_count is now a counter cache column maintained by database triggers - # This is much more performant than the previous implementation that did complex network queries - def events_count - self[:events_count] || 0 + # Check if network range has any events using DuckDB for performance + def has_events? + return false unless persisted? + + # Include child network ranges to capture all traffic within this network block + network_ids = [id] + child_ranges.pluck(:id) + + # Try DuckDB first for fast event count check + event_count = with_duckdb_fallback { EventDdb.network_event_count(network_ids) } + event_count&.positive? || events.exists? end def events diff --git a/app/services/analytics_duckdb_service.rb b/app/services/analytics_duckdb_service.rb index 3be0b3b..11a9638 100644 --- a/app/services/analytics_duckdb_service.rb +++ b/app/services/analytics_duckdb_service.rb @@ -5,12 +5,14 @@ class AnalyticsDuckdbService include Singleton - DUCKDB_PATH = Rails.root.join("storage", "analytics.duckdb").to_s BATCH_SIZE = 10_000 + MAX_EVENTS_PER_SYNC = 50_000 # Limit events per job run to prevent OOM - # Execute block with connection, ensuring database and connection are closed afterward + # Execute block with DuckDB connection + # Always uses in-memory database (no file locks, no conflicts) + # Used for writing parquet files and querying parquet files def with_connection - db = DuckDB::Database.open(DUCKDB_PATH) + db = DuckDB::Database.open(":memory:") conn = db.connect yield conn ensure @@ -44,9 +46,38 @@ class AnalyticsDuckdbService ) SQL + # Create indexes for common query patterns + create_indexes(conn) + Rails.logger.info "[DuckDB] Schema setup complete" end + # Create indexes for fast querying + def create_indexes(conn) + indexes = [ + "CREATE INDEX IF NOT EXISTS idx_events_timestamp ON events(timestamp DESC)", + "CREATE INDEX IF NOT EXISTS idx_events_network_range_id ON events(network_range_id)", + "CREATE INDEX IF NOT EXISTS idx_events_ip_address ON events(ip_address)", + "CREATE INDEX IF NOT EXISTS idx_events_waf_action ON events(waf_action)", + "CREATE INDEX IF NOT EXISTS idx_events_country ON events(country)", + "CREATE INDEX IF NOT EXISTS idx_events_company ON events(company)", + "CREATE INDEX IF NOT EXISTS idx_events_asn ON events(asn)", + "CREATE INDEX IF NOT EXISTS idx_events_rule_id ON events(rule_id)", + "CREATE INDEX IF NOT EXISTS idx_events_is_bot ON events(is_bot)", + "CREATE INDEX IF NOT EXISTS idx_events_is_datacenter ON events(is_datacenter)", + "CREATE INDEX IF NOT EXISTS idx_events_is_vpn ON events(is_vpn)", + "CREATE INDEX IF NOT EXISTS idx_events_is_proxy ON events(is_proxy)" + ] + + indexes.each do |index_sql| + conn.execute(index_sql) + end + + Rails.logger.info "[DuckDB] Indexes created" + rescue StandardError => e + Rails.logger.warn "[DuckDB] Index creation warning: #{e.message}" + end + # Get timestamp of oldest event in DuckDB # Returns nil if table is empty def oldest_event_timestamp @@ -57,7 +88,7 @@ class AnalyticsDuckdbService end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}" - nil + raise end # Get timestamp of newest event in DuckDB @@ -70,7 +101,7 @@ class AnalyticsDuckdbService end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}" - nil + raise end # Get maximum event ID already synced to DuckDB @@ -82,34 +113,39 @@ class AnalyticsDuckdbService end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}" - 0 + raise end - # Sync new events from PostgreSQL to DuckDB + # Export new events from PostgreSQL directly to timestamped Parquet file # Uses PostgreSQL cursor for memory-efficient streaming - # Uses Appender API for fast bulk inserts - # Filters by ID to avoid duplicates - def sync_new_events(from_timestamp) - total_synced = 0 + # Writes to minute/YYYYMMDDHHmmss.parquet + # @param from_timestamp [Time] Start timestamp to export from + # @param max_id [Integer] Maximum event ID already exported (to avoid duplicates) + # @return [Hash] { count: Integer, file_path: String, max_id: Integer } + def export_new_events_to_parquet(from_timestamp, max_id = 0) + ensure_parquet_directories + + total_exported = 0 + exported_max_id = max_id + timestamp = Time.current.utc.strftime("%Y%m%d%H%M%S") + parquet_file = PARQUET_MINUTE_PATH.join("#{timestamp}.parquet") + + Rails.logger.info "[Parquet] Exporting events from #{from_timestamp}, max_id=#{max_id} to #{parquet_file}" + start_time = Time.current with_connection do |conn| - # Ensure table exists + # Create temporary table in memory setup_schema(conn) - # Get max ID already in DuckDB to avoid duplicates - max_id_result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events") - max_id = max_id_result.first&.first || 0 - Rails.logger.info "[DuckDB] Syncing events from #{from_timestamp}, max_id=#{max_id}" - - start_time = Time.current appender = nil batch_count = 0 begin - # Create initial appender + # Create appender for in-memory table appender = conn.appender("events") - # Use PostgreSQL cursor for memory-efficient streaming + # Stream from PostgreSQL cursor and append to DuckDB in-memory table + # Limit to MAX_EVENTS_PER_SYNC to prevent OOM on large backlogs Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id) .select( :id, @@ -133,8 +169,8 @@ class AnalyticsDuckdbService :tags ) .order(:id) + .limit(MAX_EVENTS_PER_SYNC) .each_row(block_size: BATCH_SIZE) do |event_data| - # Unpack event data from cursor row (Hash from each_row) begin appender.append_row( event_data["id"], @@ -157,41 +193,56 @@ class AnalyticsDuckdbService event_data["user_agent"], event_data["tags"] || [] ) + + # Track maximum exported ID + exported_max_id = [exported_max_id, event_data["id"]].max rescue StandardError => e - Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}" - Rails.logger.error "[DuckDB] event_data = #{event_data.inspect}" + Rails.logger.error "[Parquet] Error appending event #{event_data['id']}: #{e.message}" + Rails.logger.error "[Parquet] event_data = #{event_data.inspect}" raise end batch_count += 1 - total_synced += 1 + total_exported += 1 # Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow if batch_count % BATCH_SIZE == 0 appender.close appender = conn.appender("events") - Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)" + Rails.logger.info "[Parquet] Loaded batch (total: #{total_exported} events)" end end - # Close final appender + # Close appender appender&.close + # Export in-memory table to parquet file + conn.execute(<<~SQL) + COPY (SELECT * FROM events ORDER BY timestamp DESC) + TO '#{parquet_file}' (FORMAT PARQUET, COMPRESSION ZSTD) + SQL + duration = Time.current - start_time - rate = total_synced / duration if duration > 0 - Rails.logger.info "[DuckDB] Sync complete: #{total_synced} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)" + rate = total_exported / duration if duration > 0 + + # Log completion and check if there are more events to export + if total_exported >= MAX_EVENTS_PER_SYNC + Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec) - hit limit, more events may be pending" + else + Rails.logger.info "[Parquet] Export complete: #{total_exported} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)" + end rescue StandardError => e appender&.close rescue nil # Ensure appender is closed on error - Rails.logger.error "[DuckDB] Error syncing events: #{e.message}" + Rails.logger.error "[Parquet] Error exporting events: #{e.message}" Rails.logger.error e.backtrace.join("\n") raise # Re-raise to be caught by outer rescue end end - total_synced + { count: total_exported, file_path: parquet_file.to_s, max_id: exported_max_id } rescue StandardError => e - Rails.logger.error "[DuckDB] Sync failed: #{e.message}" - 0 + Rails.logger.error "[Parquet] Export failed: #{e.message}" + raise end # Execute analytical query on DuckDB @@ -214,7 +265,7 @@ class AnalyticsDuckdbService end rescue StandardError => e Rails.logger.error "[DuckDB] Error getting event count: #{e.message}" - 0 + raise end # Analytics query: Total events since timestamp @@ -299,4 +350,254 @@ class AnalyticsDuckdbService @connection&.close @connection = nil end + + # ============================================================================ + # PARQUET EXPORT SYSTEM + # ============================================================================ + + PARQUET_BASE_PATH = Rails.root.join("storage", "parquet") + PARQUET_MINUTE_PATH = PARQUET_BASE_PATH.join("minute") + PARQUET_HOURS_PATH = PARQUET_BASE_PATH.join("hours") + PARQUET_DAYS_PATH = PARQUET_BASE_PATH.join("days") + PARQUET_WEEKS_PATH = PARQUET_BASE_PATH.join("weeks") + WEEK_RETENTION = ENV.fetch("PARQUET_WEEK_RETENTION", 104).to_i # Keep N weeks (default: 104 = 2 years) + + # One-time export of entire DuckDB to Parquet (bootstrap) + # Exports all data and organizes into week files + # Memory-efficient: processes one week at a time with new connections + def export_all_to_parquet + ensure_parquet_directories + + # Get date range first, then close connection + min_time, max_time = with_connection do |conn| + result = conn.query("SELECT MIN(timestamp) as min_time, MAX(timestamp) as max_time FROM events") + row = result.first + return unless row && row[0] && row[1] + + [Time.parse(row[0].to_s), Time.parse(row[1].to_s)] + end + + Rails.logger.info "[Parquet] Exporting all events from #{min_time} to #{max_time}" + + # Export week by week with separate connections (more memory efficient) + current_week_start = min_time.beginning_of_week + weeks_exported = 0 + + while current_week_start <= max_time + week_end = current_week_start.end_of_week + + year = current_week_start.year + week_num = current_week_start.strftime("%U").to_i + week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet") + + # Skip if week file already exists + unless File.exist?(week_file) + Rails.logger.info "[Parquet] Exporting week #{year}-#{week_num} (#{current_week_start} to #{week_end})" + + # Use separate connection per week to limit memory usage + with_connection do |conn| + # COPY directly without ORDER BY to save memory + # Parquet files can be sorted during queries if needed + conn.execute(<<~SQL) + COPY ( + SELECT * FROM events + WHERE timestamp >= '#{current_week_start.iso8601}' + AND timestamp < '#{week_end.iso8601}' + ) TO '#{week_file}' (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000) + SQL + end + + weeks_exported += 1 + + # Force garbage collection after each week to free memory + GC.start + end + + current_week_start += 1.week + end + + Rails.logger.info "[Parquet] Bootstrap complete: exported #{weeks_exported} weeks" + rescue StandardError => e + Rails.logger.error "[Parquet] Error in bootstrap export: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise + end + + + # Consolidate completed hour's minute files into hour file + # @param time [Time] The hour to consolidate + def consolidate_hour_to_day(time) + ensure_parquet_directories + + hour = time.utc.hour + day_of_year = time.utc.yday + hour_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet") + hour_temp_file = PARQUET_HOURS_PATH.join("#{hour.to_s.rjust(2, '0')}.parquet.temp") + day_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet") + day_temp_file = PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet.temp") + + # Find all minute files from previous hour + hour_prefix = time.utc.strftime("%Y%m%d%H") + minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("#{hour_prefix}*.parquet")) + + if minute_files.empty? + Rails.logger.info "[Parquet] No minute files found for hour #{hour_prefix}" + return + end + + with_connection do |conn| + Rails.logger.info "[Parquet] Consolidating #{minute_files.size} minute files from hour #{hour} into day #{day_of_year}" + + # Merge minute files into hour file using .temp + file_list = minute_files.map { |f| "'#{f}'" }.join(", ") + conn.execute(<<~SQL) + COPY ( + SELECT * FROM read_parquet([#{file_list}]) + ORDER BY timestamp DESC + ) TO '#{hour_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) + SQL + + # Atomic rename + FileUtils.mv(hour_temp_file, hour_file, force: true) + + # Now merge hour file into day file + if File.exist?(day_file) + # Merge hour data into existing day file + conn.execute(<<~SQL) + COPY ( + SELECT * FROM read_parquet(['#{day_file}', '#{hour_file}']) + ORDER BY timestamp DESC + ) TO '#{day_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) + SQL + + # Replace old day file with merged file + FileUtils.mv(day_temp_file, day_file, force: true) + + # Delete hour file after merging into day + File.delete(hour_file) + else + # First hour of the day - just rename hour file to day file + FileUtils.mv(hour_file, day_file) + end + + # Delete the minute files after successful consolidation + minute_files.each { |f| File.delete(f) } + Rails.logger.info "[Parquet] Consolidated #{minute_files.size} minute files into #{day_file}, deleted source files" + end + rescue StandardError => e + Rails.logger.error "[Parquet] Error consolidating hour: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise + end + + # Consolidate completed week into archive + # @param week_start [Time] The start of the week to consolidate + def consolidate_days_to_week(week_start) + ensure_parquet_directories + + year = week_start.year + week_num = week_start.strftime("%U").to_i # Week number (00-53) + week_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet") + week_temp_file = PARQUET_WEEKS_PATH.join("#{year}-#{week_num.to_s.rjust(2, '0')}.parquet.temp") + + # Collect day files for this week (7 days) + day_files = (0..6).map do |offset| + day = week_start + offset.days + day_of_year = day.yday + PARQUET_DAYS_PATH.join("#{day_of_year.to_s.rjust(3, '0')}.parquet") + end.select { |f| File.exist?(f) } + + return if day_files.empty? + + with_connection do |conn| + Rails.logger.info "[Parquet] Consolidating #{day_files.size} days into week #{year}-#{week_num}" + + # Merge all day files into week archive using .temp + file_list = day_files.map { |f| "'#{f}'" }.join(", ") + + conn.execute(<<~SQL) + COPY ( + SELECT * FROM read_parquet([#{file_list}]) + ORDER BY timestamp DESC + ) TO '#{week_temp_file}' (FORMAT PARQUET, COMPRESSION ZSTD) + SQL + + # Atomic rename + FileUtils.mv(week_temp_file, week_file, force: true) + + # Delete day files after successful consolidation + day_files.each { |f| File.delete(f) } + + Rails.logger.info "[Parquet] Consolidated week #{year}-#{week_num}, deleted #{day_files.size} day files" + end + + # Cleanup old weeks + cleanup_old_weeks + rescue StandardError => e + Rails.logger.error "[Parquet] Error consolidating week: #{e.message}" + Rails.logger.error e.backtrace.join("\n") + raise + end + + # Build list of Parquet files to query for a given time range + # @param start_time [Time] Start of query range + # @param end_time [Time] End of query range (defaults to now) + # @return [Array] List of Parquet file paths + def parquet_files_for_range(start_time, end_time = Time.current) + files = [] + + # Add minute files (most recent, not yet consolidated) + minute_files = Dir.glob(PARQUET_MINUTE_PATH.join("*.parquet")) + files.concat(minute_files) + + # Add hour files (consolidated but not yet in day files) + hour_files = Dir.glob(PARQUET_HOURS_PATH.join("*.parquet")) + files.concat(hour_files) + + # Add relevant day files + day_files = Dir.glob(PARQUET_DAYS_PATH.join("*.parquet")) + files.concat(day_files) + + # Add relevant week files based on time range + # For simplicity, include all weeks (DuckDB will filter) + week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")) + files.concat(week_files) + + files.sort + end + + # Query Parquet files using in-memory DuckDB (no file locks) + # @param block [Block] Block that receives DuckDB connection + def with_parquet_connection(&block) + # Open in-memory DuckDB database (no file locks) + db = DuckDB::Database.open(":memory:") + conn = db.connect + + yield conn + ensure + conn&.close + db&.close + end + + # Cleanup old week archives beyond retention period + def cleanup_old_weeks + week_files = Dir.glob(PARQUET_WEEKS_PATH.join("*.parquet")).sort.reverse + + if week_files.size > WEEK_RETENTION + files_to_delete = week_files[WEEK_RETENTION..-1] + files_to_delete.each do |file| + File.delete(file) + Rails.logger.info "[Parquet] Deleted old week archive: #{file}" + end + end + end + + private + + # Ensure Parquet directory structure exists + def ensure_parquet_directories + [PARQUET_MINUTE_PATH, PARQUET_HOURS_PATH, PARQUET_DAYS_PATH, PARQUET_WEEKS_PATH].each do |path| + FileUtils.mkdir_p(path) unless Dir.exist?(path) + end + end end diff --git a/config/recurring.yml b/config/recurring.yml index 896a13b..d7d3c04 100644 --- a/config/recurring.yml +++ b/config/recurring.yml @@ -30,8 +30,20 @@ cleanup_old_events: queue: background schedule: every hour -# Sync events from PostgreSQL to DuckDB for fast analytics -sync_events_to_duckdb: - class: SyncEventsToDuckdbJob +# Export events from PostgreSQL to Parquet files for fast analytics +export_events_to_parquet: + class: ExportEventsToParquetJob queue: default schedule: every 1 minutes + +# Consolidate completed hours into day files +consolidate_parquet_hourly: + class: ConsolidateParquetHourlyJob + queue: default + schedule: "5 * * * *" # At 5 minutes past every hour + +# Consolidate completed week into archive (Monday 00:05) +consolidate_parquet_weekly: + class: ConsolidateParquetWeeklyJob + queue: default + schedule: "5 0 * * 1" # Monday at 00:05 diff --git a/db/migrate/20251202070000_remove_postgres_events_count_from_network_ranges.rb b/db/migrate/20251202070000_remove_postgres_events_count_from_network_ranges.rb new file mode 100644 index 0000000..3c9bbb1 --- /dev/null +++ b/db/migrate/20251202070000_remove_postgres_events_count_from_network_ranges.rb @@ -0,0 +1,67 @@ +# frozen_string_literal: true + +class RemovePostgresEventsCountFromNetworkRanges < ActiveRecord::Migration[8.1] + def up + # Drop triggers first + execute <<-SQL + DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_insert ON events; + DROP TRIGGER IF EXISTS update_network_ranges_events_count_after_delete ON events; + DROP FUNCTION IF EXISTS update_network_range_events_count(); + SQL + + # Remove index and column + remove_index :network_ranges, :events_count + remove_column :network_ranges, :events_count + end + + def down + # Add column back (for rollback) + add_column :network_ranges, :events_count, :integer, null: false, default: 0 + add_index :network_ranges, :events_count + + # Recreate trigger function + execute <<-SQL + CREATE OR REPLACE FUNCTION update_network_range_events_count() + RETURNS TRIGGER AS $$ + BEGIN + -- Update all network ranges that contain IP address + UPDATE network_ranges + SET events_count = events_count + + CASE + WHEN TG_OP = 'INSERT' THEN 1 + WHEN TG_OP = 'DELETE' THEN -1 + ELSE 0 + END + WHERE network >>= NEW.ip_address::inet; + + RETURN COALESCE(NEW, OLD); + END; + $$ LANGUAGE plpgsql; + SQL + + # Recreate triggers + execute <<-SQL + CREATE TRIGGER update_network_ranges_events_count_after_insert + AFTER INSERT ON events + FOR EACH ROW + EXECUTE FUNCTION update_network_range_events_count(); + SQL + + execute <<-SQL + CREATE TRIGGER update_network_ranges_events_count_after_delete + AFTER DELETE ON events + FOR EACH ROW + EXECUTE FUNCTION update_network_range_events_count(); + SQL + + # Backfill existing counts + execute <<-SQL + UPDATE network_ranges + SET events_count = ( + SELECT COUNT(*) + FROM events + WHERE events.ip_address <<= network_ranges.network + ); + SQL + end +end \ No newline at end of file diff --git a/db/schema.rb b/db/schema.rb index 3943be2..2b70e85 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do +ActiveRecord::Schema[8.1].define(version: 2025_12_02_070000) do # These are extensions that must be enabled in order to support this database enable_extension "pg_catalog.plpgsql" @@ -128,7 +128,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do t.string "country" t.datetime "created_at", null: false t.text "creation_reason" - t.integer "events_count", default: 0, null: false t.boolean "is_datacenter", default: false t.boolean "is_proxy", default: false t.boolean "is_vpn", default: false @@ -143,7 +142,6 @@ ActiveRecord::Schema[8.1].define(version: 2025_11_20_003554) do t.index ["asn_org"], name: "index_network_ranges_on_asn_org" t.index ["company"], name: "index_network_ranges_on_company" t.index ["country"], name: "index_network_ranges_on_country" - t.index ["events_count"], name: "index_network_ranges_on_events_count" t.index ["is_datacenter", "is_proxy", "is_vpn"], name: "idx_network_flags" t.index ["is_datacenter"], name: "index_network_ranges_on_is_datacenter" t.index ["network"], name: "index_network_ranges_on_network", opclass: :inet_ops, using: :gist diff --git a/test/models/network_range_test.rb b/test/models/network_range_test.rb index 2fa0c67..1b67e4f 100644 --- a/test/models/network_range_test.rb +++ b/test/models/network_range_test.rb @@ -561,14 +561,23 @@ class NetworkRangeTest < ActiveSupport::TestCase end # Analytics Methods - test "events_count returns counter cache value" do + test "has_events? correctly detects if network has events" do range = NetworkRange.create!(network: "192.168.1.0/24") - assert_equal 0, range.events_count + assert_equal false, range.has_events? - # Update counter cache manually for testing - range.update_column(:events_count, 5) - assert_equal 5, range.events_count + # Create a test event in this network + Event.create!( + request_id: "test-1", + ip_address: "192.168.1.100", + network_range: range, + waf_action: 1, + request_method: 0, + response_status: 200 + ) + + # Should now detect events exist + assert_equal true, range.has_events? end test "events method finds events within range" do