Apply come formatting

This commit is contained in:
James Grogan 2024-09-29 18:26:57 +01:00
parent 61599cb76c
commit 29719cba6e
6 changed files with 29 additions and 31 deletions

View file

@ -6,12 +6,14 @@ from .tasks import Task
logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple):
input_dir: Path
output_dir: Path
output_ext: str
input_ext: str
def _get_converted_path(input_path: Path, config: ConversionConfig):
"""
Return the path you would obtain if you moved the file in the input
@ -24,32 +26,34 @@ def _get_converted_path(input_path: Path, config: ConversionConfig):
output_path = config.output_dir / relative_path.parent / output_filename
return output_path
def _build_conversion_tasks(input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable) -> list[Tasks]:
def _build_conversion_tasks(
input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable,
) -> list[Tasks]:
tasks = []
for input_path, output_path in zip(input_files, output_files):
cmd = conversion_func(input_path, config.output_dir, config.output_ext)
tasks.append(Task(cmd, output_tmp, output_path))
return tasks
def get_unconverted_files(input_files: list[Path],
config: ConversionConfig) -> list[Path]:
def get_unconverted_files(
input_files: list[Path], config: ConversionConfig
) -> list[Path]:
output_files = [_get_converted_path(f, config) for f in input_files]
return [f for f in output_files if not f.exists()]
def convert(config: ConversionConfig, conversion_func: Callable):
logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir)
input_files = get_files_recursive(
config.input_dir.resolve(), config.input_ext)
input_files = get_files_recursive(config.input_dir.resolve(), config.input_ext)
output_files = get_uncoverted_files(input_files, config)
tasks = _build_conversion_tasks(input_files,
output_files,
config,
conversion_func)
tasks = _build_conversion_tasks(input_files, output_files, config, conversion_func)
run_tasks(tasks)

View file

@ -1,6 +1,5 @@
from pathlib import Path
def get_files_recursive(search_path: Path,
extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))
def get_files_recursive(search_path: Path, extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))

View file

@ -5,16 +5,15 @@ from jgutils import music
logger = logging.getLogger(__name__)
def cli_music_convert(args):
config = music.CovertionConfig(
args.input_dir.resolve(),
args.output_dir.resolve(),
"mp3",
"flac"
args.input_dir.resolve(), args.output_dir.resolve(), "mp3", "flac"
)
music.convert(config)
def main_cli():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
@ -31,9 +30,7 @@ def main_cli():
)
music_convert_parser.add_argument(
"--output_dir",
type=Path, default=Path(),
help="Directory for converted files"
"--output_dir", type=Path, default=Path(), help="Directory for converted files"
)
music_convert_parser.set_defaults(func=cli_replace_in_files)
@ -43,5 +40,6 @@ def main_cli():
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main_cli()

View file

@ -6,9 +6,8 @@ import uuid
logger = logging.getLogger(__name__)
def ffmpeg_convert(input_path: Path,
output_dir: Path,
output_ext: str) -> str:
def ffmpeg_convert(input_path: Path, output_dir: Path, output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
@ -25,5 +24,3 @@ def move(input_dir: Path, output_dir: Path):
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)

View file

@ -1 +1 @@
from .tasks import * # NOQA
from .tasks import * # NOQA

View file

@ -22,7 +22,7 @@ def _run_task(args):
os.makedirs(task.output_path.parent, exist_ok=True)
shutil.move(task.output_tmp, task.output_path)
def run_tasks(tasks, pool_size: 10):
with Pool(10) as p:
p.map(_run_task, tasks)
def run_tasks(tasks, pool_size: 10):
with Pool(10) as p:
p.map(_run_task, tasks)