Change to python project format

This commit is contained in:
James Grogan 2024-09-29 18:23:09 +01:00
parent b20465a3d7
commit 61599cb76c
27 changed files with 268 additions and 123 deletions

0
src/jgutils/__init__.py Normal file
View file

55
src/jgutils/converters.py Normal file
View file

@ -0,0 +1,55 @@
from pathlib import Path
from typing import NamedTuple, Callable
import logging
from .tasks import Task
logger = logging.getLogger(__name__)
class ConversionConfig(NamedTuple):
input_dir: Path
output_dir: Path
output_ext: str
input_ext: str
def _get_converted_path(input_path: Path, config: ConversionConfig):
"""
Return the path you would obtain if you moved the file in the input
path to the output directory in the config and changed its extension.
"""
relative_path = input_path.relative_to(config.input_dir)
output_filename = str(input_path.stem) + f".{config.output_ext}"
output_filename.replace("'", "")
output_path = config.output_dir / relative_path.parent / output_filename
return output_path
def _build_conversion_tasks(input_files: list[Path],
output_files: list[Path],
config: ConversionConfig,
conversion_func: Callable) -> list[Tasks]:
tasks = []
for input_path, output_path in zip(input_files, output_files):
cmd = conversion_func(input_path, config.output_dir, config.output_ext)
tasks.append(Task(cmd, output_tmp, output_path))
return tasks
def get_unconverted_files(input_files: list[Path],
config: ConversionConfig) -> list[Path]:
output_files = [_get_converted_path(f, config) for f in input_files]
return [f for f in output_files if not f.exists()]
def convert(config: ConversionConfig, conversion_func: Callable):
logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir)
input_files = get_files_recursive(
config.input_dir.resolve(), config.input_ext)
output_files = get_uncoverted_files(input_files, config)
tasks = _build_conversion_tasks(input_files,
output_files,
config,
conversion_func)
run_tasks(tasks)

View file

View file

@ -0,0 +1,6 @@
from pathlib import Path
def get_files_recursive(search_path: Path,
extension: str) -> list[Path]:
return list(search_path.rglob(f"*.{extension}"))

View file

View file

@ -0,0 +1,62 @@
from pathlib import Path
import logging
from machine_admin.package_manager import SystemPackage
from machine_admin.util import run_op
logger = logging.getLogger(__name__)
class DebPreseeder:
def __init__(self, iso_path: Path, preseed_path: Path):
self.iso_path = iso_path
self.preseed_path = preseed_path
self.udevil = SystemPackage("udevil")
self.gzip = SystemPackage("gzip")
self.mount_point = None
def mount_iso(self):
self.udevil.check_available()
op = f"udevil mount {self.iso_path}"
logger.info(f"Mounting iso | {op}")
output = run_op(op, capture_output=True)
self.mount_point = Path(output.split(" ")[3])
def add_preseed_file(self):
# Copy ISO content to writeable location
op = f"cp -rT {self.mount_point} {self.workdir}/isofiles/"
run_op(op)
# Make path writeable
install_arch_path = f"{self.workdir}/isofiles/install.amd"
run_op(f"chmod +w -R {install_arch_path}")
# Extract initrd
self.gzip.check_available()
initrd_path = f"{install_arch_path}/initrd"
run_op(f"gunzip {initrd_path}.gz")
# Add the preseed file
op = f"echo {self.preseed_path} | cpio -H newc -o -A -F {initrd_path}"
run_op(op)
# Recompress the initrd
run_op(f"gzip {initrd_path}")
# Restore path permissions
run_op(f"chmod -w -R {install_arch_path}")
def regenerate_checksum(self):
checksum_path = f"{self.workdir}/isofiles/md5sum.txt"
run_op(f"chmod +w {checksum_path}")
def regenerate_iso(self, workdir: Path):
self.mount_iso()
self.add_preseed_file()
self.regenerate_checksum()

View file

@ -0,0 +1,33 @@
import logging
from .util import run_op
logger = logging.getLogger(__name__)
class UfwInterface:
def __init__(self):
pass
def enable(self):
op = "ufw enable"
logger.info(f"Enabling ufw: {op}")
run_op(op)
def allow_app(self, app_name: str):
op = f"ufw allow {app_name}"
logger.info(f"Allowing ufw app: {op}")
run_op(op)
class Firewall:
def __init__(self):
self.ufw = UfwInterface()
def allow_app(self, app_name: str):
self.ufw.allow_app(app_name)
def enable(self):
self.ufw.enable()

View file

