Added initial version of re-encode-publish-Q-Tube.py

version 0.01 beta
This commit is contained in:
crowetic 2025-04-16 01:44:51 +00:00
parent 5922d1acfd
commit 79dac22c83

508
re-encode-publish-Q-Tube.py Normal file
View File

@ -0,0 +1,508 @@
import os
import json
import base64
import hashlib
import string
import random
import subprocess
import shlex
import requests
import getpass
import argparse
from datetime import datetime
# Configuration
API_URL = "http://localhost:12391"
VIDEO_EXTENSIONS = ['.mp4', '.mkv', '.avi', '.mov', '.flv']
SIZE_LIMIT_MB = 410
DEFAULT_CODEC = 'h264'
DEFAULT_USE_NVIDIA = False
LOG_FILE = "video-encode-and-publish.log"
# Logging
def log(msg):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
line = f"[{timestamp}] {msg}"
print(line)
with open(LOG_FILE, "a") as log_file:
log_file.write(line + "\n")
# Utility functions
def slugify(text, max_length=32):
import re
text = text.lower()
text = re.sub(r'[^a-z0-9]+', '-', text).strip('-')
return text[:max_length]
def generate_short_id(length=6):
alphabet = string.ascii_letters + string.digits
return ''.join(random.choices(alphabet, k=length))
def get_video_duration_and_size(filepath):
result = subprocess.run([
'ffprobe', '-v', 'error', '-show_entries',
'format=duration,size', '-of', 'json', filepath
], capture_output=True, text=True)
metadata = json.loads(result.stdout)
duration = float(metadata['format']['duration'])
size = int(metadata['format']['size'])
return duration, size
def generate_thumbnail(video_path, timestamp, output_suffix):
thumbnail_path_webp = f"{video_path}.{output_suffix}.webp"
thumbnail_path_jpeg = f"{video_path}.{output_suffix}.jpg"
# Attempt WebP first
try:
subprocess.run([
'ffmpeg', '-ss', str(timestamp), '-i', video_path,
'-vframes', '1', '-vf', 'scale=320:-1', '-y', thumbnail_path_webp
], check=True)
if os.path.exists(thumbnail_path_webp):
return thumbnail_path_webp
except subprocess.CalledProcessError as e:
log(f"[WARNING] WebP encoding returned error: {e}. Checking if file was written anyway...")
# Accept WebP if it was written despite error
if os.path.exists(thumbnail_path_webp):
log(f"[RECOVERED] WebP thumbnail exists despite ffmpeg error.")
return thumbnail_path_webp
# Fallback to JPEG
try:
subprocess.run([
'ffmpeg', '-ss', str(timestamp), '-i', video_path,
'-vframes', '1', '-vf', 'scale=320:-1', '-c:v', 'mjpeg', '-y', thumbnail_path_jpeg
], check=True)
if os.path.exists(thumbnail_path_jpeg):
log(f"[FALLBACK] JPEG thumbnail created: {thumbnail_path_jpeg}")
return thumbnail_path_jpeg
except subprocess.CalledProcessError as e:
log(f"[ERROR] JPEG thumbnail also failed at {timestamp}s: {e}")
log(f"[SKIPPED] No thumbnail generated for timestamp {timestamp}s.")
return None
def generate_all_thumbnails(video_path, duration):
thumbnails = []
intervals = [duration * i / 4 for i in range(1, 5)]
for idx, ts in enumerate(intervals, start=1):
thumb_path = generate_thumbnail(video_path, ts, f"thumb{idx}")
if thumb_path:
thumbnails.append(f"data:image/webp;base64,{base64_encode_file(thumb_path)}")
return thumbnails
def base64_encode_file(path):
with open(path, 'rb') as f:
return base64.b64encode(f.read()).decode('utf-8')
def get_private_key():
return getpass.getpass("Enter your QDN private key: ")
def post(endpoint, data):
response = requests.post(f"{API_URL}/{endpoint}", json=data)
response.raise_for_status()
return response.json()
def get_api_key():
key_paths = [
os.path.expanduser("~/.qortal/apikey.txt"),
os.path.expanduser("~/qortal/apikey.txt")
]
for path in key_paths:
if os.path.exists(path):
with open(path, 'r') as f:
return f.read().strip()
return input("Enter your Qortal API key: ").strip()
def build_sign_publish_from_file(service, identifier, name, file_path, private_key, dry_run=False, metadata={}):
"""
Publishes a file to QDN with metadata and signs it locally.
Args:
service: 'VIDEO' or 'DOCUMENT'
identifier: QDN identifier
name: QDN channel name
file_path: Path to file
private_key: Base58 private key
dry_run: If True, log actions but do not execute
metadata: Dictionary containing optional title, description, category, fee, preview
"""
api_key = get_api_key()
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Set defaults if metadata keys are missing
params = {}
if "title" in metadata:
params["title"] = metadata["title"]
if "fullDescription" in metadata:
params["description"] = metadata["fullDescription"]
category_val = metadata.get("category")
if isinstance(category_val, str) and category_val.strip():
params["category"] = category_val.strip()
params["fee"] = metadata.get("fee", "01000000")
params["preview"] = str(metadata.get("preview", "false")).lower()
url = f"{API_URL}/arbitrary/{service}/{name}/{identifier}"
if dry_run:
log(f"[DRY RUN] Would publish {file_path} to {url} with metadata: {params}")
return
# Step 1: Build raw transaction
headers = {
'X-API-KEY': api_key,
'Content-Type': 'text/plain',
'Accept': 'text/plain'
}
log(f"[PUBLISH TX] Creating raw transaction for {identifier}")
log(f"[API URL] {url}")
log(f"[HEADERS] {headers}")
log(f"[PARAMS] {params}")
log(f"[BODY] {file_path}")
try:
response = requests.post(
url,
headers=headers,
params=params,
data=file_path.encode()
)
except Exception as e:
log(f"[EXCEPTION] During POST to /arbitrary: {str(e)}")
raise
log(f"[RESPONSE CODE] {response.status_code}")
log(f"[RESPONSE TEXT] {response.text}")
raw_tx = response.text.strip()
# Step 2: Sign the transaction
sign_response = requests.post(
f"{API_URL}/transactions/sign",
headers={
'Content-Type': 'application/json',
'Accept': 'application/json'
},
json={
"privateKey": private_key,
"transactionBytes": raw_tx
}
)
if sign_response.status_code != 200:
log(f"[ERROR] Transaction signing failed: {sign_response.status_code} {sign_response.text}")
raise Exception("Signing transaction failed")
signed_tx = sign_response.text.strip()
log(f"[SIGNED TX] {signed_tx[:80]}...")
# Step 3: Broadcast the signed transaction
broadcast_response = requests.post(
f"{API_URL}/transactions/process",
headers={
'Content-Type': 'text/plain',
'Accept': 'text/plain'
},
data=signed_tx
)
if broadcast_response.status_code != 200:
log(f"[ERROR] Broadcast failed: {broadcast_response.status_code} {broadcast_response.text}")
raise Exception("Broadcast failed")
log(f"[PUBLISHED] {identifier} ({service})")
def encoder_available(encoder):
result = subprocess.run(['ffmpeg', '-hide_banner', '-encoders'], capture_output=True, text=True)
return encoder in result.stdout
def should_reencode(video_path):
try:
# Get codec and size
result = subprocess.run([
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=codec_name', '-of',
'default=noprint_wrappers=1:nokey=1', video_path
], capture_output=True, text=True)
codec = result.stdout.strip()
_, size_bytes = get_video_duration_and_size(video_path)
size_mb = size_bytes / (1024 * 1024)
unsupported_codecs = ['hevc', 'vp9', 'av1'] # Common ones Chromium won't hardware-decode
if codec in unsupported_codecs:
log(f"[CHECK] Re-encoding required due to unsupported codec: {codec}")
return True
if size_mb > SIZE_LIMIT_MB:
log(f"[CHECK] Re-encoding required due to file size: {round(size_mb, 2)}MB")
return True
log(f"[CHECK] Skipping re-encoding (codec: {codec}, size: {round(size_mb, 2)}MB)")
return False
except Exception as e:
log(f"[ERROR] Failed to check codec or size: {e}")
return True # Fail safe: re-encode if unsure
def reencode_video(video_path, codec=DEFAULT_CODEC, use_nvidia=DEFAULT_USE_NVIDIA):
base_name, _ = os.path.splitext(video_path)
output_path = f"{base_name}_{codec}_encoded.mp4"
duration, _ = get_video_duration_and_size(video_path)
bitrate_override = template.get("bitrate") if 'template' in locals() else None
try:
if codec == 'av1':
encoder = 'libsvtav1'
crf = "30"
if not encoder_available(encoder):
log(f"[ERROR] Encoder '{encoder}' not available. Falling back to 'libx264'.")
encoder = 'libx264'
codec = 'h264'
use_nvidia = False
elif codec == 'h264' and use_nvidia:
encoder = 'h264_nvenc'
else:
encoder = 'libx264'
if not encoder_available(encoder):
log(f"[ERROR] Encoder '{encoder}' not available. Falling back to 'libx264'.")
encoder = 'libx264'
# Build ffmpeg command
command = ['ffmpeg', '-hwaccel', 'auto', '-i', video_path]
if codec == 'av1':
command += ['-c:v', encoder, '-crf', crf]
else:
if bitrate_override:
target_bitrate = bitrate_override
else:
if duration <= 90:
target_bitrate = "500k"
elif duration <= 300:
target_bitrate = "500k"
else:
target_bitrate = "400k"
command += ['-c:v', encoder, '-b:v', target_bitrate]
command += ['-c:a', 'libvorbis', '-y', output_path]
log(f"[ENCODING] Running ffmpeg: {' '.join(command)}")
subprocess.run(command, check=True)
except subprocess.CalledProcessError:
log(f"[ERROR] Encoding failed with {encoder}.")
return None
# Move original to ORIGINALS
originals_dir = os.path.join(os.path.dirname(video_path), 'ORIGINALS')
os.makedirs(originals_dir, exist_ok=True)
os.rename(video_path, os.path.join(originals_dir, os.path.basename(video_path)))
_, final_size = get_video_duration_and_size(output_path)
if final_size > SIZE_LIMIT_MB * 1024 * 1024:
too_large_dir = os.path.join(os.path.dirname(video_path), 'too_large')
os.makedirs(too_large_dir, exist_ok=True)
os.rename(output_path, os.path.join(too_large_dir, os.path.basename(output_path)))
log(f"[TOO LARGE] {os.path.basename(output_path)} moved to 'too_large' folder")
return None
log(f"[ENCODED] {output_path} ({final_size // 1024 // 1024}MB)")
return output_path
def load_metadata(path):
if os.path.exists(path):
with open(path) as f:
return json.load(f)
log(f"[INFO] No QDN.json found. Using defaults.")
return {
"codec": DEFAULT_CODEC,
"use_nvidia": DEFAULT_USE_NVIDIA
}
def save_metadata_template(path, metadata):
with open(path, 'w') as f:
json.dump(metadata, f, indent=2)
log(f"Saved QDN.json template to: {path}")
def prompt_for_metadata():
title = input("Title: ")
description = input("Full Description: ")
html_description = input("HTML Description: ")
category = int(input("Category (number): "))
codec = input("Video codec (av1/h264): ").strip().lower() or DEFAULT_CODEC
if codec not in ['av1', 'h264']:
codec = DEFAULT_CODEC
use_nvidia = input("Use NVIDIA hardware acceleration? (yes/no): ").strip().lower() == 'yes'
return title, description, html_description, category, codec, use_nvidia
def publish_qtube(video_path, private_key, mode='auto', dry_run=False):
if not video_path:
return
base_name = os.path.splitext(os.path.basename(video_path))[0]
root_dir = os.path.dirname(video_path)
metadata_path = os.path.join(root_dir, 'QDN.json')
template = load_metadata(metadata_path) if mode == 'auto' else {}
title = template.get("title")
fullDescription = template.get("fullDescription")
htmlDescription = template.get("htmlDescription")
category = template.get("category", 9)
name = template.get("videoReference", {}).get("name") or input("Enter channel name: ")
codec = template.get("codec", DEFAULT_CODEC)
use_nvidia = template.get("use_nvidia", DEFAULT_USE_NVIDIA)
if mode == 'manual' or not all([title, fullDescription, htmlDescription]):
use_auto = input("[PROMPT] QDN.json loaded, would you like to use it as-is? (yes/no): ").strip().lower()
if use_auto != 'yes':
title, fullDescription, htmlDescription, category, codec, use_nvidia = prompt_for_metadata()
slug = slugify(title)
base_slug = f"qtube_vid_{slug}"
identifier = None
short_id = None
try:
response = requests.get(
f"{API_URL}/arbitrary/resources/searchsimple",
params={
"service": "DOCUMENT",
"name": name,
"limit": 0,
"reverse": "true"
}
)
if response.status_code == 200:
results = response.json()
for item in results:
ident = item.get("identifier", "")
if ident.startswith(base_slug) and ident.endswith("_metadata"):
identifier = ident.replace("_metadata", "")
short_id = identifier.split("_")[-1]
log(f"[REUSE] Existing publish found. Reusing identifier: {identifier}")
break
except Exception as e:
log(f"[WARNING] Could not query searchSimple: {e}")
# If not found, create new identifier
if not identifier:
short_id = generate_short_id()
identifier = f"{base_slug}_{short_id}"
log(f"[NEW] No existing identifier found. Creating new: {identifier}")
comments_id = f"qtube_vid__cm_{short_id}"
metadata_identifier = f"{identifier}_metadata"
duration, file_size = get_video_duration_and_size(video_path)
video_image_path = generate_thumbnail(video_path, 1, "poster")
extracts = generate_all_thumbnails(video_path, duration)
metadata = {
"title": title,
"version": 1,
"fullDescription": fullDescription,
"htmlDescription": htmlDescription,
"videoImage": f"data:image/webp;base64,{base64_encode_file(video_image_path)}" if video_image_path else "",
"videoReference": {
"name": name,
"identifier": identifier,
"service": "VIDEO"
},
"extracts": extracts,
"commentsId": comments_id,
"category": category,
"subcategory": template.get("subcategory", ""),
"code": template.get("code", short_id),
"videoType": "video/mp4",
"filename": os.path.basename(video_path),
"fileSize": file_size,
"duration": duration,
"codec": codec,
"use_nvidia": use_nvidia
}
if mode == 'manual':
save_metadata_template(metadata_path, metadata)
# Save metadata JSON file for publishing
metadata_file_path = os.path.join(root_dir, f"{identifier}_metadata.json")
with open(metadata_file_path, 'w') as f:
json.dump(metadata, f, indent=2)
log(f"[WRITE] Saved metadata JSON to: {metadata_file_path}")
# Publish video
log(f"Publishing VIDEO: {identifier}")
build_sign_publish_from_file("VIDEO", identifier, name, video_path, private_key, dry_run)
# Publish metadata
log(f"Publishing METADATA: {metadata_identifier}")
build_sign_publish_from_file("DOCUMENT", metadata_identifier, name, metadata_file_path, private_key, dry_run)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--mode', choices=['auto', 'manual'], default='auto')
parser.add_argument('--dry-run', action='store_true')
args = parser.parse_args()
private_key = get_private_key()
for root, dirs, files in os.walk(os.getcwd()):
# Skip folders we don't want to process
if 'ORIGINALS' in dirs:
dirs.remove('ORIGINALS')
if 'too_large' in dirs:
dirs.remove('too_large')
qdn_template_cache = None
apply_template_for_folder = False
for file in files:
if any(file.lower().endswith(ext) for ext in VIDEO_EXTENSIONS):
try:
full_path = os.path.join(root, file)
# Per-folder QDN.json check
if not qdn_template_cache:
metadata_path = os.path.join(root, 'QDN.json')
if os.path.exists(metadata_path):
qdn_template_cache = load_metadata(metadata_path)
print(f"Found QDN.json in: {root}")
use_for_folder = input("Use this QDN.json for all videos in this folder? (yes/no): ").strip().lower()
apply_template_for_folder = (use_for_folder == 'yes')
template = qdn_template_cache if apply_template_for_folder else {}
codec = template.get("codec", DEFAULT_CODEC)
use_nvidia = template.get("use_nvidia", DEFAULT_USE_NVIDIA)
if should_reencode(full_path):
reencoded_path = reencode_video(full_path, codec=codec, use_nvidia=use_nvidia)
if reencoded_path:
publish_qtube(reencoded_path, private_key, mode=args.mode, dry_run=args.dry_run)
else:
publish_qtube(full_path, private_key, mode=args.mode, dry_run=args.dry_run)
except Exception as e:
log(f"Failed to process {file}: {e}")
if __name__ == "__main__":
main()