20 KiB
20 KiB
Velour Phase 3: Remote Sources & Import
Phase 3 extends the video library to support remote storage sources like S3, JellyFin servers, and web directories. This allows users to access and import videos from multiple locations.
Extended Storage Architecture
New Storage Location Types
class StorageLocation < ApplicationRecord
has_many :videos, dependent: :destroy
validates :name, presence: true
validates :storage_type, presence: true, inclusion: { in: %w[local s3 jellyfin web] }
store :configuration, accessors: [
# S3 configuration
:bucket, :region, :access_key_id, :secret_access_key, :endpoint,
# JellyFin configuration
:server_url, :api_key, :username,
# Web directory configuration
:base_url, :auth_type, :username, :password, :headers
], coder: JSON
# Storage-type specific validations
validate :validate_s3_configuration, if: -> { s3? }
validate :validate_jellyfin_configuration, if: -> { jellyfin? }
validate :validate_web_configuration, if: -> { web? }
enum storage_type: { local: 0, s3: 1, jellyfin: 2, web: 3 }
def accessible?
case storage_type
when 'local'
File.exist?(path) && File.readable?(path)
when 's3'
s3_client&.bucket(bucket)&.exists?
when 'jellyfin'
jellyfin_client&.ping?
when 'web'
web_accessible?
else
false
end
end
def scanner
case storage_type
when 'local'
LocalFileScanner.new(self)
when 's3'
S3Scanner.new(self)
when 'jellyfin'
JellyFinScanner.new(self)
when 'web'
WebDirectoryScanner.new(self)
end
end
def streamer
case storage_type
when 'local'
LocalStreamer.new(self)
when 's3'
S3Streamer.new(self)
when 'jellyfin'
JellyFinStreamer.new(self)
when 'web'
WebStreamer.new(self)
end
end
private
def validate_s3_configuration
%w[bucket region access_key_id secret_access_key].each do |field|
errors.add(:configuration, "#{field} is required for S3 storage") if send(field).blank?
end
end
def validate_jellyfin_configuration
%w[server_url api_key].each do |field|
errors.add(:configuration, "#{field} is required for JellyFin storage") if send(field).blank?
end
end
def validate_web_configuration
errors.add(:configuration, "base_url is required for web storage") if base_url.blank?
end
end
S3 Storage Implementation
S3 Scanner Service
class S3Scanner
def initialize(storage_location)
@storage_location = storage_location
@client = s3_client
end
def scan
return failure_result("S3 bucket not accessible") unless @storage_location.accessible?
video_files = find_video_files_in_s3
new_videos = process_s3_files(video_files)
success_result(new_videos)
rescue Aws::Errors::ServiceError => e
failure_result("S3 error: #{e.message}")
end
private
def s3_client
@s3_client ||= Aws::S3::Client.new(
region: @storage_location.region,
access_key_id: @storage_location.access_key_id,
secret_access_key: @storage_location.secret_access_key,
endpoint: @storage_location.endpoint # Optional for S3-compatible services
)
end
def find_video_files_in_s3
bucket = Aws::S3::Bucket.new(@storage_location.bucket, client: s3_client)
video_extensions = %w[.mp4 .avi .mkv .mov .wmv .flv .webm .m4v]
bucket.objects(prefix: "")
.select { |obj| video_extensions.any? { |ext| obj.key.end_with?(ext) } }
.to_a
end
def process_s3_files(s3_objects)
new_videos = []
s3_objects.each do |s3_object|
filename = File.basename(s3_object.key)
next if Video.exists?(filename: filename, storage_location: @storage_location)
video = Video.create!(
filename: filename,
storage_location: @storage_location,
work: Work.find_or_create_by(title: extract_title(filename)),
file_size: s3_object.size,
video_metadata: {
remote_url: s3_object.key,
last_modified: s3_object.last_modified
}
)
new_videos << video
VideoProcessorJob.perform_later(video.id)
end
new_videos
end
def extract_title(filename)
File.basename(filename, ".*").gsub(/[\[\(].*?[\]\)]/, "").strip
end
def success_result(videos = [])
{ success: true, videos: videos, message: "Found #{videos.length} new videos in S3" }
end
def failure_result(message)
{ success: false, message: message }
end
end
S3 Streamer
class S3Streamer
def initialize(storage_location)
@storage_location = storage_location
@client = s3_client
end
def stream(video, range: nil)
s3_object = s3_object_for_video(video)
if range
# Handle byte-range requests for seeking
range_header = "bytes=#{range}"
resp = @client.get_object(
bucket: @storage_location.bucket,
key: video.video_metadata['remote_url'],
range: range_header
)
{
body: resp.body,
status: 206, # Partial content
headers: {
'Content-Range' => "bytes #{range}/#{s3_object.size}",
'Content-Length' => resp.content_length,
'Accept-Ranges' => 'bytes',
'Content-Type' => 'video/mp4'
}
}
else
resp = @client.get_object(
bucket: @storage_location.bucket,
key: video.video_metadata['remote_url']
)
{
body: resp.body,
status: 200,
headers: {
'Content-Length' => resp.content_length,
'Content-Type' => 'video/mp4'
}
}
end
end
def presigned_url(video, expires_in: 1.hour)
signer = Aws::S3::Presigner.new(client: @client)
signer.presigned_url(
:get_object,
bucket: @storage_location.bucket,
key: video.video_metadata['remote_url'],
expires_in: expires_in.to_i
)
end
private
def s3_client
@s3_client ||= Aws::S3::Client.new(
region: @storage_location.region,
access_key_id: @storage_location.access_key_id,
secret_access_key: @storage_location.secret_access_key,
endpoint: @storage_location.endpoint
)
end
def s3_object_for_video(video)
@client.get_object(
bucket: @storage_location.bucket,
key: video.video_metadata['remote_url']
)
end
end
JellyFin Integration
JellyFin Client
class JellyFinClient
def initialize(server_url:, api_key:, username: nil)
@server_url = server_url.chomp('/')
@api_key = api_key
@username = username
@http = Faraday.new(url: @server_url) do |faraday|
faraday.headers['X-Emby-Token'] = @api_key
faraday.adapter Faraday.default_adapter
end
end
def ping?
response = @http.get('/System/Ping')
response.success?
rescue
false
end
def libraries
response = @http.get('/Library/VirtualFolders')
JSON.parse(response.body)
end
def movies(library_id = nil)
path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Movie&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Movie&Recursive=true"
response = @http.get(path)
JSON.parse(response.body)['Items']
end
def tv_shows(library_id = nil)
path = library_id ? "/Users/#{user_id}/Items?ParentId=#{library_id}&IncludeItemTypes=Series&Recursive=true" : "/Users/#{user_id}/Items?IncludeItemTypes=Series&Recursive=true"
response = @http.get(path)
JSON.parse(response.body)['Items']
end
def episodes(show_id)
response = @http.get("/Shows/#{show_id}/Episodes?UserId=#{user_id}")
JSON.parse(response.body)['Items']
end
def streaming_url(item_id)
"#{@server_url}/Videos/#{item_id}/stream?Static=true&MediaSourceId=#{item_id}&DeviceId=Velour&api_key=#{@api_key}"
end
def item_details(item_id)
response = @http.get("/Users/#{user_id}/Items/#{item_id}")
JSON.parse(response.body)
end
private
def user_id
@user_id ||= begin
response = @http.get('/Users')
users = JSON.parse(response.body)
if @username
user = users.find { |u| u['Name'] == @username }
user&.dig('Id') || users.first['Id']
else
users.first['Id']
end
end
end
end
JellyFin Scanner
class JellyFinScanner
def initialize(storage_location)
@storage_location = storage_location
@client = jellyfin_client
end
def scan
return failure_result("JellyFin server not accessible") unless @storage_location.accessible?
movies = @client.movies
shows = @client.tv_shows
episodes = []
shows.each do |show|
episodes.concat(@client.episodes(show['Id']))
end
all_items = movies + episodes
new_videos = process_jellyfin_items(all_items)
success_result(new_videos)
rescue => e
failure_result("JellyFin error: #{e.message}")
end
private
def jellyfin_client
@client ||= JellyFinClient.new(
server_url: @storage_location.server_url,
api_key: @storage_location.api_key,
username: @storage_location.username
)
end
def process_jellyfin_items(items)
new_videos = []
items.each do |item|
next unless item['MediaType'] == 'Video'
title = item['Name']
year = item['ProductionYear']
work_title = year ? "#{title} (#{year})" : title
work = Work.find_or_create_by(title: work_title) do |w|
w.year = year
w.description = item['Overview']
end
video = Video.find_or_initialize_by(
filename: item['Id'],
storage_location: @storage_location
)
if video.new_record?
video.update!(
work: work,
video_metadata: {
jellyfin_id: item['Id'],
media_type: item['Type'],
runtime: item['RunTimeTicks'] ? item['RunTimeTicks'] / 10_000_000 : nil,
premiere_date: item['PremiereDate'],
community_rating: item['CommunityRating'],
genres: item['Genres']
}
)
new_videos << video
VideoProcessorJob.perform_later(video.id)
end
end
new_videos
end
def success_result(videos = [])
{ success: true, videos: videos, message: "Found #{videos.length} new videos from JellyFin" }
end
def failure_result(message)
{ success: false, message: message }
end
end
JellyFin Streamer
class JellyFinStreamer
def initialize(storage_location)
@storage_location = storage_location
@client = jellyfin_client
end
def stream(video, range: nil)
jellyfin_id = video.video_metadata['jellyfin_id']
stream_url = @client.streaming_url(jellyfin_id)
# For JellyFin, we typically proxy the stream
if range
proxy_stream_with_range(stream_url, range)
else
proxy_stream(stream_url)
end
end
private
def jellyfin_client
@client ||= JellyFinClient.new(
server_url: @storage_location.server_url,
api_key: @storage_location.api_key,
username: @storage_location.username
)
end
def proxy_stream(url)
response = Faraday.get(url)
{
body: response.body,
status: response.status,
headers: response.headers
}
end
def proxy_stream_with_range(url, range)
response = Faraday.get(url, nil, { 'Range' => "bytes=#{range}" })
{
body: response.body,
status: response.status,
headers: response.headers
}
end
end
Video Import System
Import Job with Progress Tracking
class VideoImportJob < ApplicationJob
include ActiveJob::Statuses
def perform(video_id, destination_storage_location_id)
video = Video.find(video_id)
destination = StorageLocation.find(destination_storage_location_id)
progress.update(stage: "download", total: 100, current: 0)
# Download file from source
downloaded_file = download_video(video, destination) do |current, total|
progress.update(current: (current.to_f / total * 50).to_i) # Download is 50% of progress
end
progress.update(stage: "process", total: 100, current: 50)
# Create new video record in destination
new_video = Video.create!(
filename: video.filename,
storage_location: destination,
work: video.work,
file_size: video.file_size
)
# Copy file to destination
destination_path = File.join(destination.path, video.filename)
FileUtils.cp(downloaded_file.path, destination_path)
# Process the new video
VideoProcessorJob.perform_later(new_video.id)
progress.update(stage: "complete", total: 100, current: 100)
# Clean up temp file
File.delete(downloaded_file.path)
end
private
def download_video(video, destination, &block)
case video.storage_location.storage_type
when 's3'
download_from_s3(video, &block)
when 'jellyfin'
download_from_jellyfin(video, &block)
when 'web'
download_from_web(video, &block)
else
raise "Unsupported import from #{video.storage_location.storage_type}"
end
end
def download_from_s3(video, &block)
temp_file = Tempfile.new(['video_import', File.extname(video.filename)])
s3_client = Aws::S3::Client.new(
region: video.storage_location.region,
access_key_id: video.storage_location.access_key_id,
secret_access_key: video.storage_location.secret_access_key
)
s3_client.get_object(
bucket: video.storage_location.bucket,
key: video.video_metadata['remote_url'],
response_target: temp_file.path
) do |chunk|
yield(chunk.bytes_written, chunk.size) if block_given?
end
temp_file
end
def download_from_jellyfin(video, &block)
temp_file = Tempfile.new(['video_import', File.extname(video.filename)])
jellyfin_id = video.video_metadata['jellyfin_id']
client = JellyFinClient.new(
server_url: video.storage_location.server_url,
api_key: video.storage_location.api_key
)
stream_url = client.streaming_url(jellyfin_id)
# Download with progress tracking
uri = URI(stream_url)
Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new(uri)
http.request(request) do |response|
total_size = response['Content-Length'].to_i
downloaded = 0
response.read_body do |chunk|
temp_file.write(chunk)
downloaded += chunk.bytesize
yield(downloaded, total_size) if block_given?
end
end
end
temp_file
end
end
Import UI
<!-- app/views/videos/_import_button.html.erb -->
<% if video.storage_location.remote? && current_user.admin? %>
<div data-controller="import-dialog">
<button
data-action="click->import-dialog#show"
class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded">
Import to Local Storage
</button>
<div
data-import-dialog-target="dialog"
class="hidden fixed inset-0 bg-gray-600 bg-opacity-50 overflow-y-auto h-full w-full z-50">
<div class="relative top-20 mx-auto p-5 border w-96 shadow-lg rounded-md bg-white">
<h3 class="text-lg font-bold text-gray-900 mb-4">Import Video</h3>
<p class="text-gray-600 mb-4">
Import "<%= video.filename %>" to a local storage location for offline access and transcoding.
</p>
<div class="mb-4">
<label class="block text-gray-700 text-sm font-bold mb-2">
Destination Storage:
</label>
<select
name="destination_storage_location_id"
data-import-dialog-target="destination"
class="shadow appearance-none border rounded w-full py-2 px-3 text-gray-700 leading-tight">
<% StorageLocation.local.accessible.each do |location| %>
<option value="<%= location.id %>"><%= location.name %></option>
<% end %>
</select>
</div>
<div class="flex justify-end space-x-2">
<button
data-action="click->import-dialog#hide"
class="bg-gray-500 hover:bg-gray-700 text-white font-bold py-2 px-4 rounded">
Cancel
</button>
<button
data-action="click->import-dialog#import"
data-video-id="<%= video.id %>"
class="bg-blue-500 hover:bg-blue-700 text-white font-bold py-2 px-4 rounded">
Import
</button>
</div>
</div>
</div>
<!-- Progress display -->
<div
data-import-dialog-target="progress"
class="hidden mt-2">
<div class="bg-blue-100 border-l-4 border-blue-500 text-blue-700 p-4">
<p class="font-bold">Importing video...</p>
<div class="mt-2">
<div class="bg-blue-200 rounded-full h-2">
<div
data-import-dialog-target="progressBar"
class="bg-blue-600 h-2 rounded-full transition-all duration-300"
style="width: 0%"></div>
</div>
<p class="text-sm mt-1" data-import-dialog-target="progressText">Starting...</p>
</div>
</div>
</div>
</div>
<% end %>
Import Stimulus Controller
// app/javascript/controllers/import_dialog_controller.js
import { Controller } from "@hotwired/stimulus"
import { get } from "@rails/request.js"
export default class extends Controller {
static targets = ["dialog", "progress", "progressBar", "progressText", "destination"]
show() {
this.dialogTarget.classList.remove("hidden")
}
hide() {
this.dialogTarget.classList.add("hidden")
}
async import(event) {
const videoId = event.target.dataset.videoId
const destinationId = this.destinationTarget.value
this.hide()
this.progressTarget.classList.remove("hidden")
try {
// Start import job
const response = await post("/videos/import", {
body: JSON.stringify({
video_id: videoId,
destination_storage_location_id: destinationId
})
})
const { jobId } = await response.json
// Poll for progress
this.pollProgress(jobId)
} catch (error) {
console.error("Import failed:", error)
this.progressTarget.innerHTML = `
<div class="bg-red-100 border-l-4 border-red-500 text-red-700 p-4">
<p class="font-bold">Import failed</p>
<p class="text-sm">${error.message}</p>
</div>
`
}
}
async pollProgress(jobId) {
const updateProgress = async () => {
try {
const response = await get(`/jobs/${jobId}/progress`)
const progress = await response.json
this.progressBarTarget.style.width = `${progress.current}%`
this.progressTextTarget.textContent = `${progress.stage}: ${progress.current}%`
if (progress.stage === "complete") {
this.progressTarget.innerHTML = `
<div class="bg-green-100 border-l-4 border-green-500 text-green-700 p-4">
<p class="font-bold">Import complete!</p>
</div>
`
setTimeout(() => {
window.location.reload()
}, 2000)
} else if (progress.stage === "failed") {
this.progressTarget.innerHTML = `
<div class="bg-red-100 border-l-4 border-red-500 text-red-700 p-4">
<p class="font-bold">Import failed</p>
<p class="text-sm">${progress.error || "Unknown error"}</p>
</div>
`
} else {
setTimeout(updateProgress, 1000)
}
} catch (error) {
console.error("Failed to get progress:", error)
setTimeout(updateProgress, 1000)
}
}
updateProgress()
}
}
Environment Configuration
New Environment Variables
# S3 Configuration
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_ACCESS_KEY=your_secret_key
AWS_DEFAULT_REGION=us-east-1
# Import Settings
MAX_IMPORT_SIZE_GB=10
IMPORT_TIMEOUT_MINUTES=60
# Rate limiting
JELLYFIN_RATE_LIMIT_DELAY=1 # seconds between requests
New Gems
# Gemfile
gem 'aws-sdk-s3', '~> 1'
gem 'faraday', '~> 2.0'
gem 'httparty', '~> 0.21'
Routes for Phase 3
# Add to existing routes
resources :videos, only: [] do
member do
post :import
end
end
namespace :admin do
resources :storage_locations do
member do
post :test_connection
end
end
end
Phase 3 enables users to access video libraries from multiple remote sources while maintaining a unified interface. The import system allows bringing remote videos into local storage for offline access and transcoding.