Compare commits

..

No commits in common. "8974849b8b3513f88005dd1058352f992154bd58" and "61599cb76ca377003442db37eca4033e1ad5ba38" have entirely different histories.

10 changed files with 58 additions and 176 deletions

View file

@ -15,7 +15,7 @@ classifiers = [
"Topic :: System :: Distributed Computing"
]
keywords = ["Personal tools and recipes"]
dependencies = ["pydantic", "tinytag"]
dependencies = ["pydantic"]
[project.urls]
Repository = "https://git.jmsgrogan.com/jgrogan/recipes"

View file

@ -6,14 +6,12 @@ from .tasks import Task
logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple):
input_dir: Path
output_dir: Path
output_ext: str
input_ext: str
def _get_converted_path(input_path: Path, config: ConversionConfig):
"""
Return the path you would obtain if you moved the file in the input
@ -26,34 +24,32 @@ def _get_converted_path(input_path: Path, config: ConversionConfig):
output_path = config.output_dir / relative_path.parent / output_filename
return output_path
def _build_conversion_tasks(
input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable,
) -> list[Tasks]:
def _build_conversion_tasks(input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable) -> list[Tasks]:
tasks = []
for input_path, output_path in zip(input_files, output_files):
cmd = conversion_func(input_path, config.output_dir, config.output_ext)
tasks.append(Task(cmd, output_tmp, output_path))
return tasks
def get_unconverted_files(
input_files: list[Path], config: ConversionConfig
) -> list[Path]:
def get_unconverted_files(input_files: list[Path],
config: ConversionConfig) -> list[Path]:
output_files = [_get_converted_path(f, config) for f in input_files]
return [f for f in output_files if not f.exists()]
def convert(config: ConversionConfig, conversion_func: Callable):
logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir)
input_files = get_files_recursive(config.input_dir.resolve(), config.input_ext)
input_files = get_files_recursive(
config.input_dir.resolve(), config.input_ext)
output_files = get_uncoverted_files(input_files, config)
tasks = _build_conversion_tasks(input_files, output_files, config, conversion_func)
tasks = _build_conversion_tasks(input_files,
output_files,
config,
conversion_func)
run_tasks(tasks)

View file

@ -1 +0,0 @@
from .filesystem import * # NOQA

View file

@ -1,5 +1,6 @@
from pathlib import Path
def get_files_recursive(search_path: Path, extension: str) -> list[Path]:
def get_files_recursive(search_path: Path,
extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))

View file

@ -1,29 +1,20 @@
import argparse
import logging
from pathlib import Path
import logger
from jgutils import music
logger = logging.getLogger(__name__)
def cli_music_convert(args):
config = music.CovertionConfig(
args.input_dir.resolve(), args.output_dir.resolve(), "mp3", "flac"
args.input_dir.resolve(),
args.output_dir.resolve(),
"mp3",
"flac"
)
music.convert(config)
def cli_music_metadata(args):
collection = music.get_metadata(args.input_dir.resolve(), "flac")
with open(args.output_path.resolve(), 'w') as f:
f.write(collection.model_dump_json(indent=4))
def cli_music_refresh(args):
collection = music.refresh(args.input_dir.resolve())
def main_cli():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
@ -38,34 +29,13 @@ def main_cli():
default=Path(),
help="Directory with input files for conversion.",
)
music_convert_parser.add_argument(
"--output_dir", type=Path, default=Path(), help="Directory for converted files"
"--output_dir",
type=Path, default=Path(),
help="Directory for converted files"
)
music_convert_parser.set_defaults(func=cli_music_convert)
music_md_parser = music_subparsers.add_parser("metadata")
music_md_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input music files.",
)
music_md_parser.add_argument(
"--output_path",
type=Path,
default=Path() / "music_collection.json",
help="Path to save collection to.",
)
music_md_parser.set_defaults(func=cli_music_metadata)
music_refresh_parser = music_subparsers.add_parser("refresh")
music_refresh_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input music files.",
)
music_refresh_parser.set_defaults(func=cli_music_refresh)
music_convert_parser.set_defaults(func=cli_replace_in_files)
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
@ -73,6 +43,5 @@ def main_cli():
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main_cli()

View file

@ -1 +0,0 @@
from .music import * # NOQA

View file

@ -0,0 +1,29 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
logger = logging.getLogger(__name__)
def ffmpeg_convert(input_path: Path,
output_dir: Path,
output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
def move(input_dir: Path, output_dir: Path):
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)

View file

@ -1,111 +0,0 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
from tinytag import TinyTag
from pydantic import BaseModel
from jgutils.filesystem import get_files_recursive
logger = logging.getLogger(__name__)
class Song(BaseModel):
title: str
identifier: str
formats: list[Path] = []
class Album(BaseModel):
title: str
songs: list[Song] = []
class Artist(BaseModel):
name: str
albums: list[Album] = []
songs: list[Song] = []
def get_album(self, title: str) -> Album | None:
for album in self.albums:
if album.title == title:
return album
return None
class MusicCollection(BaseModel):
artists: list[Artist] = []
def get_artist(self, name:str) -> Artist | None:
for artist in self.artists:
if artist.name == name:
return artist
return None
def ffmpeg_convert(input_path: Path, output_dir: Path, output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
def move(input_dir: Path, output_dir: Path):
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)
def get_metadata(input_dir: Path, extension: str) -> MusicCollection:
files = get_files_recursive(input_dir, extension)
collection = MusicCollection()
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
artist = collection.get_artist(tag.artist)
if not artist:
artist = Artist(name=tag.artist)
collection.artists.append(artist)
if tag.album:
album = artist.get_album(tag.album)
if not album:
album = Album(title=tag.album)
artist.albums.append(album)
song = Song(title=tag.title, identifier=str(uuid.uuid4()))
song.formats.append(eachFile.relative_to(input_dir))
if tag.album:
album.songs.append(song)
else:
artist.songs.append(song)
return collection
def refresh(input_dir: Path):
files = get_files_recursive(input_dir, "flac")
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
os.make_dirs(input_dir / tag.artist, exist_ok=True)

View file

@ -1 +1 @@
from .tasks import * # NOQA
from .tasks import * # NOQA

View file

@ -22,7 +22,7 @@ def _run_task(args):
os.makedirs(task.output_path.parent, exist_ok=True)
shutil.move(task.output_tmp, task.output_path)
def run_tasks(tasks, pool_size: 10):
with Pool(10) as p:
with Pool(10) as p:
p.map(_run_task, tasks)