303 lines
9.0 KiB
Ruby
303 lines
9.0 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Service for managing DuckDB analytics database
|
|
# Provides fast analytical queries on events data using columnar storage
|
|
class AnalyticsDuckdbService
|
|
include Singleton
|
|
|
|
DUCKDB_PATH = Rails.root.join("storage", "analytics.duckdb").to_s
|
|
BATCH_SIZE = 10_000
|
|
|
|
# Execute block with connection, ensuring database and connection are closed afterward
|
|
def with_connection
|
|
db = DuckDB::Database.open(DUCKDB_PATH)
|
|
conn = db.connect
|
|
yield conn
|
|
ensure
|
|
conn&.close
|
|
db&.close
|
|
end
|
|
|
|
# Create events table if it doesn't exist (must be called within with_connection block)
|
|
def setup_schema(conn)
|
|
conn.execute(<<~SQL)
|
|
CREATE TABLE IF NOT EXISTS events (
|
|
id BIGINT PRIMARY KEY,
|
|
timestamp TIMESTAMP NOT NULL,
|
|
ip_address VARCHAR,
|
|
network_range_id BIGINT,
|
|
country VARCHAR,
|
|
company VARCHAR,
|
|
asn INTEGER,
|
|
asn_org VARCHAR,
|
|
is_datacenter BOOLEAN,
|
|
is_vpn BOOLEAN,
|
|
is_proxy BOOLEAN,
|
|
is_bot BOOLEAN,
|
|
waf_action INTEGER,
|
|
request_method INTEGER,
|
|
response_status INTEGER,
|
|
rule_id BIGINT,
|
|
request_path VARCHAR,
|
|
user_agent VARCHAR,
|
|
tags VARCHAR[]
|
|
)
|
|
SQL
|
|
|
|
Rails.logger.info "[DuckDB] Schema setup complete"
|
|
end
|
|
|
|
# Get timestamp of oldest event in DuckDB
|
|
# Returns nil if table is empty
|
|
def oldest_event_timestamp
|
|
with_connection do |conn|
|
|
result = conn.query("SELECT MIN(timestamp) as oldest FROM events")
|
|
first_row = result.first
|
|
first_row&.first # Returns the value or nil
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Error getting oldest timestamp: #{e.message}"
|
|
nil
|
|
end
|
|
|
|
# Get timestamp of newest event in DuckDB
|
|
# Returns nil if table is empty
|
|
def newest_event_timestamp
|
|
with_connection do |conn|
|
|
result = conn.query("SELECT MAX(timestamp) as newest FROM events")
|
|
first_row = result.first
|
|
first_row&.first # Returns the value or nil
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Error getting newest timestamp: #{e.message}"
|
|
nil
|
|
end
|
|
|
|
# Get maximum event ID already synced to DuckDB
|
|
def max_synced_id
|
|
with_connection do |conn|
|
|
result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events")
|
|
first_row = result.first
|
|
first_row&.first || 0
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Error getting max ID: #{e.message}"
|
|
0
|
|
end
|
|
|
|
# Sync new events from PostgreSQL to DuckDB
|
|
# Uses PostgreSQL cursor for memory-efficient streaming
|
|
# Uses Appender API for fast bulk inserts
|
|
# Filters by ID to avoid duplicates
|
|
def sync_new_events(from_timestamp)
|
|
total_synced = 0
|
|
|
|
with_connection do |conn|
|
|
# Ensure table exists
|
|
setup_schema(conn)
|
|
|
|
# Get max ID already in DuckDB to avoid duplicates
|
|
max_id_result = conn.query("SELECT COALESCE(MAX(id), 0) as max_id FROM events")
|
|
max_id = max_id_result.first&.first || 0
|
|
Rails.logger.info "[DuckDB] Syncing events from #{from_timestamp}, max_id=#{max_id}"
|
|
|
|
start_time = Time.current
|
|
appender = nil
|
|
batch_count = 0
|
|
|
|
begin
|
|
# Create initial appender
|
|
appender = conn.appender("events")
|
|
|
|
# Use PostgreSQL cursor for memory-efficient streaming
|
|
Event.where("timestamp >= ? AND id > ?", from_timestamp, max_id)
|
|
.select(
|
|
:id,
|
|
:timestamp,
|
|
:ip_address,
|
|
:network_range_id,
|
|
:country,
|
|
:company,
|
|
:asn,
|
|
:asn_org,
|
|
:is_datacenter,
|
|
:is_vpn,
|
|
:is_proxy,
|
|
:is_bot,
|
|
:waf_action,
|
|
:request_method,
|
|
:response_status,
|
|
:rule_id,
|
|
:request_path,
|
|
:user_agent,
|
|
:tags
|
|
)
|
|
.order(:id)
|
|
.each_row(block_size: BATCH_SIZE) do |event_data|
|
|
# Unpack event data from cursor row (Hash from each_row)
|
|
begin
|
|
appender.append_row(
|
|
event_data["id"],
|
|
event_data["timestamp"],
|
|
event_data["ip_address"]&.to_s,
|
|
event_data["network_range_id"],
|
|
event_data["country"],
|
|
event_data["company"],
|
|
event_data["asn"],
|
|
event_data["asn_org"],
|
|
event_data["is_datacenter"],
|
|
event_data["is_vpn"],
|
|
event_data["is_proxy"],
|
|
event_data["is_bot"],
|
|
event_data["waf_action"],
|
|
event_data["request_method"],
|
|
event_data["response_status"],
|
|
event_data["rule_id"],
|
|
event_data["request_path"],
|
|
event_data["user_agent"],
|
|
event_data["tags"] || []
|
|
)
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Error appending event #{event_data['id']}: #{e.message}"
|
|
Rails.logger.error "[DuckDB] event_data = #{event_data.inspect}"
|
|
raise
|
|
end
|
|
|
|
batch_count += 1
|
|
total_synced += 1
|
|
|
|
# Flush and recreate appender every BATCH_SIZE events to avoid chunk overflow
|
|
if batch_count % BATCH_SIZE == 0
|
|
appender.close
|
|
appender = conn.appender("events")
|
|
Rails.logger.info "[DuckDB] Synced batch (total: #{total_synced} events)"
|
|
end
|
|
end
|
|
|
|
# Close final appender
|
|
appender&.close
|
|
|
|
duration = Time.current - start_time
|
|
rate = total_synced / duration if duration > 0
|
|
Rails.logger.info "[DuckDB] Sync complete: #{total_synced} events in #{duration.round(2)}s (~#{rate&.round(0)} events/sec)"
|
|
rescue StandardError => e
|
|
appender&.close rescue nil # Ensure appender is closed on error
|
|
Rails.logger.error "[DuckDB] Error syncing events: #{e.message}"
|
|
Rails.logger.error e.backtrace.join("\n")
|
|
raise # Re-raise to be caught by outer rescue
|
|
end
|
|
end
|
|
|
|
total_synced
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Sync failed: #{e.message}"
|
|
0
|
|
end
|
|
|
|
# Execute analytical query on DuckDB
|
|
def query(sql, *params)
|
|
with_connection do |conn|
|
|
conn.query(sql, *params)
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Query error: #{e.message}"
|
|
Rails.logger.error "SQL: #{sql}"
|
|
raise
|
|
end
|
|
|
|
# Get event count in DuckDB
|
|
def event_count
|
|
with_connection do |conn|
|
|
result = conn.query("SELECT COUNT(*) as count FROM events")
|
|
first_row = result.first
|
|
first_row&.first || 0
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB] Error getting event count: #{e.message}"
|
|
0
|
|
end
|
|
|
|
# Analytics query: Total events since timestamp
|
|
def total_events_since(start_time)
|
|
with_connection do |conn|
|
|
result = conn.query("SELECT COUNT(*) as count FROM events WHERE timestamp >= ?", start_time)
|
|
result.first&.first || 0
|
|
end
|
|
end
|
|
|
|
# Analytics query: Event breakdown by WAF action
|
|
def event_breakdown_by_action(start_time)
|
|
with_connection do |conn|
|
|
result = conn.query(<<~SQL, start_time)
|
|
SELECT waf_action, COUNT(*) as count
|
|
FROM events
|
|
WHERE timestamp >= ?
|
|
GROUP BY waf_action
|
|
SQL
|
|
|
|
# Convert to hash like PostgreSQL returns
|
|
# DuckDB returns arrays: [waf_action, count]
|
|
result.to_a.to_h { |row| [row[0], row[1]] }
|
|
end
|
|
end
|
|
|
|
# Analytics query: Top countries
|
|
def top_countries(start_time, limit = 10)
|
|
with_connection do |conn|
|
|
result = conn.query(<<~SQL, start_time, limit)
|
|
SELECT country, COUNT(*) as count
|
|
FROM events
|
|
WHERE timestamp >= ? AND country IS NOT NULL
|
|
GROUP BY country
|
|
ORDER BY count DESC
|
|
LIMIT ?
|
|
SQL
|
|
|
|
# DuckDB returns arrays: [country, count]
|
|
result.to_a.map { |row| [row[0], row[1]] }
|
|
end
|
|
end
|
|
|
|
# Analytics query: Top blocked IPs
|
|
def top_blocked_ips(start_time, limit = 10)
|
|
with_connection do |conn|
|
|
result = conn.query(<<~SQL, start_time, limit)
|
|
SELECT ip_address, COUNT(*) as count
|
|
FROM events
|
|
WHERE timestamp >= ? AND waf_action = 0
|
|
GROUP BY ip_address
|
|
ORDER BY count DESC
|
|
LIMIT ?
|
|
SQL
|
|
|
|
# DuckDB returns arrays: [ip_address, count]
|
|
result.to_a.map { |row| [row[0], row[1]] }
|
|
end
|
|
end
|
|
|
|
# Analytics query: Hourly timeline (events grouped by hour)
|
|
def hourly_timeline(start_time, end_time)
|
|
with_connection do |conn|
|
|
result = conn.query(<<~SQL, start_time, end_time)
|
|
SELECT
|
|
DATE_TRUNC('hour', timestamp) as hour,
|
|
COUNT(*) as count
|
|
FROM events
|
|
WHERE timestamp >= ? AND timestamp < ?
|
|
GROUP BY hour
|
|
ORDER BY hour
|
|
SQL
|
|
|
|
# Convert to hash with Time keys like PostgreSQL
|
|
# DuckDB returns arrays: [hour, count]
|
|
result.to_a.to_h { |row| [row[0], row[1]] }
|
|
end
|
|
end
|
|
|
|
# Close DuckDB connection (for cleanup/testing)
|
|
def close
|
|
@connection&.close
|
|
@connection = nil
|
|
end
|
|
end
|