Files
velour/docs/phases/phase_3.md
Dan Milne 88a906064f
Some checks failed
CI / scan_ruby (push) Has been cancelled
CI / scan_js (push) Has been cancelled
CI / lint (push) Has been cancelled
CI / test (push) Has been cancelled
CI / system-test (push) Has been cancelled
Much base work started
2025-10-31 14:36:14 +11:00

20 KiB

Velour Phase 3: Remote Sources & Import

Phase 3 extends the video library to support remote storage sources like S3, JellyFin servers, and web directories. This allows users to access and import videos from multiple locations.

Extended Storage Architecture

New Storage Location Types

class StorageLocation < ApplicationRecord
  has_many :videos, dependent: :destroy

  validates :name, presence: true
  validates :storage_type, presence: true, inclusion: { in: %w[local s3 jellyfin web] }

  store :configuration, accessors: [
    # S3 configuration
    :bucket, :region, :access_key_id, :secret_access_key, :endpoint,
    # JellyFin configuration
    :server_url, :api_key, :username,
    # Web directory configuration
    :base_url, :auth_type, :username, :password, :headers
  ], coder: JSON

  # Storage-type specific validations
  validate :validate_s3_configuration, if: -> { s3? }
  validate :validate_jellyfin_configuration, if: -> { jellyfin? }
  validate :validate_web_configuration, if: -> { web? }

  enum storage_type: { local: 0, s3: 1, jellyfin: 2, web: 3 }

  def accessible?
    case storage_type
    when 'local'
      File.exist?(path) && File.readable?(path)
    when 's3'
      s3_client&.bucket(bucket)&.exists?
    when 'jellyfin'
      jellyfin_client&.ping?
    when 'web'
      web_accessible?
    else
      false
    end
  end

  def scanner
    case storage_type
    when 'local'
      LocalFileScanner.new(self)
    when 's3'
      S3Scanner.new(self)
    when 'jellyfin'
      JellyFinScanner.new(self)
    when 'web'
      WebDirectoryScanner.new(self)
    end
  end

  def streamer
    case storage_type
    when 'local'
      LocalStreamer.new(self)
    when 's3'
      S3Streamer.new(self)
    when 'jellyfin'
      JellyFinStreamer.new(self)
    when 'web'
      WebStreamer.new(self)
    end
  end

  private

  def validate_s3_configuration
    %w[bucket region access_key_id secret_access_key].each do |field|
      errors.add(:configuration, "#{field} is required for S3 storage") if send(field).blank?
    end
  end

  def validate_jellyfin_configuration
    %w[server_url api_key].each do |field|
      errors.add(:configuration, "#{field} is required for JellyFin storage") if send(field).blank?
    end
  end

  def validate_web_configuration
    errors.add(:configuration, "base_url is required for web storage") if base_url.blank?
  end
end

S3 Storage Implementation

S3 Scanner Service

class S3Scanner
  def initialize(storage_location)
    @storage_location = storage_location
    @client = s3_client
  end

  def scan
    return failure_result("S3 bucket not accessible") unless @storage_location.accessible?

    video_files = find_video_files_in_s3
    new_videos = process_s3_files(video_files)

    success_result(new_videos)
  rescue Aws::Errors::ServiceError => e
    failure_result("S3 error: #{e.message}")
  end

  private

  def s3_client
    @s3_client ||= Aws::S3::Client.new(
      region: @storage_location.region,
      access_key_id: @storage_location.access_key_id,
      secret_access_key: @storage_location.secret_access_key,
      endpoint: @storage_location.endpoint # Optional for S3-compatible services
    )
  end

  def find_video_files_in_s3
    bucket = Aws::S3::Bucket.new(@storage_location.bucket, client: s3_client)

    video_extensions = %w[.mp4 .avi .mkv .mov .wmv .flv .webm .m4v]

    bucket.objects(prefix: "")
         .select { |obj| video_extensions.any? { |ext| obj.key.end_with?(ext) } }
         .to_a
  end

  def process_s3_files(s3_objects)
    new_videos = []

    s3_objects.each do |s3_object|
      filename = File.basename(s3_object.key)

      next if Video.exists?(filename: filename, storage_location: @storage_location)

      video = Video.create!(
        filename: filename,
        storage_location: @storage_location,
        work: Work.find_or_create_by(title: extract_title(filename)),
        file_size: s3_object.size,
        video_metadata: {
          remote_url: s3_object.key,
          last_modified: s3_object.last_modified
        }
      )

      new_videos << video
      VideoProcessorJob.perform_later(video.id)
    end

    new_videos
  end

  def extract_title(filename)
    File.basename(filename, ".*").gsub(/[\[\(].*?[\]\)]/, "").strip
  end

  def success_result(videos = [])
    { success: true, videos: videos, message: "Found #{videos.length} new videos in S3" }
  end

  def failure_result(message)
    { success: false, message: message }
  end
end

S3 Streamer

class S3Streamer
  def initialize(storage_location)
    @storage_location = storage_location
    @client = s3_client
  end

  def stream(video, range: nil)
    s3_object = s3_object_for_video(video)

    if range
      # Handle byte-range requests for seeking
      range_header = "bytes=#{range}"
      resp = @client.get_object(
        bucket: @storage_location.bucket,
        key: video.video_metadata['remote_url'],
        range: range_header
      )

      {
        body: resp.body,
        status: 206, # Partial content
        headers: {
          'Content-Range' => "bytes #{range}/#{s3_object.size}",
          'Content-Length' => resp.content_length,
          'Accept-Ranges' => 'bytes',
          'Content-Type' => 'video/mp4'
        }
      }
    else
      resp = @client.get_object(
        bucket: @storage_location.bucket,
        key: video.video_metadata['remote_url']
      )

      {
        body: resp.body,
        status: 200,
        headers: {
          'Content-Length' => resp.content_length,
          'Content-Type' => 'video/mp4'
        }
      }
    end
  end

  def presigned_url(video, expires_in: 1.hour)
    signer = Aws::S3::Presigner.new(client: @client)

    signer.presigned_url(
      :get_object,
      bucket: @storage_location.bucket,
      key: video.video_metadata['remote_url'],
      expires_in: expires_in.to_i
    )
  end

  private

  def s3_client
    @s3_client ||= Aws::S3::Client.new(
      region: @storage_location.region,
      access_key_id: @storage_location.access_key_id,
      secret_access_key: @storage_location.secret_access_key,
      endpoint: @storage_location.endpoint
    )
  end

  def s3_object_for_video(video)
    @client.get_object(
      bucket: @storage_location.bucket,
      key: video.video_metadata['remote_url']
    )
  end
end

JellyFin Integration

JellyFin Client

class JellyFinClient
  def initialize(server_url:, api_key:, username: nil)
    @server_url = server_url.chomp('/')
    @api_key = api_key
    @username = username
    @http = Faraday.new(url: @server_url) do |faraday|
      faraday.headers['X-Emby-Token'] = @api_key
      faraday.adapter Faraday.default_adapter
    end
  end

  def ping?
    response = @http.get('/System/Ping')
    response.success?
  rescue
    false
  end

  def libraries
    response = @http.get('/Library/VirtualFolders')
    JSON.parse(response.body)
  end

  def movies(library_id = nil)
    path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Movie&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Movie&Recursive=true"
    response = @http.get(path)
    JSON.parse(response.body)['Items']
  end

  def tv_shows(library_id = nil)
    path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Series&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Series&Recursive=true"
    response = @http.get(path)
    JSON.parse(response.body)['Items']
  end

  def episodes(show_id)
    response = @http.get("/Shows/#{show_id}/Episodes?UserId=#{user_id}")
    JSON.parse(response.body)['Items']
  end

  def streaming_url(item_id)
    "#{@server_url}/Videos/#{item_id}/stream?Static=true&MediaSourceId=#{item_id}&DeviceId=Velour&api_key=#{@api_key}"
  end

  def item_details(item_id)
    response = @http.get("/Users/#{user_id}/Items/#{item_id}")
    JSON.parse(response.body)
  end

  private

  def user_id
    @user_id ||= begin
      response = @http.get('/Users')
      users = JSON.parse(response.body)

      if @username
        user = users.find { |u| u['Name'] == @username }
        user&.dig('Id') || users.first['Id']
      else
        users.first['Id']
      end
    end
  end
