From b39807db4492c1a3baff09da3cf245e81669a5be Mon Sep 17 00:00:00 2001 From: jgrogan Date: Mon, 30 Sep 2024 09:17:51 +0100 Subject: [PATCH] Start adding sync functions. --- src/jgutils/converters.py | 12 +-- src/jgutils/filesystem/filesystem.py | 35 +++++--- src/jgutils/main_cli.py | 57 +++++-------- src/jgutils/music/__init__.py | 1 - src/jgutils/music/collection.py | 105 +++++++++++++++++++++++ src/jgutils/music/converter.py | 30 +++++++ src/jgutils/music/models.py | 25 ++++++ src/jgutils/music/music.py | 120 --------------------------- src/jgutils/serialization.py | 8 ++ src/jgutils/tasks/tasks.py | 22 +---- 10 files changed, 221 insertions(+), 194 deletions(-) create mode 100644 src/jgutils/music/collection.py create mode 100644 src/jgutils/music/converter.py create mode 100644 src/jgutils/music/models.py delete mode 100644 src/jgutils/music/music.py create mode 100644 src/jgutils/serialization.py diff --git a/src/jgutils/converters.py b/src/jgutils/converters.py index 34324ca..e44aaae 100644 --- a/src/jgutils/converters.py +++ b/src/jgutils/converters.py @@ -1,4 +1,5 @@ from pathlib import Path +from functools import partial from typing import NamedTuple, Callable import logging @@ -9,9 +10,9 @@ logger = logging.getLogger(__name__) class ConversionConfig(NamedTuple): input_dir: Path - output_ext: str + output_dir: Path input_ext: str - + output_ext: str def _get_converted_path(input_path: Path, config: ConversionConfig): """ @@ -21,7 +22,7 @@ def _get_converted_path(input_path: Path, config: ConversionConfig): relative_path = input_path.relative_to(config.input_dir) output_filename = str(input_path.stem) + f".{config.output_ext}" - return relative_path.parent / output_filename + return output_dir / relative_path.parent / output_filename def get_unconverted_files( @@ -41,8 +42,7 @@ def convert(config: ConversionConfig, conversion_func: Callable): logger.info("Converting files in %s", config.input_dir) logger.info("Writing output to: %s", config.output_dir) - input_files = get_files_recursive(config.input_dir.resolve(), config.input_ext) + input_files = get_files_recursive(config.input_dir.resolve(), [config.input_ext]) output_files = get_uncoverted_files(input_files, config) - tasks = _build_conversion_tasks(input_files, output_files, config, conversion_func) - run_tasks(tasks) + run_tasks(conversion_func, zip(input_files, output_files)) diff --git a/src/jgutils/filesystem/filesystem.py b/src/jgutils/filesystem/filesystem.py index 0cd0199..4e6870b 100644 --- a/src/jgutils/filesystem/filesystem.py +++ b/src/jgutils/filesystem/filesystem.py @@ -3,18 +3,29 @@ import os import sys -def get_files_recursive(search_path: Path, extension: str) -> list[Path]: - return list(search_path.rglob(f"*.{extension}")) - -def delete_empty_dirs_recursive(search_path: Path): - for curdir, subdirs, files in os.walk(search_path): - if len(subdirs) == 0 and len(files) == 0: - #print(curdir) +def get_files_recursive(path: Path, extensions: list[str] | None) -> list[Path]: + if not extensions: + return list(path.rglob("*.*")) + else: + ret = [] + for ext in extensions: + ret.append(list(search_path.rglob(f"*.{ext}"))) + return ret + +def _delete_empty_dirs(path: Path) -> bool: + found_empty = False + for curdir, subdirs, files in os.walk(path): + if len(subdirs) == 0 and len(files) == 0: + found_empty = True os.rmdir(curdir) + return found_empty + +def delete_empty_dirs_recursive(path: Path): + found_empty = True + while found_empty: + found_empty = _delete_empty_dirs(path) -if __name__ == "__main__": - - input_path = sys.argv[1] - print(input_path) - delete_empty_dirs_recursive(Path(input_path).resolve()) +def replace_filename(path: Path, replacement: str) -> Path: + return path.parent / (replacement + path.suffix) + diff --git a/src/jgutils/main_cli.py b/src/jgutils/main_cli.py index 753d34e..a5ef01c 100644 --- a/src/jgutils/main_cli.py +++ b/src/jgutils/main_cli.py @@ -3,26 +3,17 @@ import logging from pathlib import Path from jgutils import music +from jgutils.serialization import write_model logger = logging.getLogger(__name__) - -def cli_music_convert(args): - - config = music.CovertionConfig( - args.input_dir.resolve(), args.output_dir.resolve(), "mp3", "flac" - ) - music.convert(config) - -def cli_music_metadata(args): - - collection = music.get_metadata(args.input_dir.resolve(), "flac") - with open(args.output_path.resolve(), 'w') as f: - f.write(collection.model_dump_json(indent=4)) +def cli_music_read(args): + collection = music.collection.read(args.collection.resolve()) + write_model(collection) def cli_music_refresh(args): - - collection = music.refresh(args.input_dir.resolve()) + music.collection.refresh(args.collection.resolve(), + args.work_dir.resolve()) def main_cli(): parser = argparse.ArgumentParser() @@ -30,40 +21,34 @@ def main_cli(): music_parser = subparsers.add_parser("music") music_subparsers = music_parser.add_subparsers(required=True) - - music_convert_parser = music_subparsers.add_parser("convert") - music_convert_parser.add_argument( - "--input_dir", - type=Path, - default=Path(), - help="Directory with input files for conversion.", - ) - music_convert_parser.add_argument( - "--output_dir", type=Path, default=Path(), help="Directory for converted files" - ) - music_convert_parser.set_defaults(func=cli_music_convert) - - music_md_parser = music_subparsers.add_parser("metadata") - music_md_parser.add_argument( - "--input_dir", + + music_read_parser = music_subparsers.add_parser("read") + music_read_parser.add_argument( + "--collection", type=Path, default=Path(), help="Directory with input music files.", ) - music_md_parser.add_argument( + music_read_parser.add_argument( "--output_path", type=Path, default=Path() / "music_collection.json", - help="Path to save collection to.", + help="Path to save collection info json to.", ) - music_md_parser.set_defaults(func=cli_music_metadata) + music_read_parser.set_defaults(func=cli_music_read) music_refresh_parser = music_subparsers.add_parser("refresh") music_refresh_parser.add_argument( - "--input_dir", + "--collection", type=Path, default=Path(), - help="Directory with input music files.", + help="Directory with the music collection.", + ) + music_refresh_parser.add_argument( + "--work_dir", + type=Path, + default=Path(), + help="Directory for intermediate storage.", ) music_refresh_parser.set_defaults(func=cli_music_refresh) diff --git a/src/jgutils/music/__init__.py b/src/jgutils/music/__init__.py index eec2a70..e69de29 100644 --- a/src/jgutils/music/__init__.py +++ b/src/jgutils/music/__init__.py @@ -1 +0,0 @@ -from .music import * # NOQA diff --git a/src/jgutils/music/collection.py b/src/jgutils/music/collection.py new file mode 100644 index 0000000..d597721 --- /dev/null +++ b/src/jgutils/music/collection.py @@ -0,0 +1,105 @@ +import os +import logging +import shutil +from pathlib import Path + +from tinytag import TinyTag +from pydantic import BaseModel + +from jgutils.filesystem import get_files_recursive +from jgutils import converters + +from .models import Song, Album, Artist + +logger = logging.getLogger(__name__) + +class MusicCollection(BaseModel): + + """ + Representation of a collection of songs or audio tracks + """ + + artists: list[Artist] = [] + + def get_artist(self, name:str) -> Artist | None: + for artist in self.artists: + if artist.name == name: + return artist + return None + +_DEFAULT_EXTENSIONS = ["flac", + "mp3"] + +def read(input_dir: Path, extensions: list[str] = None) -> MusicCollection: + + """ + This function reads a music collection from a filesystem and + uses the metadata in each song file to construct a MusicCollection. + """ + + if not extensions: + extensions = _DEFAULT_EXTENSIONS + + collection = MusicCollection() + for eachFile in get_files_recursive(input_dir, extensions): + tag = TinyTag.get(eachFile) + if not tag.title: + logger.warn("Found tag with no title, skipping: %s", tag) + continue + + artist = collection.get_artist(tag.artist) + if not artist: + artist = Artist(name=tag.artist) + collection.artists.append(artist) + + if tag.album: + album = artist.get_album(tag.album) + if not album: + album = Album(title=tag.album) + artist.albums.append(album) + + song = Song(title=tag.title, identifier=str(uuid.uuid4())) + song.formats.append(eachFile.relative_to(input_dir)) + if tag.album: + album.songs.append(song) + else: + artist.songs.append(song) + + return collection + +def refresh(collection_dir: Path, work_dir: Path): + + """ + This method refreshes a music collection by making sure each song + has a consistently named directory and that external copies are synced + with the reference version. + """ + + ext = "flac" + files = get_files_recursive(collection_dir, [ext]) + + for eachFile in files: + tag = TinyTag.get(eachFile) + if not tag.title: + logger.warn("Found tag with no title, skipping: %s", tag) + continue + + artist_dir = input_dir / tag.artist + os.makedirs(artist_dir, exist_ok=True) + + if tag.album: + os.makedirs(artist_dir / tag.album, exist_ok=True) + shutil.move(eachFile, artist_dir / tag.album / f"{tag.title}.{ext}") + else: + shutil.move(eachFile, artist_dir / f"{tag.title}.{ext}") + + # Get any files not existing on the sync target, convert them to a suitable format + # and push them to the target. + config = converters.ConversionConfig( + collection_dir, work_dir, "flac", "mp3" + ) + converters.convert(config) + + + + diff --git a/src/jgutils/music/converter.py b/src/jgutils/music/converter.py new file mode 100644 index 0000000..2277e68 --- /dev/null +++ b/src/jgutils/music/converter.py @@ -0,0 +1,30 @@ +import os +from pathlib import Path +import uuid +import shutil +import subprocess +import logging + +from jgutils.filesystem import replace_filename + +logger = logging.getLogger(__name__) + +def ffmpeg(input_path: Path, output_path: Path) -> int: + + identifier = str(uuid.uuid4()) + input_tmp = replace_filename(input_path, identifier) + shutil.copy(input_path, input_tmp) + + output_tmp = reaplce_filename(output_path, identifier) + cmd = f"ffmpeg -i '{input_tmp}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'" + + os.makedirs(output_path.parent, exist_ok=True) + status = subprocess.run(cmd, shell=True) + + input_tmp.unlink() + if status.returncode == 0: + shutil.move(output_tmp, output_path) + else: + logger.error("Error converting: %s", input_path) + return status.returncode + diff --git a/src/jgutils/music/models.py b/src/jgutils/music/models.py new file mode 100644 index 0000000..45c1241 --- /dev/null +++ b/src/jgutils/music/models.py @@ -0,0 +1,25 @@ +from pydantic import BaseModel + +class Song(BaseModel): + + title: str + identifier: str + formats: list[Path] = [] + +class Album(BaseModel): + + title: str + songs: list[Song] = [] + +class Artist(BaseModel): + + name: str + albums: list[Album] = [] + songs: list[Song] = [] + + def get_album(self, title: str) -> Album | None: + for album in self.albums: + if album.title == title: + return album + return None + diff --git a/src/jgutils/music/music.py b/src/jgutils/music/music.py deleted file mode 100644 index c9c1498..0000000 --- a/src/jgutils/music/music.py +++ /dev/null @@ -1,120 +0,0 @@ -import os -import logging -import shutil -from pathlib import Path -import uuid -import subprocess - -from tinytag import TinyTag -from pydantic import BaseModel - -from jgutils.filesystem import get_files_recursive - -logger = logging.getLogger(__name__) - - -class Song(BaseModel): - - title: str - identifier: str - formats: list[Path] = [] - -class Album(BaseModel): - - title: str - songs: list[Song] = [] - -class Artist(BaseModel): - - name: str - albums: list[Album] = [] - songs: list[Song] = [] - - def get_album(self, title: str) -> Album | None: - for album in self.albums: - if album.title == title: - return album - return None - -class MusicCollection(BaseModel): - - artists: list[Artist] = [] - - def get_artist(self, name:str) -> Artist | None: - for artist in self.artists: - if artist.name == name: - return artist - return None - - -def ffmpeg_convert(input_path: Path, output_path: Path) -> int: - - identifier = str(uuid.uuid4()) - input_tmp = input_path.parent / (identifier + input_path.suffix) - shutil.copy(input_path, input_tmp) - - output_tmp = output_path.parent / (identifier + output_path.suffix) - cmd = f"ffmpeg -i '{input_tmp}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'" - - os.makedirs(output_path.parent, exist_ok=True) - status = subprocess.run(cmd, shell=True) - - input_tmp.unlink() - if status.returncode == 0: - shutil.move(output_tmp, output_path) - else: - logger.error("Error converting: %s", input_path) - return status.returncode - - -def get_metadata(input_dir: Path, extension: str) -> MusicCollection: - - files = get_files_recursive(input_dir, extension) - - collection = MusicCollection() - - for eachFile in files: - tag = TinyTag.get(eachFile) - if not tag.title: - logger.warn("Found tag with no title, skipping: %s", tag) - continue - - artist = collection.get_artist(tag.artist) - if not artist: - artist = Artist(name=tag.artist) - collection.artists.append(artist) - - if tag.album: - album = artist.get_album(tag.album) - if not album: - album = Album(title=tag.album) - artist.albums.append(album) - - song = Song(title=tag.title, identifier=str(uuid.uuid4())) - song.formats.append(eachFile.relative_to(input_dir)) - if tag.album: - album.songs.append(song) - else: - artist.songs.append(song) - - return collection - -def refresh(input_dir: Path): - - files = get_files_recursive(input_dir, "flac") - - for eachFile in files: - tag = TinyTag.get(eachFile) - if not tag.title: - logger.warn("Found tag with no title, skipping: %s", tag) - continue - - os.makedirs(input_dir / tag.artist, exist_ok=True) - - if tag.album: - os.makedirs(input_dir / tag.artist / tag.album, exist_ok=True) - shutil.move(eachFile, input_dir / tag.artist / tag.album / f"{tag.title}.flac") - else: - shutil.move(eachFile, input_dir / tag.artist / f"{tag.title}.flac") - - diff --git a/src/jgutils/serialization.py b/src/jgutils/serialization.py new file mode 100644 index 0000000..deb2b74 --- /dev/null +++ b/src/jgutils/serialization.py @@ -0,0 +1,8 @@ +from pathlib import Path + +from pydantic import BaseModel + +def write_model(model: BaseModel, path: Path): + with open(path, 'w', encoding="utf-8") as f: + f.write(model.model_dump_json(indent=4)) + diff --git a/src/jgutils/tasks/tasks.py b/src/jgutils/tasks/tasks.py index a392744..7b2a82f 100644 --- a/src/jgutils/tasks/tasks.py +++ b/src/jgutils/tasks/tasks.py @@ -1,25 +1,9 @@ -import os -from pathlib import Path import logging -import shutil -import subprocess -from typing import NamedTuple from multiprocessing import Pool logger = logging.getLogger(__name__) -class Task(NamedTuple): - cmd: str - output_tmp: Path - output_path: Path - - -def _run_task(args): - task = args[0] - - subprocess.run(task.cmd, shell=True) - -def run_tasks(tasks, pool_size: 10): - with Pool(10) as p: - p.map(_run_task, tasks) +def run_tasks(func, args: list, pool_size: int = 10): + with Pool(pool_size) as p: + p.map(func, args)