Files
velour/docs/phases/phase_4.md
Dan Milne 88a906064f
Some checks failed
CI / scan_ruby (push) Has been cancelled
CI / scan_js (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / test (push) Has been cancelled
CI / system-test (push) Has been cancelled
Much base work started
2025-10-31 14:36:14 +11:00

17 KiB

Velour Phase 4: Federation

Phase 4 enables federation between multiple Velour instances, allowing users to share and access video libraries across different servers while maintaining security and access controls.

Federation Architecture

Overview

Velour federation allows instances to:

  • Share video libraries with other trusted instances
  • Stream videos from remote servers
  • Sync metadata and work information
  • Maintain access control and authentication

Federation Storage Location Type

class StorageLocation < ApplicationRecord
  # ... existing code ...

  validates :storage_type, inclusion: { in: %w[local s3 jellyfin web velour] }

  store :configuration, accessors: [
    # Existing configurations...
    # Velour federation configuration
    :remote_instance_url, :api_key, :instance_name, :trusted_instances
  ], coder: JSON

  enum storage_type: { local: 0, s3: 1, jellyfin: 2, web: 3, velour: 4 }

  def federation_client
    return nil unless velour?
    @federation_client ||= VelourFederationClient.new(
      instance_url: remote_instance_url,
      api_key: api_key
    )
  end

  def accessible?
    case storage_type
    # ... existing cases ...
    when 'velour'
      federation_client&.ping?
    else
      false
    end
  end

  def scanner
    case storage_type
    # ... existing cases ...
    when 'velour'
      VelourFederationScanner.new(self)
    else
      super
    end
  end

  def streamer
    case storage_type
    # ... existing cases ...
    when 'velour'
      VelourFederationStreamer.new(self)
    else
      super
    end
  end
end

Federation API Authentication

API Key Management

class ApiKey < ApplicationRecord
  belongs_to :user
  has_many :federation_connections, dependent: :destroy

  validates :key, presence: true, uniqueness: true
  validates :name, presence: true
  validates :permissions, presence: true

  store :permissions, coder: JSON, accessors: [:can_read, :can_stream, :can_metadata]

  before_validation :generate_key, on: :create

  def self.generate_key
    SecureRandom.hex(32)
  end

  private

  def generate_key
    self.key ||= self.class.generate_key
  end
end

Federation Connections

class FederationConnection < ApplicationRecord
  belongs_to :api_key
  belongs_to :storage_location

  validates :remote_instance_url, presence: true, uniqueness: { scope: :api_key_id }
  validates :status, inclusion: { in: %w[pending active suspended rejected] }

  enum status: { pending: 0, active: 1, suspended: 2, rejected: 3 }

  def active?
    status == "active"
  end
end

Federation Client

Velour Federation Client

class VelourFederationClient
  def initialize(instance_url:, api_key:)
    @instance_url = instance_url.chomp('/')
    @api_key = api_key
    @http = Faraday.new(url: @instance_url) do |faraday|
      faraday.headers['Authorization'] = "Bearer #{@api_key}"
      faraday.headers['User-Agent'] = "Velour/#{Velour::VERSION}"
      faraday.request :json
      faraday.response :json
      faraday.adapter Faraday.default_adapter
    end
  end

  def ping?
    response = @http.get('/api/v1/ping')
    response.success?
  rescue
    false
  end

  def instance_info
    response = @http.get('/api/v1/instance')
    response.success? ? response.body : nil
  end

  def works(page: 1, per_page: 50)
    response = @http.get('/api/v1/works', params: {
      page: page,
      per_page: per_page
    })
    response.success? ? response.body : []
  end

  def work_details(work_id)
    response = @http.get("/api/v1/works/#{work_id}")
    response.success? ? response.body : nil
  end

  def video_stream_url(video_id)
    "#{@instance_url}/api/v1/videos/#{video_id}/stream"
  end

  def video_metadata(video_id)
    response = @http.get("/api/v1/videos/#{video_id}")
    response.success? ? response.body : nil
  end

  def search_videos(query:, page: 1, per_page: 20)
    response = @http.get('/api/v1/videos/search', params: {
      query: query,
      page: page,
      per_page: per_page
    })
    response.success? ? response.body : []
  end
end

Federation Scanner

Velour Federation Scanner

class VelourFederationScanner
  def initialize(storage_location)
    @storage_location = storage_location
    @client = @storage_location.federation_client
  end

  def scan
    return failure_result("Remote Velour instance not accessible") unless @storage_location.accessible?

    instance_info = @client.instance_info
    return failure_result("Failed to get instance info") unless instance_info

    works = @client.works(per_page: 100) # Start with first 100 works
    new_videos = process_remote_works(works)

    success_result(new_videos, instance_info)
  rescue => e
    failure_result("Federation error: #{e.message}")
  end

  private

  def process_remote_works(works)
    new_videos = []

    works.each do |work_data|
      # Create or find local work
      work = Work.find_or_create_by(
        title: work_data['title'],
        year: work_data['year']
      ) do |w|
        w.description = work_data['description']
        w.director = work_data['director']
        w.rating = work_data['rating']
      end

      # Process videos for this work
      work_data['videos'].each do |video_data|
        video = Video.find_or_initialize_by(
          filename: video_data['id'], # Use remote ID as filename
          storage_location: @storage_location
        )

        if video.new_record?
          video.update!(
            work: work,
            file_size: video_data['file_size'],
            web_compatible: video_data['web_compatible'],
            video_metadata: {
              remote_video_id: video_data['id'],
              remote_instance_url: @storage_location.remote_instance_url,
              duration: video_data['duration'],
              width: video_data['width'],
              height: video_data['height'],
              video_codec: video_data['video_codec'],
              audio_codec: video_data['audio_codec']
            },
            fingerprints: video_data['fingerprints']
          )

          new_videos << video
          # Note: We don't process remote videos locally
          # We just catalog them for streaming
        end
      end
    end

    new_videos
  end

  def success_result(videos = [], instance_info = {})
    {
      success: true,
      videos: videos,
      message: "Found #{videos.length} videos from #{@storage_location.instance_name}",
      instance_info: instance_info
    }
  end

  def failure_result(message)
    { success: false, message: message }
  end
end

Federation Streamer

Velour Federation Streamer

class VelourFederationStreamer
  def initialize(storage_location)
    @storage_location = storage_location
    @client = @storage_location.federation_client
  end

  def stream(video, range: nil)
    remote_video_id = video.video_metadata['remote_video_id']
    stream_url = @client.video_stream_url(remote_video_id)

    if range
      proxy_stream_with_range(stream_url, range)
    else
      proxy_stream(stream_url)
    end
  end

  def thumbnail_url(video)
    remote_video_id = video.video_metadata['remote_video_id']
    "#{@storage_location.remote_instance_url}/api/v1/videos/#{remote_video_id}/thumbnail"
  end

  private

  def proxy_stream(url)
    response = Faraday.get(url) do |req|
      req.headers['Authorization'] = "Bearer #{@storage_location.api_key}"
    end

    {
      body: response.body,
      status: response.status,
      headers: response.headers
    }
  end

  def proxy_stream_with_range(url, range)
    response = Faraday.get(url) do |req|
      req.headers['Authorization'] = "Bearer #{@storage_location.api_key}"
      req.headers['Range'] = "bytes=#{range}"
    end

    {
      body: response.body,
      status: response.status,
      headers: response.headers
    }
  end
end

Federation API Endpoints

API Controllers

# app/controllers/api/v1/base_controller.rb
class Api::V1::BaseController < ActionController::Base
  before_action :authenticate_api_request

  private

  def authenticate_api_request
    token = request.headers['Authorization']&.gsub(/^Bearer\s+/, '')
    api_key = ApiKey.find_by(key: token)

    if api_key&.active?
      @current_api_key = api_key
      @current_user = api_key.user
    else
      render json: { error: 'Unauthorized' }, status: :unauthorized
    end
  end

  def authorize_federation
    head :forbidden unless @current_api_key&.can_read
  end

  def authorize_streaming
    head :forbidden unless @current_api_key&.can_stream
  end
end

# app/controllers/api/v1/instance_controller.rb
class Api::V1::InstanceController < Api::V1::BaseController
  before_action :authorize_federation

  def ping
    render json: { status: 'ok', timestamp: Time.current.iso8601 }
  end

  def show
    render json: {
      name: 'Velour Instance',
      version: Velour::VERSION,
      description: 'Video library federation node',
      total_works: Work.count,
      total_videos: Video.count,
      total_storage: Video.sum(:file_size)
    }
  end
end

# app/controllers/api/v1/works_controller.rb
class Api::V1::WorksController < Api::V1::BaseController
  before_action :authorize_federation

  def index
    works = Work.includes(:videos)
                .order(:title)
                .page(params[:page])
                .per(params[:per_page] || 50)

    render json: {
      works: works.map { |work| serialize_work(work) },
      pagination: {
        current_page: works.current_page,
        total_pages: works.total_pages,
        total_count: works.total_count
      }
    }
  end

  def show
    work = Work.includes(videos: :storage_location).find(params[:id])
    render json: serialize_work(work)
  end

  private

  def serialize_work(work)
    {
      id: work.id,
      title: work.title,
      year: work.year,
      description: work.description,
      director: work.director,
      rating: work.rating,
      organized: work.organized,
      created_at: work.created_at.iso8601,
      videos: work.videos.map { |video| serialize_video(video) }
    }
  end

  def serialize_video(video)
    {
      id: video.id,
      filename: video.filename,
      file_size: video.file_size,
      web_compatible: video.web_compatible?,
      duration: video.duration,
      width: video.width,
      height: video.height,
      video_codec: video.video_codec,
      audio_codec: video.audio_codec,
      fingerprints: video.fingerprints,
      storage_type: video.storage_location.storage_type,
      created_at: video.created_at.iso8601
    }
  end
end

# app/controllers/api/v1/videos_controller.rb
class Api::V1::VideosController < Api::V1::BaseController
  before_action :set_video, only: [:show, :stream, :thumbnail]
  before_action :authorize_federation, except: [:stream, :thumbnail]
  before_action :authorize_streaming, only: [:stream, :thumbnail]

  def show
    render json: serialize_video(@video)
  end

  def stream
    streamer = @video.storage_location.streamer
    result = streamer.stream(@video, range: request.headers['Range'])

    send_data result[:body],
      type: result[:headers]['Content-Type'] || 'video/mp4',
      disposition: 'inline',
      status: result[:status],
      headers: result[:headers].slice('Content-Range', 'Content-Length', 'Accept-Ranges')
  end

  def thumbnail
    if @video.video_assets.where(asset_type: 'thumbnail').exists?
      asset = @video.video_assets.where(asset_type: 'thumbnail').first
      redirect_to rails_blob_url(asset.file, disposition: 'inline')
    else
      head :not_found
    end
  end

  def search
    query = params[:query]
    return render json: [] if query.blank?

    videos = Video.joins(:work)
                  .where('works.title ILIKE ?', "%#{query}%")
                  .includes(:work, :storage_location)
                  .page(params[:page])
                  .per(params[:per_page] || 20)

    render json: {
      videos: videos.map { |video| serialize_video(video) },
      pagination: {
        current_page: videos.current_page,
        total_pages: videos.total_pages,
        total_count: videos.total_count
      }
    }
  end

  private

  def set_video
    @video = Video.find(params[:id])
  end

  def serialize_video(video)
    {
      id: video.id,
      filename: video.filename,
      work_id: video.work_id,
      work_title: video.work.title,
      file_size: video.file_size,
      web_compatible: video.web_compatible?,
      duration: video.duration,
      width: video.width,
      height: video.height,
      video_codec: video.video_codec,
      audio_codec: video.audio_codec,
      fingerprints: video.fingerprints,
      storage_type: video.storage_location.storage_type,
      created_at: video.created_at.iso8601
    }
  end
end

Federation Management UI

Federation Connections Management

<!-- app/views/admin/federation_connections/index.html.erb -->
<div class="container mx-auto p-6">
  <h1 class="text-3xl font-bold mb-6">Federation Connections</h1>

  <% if @api_keys.any? %>
    <div class="mb-6">
      <%= link_to 'Create New Connection', new_admin_federation_connection_path, class: 'bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded' %>
    </div>

    <div class="bg-white shadow overflow-hidden sm:rounded-md">
      <ul class="divide-y divide-gray-200">
        <% @connections.each do |connection| %>
          <li class="px-6 py-4">
            <div class="flex items-center justify-between">
              <div>
                <p class="text-sm font-medium text-gray-900">
                  <%= connection.storage_location.name %>
                </p>
                <p class="text-sm text-gray-500">
                  <%= connection.remote_instance_url %>
                </p>
                <p class="text-sm text-gray-500">
                  API Key: <%= connection.api_key.name %>
                </p>
              </div>
              <div class="flex items-center space-x-2">
                <span class="inline-flex items-center px-2.5 py-0.5 rounded-full text-xs font-medium
                  <%= case connection.status
                      when 'active' then 'bg-green-100 text-green-800'
                      when 'pending' then 'bg-yellow-100 text-yellow-800'
                      when 'suspended' then 'bg-red-100 text-red-800'
                      when 'rejected' then 'bg-gray-100 text-gray-800'
                      end %>">
                  <%= connection.status.titleize %>
                </span>
                <%= link_to 'Edit', edit_admin_federation_connection_path(connection), class: 'text-blue-600 hover:text-blue-900' %>
                <%= link_to 'Test', test_admin_federation_connection_path(connection), method: :post, class: 'text-green-600 hover:text-green-900' %>
                <%= link_to 'Delete', admin_federation_connection_path(connection), method: :delete,
                    data: { confirm: 'Are you sure?' }, class: 'text-red-600 hover:text-red-900' %>
              </div>
            </div>
          </li>
        <% end %>
      </ul>
    </div>
  <% else %>
    <div class="bg-yellow-50 border-l-4 border-yellow-400 p-4">
      <div class="flex">
        <div class="flex-shrink-0">
          <svg class="h-5 w-5 text-yellow-400" viewBox="0 0 20 20" fill="currentColor">
            <path fill-rule="evenodd" d="M8.257 3.099c.765-1.36 2.722-1.36 3.486 0l5.58 9.92c.75 1.334-.213 2.98-1.742 2.98H4.42c-1.53 0-2.493-1.646-1.743-2.98l5.58-9.92zM11 13a1 1 0 11-2 0 1 1 0 012 0zm-1-8a1 1 0 00-1 1v3a1 1 0 002 0V6a1 1 0 00-1-1z" clip-rule="evenodd"/>
          </svg>
        </div>
        <div class="ml-3">
          <p class="text-sm text-yellow-700">
            You need to create an API key before you can establish federation connections.
          </p>
          <div class="mt-2">
            <%= link_to 'Create API Key', new_admin_api_key_path, class: 'text-yellow-700 underline hover:text-yellow-600' %>
          </div>
        </div>
      </div>
    </div>
  <% end %>
</div>

Security Considerations

Federation Security Best Practices

  1. API Key Management: Regular rotation and limited scope
  2. Access Control: Minimum required permissions
  3. Rate Limiting: Prevent abuse from federated instances
  4. Audit Logging: Track all federated access
  5. Network Security: HTTPS-only federation connections

Rate Limiting

# app/controllers/concerns/rate_limited.rb
module RateLimited
  extend ActiveSupport::Concern

  included do
    before_action :check_rate_limit, only: [:index, :show, :stream]
  end

  private

  def check_rate_limit
    return unless @current_api_key

    key = "federation_rate_limit:#{@current_api_key.id}"
    count = Rails.cache.increment(key, 1, expires_in: 1.hour)

    if count > rate_limit
      Rails.logger.warn "Rate limit exceeded for API key #{@current_api_key.id}"
      render json: { error: 'Rate limit exceeded' }, status: :too_many_requests
    end
  end

  def rate_limit
    case action_name
    when 'stream'
      1000 # requests per hour
    else
      5000 # requests per hour
    end
  end
end

Environment Configuration

Federation Configuration

# Federation settings
FEDERATION_ENABLED=true
MAX_FEDERATED_CONNECTIONS=10
FEDERATION_RATE_LIMIT_PER_HOUR=5000

# Federation security
FEDERATION_REQUIRE_HTTPS=true
FEDERATION_TOKEN_EXPIRY_HOURS=24

Phase 4 enables a distributed network of Velour instances that can share video libraries while maintaining security and access controls. This allows organizations to federate their video collections across multiple servers or locations.