end

JellyFin Scanner

class JellyFinScanner
  def initialize(storage_location)
    @storage_location = storage_location
    @client = jellyfin_client
  end

  def scan
    return failure_result("JellyFin server not accessible") unless @storage_location.accessible?

    movies = @client.movies
    shows = @client.tv_shows
    episodes = []

    shows.each do |show|
      episodes.concat(@client.episodes(show['Id']))
    end

    all_items = movies + episodes
    new_videos = process_jellyfin_items(all_items)

    success_result(new_videos)
  rescue => e
    failure_result("JellyFin error: #{e.message}")
  end

  private

  def jellyfin_client
    @client ||= JellyFinClient.new(
      server_url: @storage_location.server_url,
      api_key: @storage_location.api_key,
      username: @storage_location.username
    )
  end

  def process_jellyfin_items(items)
    new_videos = []

    items.each do |item|
      next unless item['MediaType'] == 'Video'

      title = item['Name']
      year = item['ProductionYear']
      work_title = year ? "#{title} (#{year})" : title

      work = Work.find_or_create_by(title: work_title) do |w|
        w.year = year
        w.description = item['Overview']
      end

      video = Video.find_or_initialize_by(
        filename: item['Id'],
        storage_location: @storage_location
      )

      if video.new_record?
        video.update!(
          work: work,
          video_metadata: {
            jellyfin_id: item['Id'],
            media_type: item['Type'],
            runtime: item['RunTimeTicks'] ? item['RunTimeTicks'] / 10_000_000 : nil,
            premiere_date: item['PremiereDate'],
            community_rating: item['CommunityRating'],
            genres: item['Genres']
          }
        )

        new_videos << video
        VideoProcessorJob.perform_later(video.id)
      end
    end

    new_videos
  end

  def success_result(videos = [])
    { success: true, videos: videos, message: "Found #{videos.length} new videos from JellyFin" }
  end

  def failure_result(message)
    { success: false, message: message }
  end
end

JellyFin Streamer

class JellyFinStreamer
  def initialize(storage_location)
    @storage_location = storage_location
    @client = jellyfin_client
  end

  def stream(video, range: nil)
    jellyfin_id = video.video_metadata['jellyfin_id']
    stream_url = @client.streaming_url(jellyfin_id)

    # For JellyFin, we typically proxy the stream
    if range
      proxy_stream_with_range(stream_url, range)
    else
      proxy_stream(stream_url)
    end
  end

  private

  def jellyfin_client
    @client ||= JellyFinClient.new(
      server_url: @storage_location.server_url,
      api_key: @storage_location.api_key,
      username: @storage_location.username
    )
  end

  def proxy_stream(url)
    response = Faraday.get(url)

    {
      body: response.body,
      status: response.status,
      headers: response.headers
    }
  end

  def proxy_stream_with_range(url, range)
    response = Faraday.get(url, nil, { 'Range' => "bytes=#{range}" })

    {
      body: response.body,
      status: response.status,
      headers: response.headers
    }
  end
end

Video Import System

Import Job with Progress Tracking

