90 lines
3.1 KiB
Ruby
90 lines
3.1 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
# Background job to sync events from PostgreSQL to DuckDB
|
|
# Runs every 5 minutes to keep analytics database up-to-date
|
|
# Uses watermark tracking to only sync new events
|
|
class SyncEventsToDuckdbJob < ApplicationJob
|
|
queue_as :default
|
|
|
|
# Key for storing last sync timestamp in Rails cache
|
|
WATERMARK_CACHE_KEY = "duckdb_last_sync_time"
|
|
WATERMARK_TTL = 1.week
|
|
|
|
# Overlap window to catch late-arriving events
|
|
SYNC_OVERLAP = 1.minute
|
|
|
|
def perform
|
|
service = AnalyticsDuckdbService.instance
|
|
|
|
# Determine where to start syncing
|
|
from_timestamp = determine_sync_start_time(service)
|
|
|
|
Rails.logger.info "[DuckDB Sync] Starting sync from #{from_timestamp}"
|
|
|
|
# Sync new events using PostgreSQL cursor + DuckDB Appender
|
|
# (setup_schema is called internally within sync_new_events)
|
|
count = service.sync_new_events(from_timestamp)
|
|
|
|
# Update watermark if we synced any events
|
|
if count > 0
|
|
update_last_sync_time
|
|
Rails.logger.info "[DuckDB Sync] Successfully synced #{count} events"
|
|
else
|
|
Rails.logger.info "[DuckDB Sync] No new events to sync"
|
|
end
|
|
rescue StandardError => e
|
|
Rails.logger.error "[DuckDB Sync] Job failed: #{e.message}"
|
|
Rails.logger.error e.backtrace.join("\n")
|
|
raise # Re-raise to mark job as failed in Solid Queue
|
|
end
|
|
|
|
private
|
|
|
|
# Determine timestamp to start syncing from
|
|
# Strategy:
|
|
# 1. First run (DuckDB empty): sync from oldest PostgreSQL event
|
|
# 2. Subsequent runs: sync from last watermark with overlap
|
|
def determine_sync_start_time(service)
|
|
oldest_duckdb = service.oldest_event_timestamp
|
|
|
|
if oldest_duckdb.nil?
|
|
# DuckDB is empty - this is the first sync
|
|
# Start from oldest PostgreSQL event (or reasonable cutoff)
|
|
oldest_pg = Event.minimum(:timestamp)
|
|
|
|
if oldest_pg.nil?
|
|
# No events in PostgreSQL at all
|
|
Rails.logger.warn "[DuckDB Sync] No events found in PostgreSQL"
|
|
1.day.ago # Default to recent window
|
|
else
|
|
Rails.logger.info "[DuckDB Sync] First sync - starting from oldest event: #{oldest_pg}"
|
|
oldest_pg
|
|
end
|
|
else
|
|
# DuckDB has data - sync from last watermark with overlap
|
|
last_sync = Rails.cache.read(WATERMARK_CACHE_KEY)
|
|
|
|
if last_sync.nil?
|
|
# Watermark not in cache (maybe cache expired or restarted)
|
|
# Fall back to newest event in DuckDB
|
|
newest_duckdb = service.newest_event_timestamp
|
|
start_time = newest_duckdb ? newest_duckdb - SYNC_OVERLAP : oldest_duckdb
|
|
Rails.logger.info "[DuckDB Sync] Watermark not found, using newest DuckDB event: #{start_time}"
|
|
start_time
|
|
else
|
|
# Normal case: use watermark with overlap to catch late arrivals
|
|
start_time = last_sync - SYNC_OVERLAP
|
|
Rails.logger.debug "[DuckDB Sync] Using watermark: #{last_sync} (with #{SYNC_OVERLAP}s overlap)"
|
|
start_time
|
|
end
|
|
end
|
|
end
|
|
|
|
# Update last sync watermark in cache
|
|
def update_last_sync_time
|
|
now = Time.current
|
|
Rails.cache.write(WATERMARK_CACHE_KEY, now, expires_in: WATERMARK_TTL)
|
|
Rails.logger.debug "[DuckDB Sync] Updated watermark to #{now}"
|
|
end
|
|
end
|