@ -0,0 +1,30 @@
from .firewall import Firewall
from .user import User, UserManager
from .ssh_config import SshConfig
from .package_manager import PackageManager
class Machine:
def __init__(self, default_user: User):
self.user = default_user
self.user_manager = UserManager()
self.firewall = Firewall()
self.ssh_config = SshConfig()
self.package_manager = PackageManager()
def enable_firewall(self):
self.firewall.allow_app("OpenSSH")
self.firewall.enable()
def secure_ssh_config(self):
self.ssh_config.sync_target_values()
self.ssh_config.restart_service()
def setup(self):
self.package_manager.upgrade()
self.user_manager.setup_user(self.user)
self.enable_firewall()
self.secure_ssh_config()
self.package_manager.install_packages(["rsync", "fail2ban"])
self.ssh_config.copy_ssh_dir_to_user(self.user.name)

View file

@ -0,0 +1,60 @@
import logging
from .platform.distro import Distro
from .util import run_op, has_program
logger = logging.getLogger(__name__)
class AptInterface:
def __init__(self):
pass
def update(self):
op = "apt-get update"
logger.info(f"Updating apt: {op}")
run_op(op)
def upgrade(self):
op = "apt-get -y upgrade"
logger.info(f"Upgrading via apt: {op}")
run_op(op)
def install_packages(self, packages: list):
packages_str = " ".join(packages)
op = f"apt-get install -y {packages_str}"
logger.info(f"Installing packages: {op}")
run_op(op)
class SystemPackage:
def __init__(self, name):
self.name = name
self.package_alt_names = {}
def check_available(self):
if has_program(self.name):
return
msg = f"Program {self.name} not found"
if Distro.has_apt():
msg = f"Program not found. Install with: sudo apt-get install {self.name}"
raise RuntimeError(msg)
class PackageManager:
def __init__(self):
self.apt = AptInterface()
def update(self):
self.apt.update()
def upgrade(self):
self.update()
self.apt.upgrade()
def install_packages(self, packages: list):
self.apt.install_packages(packages)

View file

@ -0,0 +1,12 @@
from machine_admin.util import has_program
class Distro:
def __init__(self, name):
self.name = name
self.version = ""
@staticmethod
def has_apt(self):
return has_program("apt-get")

View file

@ -0,0 +1,7 @@
import sys
from .distro import Distro
def is_linux():
return sys.platform == "linux" or sys.platform == "linux2"

View file

@ -0,0 +1,32 @@
from pathlib import Path
import logging
from .util import run_op
logger = logging.getLogger(__name__)
class SshConfig:
def __init__(self):
self.config_path = Path("/etc/ssh/ssh_config")
self.target_values = {
"PermitRootLogin": "No",
"PasswordAuthentication": "No",
"ChallengeResponseAuthentication": "No",
"UsePAM": "No",
}
def sync_target_values(self):
logger.info(f"Updating ssh config in: {self.config_path}")
pass
def restart_service(self):
op = "systemctl restart ssh"
logger.info(f"Restarting ssh service: {op}")
run_op(op)
def copy_ssh_dir_to_user(self, username: str):
op = f"rsync --archive --chown={username}:{username} ~/.ssh /home/{username}"
logger.info(f"Copying ssh dir to user: {op}")
run_op(op)

View file

@ -0,0 +1,31 @@
import logging
from .util import run_op
logger = logging.getLogger(__name__)
class User:
def __init__(self, name, has_sudo=False):
self.name = name
self.has_sudo = has_sudo
class UserManager:
def __init__(self):
pass
def setup_user(self, user: User):
self.add_user(user)
if user.has_sudo:
self.add_user_to_sudo(user)
def add_user(self, user: User):
op = f'adduser {user.name} --disabled-password --comment ""'
logger.info(f"Adding user: {op}")
run_op(op)
def add_user_to_sudo(self, user: User):
op = f"usermod -aG sudo {user.name}"
logger.info(f"Adding user to sudo: {op}")
run_op(op)

View file

@ -0,0 +1,31 @@
import subprocess
import logging
logger = logging.getLogger(__name__)
_DRY_RUN = False
def set_is_dry_run(is_dry_run: bool):
_DRY_RUN = is_dry_run
def run_op(op: str, capture_output: bool = False, cwd: str = None):
if not _DRY_RUN:
if capture_output:
ret = subprocess.run(
op, shell=True, capture_output=True, text=True, cwd=cwd
)
ret.check_returncode()
return ret.stdout
else:
ret = subprocess.run(op, shell=True, cwd=cwd)
ret.check_returncode()
else:
logger.info(f"Dry Run | {op}")
def has_program(program_name: str):
op = f"which {program_name}"
ret = subprocess.run(op, shell=True, capture_output=True, text=True)
return len(ret.stdout)

View file