class VideoImportJob < ApplicationJob
  include ActiveJob::Statuses

  def perform(video_id, destination_storage_location_id)
    video = Video.find(video_id)
    destination = StorageLocation.find(destination_storage_location_id)

    progress.update(stage: "download", total: 100, current: 0)

    # Download file from source
    downloaded_file = download_video(video, destination) do |current, total|
      progress.update(current: (current.to_f / total * 50).to_i) # Download is 50% of progress
    end

    progress.update(stage: "process", total: 100, current: 50)

    # Create new video record in destination
    new_video = Video.create!(
      filename: video.filename,
      storage_location: destination,
      work: video.work,
      file_size: video.file_size
    )

    # Copy file to destination
    destination_path = File.join(destination.path, video.filename)
    FileUtils.cp(downloaded_file.path, destination_path)

    # Process the new video
    VideoProcessorJob.perform_later(new_video.id)

    progress.update(stage: "complete", total: 100, current: 100)

    # Clean up temp file
    File.delete(downloaded_file.path)
  end

  private

  def download_video(video, destination, &block)
    case video.storage_location.storage_type
    when 's3'
      download_from_s3(video, &block)
    when 'jellyfin'
      download_from_jellyfin(video, &block)
    when 'web'
      download_from_web(video, &block)
    else
      raise "Unsupported import from #{video.storage_location.storage_type}"
    end
  end

  def download_from_s3(video, &block)
    temp_file = Tempfile.new(['video_import', File.extname(video.filename)])

    s3_client = Aws::S3::Client.new(
      region: video.storage_location.region,
      access_key_id: video.storage_location.access_key_id,
      secret_access_key: video.storage_location.secret_access_key
    )

    s3_client.get_object(
      bucket: video.storage_location.bucket,
      key: video.video_metadata['remote_url'],
      response_target: temp_file.path
    ) do |chunk|
      yield(chunk.bytes_written, chunk.size) if block_given?
    end

    temp_file
  end

  def download_from_jellyfin(video, &block)
    temp_file = Tempfile.new(['video_import', File.extname(video.filename)])

    jellyfin_id = video.video_metadata['jellyfin_id']
    client = JellyFinClient.new(
      server_url: video.storage_location.server_url,
      api_key: video.storage_location.api_key
    )

    stream_url = client.streaming_url(jellyfin_id)

    # Download with progress tracking
    uri = URI(stream_url)
    Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
      request = Net::HTTP::Get.new(uri)
      http.request(request) do |response|
        total_size = response['Content-Length'].to_i
        downloaded = 0

        response.read_body do |chunk|
          temp_file.write(chunk)
          downloaded += chunk.bytesize
          yield(downloaded, total_size) if block_given?
        end
      end
    end

    temp_file
  end
end

Import UI

<!-- app/views/videos/_import_button.html.erb -->
<% if video.storage_location.remote? && current_user.admin? %>
  <div data-controller="import-dialog">
    <button
      data-action="click->import-dialog#show"
      class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded">
      Import to Local Storage
    </button>

    <div
      data-import-dialog-target="dialog"
      class="hidden fixed inset-0 bg-gray-600 bg-opacity-50 overflow-y-auto h-full w-full z-50">
      <div class="relative top-20 mx-auto p-5 border w-96 shadow-lg rounded-md bg-white">
        <h3 class="text-lg font-bold text-gray-900 mb-4">Import Video</h3>

        <p class="text-gray-600 mb-4">
          Import "<%= video.filename %>" to a local storage location for offline access and transcoding.
        </p>

        <div class="mb-4">
          <label class="block text-gray-700 text-sm font-bold mb-2">
            Destination Storage:
          </label>
          <select
            name="destination_storage_location_id"
            data-import-dialog-target="destination"
            class="shadow appearance-none border rounded w-full py-2 px-3 text-gray-700 leading-tight">
            <% StorageLocation.local.accessible.each do |location| %>
              <option value="<%= location.id %>"><%= location.name %></option>
            <% end %>
          </select>
        </div>

        <div class="flex justify-end space-x-2">
          <button
            data-action="click->import-dialog#hide"
            class="bg-gray-500 hover:bg-gray-700 text-white font-bold py-2 px-4 rounded">
            Cancel
          </button>
          <button
            data-action="click->import-dialog#import"
            data-video-id="<%= video.id %>"
            class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded">
            Import
          </button>
        </div>
      </div>
    </div>

    <!-- Progress display -->
    <div
      data-import-dialog-target="progress"
      class="hidden mt-2">
      <div class="bg-blue-100 border-l-4 border-blue-500 text-blue-700 p-4">
        <p class="font-bold">Importing video...</p>
        <div class="mt-2">
          <div class="bg-blue-200 rounded-full h-2">
            <div
              data-import-dialog-target="progressBar"
              class="bg-blue-600 h-2 rounded-full transition-all duration-300"
              style="width: 0%"></div>
          </div>
          <p class="text-sm mt-1" data-import-dialog-target="progressText">Starting...</p>
        </div>
      </div>
    </div>
  </div>
