Accepts incoming events and correctly parses them into events. GeoLite2 integration complete"
This commit is contained in:
@@ -88,40 +88,63 @@ class Event < ApplicationRecord
|
||||
after_validation :normalize_event_fields, if: :should_normalize?
|
||||
|
||||
def self.create_from_waf_payload!(event_id, payload, project)
|
||||
# Normalize headers in payload during import phase
|
||||
normalized_payload = normalize_payload_headers(payload)
|
||||
|
||||
# Create the WAF request event
|
||||
create!(
|
||||
project: project,
|
||||
event_id: event_id,
|
||||
timestamp: parse_timestamp(payload["timestamp"]),
|
||||
payload: payload,
|
||||
timestamp: parse_timestamp(normalized_payload["timestamp"]),
|
||||
payload: normalized_payload,
|
||||
|
||||
# WAF-specific fields
|
||||
ip_address: payload.dig("request", "ip"),
|
||||
user_agent: payload.dig("request", "headers", "User-Agent"),
|
||||
request_method: payload.dig("request", "method")&.downcase,
|
||||
request_path: payload.dig("request", "path"),
|
||||
request_url: payload.dig("request", "url"),
|
||||
request_protocol: payload.dig("request", "protocol"),
|
||||
response_status: payload.dig("response", "status_code"),
|
||||
response_time_ms: payload.dig("response", "duration_ms"),
|
||||
waf_action: normalize_action(payload["waf_action"]), # Normalize incoming action values
|
||||
rule_matched: payload["rule_matched"],
|
||||
blocked_reason: payload["blocked_reason"],
|
||||
ip_address: normalized_payload.dig("request", "ip"),
|
||||
user_agent: normalized_payload.dig("request", "headers", "user-agent") || normalized_payload.dig("request", "headers", "User-Agent"),
|
||||
# request_method will be set by extract_fields_from_payload + normalize_event_fields
|
||||
request_path: normalized_payload.dig("request", "path"),
|
||||
request_url: normalized_payload.dig("request", "url"),
|
||||
# request_protocol will be set by extract_fields_from_payload + normalize_event_fields
|
||||
response_status: normalized_payload.dig("response", "status_code"),
|
||||
response_time_ms: normalized_payload.dig("response", "duration_ms"),
|
||||
waf_action: normalize_action(normalized_payload["waf_action"]), # Normalize incoming action values
|
||||
rule_matched: normalized_payload["rule_matched"],
|
||||
blocked_reason: normalized_payload["blocked_reason"],
|
||||
|
||||
# Server/Environment info
|
||||
server_name: payload["server_name"],
|
||||
environment: payload["environment"],
|
||||
server_name: normalized_payload["server_name"],
|
||||
environment: normalized_payload["environment"],
|
||||
|
||||
# Geographic data
|
||||
country_code: payload.dig("geo", "country_code"),
|
||||
city: payload.dig("geo", "city"),
|
||||
country_code: normalized_payload.dig("geo", "country_code"),
|
||||
city: normalized_payload.dig("geo", "city"),
|
||||
|
||||
# WAF agent info
|
||||
agent_version: payload.dig("agent", "version"),
|
||||
agent_name: payload.dig("agent", "name")
|
||||
agent_version: normalized_payload.dig("agent", "version"),
|
||||
agent_name: normalized_payload.dig("agent", "name")
|
||||
)
|
||||
end
|
||||
|
||||
# Normalize headers in payload to lower case during import phase
|
||||
def self.normalize_payload_headers(payload)
|
||||
return payload unless payload.is_a?(Hash)
|
||||
|
||||
# Create a deep copy to avoid modifying the original
|
||||
normalized = Marshal.load(Marshal.dump(payload))
|
||||
|
||||
# Normalize request headers
|
||||
if normalized.dig("request", "headers")&.is_a?(Hash)
|
||||
normalized["request"]["headers"] = normalized["request"]["headers"].transform_keys(&:downcase)
|
||||
end
|
||||
|
||||
# Normalize response headers if they exist
|
||||
if normalized.dig("response", "headers")&.is_a?(Hash)
|
||||
normalized["response"]["headers"] = normalized["response"]["headers"].transform_keys(&:downcase)
|
||||
end
|
||||
|
||||
normalized
|
||||
end
|
||||
|
||||
def self.normalize_action(action)
|
||||
return "allow" if action.nil? || action.blank?
|
||||
|
||||
@@ -195,7 +218,8 @@ class Event < ApplicationRecord
|
||||
end
|
||||
|
||||
def headers
|
||||
payload&.dig("request", "headers") || {}
|
||||
raw_headers = payload&.dig("request", "headers") || {}
|
||||
normalize_headers(raw_headers)
|
||||
end
|
||||
|
||||
def query_params
|
||||
@@ -237,6 +261,69 @@ class Event < ApplicationRecord
|
||||
URI.parse(request_url).hostname rescue nil
|
||||
end
|
||||
|
||||
# Normalize headers to lower case keys during import phase
|
||||
def normalize_headers(headers)
|
||||
return {} unless headers.is_a?(Hash)
|
||||
|
||||
headers.transform_keys(&:downcase)
|
||||
end
|
||||
|
||||
# GeoIP enrichment methods
|
||||
def enrich_geo_location!
|
||||
return if ip_address.blank?
|
||||
return if country_code.present? # Already has geo data
|
||||
|
||||
country = GeoIpService.lookup_country(ip_address)
|
||||
update!(country_code: country) if country.present?
|
||||
rescue => e
|
||||
Rails.logger.error "Failed to enrich geo location for event #{id}: #{e.message}"
|
||||
end
|
||||
|
||||
# Class method to enrich multiple events
|
||||
def self.enrich_geo_location_batch(events = nil)
|
||||
events ||= where(country_code: [nil, '']).where.not(ip_address: [nil, ''])
|
||||
geo_service = GeoIpService.new
|
||||
updated_count = 0
|
||||
|
||||
events.find_each do |event|
|
||||
next if event.country_code.present?
|
||||
|
||||
country = geo_service.lookup_country(event.ip_address)
|
||||
if country.present?
|
||||
event.update!(country_code: country)
|
||||
updated_count += 1
|
||||
end
|
||||
end
|
||||
|
||||
updated_count
|
||||
end
|
||||
|
||||
# Lookup country code for this event's IP
|
||||
def lookup_country
|
||||
return country_code if country_code.present?
|
||||
return nil if ip_address.blank?
|
||||
|
||||
GeoIpService.lookup_country(ip_address)
|
||||
rescue => e
|
||||
Rails.logger.error "GeoIP lookup failed for #{ip_address}: #{e.message}"
|
||||
nil
|
||||
end
|
||||
|
||||
# Check if event has valid geo location data
|
||||
def has_geo_data?
|
||||
country_code.present? || city.present?
|
||||
end
|
||||
|
||||
# Get full geo location details
|
||||
def geo_location
|
||||
{
|
||||
country_code: country_code,
|
||||
city: city,
|
||||
ip_address: ip_address,
|
||||
has_data: has_geo_data?
|
||||
}
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def should_normalize?
|
||||
@@ -257,7 +344,12 @@ class Event < ApplicationRecord
|
||||
response_data = payload.dig("response") || {}
|
||||
|
||||
self.ip_address = request_data["ip"]
|
||||
self.user_agent = request_data.dig("headers", "User-Agent")
|
||||
|
||||
# Extract user agent with header name standardization
|
||||
headers = request_data["headers"] || {}
|
||||
normalized_headers = normalize_headers(headers)
|
||||
self.user_agent = normalized_headers["user-agent"] || normalized_headers["User-Agent"]
|
||||
|
||||
self.request_path = request_data["path"]
|
||||
self.request_url = request_data["url"]
|
||||
self.response_status = response_data["status_code"]
|
||||
@@ -265,10 +357,11 @@ class Event < ApplicationRecord
|
||||
self.rule_matched = payload["rule_matched"]
|
||||
self.blocked_reason = payload["blocked_reason"]
|
||||
|
||||
# Store original values for normalization (these will be normalized to IDs)
|
||||
@raw_request_method = request_data["method"]
|
||||
@raw_request_protocol = request_data["protocol"]
|
||||
@raw_action = payload["waf_action"]
|
||||
# Store original values for normalization only if they don't exist yet
|
||||
# This prevents overwriting during multiple callback runs
|
||||
@raw_request_method ||= request_data["method"]
|
||||
@raw_request_protocol ||= request_data["protocol"]
|
||||
@raw_action ||= payload["waf_action"]
|
||||
|
||||
# Extract server/environment info
|
||||
self.server_name = payload["server_name"]
|
||||
|
||||
Reference in New Issue
Block a user