205 lines
5.6 KiB
Ruby
205 lines
5.6 KiB
Ruby
require 'csv'
|
|
|
|
class GeoliteAsnImporter
|
|
BATCH_SIZE = 1000
|
|
|
|
def initialize(file_path, data_import:)
|
|
@file_path = file_path
|
|
@data_import = data_import
|
|
@total_records = 0
|
|
@processed_records = 0
|
|
@failed_records = 0
|
|
@errors = []
|
|
end
|
|
|
|
def import
|
|
Rails.logger.info "Starting import for file: #{@file_path}"
|
|
Rails.logger.info "File exists: #{File.exist?(@file_path)}"
|
|
Rails.logger.info "File size: #{File.size(@file_path)} bytes" if File.exist?(@file_path)
|
|
|
|
# Check if file is actually a zip by reading the magic bytes
|
|
is_zip_file = check_if_zip_file
|
|
Rails.logger.info "File is zip: #{is_zip_file}"
|
|
|
|
if is_zip_file
|
|
import_from_zip
|
|
else
|
|
import_csv_file(@file_path)
|
|
end
|
|
|
|
{
|
|
total_records: @total_records,
|
|
processed_records: @processed_records,
|
|
failed_records: @failed_records,
|
|
errors: @errors
|
|
}
|
|
end
|
|
|
|
private
|
|
|
|
def check_if_zip_file
|
|
# Check if the file starts with ZIP magic bytes (PK\x03\x04)
|
|
File.open(@file_path, 'rb') do |file|
|
|
header = file.read(4)
|
|
return header == "PK\x03\x04"
|
|
end
|
|
rescue => e
|
|
Rails.logger.error "Error checking if file is zip: #{e.message}"
|
|
false
|
|
end
|
|
|
|
def import_from_zip
|
|
require 'zip'
|
|
require 'stringio'
|
|
|
|
Rails.logger.info "Processing zip file directly: #{@file_path}"
|
|
|
|
# Read the entire ZIP file content into memory first
|
|
zip_content = File.binread(@file_path)
|
|
|
|
Zip::File.open_buffer(StringIO.new(zip_content)) do |zip_file|
|
|
zip_file.each do |entry|
|
|
if entry.name.include?('Blocks') && entry.name.end_with?('.csv')
|
|
Rails.logger.info "Processing ASN block file from zip: #{entry.name}"
|
|
process_csv_from_zip(zip_file, entry)
|
|
end
|
|
end
|
|
end
|
|
rescue => e
|
|
Rails.logger.error "Error processing ZIP file: #{e.message}"
|
|
Rails.logger.error e.backtrace.join("\n")
|
|
raise
|
|
end
|
|
|
|
def process_csv_from_zip(zip_file, entry)
|
|
zip_file.get_input_stream(entry) do |io|
|
|
# Read the entire content from the stream
|
|
content = io.read
|
|
|
|
CSV.parse(content, headers: true, header_converters: :symbol, encoding: 'UTF-8') do |row|
|
|
@total_records += 1
|
|
|
|
begin
|
|
import_record(row)
|
|
@processed_records += 1
|
|
rescue => e
|
|
@failed_records += 1
|
|
@errors << "Row #{@total_records}: #{e.message} - Data: #{row.to_h}"
|
|
end
|
|
|
|
update_progress_if_needed
|
|
end
|
|
end
|
|
end
|
|
|
|
def csv_files
|
|
if @file_path.end_with?('.zip')
|
|
# Look for extracted CSV files in the same directory
|
|
base_dir = File.dirname(@file_path)
|
|
base_name = File.basename(@file_path, '.zip')
|
|
|
|
[
|
|
File.join(base_dir, "#{base_name}-Blocks-IPv4.csv"),
|
|
File.join(base_dir, "#{base_name}-Blocks-IPv6.csv")
|
|
].select { |file| File.exist?(file) }
|
|
else
|
|
[@file_path]
|
|
end
|
|
end
|
|
|
|
def import_csv_file(csv_file)
|
|
CSV.foreach(csv_file, headers: true, header_converters: :symbol, encoding: 'UTF-8') do |row|
|
|
@total_records += 1
|
|
|
|
begin
|
|
import_record(row)
|
|
@processed_records += 1
|
|
rescue => e
|
|
@failed_records += 1
|
|
@errors << "Row #{@total_records}: #{e.message} - Data: #{row.to_h}"
|
|
|
|
# Update progress every 100 records or on error
|
|
update_progress_if_needed
|
|
end
|
|
|
|
update_progress_if_needed
|
|
end
|
|
end
|
|
|
|
def import_record(row)
|
|
network = row[:network]
|
|
asn = row[:autonomous_system_number]&.to_i
|
|
asn_org = row[:autonomous_system_organization]&.strip
|
|
|
|
unless network && asn && asn_org
|
|
raise "Missing required fields: network=#{network}, asn=#{asn}, asn_org=#{asn_org}"
|
|
end
|
|
|
|
# Validate network format
|
|
IPAddr.new(network) # This will raise if invalid
|
|
|
|
# Store raw GeoLite ASN data in network_data
|
|
geolite_asn_data = {
|
|
asn: {
|
|
autonomous_system_number: asn,
|
|
autonomous_system_organization: asn_org
|
|
}
|
|
}
|
|
|
|
# Use upsert with JSONB merge
|
|
# COALESCE handles the case where network_data might be NULL
|
|
# || is PostgreSQL's JSONB concatenation/merge operator
|
|
# jsonb_set merges the nested geolite data
|
|
NetworkRange.upsert(
|
|
{
|
|
network: network,
|
|
asn: asn,
|
|
asn_org: asn_org,
|
|
source: 'geolite_asn',
|
|
network_data: { geolite: geolite_asn_data },
|
|
updated_at: Time.current
|
|
},
|
|
unique_by: :index_network_ranges_on_network_unique,
|
|
on_duplicate: Arel.sql("
|
|
asn = EXCLUDED.asn,
|
|
asn_org = EXCLUDED.asn_org,
|
|
network_data = COALESCE(network_ranges.network_data, '{}'::jsonb) ||
|
|
jsonb_build_object('geolite',
|
|
COALESCE(network_ranges.network_data->'geolite', '{}'::jsonb) ||
|
|
EXCLUDED.network_data->'geolite'
|
|
),
|
|
updated_at = EXCLUDED.updated_at
|
|
")
|
|
)
|
|
end
|
|
|
|
def update_progress_if_needed
|
|
if (@processed_records + @failed_records) % 100 == 0
|
|
@data_import.update_progress(
|
|
processed: @processed_records,
|
|
failed: @failed_records,
|
|
total_records: @total_records,
|
|
stats: {
|
|
total_records: @total_records,
|
|
current_file: File.basename(@file_path),
|
|
recent_errors: @errors.last(5)
|
|
}
|
|
)
|
|
end
|
|
end
|
|
|
|
def extract_if_zipfile
|
|
return unless @file_path.end_with?('.zip')
|
|
|
|
require 'zip'
|
|
|
|
Zip::File.open(@file_path) do |zip_file|
|
|
zip_file.each do |entry|
|
|
if entry.name.end_with?('.csv')
|
|
extract_path = File.join(File.dirname(@file_path), entry.name)
|
|
entry.extract(extract_path)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end |