Compare commits

..

2 commits

Author SHA1 Message Date
8974849b8b Add music tagging 2024-09-29 19:42:35 +01:00
29719cba6e Apply come formatting 2024-09-29 18:26:57 +01:00
10 changed files with 177 additions and 59 deletions

View file

@ -15,7 +15,7 @@ classifiers = [
"Topic :: System :: Distributed Computing" "Topic :: System :: Distributed Computing"
] ]
keywords = ["Personal tools and recipes"] keywords = ["Personal tools and recipes"]
dependencies = ["pydantic"] dependencies = ["pydantic", "tinytag"]
[project.urls] [project.urls]
Repository = "https://git.jmsgrogan.com/jgrogan/recipes" Repository = "https://git.jmsgrogan.com/jgrogan/recipes"

View file

@ -6,12 +6,14 @@ from .tasks import Task
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple): class ConversionConfig(NamedTuple):
input_dir: Path input_dir: Path
output_dir: Path output_dir: Path
output_ext: str output_ext: str
input_ext: str input_ext: str
def _get_converted_path(input_path: Path, config: ConversionConfig): def _get_converted_path(input_path: Path, config: ConversionConfig):
""" """
Return the path you would obtain if you moved the file in the input Return the path you would obtain if you moved the file in the input
@ -24,32 +26,34 @@ def _get_converted_path(input_path: Path, config: ConversionConfig):
output_path = config.output_dir / relative_path.parent / output_filename output_path = config.output_dir / relative_path.parent / output_filename
return output_path return output_path
def _build_conversion_tasks(input_files: list[Path],
def _build_conversion_tasks(
input_files: list[Path],
output_files: list[Path], output_files: list[Path],
config: ConversionConfig, config: ConversionConfig,
conversion_func: Callable) -> list[Tasks]: conversion_func: Callable,
) -> list[Tasks]:
tasks = [] tasks = []
for input_path, output_path in zip(input_files, output_files): for input_path, output_path in zip(input_files, output_files):
cmd = conversion_func(input_path, config.output_dir, config.output_ext) cmd = conversion_func(input_path, config.output_dir, config.output_ext)
tasks.append(Task(cmd, output_tmp, output_path)) tasks.append(Task(cmd, output_tmp, output_path))
return tasks return tasks
def get_unconverted_files(input_files: list[Path],
config: ConversionConfig) -> list[Path]: def get_unconverted_files(
input_files: list[Path], config: ConversionConfig
) -> list[Path]:
output_files = [_get_converted_path(f, config) for f in input_files] output_files = [_get_converted_path(f, config) for f in input_files]
return [f for f in output_files if not f.exists()] return [f for f in output_files if not f.exists()]
def convert(config: ConversionConfig, conversion_func: Callable): def convert(config: ConversionConfig, conversion_func: Callable):
logger.info("Converting files in %s", config.input_dir) logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir) logger.info("Writing output to: %s", config.output_dir)
input_files = get_files_recursive( input_files = get_files_recursive(config.input_dir.resolve(), config.input_ext)
config.input_dir.resolve(), config.input_ext)
output_files = get_uncoverted_files(input_files, config) output_files = get_uncoverted_files(input_files, config)
tasks = _build_conversion_tasks(input_files, tasks = _build_conversion_tasks(input_files, output_files, config, conversion_func)
output_files,
config,
conversion_func)
run_tasks(tasks) run_tasks(tasks)

View file

@ -0,0 +1 @@
from .filesystem import * # NOQA

View file

@ -1,6 +1,5 @@
from pathlib import Path from pathlib import Path
def get_files_recursive(search_path: Path,
extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))
def get_files_recursive(search_path: Path, extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))

View file