@ -0,0 +1,31 @@
import argparse
import logging
from machine_admin.user import User
from machine_admin.machine import Machine
from machine_admin.util import set_is_dry_run
logger = logging.getLogger(__name__)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog="MachineSetup", description="Scripts for machine provisioning"
)
parser.add_argument("username", help="Name of the default non-root user")
parser.add_argument(
"--dry_run",
help="If set then don't change the system state - used for testing.",
default=False,
)
args = parser.parse_args()
set_is_dry_run(args.dry_run)
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
user = User(args.username, has_sudo=True)
machine = Machine(user)
machine.setup()

47
src/jgutils/main_cli.py Normal file
View file

@ -0,0 +1,47 @@
import argparse
import logger
from jgutils import music
logger = logging.getLogger(__name__)
def cli_music_convert(args):
config = music.CovertionConfig(
args.input_dir.resolve(),
args.output_dir.resolve(),
"mp3",
"flac"
)
music.convert(config)
def main_cli():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(required=True)
music_parser = subparsers.add_parser("music")
music_subparsers = music_parser.add_subparsers(required=True)
music_convert_parser = music_subparsers.add_parser("convert")
music_convert_parser.add_argument(
"--input_dir",
type=Path,
default=Path(),
help="Directory with input files for conversion.",
)
music_convert_parser.add_argument(
"--output_dir",
type=Path, default=Path(),
help="Directory for converted files"
)
music_convert_parser.set_defaults(func=cli_replace_in_files)
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
args = parser.parse_args()
args.func(args)
if __name__ == "__main__":
main_cli()

View file

@ -0,0 +1,29 @@
import os
import logging
import shutil
from pathlib import Path
import uuid
logger = logging.getLogger(__name__)
def ffmpeg_convert(input_path: Path,
output_dir: Path,
output_ext: str) -> str:
output_tmp = output_dir / (str(uuid.uuid4()) + f".{output_ext}")
cmd = f"ffmpeg -i '{input_path}' -ab 320k -map_metadata 0 -id3v2_version 3 '{output_tmp}'"
def move(input_dir: Path, output_dir: Path):
logger.info("Moving files in %s to %s", input_dir, output_dir)
os.makedirs(output_dir, exist_ok=True)
mp3_files = list(input_dir.resolve().rglob("*.mp3"))
for idx, path in enumerate(mp3_files):
logger.info("Moving file %d of %d", idx, len(mp3_files))
relative_path = path.relative_to(input_dir)
output_path = output_dir / relative_path
os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path)

View file

@ -0,0 +1,74 @@
import argparse
import os
import logging
import shutil
from pathlib import Path
import subprocess
import uuid
from fabric import Connection
logger = logging.getLogger(__name__)
def sync(source_dir: Path, target_dir: Path, host: str):
logger.info("Syncing files in %s to %s on % s", source_dir, target_dir, host)
cmd = f"find {target_dir} -type f"
if host:
result = Connection(host).run(cmd, hide=True)
target_files = result.stdout.splitlines()
else:
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
target_files = result.stdout.splitlines()
target_files = [Path(f).relative_to(target_dir) for f in target_files]
source_files = []
for dir_path, _, files in os.walk(source_dir):
for each_file in files:
source_files.append(
Path(dir_path + "/" + each_file).relative_to(source_dir)
)
sync_files = []
for source_file in source_files:
if source_file not in target_files:
sync_files.append(source_file)
sync_dir = Path(os.getcwd()) / "sync"
os.makedirs(sync_dir, exist_ok=True)
for sync_file in sync_files:
os.makedirs((sync_dir / sync_file).parent, exist_ok=True)
shutil.copy(source_dir / sync_file, sync_dir / sync_file)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--source_dir",
type=Path,
default=Path(),
help="Directory with source files to sync from.",
)
parser.add_argument(
"--host", type=str, default="", help="Name of host system to sync with."
)
parser.add_argument(
"--target_dir",
type=Path,
default=Path(),
help="Directory with source files to sync to.",
)
args = parser.parse_args()
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
sync(args.source_dir.resolve(), args.target_dir.resolve(), args.host)

View file

@ -0,0 +1,4 @@
for f in $(find . -name '*.HEIC');
do echo "Converting $f";
/home/jgrogan/code/ImageMagick-7.1.1-35/utilities/magick mogrify -format png -quality 100% "$f";
done;

0
src/jgutils/py.typed Normal file
View file

View file

@ -0,0 +1 @@
from .tasks import * # NOQA

View file

@ -0,0 +1,28 @@
import os
from pathlib import Path
import logging
import shutil
import subprocess
from typing import NamedTuple
from multiprocessing import Pool
logger = logging.getLogger(__name__)
class Task(NamedTuple):
cmd: str
output_tmp: Path
output_path: Path
def _run_task(args):
task = args[0]
subprocess.run(cmd, shell=True)
os.makedirs(task.output_path.parent, exist_ok=True)
shutil.move(task.output_tmp, task.output_path)
def run_tasks(tasks, pool_size: 10):
with Pool(10) as p:
p.map(_run_task, tasks)