153 lines
4.8 KiB
Ruby
153 lines
4.8 KiB
Ruby
#!/usr/bin/env ruby
|
|
# frozen_string_literal: true
|
|
|
|
# One-time backfill script to populate new columns in existing DuckDB events
|
|
# This uses DuckDB's JOIN-based UPDATE for maximum performance
|
|
|
|
require 'csv'
|
|
require 'tempfile'
|
|
|
|
puts "DuckDB Column Backfill Script (JOIN-based UPDATE)"
|
|
puts "=" * 60
|
|
puts "This will update existing DuckDB events with data from PostgreSQL"
|
|
puts "using a fast JOIN-based approach"
|
|
puts
|
|
|
|
BATCH_SIZE = 50_000
|
|
|
|
AnalyticsDuckdbService.instance.with_connection do |conn|
|
|
# Get total events in DuckDB
|
|
puts "Step 1: Counting events to backfill..."
|
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL")
|
|
total_to_backfill = result.first&.first || 0
|
|
|
|
result = conn.query("SELECT COUNT(*) FROM events")
|
|
total_events = result.first&.first || 0
|
|
|
|
puts " Total events in DuckDB: #{total_events}"
|
|
puts " Events needing backfill: #{total_to_backfill}"
|
|
|
|
if total_to_backfill == 0
|
|
puts "\n✓ All events already have new columns populated!"
|
|
exit 0
|
|
end
|
|
|
|
# Get min and max event IDs in DuckDB
|
|
result = conn.query("SELECT MIN(id), MAX(id) FROM events WHERE request_method IS NULL")
|
|
min_id, max_id = result.first
|
|
puts " ID range to backfill: #{min_id} to #{max_id}"
|
|
|
|
puts "\nStep 2: Exporting PostgreSQL data in batches..."
|
|
current_id = min_id
|
|
batch_num = 0
|
|
total_updated = 0
|
|
|
|
# Create temporary CSV file for data transfer
|
|
temp_csv = Tempfile.new(['events_backfill', '.csv'])
|
|
|
|
begin
|
|
CSV.open(temp_csv.path, 'w') do |csv|
|
|
# Header
|
|
csv << ['id', 'request_method', 'response_status', 'rule_id']
|
|
|
|
while current_id <= max_id
|
|
batch_num += 1
|
|
batch_end_id = [current_id + BATCH_SIZE - 1, max_id].min
|
|
|
|
print " Batch #{batch_num}: Exporting IDs #{current_id}-#{batch_end_id}..."
|
|
|
|
# Fetch from PostgreSQL
|
|
pg_events = Event.where("id >= ? AND id <= ?", current_id, batch_end_id)
|
|
.select(:id, :request_method, :response_status, :rule_id)
|
|
|
|
count = 0
|
|
pg_events.find_each do |event|
|
|
csv << [
|
|
event.id,
|
|
event.request_method,
|
|
event.response_status,
|
|
event.rule_id
|
|
]
|
|
count += 1
|
|
end
|
|
|
|
puts " #{count} events"
|
|
current_id = batch_end_id + 1
|
|
end
|
|
end
|
|
|
|
temp_csv.close
|
|
|
|
puts "\n✓ Exported to temporary CSV: #{temp_csv.path}"
|
|
puts " File size: #{(File.size(temp_csv.path) / 1024.0 / 1024.0).round(2)} MB"
|
|
|
|
puts "\nStep 3: Loading CSV into temporary DuckDB table..."
|
|
conn.execute("DROP TABLE IF EXISTS events_updates")
|
|
conn.execute(<<~SQL)
|
|
CREATE TABLE events_updates (
|
|
id BIGINT,
|
|
request_method INTEGER,
|
|
response_status INTEGER,
|
|
rule_id BIGINT
|
|
)
|
|
SQL
|
|
|
|
conn.execute(<<~SQL)
|
|
COPY events_updates FROM '#{temp_csv.path}' (FORMAT CSV, HEADER TRUE, NULL '')
|
|
SQL
|
|
|
|
result = conn.query("SELECT COUNT(*) FROM events_updates")
|
|
loaded_count = result.first&.first || 0
|
|
puts "✓ Loaded #{loaded_count} rows into temporary table"
|
|
|
|
puts "\nStep 4: Performing bulk UPDATE via JOIN..."
|
|
start_time = Time.current
|
|
|
|
# DuckDB's efficient UPDATE...FROM syntax
|
|
conn.execute(<<~SQL)
|
|
UPDATE events
|
|
SET
|
|
request_method = events_updates.request_method,
|
|
response_status = events_updates.response_status,
|
|
rule_id = events_updates.rule_id
|
|
FROM events_updates
|
|
WHERE events.id = events_updates.id
|
|
SQL
|
|
|
|
duration = Time.current - start_time
|
|
puts "✓ Bulk update complete in #{duration.round(2)}s!"
|
|
|
|
puts "\nStep 5: Cleaning up temporary table..."
|
|
conn.execute("DROP TABLE events_updates")
|
|
puts "✓ Temporary table dropped"
|
|
|
|
ensure
|
|
# Clean up temp file
|
|
temp_csv.unlink if temp_csv
|
|
end
|
|
|
|
puts "\nStep 6: Verifying backfill..."
|
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NOT NULL OR response_status IS NOT NULL OR rule_id IS NOT NULL")
|
|
filled_count = result.first&.first || 0
|
|
|
|
result = conn.query("SELECT COUNT(*) FROM events WHERE request_method IS NULL AND response_status IS NULL AND rule_id IS NULL")
|
|
still_null_count = result.first&.first || 0
|
|
|
|
puts " Events with new columns populated: #{filled_count}"
|
|
puts " Events still with NULL columns: #{still_null_count}"
|
|
|
|
if still_null_count > 0
|
|
puts "\n⚠ Note: #{still_null_count} events still have NULL values."
|
|
puts " This is normal if those events don't exist in PostgreSQL anymore"
|
|
puts " (they may have been cleaned up due to retention policy)"
|
|
else
|
|
puts "\n✓ Backfill complete! All events have new columns populated."
|
|
end
|
|
end
|
|
|
|
puts "\n" + "=" * 60
|
|
puts "Backfill complete!"
|
|
puts "\nNext steps:"
|
|
puts "1. Test the events index page to verify everything works"
|
|
puts "2. Monitor performance improvements from DuckDB queries"
|