Start setting up conversion workflow

This commit is contained in:
James Grogan 2024-09-29 20:28:44 +01:00
parent 8974849b8b
commit 00325b36eb
4 changed files with 46 additions and 36 deletions

View file

@ -9,7 +9,6 @@ logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple): class ConversionConfig(NamedTuple):
input_dir: Path input_dir: Path
output_dir: Path
output_ext: str output_ext: str
input_ext: str input_ext: str
@ -22,29 +21,19 @@ def _get_converted_path(input_path: Path, config: ConversionConfig):
relative_path = input_path.relative_to(config.input_dir) relative_path = input_path.relative_to(config.input_dir)
output_filename = str(input_path.stem) + f".{config.output_ext}" output_filename = str(input_path.stem) + f".{config.output_ext}"
output_filename.replace("'", "") return relative_path.parent / output_filename
output_path = config.output_dir / relative_path.parent / output_filename
return output_path
def _build_conversion_tasks(
input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable,
) -> list[Tasks]:
tasks = []
for input_path, output_path in zip(input_files, output_files):
cmd = conversion_func(input_path, config.output_dir, config.output_ext)
tasks.append(Task(cmd, output_tmp, output_path))
return tasks
def get_unconverted_files( def get_unconverted_files(
input_files: list[Path], config: ConversionConfig input_files: list[Path], config: ConversionConfig
) -> list[Path]: ) -> list[Path]:
output_files = [_get_converted_path(f, config) for f in input_files] output_files = [_get_converted_path(f, config) for f in input_files]
return [f for f in output_files if not f.exists()]
conversion_pairs = []
for input_file, output_file in zip(input_files, output_files):
if not output_file.exists():
conversion_pairs.append((input_file, output_file))
return conversion_pairs
def convert(config: ConversionConfig, conversion_func: Callable): def convert(config: ConversionConfig, conversion_func: Callable):

View file

@ -1,5 +1,20 @@
from pathlib import Path from pathlib import Path
import os
import sys
def get_files_recursive(search_path: Path, extension: str) -> list[Path]: def get_files_recursive(search_path: Path, extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}")) return list(search_path.rglob(f"*.{extension}"))
def delete_empty_dirs_recursive(search_path: Path):
for curdir, subdirs, files in os.walk(search_path):
if len(subdirs) == 0 and len(files) == 0:
#print(curdir)
os.rmdir(curdir)
if __name__ == "__main__":
input_path = sys.argv[1]
print(input_path)
delete_empty_dirs_recursive(Path(input_path).resolve())

View file

@ -3,6 +3,7 @@ import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
import uuid import uuid
import subprocess
from tinytag import TinyTag from tinytag import TinyTag
from pydantic import BaseModel from pydantic import BaseModel
@ -46,23 +47,25 @@ class MusicCollection(BaseModel):
return None return None
def ffmpeg_convert(input_path: Path, output_dir: Path, output_ext: str) -> str: def ffmpeg_convert(input_path: Path, output_path: Path) -> int:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
identifier = str(uuid.uuid4())
input_tmp = input_path.parent / (identifier + input_path.suffix)
shutil.copy(input_path, input_tmp)
def move(input_dir: Path, output_dir: Path): output_tmp = output_path.parent / (identifier + output_path.suffix)
cmd = f"ffmpeg -i '{input_tmp}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True) os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path) status = subprocess.run(cmd, shell=True)
input_tmp.unlink()
if status.returncode == 0:
shutil.move(output_tmp, output_path)
else:
logger.error("Error converting: %s", input_path)
return status.returncode
def get_metadata(input_dir: Path, extension: str) -> MusicCollection: def get_metadata(input_dir: Path, extension: str) -> MusicCollection:
@ -106,6 +109,12 @@ def refresh(input_dir: Path):
logger.warn("Found tag with no title, skipping: %s", tag) logger.warn("Found tag with no title, skipping: %s", tag)
continue continue
os.make_dirs(input_dir / tag.artist, exist_ok=True) os.makedirs(input_dir / tag.artist, exist_ok=True)
if tag.album:
os.makedirs(input_dir / tag.artist / tag.album, exist_ok=True)
shutil.move(eachFile, input_dir / tag.artist / tag.album / f"{tag.title}.flac")
else:
shutil.move(eachFile, input_dir / tag.artist / f"{tag.title}.flac")

View file

@ -18,10 +18,7 @@ class Task(NamedTuple):
def _run_task(args): def _run_task(args):
task = args[0] task = args[0]
subprocess.run(cmd, shell=True) subprocess.run(task.cmd, shell=True)
os.makedirs(task.output_path.parent, exist_ok=True)
shutil.move(task.output_tmp, task.output_path)
def run_tasks(tasks, pool_size: 10): def run_tasks(tasks, pool_size: 10):
with Pool(10) as p: with Pool(10) as p: