Cleaning some music collection converters.
This commit is contained in:
parent
b39807db44
commit
c09179f5da
8 changed files with 94 additions and 80 deletions
14
src/guile/hello-test.scm
Normal file
14
src/guile/hello-test.scm
Normal file
|
@ -0,0 +1,14 @@
|
|||
(use-modules (srfi srfi-64)
|
||||
(hello))
|
||||
|
||||
(test-begin "harness")
|
||||
|
||||
(test-equal "test-hello"
|
||||
"Hello world!\n"
|
||||
(hi))
|
||||
|
||||
(test-equal "test-named-hello"
|
||||
"Hello bob!\n"
|
||||
(hi "bob"))
|
||||
|
||||
(test-end "harness")
|
16
src/guile/hello.scm
Normal file
16
src/guile/hello.scm
Normal file
|
@ -0,0 +1,16 @@
|
|||
(define-module (hello))
|
||||
|
||||
(define GREETING_PREFIX "Hello ")
|
||||
(define GREETING_SUFFIX "!\n")
|
||||
(define DEFAULT_NAME "world")
|
||||
|
||||
(define-public hi
|
||||
"Says hello to the named person"
|
||||
(lambda* (#:optional name)
|
||||
(string-append GREETING_PREFIX (addressee name) GREETING_SUFFIX)))
|
||||
|
||||
(define addressee
|
||||
(lambda (name)
|
||||
(if name
|
||||
name
|
||||
DEFAULT_NAME)))
|
|
@ -1,48 +1,37 @@
|
|||
from pathlib import Path
|
||||
from functools import partial
|
||||
from typing import NamedTuple, Callable
|
||||
from typing import Callable
|
||||
import logging
|
||||
|
||||
from .tasks import Task
|
||||
from jgutils.filesystem import get_files_recursive
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ConversionConfig(NamedTuple):
|
||||
input_dir: Path
|
||||
output_dir: Path
|
||||
input_ext: str
|
||||
output_ext: str
|
||||
|
||||
def _get_converted_path(input_path: Path, config: ConversionConfig):
|
||||
def _get_converted_path(input_path: Path,
|
||||
output_dir: Path,
|
||||
output_ext: str):
|
||||
"""
|
||||
Return the path you would obtain if you moved the file in the input
|
||||
path to the output directory in the config and changed its extension.
|
||||
"""
|
||||
|
||||
relative_path = input_path.relative_to(config.input_dir)
|
||||
output_filename = str(input_path.stem) + f".{config.output_ext}"
|
||||
return output_dir / relative_path.parent / output_filename
|
||||
output_filename = str(input_path.stem) + f".{output_ext}"
|
||||
return output_dir / input_path.parent / output_filename
|
||||
|
||||
def convert(input_dir: Path,
|
||||
output_dir: Path,
|
||||
input_ext: str,
|
||||
output_ext: str,
|
||||
conversion_func: Callable):
|
||||
|
||||
def get_unconverted_files(
|
||||
input_files: list[Path], config: ConversionConfig
|
||||
) -> list[Path]:
|
||||
output_files = [_get_converted_path(f, config) for f in input_files]
|
||||
logger.info("Converting files in %s to %s", input_dir, output_dir)
|
||||
|
||||
conversion_pairs = []
|
||||
for input_file, output_file in zip(input_files, output_files):
|
||||
if not output_file.exists():
|
||||
conversion_pairs.append((input_file, output_file))
|
||||
return conversion_pairs
|
||||
input_files = get_files_recursive(input_dir.resolve(), [input_ext])
|
||||
output_files = [_get_converted_path(f.relative_to(input_dir),
|
||||
output_dir,
|
||||
output_ext) for f in input_files]
|
||||
conversion_pairs = [io_paths for io_paths in
|
||||
zip(input_files, output_files) if not io_paths[1].exists()]
|
||||
|
||||
|
||||
def convert(config: ConversionConfig, conversion_func: Callable):
|
||||
|
||||
logger.info("Converting files in %s", config.input_dir)
|
||||
logger.info("Writing output to: %s", config.output_dir)
|
||||
|
||||
input_files = get_files_recursive(config.input_dir.resolve(), [config.input_ext])
|
||||
output_files = get_uncoverted_files(input_files, config)
|
||||
|
||||
run_tasks(conversion_func, zip(input_files, output_files))
|
||||
run_tasks(conversion_func, conversion_pairs)
|
||||
|
|
|
@ -7,25 +7,29 @@ def get_files_recursive(path: Path, extensions: list[str] | None) -> list[Path]:
|
|||
if not extensions:
|
||||
return list(path.rglob("*.*"))
|
||||
else:
|
||||
ret = []
|
||||
for ext in extensions:
|
||||
ret.append(list(search_path.rglob(f"*.{ext}")))
|
||||
return ret
|
||||
paths_list = [list(path.rglob(f"*.{ext}")) for ext in extensions]
|
||||
return [p for paths in paths_list for p in paths]
|
||||
|
||||
def get_empty_dirs(path: Path) -> list[Path]:
|
||||
return [curdir
|
||||
for curdir, subdirs, files in os.walk(path)
|
||||
if len(subdirs)==0 and len(files) == 0]
|
||||
|
||||
def _delete_empty_dirs(path: Path) -> bool:
|
||||
found_empty = False
|
||||
for curdir, subdirs, files in os.walk(path):
|
||||
if len(subdirs) == 0 and len(files) == 0:
|
||||
found_empty = True
|
||||
os.rmdir(curdir)
|
||||
return found_empty
|
||||
empty_dirs = get_empty_dirs(path)
|
||||
map(os.rmdir, empty_dirs)
|
||||
return bool(empty_dirs)
|
||||
|
||||
def delete_empty_dirs_recursive(path: Path):
|
||||
found_empty = True
|
||||
while found_empty:
|
||||
found_empty = _delete_empty_dirs(path)
|
||||
|
||||
|
||||
def replace_filename(path: Path, replacement: str) -> Path:
|
||||
return path.parent / (replacement + path.suffix)
|
||||
|
||||
def move_file_new_name(src: Path, dst: Path, name: str):
|
||||
os.makedirs(dst, exist_ok=True)
|
||||
shutil.move(src, dst / f"{name}{src.suffix}")
|
||||
|
||||
|
||||
|
|
|
@ -12,7 +12,7 @@ def cli_music_read(args):
|
|||
write_model(collection)
|
||||
|
||||
def cli_music_refresh(args):
|
||||
music.collection.refresh(args.collection.resolve(),
|
||||
music.refresh(args.collection.resolve(),
|
||||
args.work_dir.resolve())
|
||||
|
||||
def main_cli():
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
from .collection import refresh # NOQA
|
|
@ -6,7 +6,7 @@ from pathlib import Path
|
|||
from tinytag import TinyTag
|
||||
from pydantic import BaseModel
|
||||
|
||||
from jgutils.filesystem import get_files_recursive
|
||||
from jgutils.filesystem import get_files_recursive, move_file_new_name
|
||||
from jgutils import converters
|
||||
|
||||
from .models import Song, Album, Artist
|
||||
|
@ -21,12 +21,6 @@ class MusicCollection(BaseModel):
|
|||
|
||||
artists: list[Artist] = []
|
||||
|
||||
def get_artist(self, name:str) -> Artist | None:
|
||||
for artist in self.artists:
|
||||
if artist.name == name:
|
||||
return artist
|
||||
return None
|
||||
|
||||
_DEFAULT_EXTENSIONS = ["flac",
|
||||
"mp3"]
|
||||
|
||||
|
@ -40,17 +34,18 @@ def read(input_dir: Path, extensions: list[str] = None) -> MusicCollection:
|
|||
if not extensions:
|
||||
extensions = _DEFAULT_EXTENSIONS
|
||||
|
||||
collection = MusicCollection()
|
||||
for eachFile in get_files_recursive(input_dir, extensions):
|
||||
tag = TinyTag.get(eachFile)
|
||||
if not tag.title:
|
||||
logger.warn("Found tag with no title, skipping: %s", tag)
|
||||
continue
|
||||
tags = [TingTag.get(f) for f in get_files_recursive(input_dir, extensions)]
|
||||
tags_with_titles = [t for t in tags if t.title]
|
||||
|
||||
artist = collection.get_artist(tag.artist)
|
||||
if not artist:
|
||||
artists = []
|
||||
for tag in tags_with_titles:
|
||||
|
||||
matching_artists = filter(lambda x: x.name == tag.artist, artists)
|
||||
if not matching_artists:
|
||||
artist = Artist(name=tag.artist)
|
||||
collection.artists.append(artist)
|
||||
artists.append(artist)
|
||||
else:
|
||||
artist = next(matching_artists)
|
||||
|
||||
if tag.album:
|
||||
album = artist.get_album(tag.album)
|
||||
|
@ -65,7 +60,15 @@ def read(input_dir: Path, extensions: list[str] = None) -> MusicCollection:
|
|||
else:
|
||||
artist.songs.append(song)
|
||||
|
||||
return collection
|
||||
return MusicCollection(artists=artists)
|
||||
|
||||
def _refresh_collection_dirs(tags: list, collection_dir: Path):
|
||||
for tag, song_file in tags:
|
||||
artist_dir = collection_dir / tag.artist
|
||||
if tag.album:
|
||||
move_file_new_name(song_file, artist_dir / tag.album, tag.title)
|
||||
else:
|
||||
move_file_new_name(song_file, artist_dir, tag.title)
|
||||
|
||||
def refresh(collection_dir: Path, work_dir: Path):
|
||||
|
||||
|
@ -76,29 +79,14 @@ def refresh(collection_dir: Path, work_dir: Path):
|
|||
"""
|
||||
|
||||
ext = "flac"
|
||||
files = get_files_recursive(collection_dir, [ext])
|
||||
tags = [(TinyTag.get(f), f) for f in get_files_recursive(collection_dir, [ext])]
|
||||
tags_with_titles = [t for t in tags if t[0].title]
|
||||
|
||||
for eachFile in files:
|
||||
tag = TinyTag.get(eachFile)
|
||||
if not tag.title:
|
||||
logger.warn("Found tag with no title, skipping: %s", tag)
|
||||
continue
|
||||
|
||||
artist_dir = input_dir / tag.artist
|
||||
os.makedirs(artist_dir, exist_ok=True)
|
||||
|
||||
if tag.album:
|
||||
os.makedirs(artist_dir / tag.album, exist_ok=True)
|
||||
shutil.move(eachFile, artist_dir / tag.album / f"{tag.title}.{ext}")
|
||||
else:
|
||||
shutil.move(eachFile, artist_dir / f"{tag.title}.{ext}")
|
||||
_refresh_collection_dirs(tags_with_titles, collection_dir)
|
||||
|
||||
# Get any files not existing on the sync target, convert them to a suitable format
|
||||
# and push them to the target.
|
||||
config = converters.ConversionConfig(
|
||||
collection_dir, work_dir, "flac", "mp3"
|
||||
)
|
||||
converters.convert(config)
|
||||
converters.convert(collection_dir, work_dir, "flac", "mp3")
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from pathlib import Path
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
class Song(BaseModel):
|
||||
|
|
Loading…
Reference in a new issue