Files
baffle-hub/app/jobs/sync_events_to_duckdb_job.rb

90 lines
3.1 KiB
Ruby

# frozen_string_literal: true
# Background job to sync events from PostgreSQL to DuckDB
# Runs every 5 minutes to keep analytics database up-to-date
# Uses watermark tracking to only sync new events
class SyncEventsToDuckdbJob < ApplicationJob
queue_as :default
# Key for storing last sync timestamp in Rails cache
WATERMARK_CACHE_KEY = "duckdb_last_sync_time"
WATERMARK_TTL = 1.week
# Overlap window to catch late-arriving events
SYNC_OVERLAP = 1.minute
def perform
service = AnalyticsDuckdbService.instance
# Determine where to start syncing
from_timestamp = determine_sync_start_time(service)
Rails.logger.info "[DuckDB Sync] Starting sync from #{from_timestamp}"
# Sync new events using PostgreSQL cursor + DuckDB Appender
# (setup_schema is called internally within sync_new_events)
count = service.sync_new_events(from_timestamp)
# Update watermark if we synced any events
if count > 0
update_last_sync_time
Rails.logger.info "[DuckDB Sync] Successfully synced #{count} events"
else
Rails.logger.info "[DuckDB Sync] No new events to sync"
end
rescue StandardError => e
Rails.logger.error "[DuckDB Sync] Job failed: #{e.message}"
Rails.logger.error e.backtrace.join("\n")
raise # Re-raise to mark job as failed in Solid Queue
end
private
# Determine timestamp to start syncing from
# Strategy:
# 1. First run (DuckDB empty): sync from oldest PostgreSQL event
# 2. Subsequent runs: sync from last watermark with overlap
def determine_sync_start_time(service)
oldest_duckdb = service.oldest_event_timestamp
if oldest_duckdb.nil?
# DuckDB is empty - this is the first sync
# Start from oldest PostgreSQL event (or reasonable cutoff)
oldest_pg = Event.minimum(:timestamp)
if oldest_pg.nil?
# No events in PostgreSQL at all
Rails.logger.warn "[DuckDB Sync] No events found in PostgreSQL"
1.day.ago # Default to recent window
else
Rails.logger.info "[DuckDB Sync] First sync - starting from oldest event: #{oldest_pg}"
oldest_pg
end
else
# DuckDB has data - sync from last watermark with overlap
last_sync = Rails.cache.read(WATERMARK_CACHE_KEY)
if last_sync.nil?
# Watermark not in cache (maybe cache expired or restarted)
# Fall back to newest event in DuckDB
newest_duckdb = service.newest_event_timestamp
start_time = newest_duckdb ? newest_duckdb - SYNC_OVERLAP : oldest_duckdb
Rails.logger.info "[DuckDB Sync] Watermark not found, using newest DuckDB event: #{start_time}"
start_time
else
# Normal case: use watermark with overlap to catch late arrivals
start_time = last_sync - SYNC_OVERLAP
Rails.logger.debug "[DuckDB Sync] Using watermark: #{last_sync} (with #{SYNC_OVERLAP}s overlap)"
start_time
end
end
end
# Update last sync watermark in cache
def update_last_sync_time
now = Time.current
Rails.cache.write(WATERMARK_CACHE_KEY, now, expires_in: WATERMARK_TTL)
Rails.logger.debug "[DuckDB Sync] Updated watermark to #{now}"
end
end