@ -1,20 +1,29 @@
import argparse import argparse
import logger import logging
from pathlib import Path
from jgutils import music from jgutils import music
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def cli_music_convert(args): def cli_music_convert(args):
config = music.CovertionConfig( config = music.CovertionConfig(
args.input_dir.resolve(), args.input_dir.resolve(), args.output_dir.resolve(), "mp3", "flac"
args.output_dir.resolve(),
"mp3",
"flac"
) )
music.convert(config) music.convert(config)
def cli_music_metadata(args):
collection = music.get_metadata(args.input_dir.resolve(), "flac")
with open(args.output_path.resolve(), 'w') as f:
f.write(collection.model_dump_json(indent=4))
def cli_music_refresh(args):
collection = music.refresh(args.input_dir.resolve())
def main_cli(): def main_cli():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True) subparsers = parser.add_subparsers(required=True)
@ -29,13 +38,34 @@ def main_cli():
default=Path(), default=Path(),
help="Directory with input files for conversion.", help="Directory with input files for conversion.",
) )
music_convert_parser.add_argument( music_convert_parser.add_argument(
"--output_dir", "--output_dir", type=Path, default=Path(), help="Directory for converted files"
type=Path, default=Path(),
help="Directory for converted files"
) )
music_convert_parser.set_defaults(func=cli_replace_in_files) music_convert_parser.set_defaults(func=cli_music_convert)
music_md_parser = music_subparsers.add_parser("metadata")
music_md_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input music files.",
)
music_md_parser.add_argument(
"--output_path",
type=Path,
default=Path() / "music_collection.json",
help="Path to save collection to.",
)
music_md_parser.set_defaults(func=cli_music_metadata)
music_refresh_parser = music_subparsers.add_parser("refresh")
music_refresh_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input music files.",
)
music_refresh_parser.set_defaults(func=cli_music_refresh)
logging.basicConfig() logging.basicConfig()
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
@ -43,5 +73,6 @@ def main_cli():
args = parser.parse_args() args = parser.parse_args()
args.func(args) args.func(args)
if __name__ == "__main__": if __name__ == "__main__":
main_cli() main_cli()

View file

@ -0,0 +1 @@
from .music import * # NOQA

View file

@ -1,29 +0,0 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
logger = logging.getLogger(__name__)
def ffmpeg_convert(input_path: Path,
output_dir: Path,
output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
def move(input_dir: Path, output_dir: Path):
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)

111
src/jgutils/music/music.py Normal file
View file

@ -0,0 +1,111 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
from tinytag import TinyTag
from pydantic import BaseModel
from jgutils.filesystem import get_files_recursive
logger = logging.getLogger(__name__)
class Song(BaseModel):
title: str
identifier: str
formats: list[Path] = []
class Album(BaseModel):
title: str
songs: list[Song] = []
class Artist(BaseModel):
name: str
albums: list[Album] = []
songs: list[Song] = []
def get_album(self, title: str) -> Album | None:
for album in self.albums:
if album.title == title:
return album
return None
class MusicCollection(BaseModel):
artists: list[Artist] = []
def get_artist(self, name:str) -> Artist | None:
for artist in self.artists:
if artist.name == name:
return artist
return None
def ffmpeg_convert(input_path: Path, output_dir: Path, output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
def move(input_dir: Path, output_dir: Path):
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)
def get_metadata(input_dir: Path, extension: str) -> MusicCollection:
files = get_files_recursive(input_dir, extension)
collection = MusicCollection()
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
artist = collection.get_artist(tag.artist)
if not artist:
artist = Artist(name=tag.artist)
collection.artists.append(artist)
if tag.album:
album = artist.get_album(tag.album)
if not album:
album = Album(title=tag.album)
artist.albums.append(album)
song = Song(title=tag.title, identifier=str(uuid.uuid4()))
song.formats.append(eachFile.relative_to(input_dir))
if tag.album:
album.songs.append(song)
else:
artist.songs.append(song)
return collection
def refresh(input_dir: Path):
files = get_files_recursive(input_dir, "flac")
for eachFile in files:
tag = TinyTag.get(eachFile)
if not tag.title:
logger.warn("Found tag with no title, skipping: %s", tag)
continue
os.make_dirs(input_dir / tag.artist, exist_ok=True)

View file

@ -22,7 +22,7 @@ def _run_task(args):
os.makedirs(task.output_path.parent, exist_ok=True) os.makedirs(task.output_path.parent, exist_ok=True)
shutil.move(task.output_tmp, task.output_path) shutil.move(task.output_tmp, task.output_path)
def run_tasks(tasks, pool_size: 10): def run_tasks(tasks, pool_size: 10):
with Pool(10) as p: with Pool(10) as p:
p.map(_run_task, tasks) p.map(_run_task, tasks)