Start adding sync functions.

This commit is contained in:
James Grogan 2024-09-30 09:17:51 +01:00
parent 00325b36eb
commit b39807db44
10 changed files with 221 additions and 194 deletions

View file

@ -1,4 +1,5 @@
from pathlib import Path
from functools import partial
from typing import NamedTuple, Callable
import logging
@ -9,9 +10,9 @@ logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple):
input_dir: Path
output_ext: str
output_dir: Path
input_ext: str
output_ext: str
def _get_converted_path(input_path: Path, config: ConversionConfig):
"""
@ -21,7 +22,7 @@ def _get_converted_path(input_path: Path, config: ConversionConfig):
relative_path = input_path.relative_to(config.input_dir)
output_filename = str(input_path.stem) + f".{config.output_ext}"
return relative_path.parent / output_filename
return output_dir / relative_path.parent / output_filename
def get_unconverted_files(
@ -41,8 +42,7 @@ def convert(config: ConversionConfig, conversion_func: Callable):
logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir)
input_files = get_files_recursive(config.input_dir.resolve(), config.input_ext)
input_files = get_files_recursive(config.input_dir.resolve(), [config.input_ext])
output_files = get_uncoverted_files(input_files, config)
tasks = _build_conversion_tasks(input_files, output_files, config, conversion_func)
run_tasks(tasks)
run_tasks(conversion_func, zip(input_files, output_files))

View file

@ -3,18 +3,29 @@ import os
import sys
def get_files_recursive(search_path: Path, extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))
def delete_empty_dirs_recursive(search_path: Path):
for curdir, subdirs, files in os.walk(search_path):
if len(subdirs) == 0 and len(files) == 0:
#print(curdir)
def get_files_recursive(path: Path, extensions: list[str] | None) -> list[Path]:
if not extensions:
return list(path.rglob("*.*"))
else:
ret = []
for ext in extensions:
ret.append(list(search_path.rglob(f"*.{ext}")))
return ret
def _delete_empty_dirs(path: Path) -> bool:
found_empty = False
for curdir, subdirs, files in os.walk(path):
if len(subdirs) == 0 and len(files) == 0:
found_empty = True
os.rmdir(curdir)
return found_empty
def delete_empty_dirs_recursive(path: Path):
found_empty = True
while found_empty:
found_empty = _delete_empty_dirs(path)
if __name__ == "__main__":
input_path = sys.argv[1]
print(input_path)
delete_empty_dirs_recursive(Path(input_path).resolve())
def replace_filename(path: Path, replacement: str) -> Path:
return path.parent / (replacement + path.suffix)

View file

@ -3,26 +3,17 @@ import logging
from pathlib import Path
from jgutils import music
from jgutils.serialization import write_model
logger = logging.getLogger(__name__)
def cli_music_convert(args):
config = music.CovertionConfig(
args.input_dir.resolve(), args.output_dir.resolve(), "mp3", "flac"
)
music.convert(config)
def cli_music_metadata(args):
collection = music.get_metadata(args.input_dir.resolve(), "flac")
with open(args.output_path.resolve(), 'w') as f:
f.write(collection.model_dump_json(indent=4))
def cli_music_read(args):
collection = music.collection.read(args.collection.resolve())
write_model(collection)
def cli_music_refresh(args):
collection = music.refresh(args.input_dir.resolve())
music.collection.refresh(args.collection.resolve(),
args.work_dir.resolve())
def main_cli():
parser = argparse.ArgumentParser()
@ -30,40 +21,34 @@ def main_cli():
music_parser = subparsers.add_parser("music")
music_subparsers = music_parser.add_subparsers(required=True)
music_convert_parser = music_subparsers.add_parser("convert")
music_convert_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input files for conversion.",
)
music_convert_parser.add_argument(
"--output_dir", type=Path, default=Path(), help="Directory for converted files"
)
music_convert_parser.set_defaults(func=cli_music_convert)
music_md_parser = music_subparsers.add_parser("metadata")
music_md_parser.add_argument(
"--input_dir",
music_read_parser = music_subparsers.add_parser("read")
music_read_parser.add_argument(
"--collection",
type=Path,
default=Path(),
help="Directory with input music files.",
)
music_md_parser.add_argument(
music_read_parser.add_argument(
"--output_path",
type=Path,
default=Path() / "music_collection.json",
help="Path to save collection to.",
help="Path to save collection info json to.",
)
music_md_parser.set_defaults(func=cli_music_metadata)
music_read_parser.set_defaults(func=cli_music_read)
music_refresh_parser = music_subparsers.add_parser("refresh")
music_refresh_parser.add_argument(
"--input_dir",
"--collection",
type=Path,
default=Path(),
help="Directory with input music files.",
help="Directory with the music collection.",
)
music_refresh_parser.add_argument(
"--work_dir",
type=Path,
default=Path(),
help="Directory for intermediate storage.",
)
music_refresh_parser.set_defaults(func=cli_music_refresh)

View file

@ -1 +0,0 @@
from .music import * # NOQA

View file

@ -0,0 +1,105 @@
import os
import logging
import shutil
from pathlib import Path
from tinytag import TinyTag
from pydantic import BaseModel
from jgutils.filesystem import get_files_recursive
from jgutils import converters
from .models import Song, Album, Artist
logger = logging.getLogger(__name__)
class MusicCollection(BaseModel):
"""
Representation of a collection of songs or audio tracks
"""
artists: list[Artist] = []
def get_artist(self, name:str) -> Artist | None:
for artist in self.artists:
if artist.name == name:
return artist
return None
_DEFAULT_EXTENSIONS = ["flac",
"mp3"]
def read(input_dir: Path, extensions: list[str] = None) -> MusicCollection:
"""
This function reads a music collection from a filesystem and
uses the metadata in each song file to construct a MusicCollection.
"""
if not extensions:
extensions = _DEFAULT_EXTENSIONS
collection = MusicCollection()
for eachFile in get_files_recursive(input_dir, extensions):
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
artist = collection.get_artist(tag.artist)
if not artist:
artist = Artist(name=tag.artist)
collection.artists.append(artist)
if tag.album:
album = artist.get_album(tag.album)
if not album:
album = Album(title=tag.album)
artist.albums.append(album)
song = Song(title=tag.title, identifier=str(uuid.uuid4()))
song.formats.append(eachFile.relative_to(input_dir))
if tag.album:
album.songs.append(song)
else:
artist.songs.append(song)
return collection
def refresh(collection_dir: Path, work_dir: Path):
"""
This method refreshes a music collection by making sure each song
has a consistently named directory and that external copies are synced
with the reference version.
"""
ext = "flac"
files = get_files_recursive(collection_dir, [ext])
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
artist_dir = input_dir / tag.artist
os.makedirs(artist_dir, exist_ok=True)
if tag.album:
os.makedirs(artist_dir / tag.album, exist_ok=True)
shutil.move(eachFile, artist_dir / tag.album / f"{tag.title}.{ext}")
else:
shutil.move(eachFile, artist_dir / f"{tag.title}.{ext}")
# Get any files not existing on the sync target, convert them to a suitable format
# and push them to the target.
config = converters.ConversionConfig(
collection_dir, work_dir, "flac", "mp3"
)
converters.convert(config)

View file

@ -0,0 +1,30 @@
import os
from pathlib import Path
import uuid
import shutil
import subprocess
import logging
from jgutils.filesystem import replace_filename
logger = logging.getLogger(__name__)
def ffmpeg(input_path: Path, output_path: Path) -> int:
identifier = str(uuid.uuid4())
input_tmp = replace_filename(input_path, identifier)
shutil.copy(input_path, input_tmp)
output_tmp = reaplce_filename(output_path, identifier)
cmd = f"ffmpeg -i '{input_tmp}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
os.makedirs(output_path.parent, exist_ok=True)
status = subprocess.run(cmd, shell=True)
input_tmp.unlink()
if status.returncode == 0:
shutil.move(output_tmp, output_path)
else:
logger.error("Error converting: %s", input_path)
return status.returncode

View file

@ -0,0 +1,25 @@
from pydantic import BaseModel
class Song(BaseModel):
title: str
identifier: str
formats: list[Path] = []
class Album(BaseModel):
title: str
songs: list[Song] = []
class Artist(BaseModel):
name: str
albums: list[Album] = []
songs: list[Song] = []
def get_album(self, title: str) -> Album | None:
for album in self.albums:
if album.title == title:
return album
return None

View file

@ -1,120 +0,0 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
import subprocess
from tinytag import TinyTag
from pydantic import BaseModel
from jgutils.filesystem import get_files_recursive
logger = logging.getLogger(__name__)
class Song(BaseModel):
title: str
identifier: str
formats: list[Path] = []
class Album(BaseModel):
title: str
songs: list[Song] = []
class Artist(BaseModel):
name: str
albums: list[Album] = []
songs: list[Song] = []
def get_album(self, title: str) -> Album | None:
for album in self.albums:
if album.title == title:
return album
return None
class MusicCollection(BaseModel):
artists: list[Artist] = []
def get_artist(self, name:str) -> Artist | None:
for artist in self.artists:
if artist.name == name:
return artist
return None
def ffmpeg_convert(input_path: Path, output_path: Path) -> int:
identifier = str(uuid.uuid4())
input_tmp = input_path.parent / (identifier + input_path.suffix)
shutil.copy(input_path, input_tmp)
output_tmp = output_path.parent / (identifier + output_path.suffix)
cmd = f"ffmpeg -i '{input_tmp}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
os.makedirs(output_path.parent, exist_ok=True)
status = subprocess.run(cmd, shell=True)
input_tmp.unlink()
if status.returncode == 0:
shutil.move(output_tmp, output_path)
else:
logger.error("Error converting: %s", input_path)
return status.returncode
def get_metadata(input_dir: Path, extension: str) -> MusicCollection:
files = get_files_recursive(input_dir, extension)
collection = MusicCollection()
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
artist = collection.get_artist(tag.artist)
if not artist:
artist = Artist(name=tag.artist)
collection.artists.append(artist)
if tag.album:
album = artist.get_album(tag.album)
if not album:
album = Album(title=tag.album)
artist.albums.append(album)
song = Song(title=tag.title, identifier=str(uuid.uuid4()))
song.formats.append(eachFile.relative_to(input_dir))
if tag.album:
album.songs.append(song)
else:
artist.songs.append(song)
return collection
def refresh(input_dir: Path):
files = get_files_recursive(input_dir, "flac")
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
os.makedirs(input_dir / tag.artist, exist_ok=True)
if tag.album:
os.makedirs(input_dir / tag.artist / tag.album, exist_ok=True)
shutil.move(eachFile, input_dir / tag.artist / tag.album / f"{tag.title}.flac")
else:
shutil.move(eachFile, input_dir / tag.artist / f"{tag.title}.flac")

View file

@ -0,0 +1,8 @@
from pathlib import Path
from pydantic import BaseModel
def write_model(model: BaseModel, path: Path):
with open(path, 'w', encoding="utf-8") as f:
f.write(model.model_dump_json(indent=4))

View file

@ -1,25 +1,9 @@
import os
from pathlib import Path
import logging
import shutil
import subprocess
from typing import NamedTuple
from multiprocessing import Pool
logger = logging.getLogger(__name__)
class Task(NamedTuple):
cmd: str
output_tmp: Path
output_path: Path
def _run_task(args):
task = args[0]
subprocess.run(task.cmd, shell=True)
def run_tasks(tasks, pool_size: 10):
with Pool(10) as p:
p.map(_run_task, tasks)
def run_tasks(func, args: list, pool_size: int = 10):
with Pool(pool_size) as p:
p.map(func, args)