<% end %>

Import Stimulus Controller

// app/javascript/controllers/import_dialog_controller.js
import { Controller } from "@hotwired/stimulus"
import { get } from "@rails/request.js"

export default class extends Controller {
  static targets = ["dialog", "progress", "progressBar", "progressText", "destination"]

  show() {
    this.dialogTarget.classList.remove("hidden")
  }

  hide() {
    this.dialogTarget.classList.add("hidden")
  }

  async import(event) {
    const videoId = event.target.dataset.videoId
    const destinationId = this.destinationTarget.value

    this.hide()
    this.progressTarget.classList.remove("hidden")

    try {
      // Start import job
      const response = await post("/videos/import", {
        body: JSON.stringify({
          video_id: videoId,
          destination_storage_location_id: destinationId
        })
      })

      const { jobId } = await response.json

      // Poll for progress
      this.pollProgress(jobId)
    } catch (error) {
      console.error("Import failed:", error)
      this.progressTarget.innerHTML = `
        <div class="bg-red-100 border-l-4 border-red-500 text-red-700 p-4">
          <p class="font-bold">Import failed</p>
          <p class="text-sm">${error.message}</p>
        </div>
      `
    }
  }

  async pollProgress(jobId) {
    const updateProgress = async () => {
      try {
        const response = await get(`/jobs/${jobId}/progress`)
        const progress = await response.json

        this.progressBarTarget.style.width = `${progress.current}%`
        this.progressTextTarget.textContent = `${progress.stage}: ${progress.current}%`

        if (progress.stage === "complete") {
          this.progressTarget.innerHTML = `
            <div class="bg-green-100 border-l-4 border-green-500 text-green-700 p-4">
              <p class="font-bold">Import complete!</p>
            </div>
          `
          setTimeout(() => {
            window.location.reload()
          }, 2000)
        } else if (progress.stage === "failed") {
          this.progressTarget.innerHTML = `
            <div class="bg-red-100 border-l-4 border-red-500 text-red-700 p-4">
              <p class="font-bold">Import failed</p>
              <p class="text-sm">${progress.error || "Unknown error"}</p>
            </div>
          `
        } else {
          setTimeout(updateProgress, 1000)
        }
      } catch (error) {
        console.error("Failed to get progress:", error)
        setTimeout(updateProgress, 1000)
      }
    }

    updateProgress()
  }
}

Environment Configuration

New Environment Variables

# S3 Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_DEFAULT_REGION=us-east-1

# Import Settings
MAX_IMPORT_SIZE_GB=10
IMPORT_TIMEOUT_MINUTES=60

# Rate limiting
JELLYFIN_RATE_LIMIT_DELAY=1 # seconds between requests

New Gems

# Gemfile
gem 'aws-sdk-s3', '~> 1'
gem 'faraday', '~> 2.0'
gem 'httparty', '~> 0.21'

Routes for Phase 3

# Add to existing routes
resources :videos, only: [] do
  member do
    post :import
  end
end

namespace :admin do
  resources :storage_locations do
    member do
      post :test_connection
    end
  end
end

Phase 3 enables users to access video libraries from multiple remote sources while maintaining a unified interface. The import system allows bringing remote videos into local storage for offline access and transcoding.