Apply formatting

This commit is contained in:
James Grogan 2024-09-29 14:06:05 +01:00
parent a6382a9b96
commit 9a70b6206b
12 changed files with 136 additions and 121 deletions

View file

@ -6,6 +6,7 @@ from machine_admin.util import run_op
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DebPreseeder: class DebPreseeder:
def __init__(self, iso_path: Path, preseed_path: Path): def __init__(self, iso_path: Path, preseed_path: Path):
@ -59,16 +60,3 @@ class DebPreseeder:
self.add_preseed_file() self.add_preseed_file()
self.regenerate_checksum() self.regenerate_checksum()

View file

@ -4,6 +4,7 @@ from .util import run_op
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class UfwInterface: class UfwInterface:
def __init__(self): def __init__(self):
@ -19,6 +20,7 @@ class UfwInterface:
logger.info(f"Allowing ufw app: {op}") logger.info(f"Allowing ufw app: {op}")
run_op(op) run_op(op)
class Firewall: class Firewall:
def __init__(self): def __init__(self):

View file

@ -3,6 +3,7 @@ from .user import User, UserManager
from .ssh_config import SshConfig from .ssh_config import SshConfig
from .package_manager import PackageManager from .package_manager import PackageManager
class Machine: class Machine:
def __init__(self, default_user: User): def __init__(self, default_user: User):

View file

@ -5,6 +5,7 @@ from .util import run_op, has_program
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class AptInterface: class AptInterface:
def __init__(self): def __init__(self):
@ -26,6 +27,7 @@ class AptInterface:
logger.info(f"Installing packages: {op}") logger.info(f"Installing packages: {op}")
run_op(op) run_op(op)
class SystemPackage: class SystemPackage:
def __init__(self, name): def __init__(self, name):

View file

@ -1,5 +1,6 @@
from machine_admin.util import has_program from machine_admin.util import has_program
class Distro: class Distro:
def __init__(self, name): def __init__(self, name):

View file

@ -2,5 +2,6 @@ import sys
from .distro import Distro from .distro import Distro
def is_linux(): def is_linux():
return sys.platform == "linux" or sys.platform == "linux2" return sys.platform == "linux" or sys.platform == "linux2"

View file

@ -5,14 +5,17 @@ from .util import run_op
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class SshConfig: class SshConfig:
def __init__(self): def __init__(self):
self.config_path = Path("/etc/ssh/ssh_config") self.config_path = Path("/etc/ssh/ssh_config")
self.target_values = {"PermitRootLogin": "No", self.target_values = {
"PermitRootLogin": "No",
"PasswordAuthentication": "No", "PasswordAuthentication": "No",
"ChallengeResponseAuthentication": "No", "ChallengeResponseAuthentication": "No",
"UsePAM": "No"} "UsePAM": "No",
}
def sync_target_values(self): def sync_target_values(self):
logger.info(f"Updating ssh config in: {self.config_path}") logger.info(f"Updating ssh config in: {self.config_path}")
@ -27,4 +30,3 @@ class SshConfig:
op = f"rsync --archive --chown={username}:{username} ~/.ssh /home/{username}" op = f"rsync --archive --chown={username}:{username} ~/.ssh /home/{username}"
logger.info(f"Copying ssh dir to user: {op}") logger.info(f"Copying ssh dir to user: {op}")
run_op(op) run_op(op)

View file

@ -3,11 +3,13 @@ from .util import run_op
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class User: class User:
def __init__(self, name, has_sudo=False): def __init__(self, name, has_sudo=False):
self.name = name self.name = name
self.has_sudo = has_sudo self.has_sudo = has_sudo
class UserManager: class UserManager:
def __init__(self): def __init__(self):

View file

@ -5,18 +5,17 @@ logger = logging.getLogger(__name__)
_DRY_RUN = False _DRY_RUN = False
def set_is_dry_run(is_dry_run: bool): def set_is_dry_run(is_dry_run: bool):
_DRY_RUN = is_dry_run _DRY_RUN = is_dry_run
def run_op(op: str, capture_output:bool = False,
cwd:str = None): def run_op(op: str, capture_output: bool = False, cwd: str = None):
if not _DRY_RUN: if not _DRY_RUN:
if capture_output: if capture_output:
ret = subprocess.run(op, ret = subprocess.run(
shell=True, op, shell=True, capture_output=True, text=True, cwd=cwd
capture_output = True, )
text = True,
cwd=cwd)
ret.check_returncode() ret.check_returncode()
return ret.stdout return ret.stdout
else: else:
@ -25,10 +24,8 @@ def run_op(op: str, capture_output:bool = False,
else: else:
logger.info(f"Dry Run | {op}") logger.info(f"Dry Run | {op}")
def has_program(program_name: str): def has_program(program_name: str):
op = f"which {program_name}" op = f"which {program_name}"
ret = subprocess.run(op, ret = subprocess.run(op, shell=True, capture_output=True, text=True)
shell=True,
capture_output = True,
text = True)
return len(ret.stdout) return len(ret.stdout)

View file

@ -10,14 +10,15 @@ logger = logging.getLogger(__name__)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog='MachineSetup', prog="MachineSetup", description="Scripts for machine provisioning"
description='Scripts for machine provisioning') )
parser.add_argument('username', parser.add_argument("username", help="Name of the default non-root user")
help="Name of the default non-root user") parser.add_argument(
parser.add_argument('--dry_run', "--dry_run",
help="If set then don't change the system state - used for testing.", help="If set then don't change the system state - used for testing.",
default = False) default=False,
)
args = parser.parse_args() args = parser.parse_args()
set_is_dry_run(args.dry_run) set_is_dry_run(args.dry_run)
@ -28,4 +29,3 @@ if __name__ == "__main__":
user = User(args.username, has_sudo=True) user = User(args.username, has_sudo=True)
machine = Machine(user) machine = Machine(user)
machine.setup() machine.setup()

View file

@ -3,12 +3,14 @@ import os
import logging import logging
import shutil import shutil
from pathlib import Path from pathlib import Path
from typing import NamedTuple
import subprocess import subprocess
import uuid import uuid
from multiprocessing import Pool from multiprocessing import Pool
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def convert_audio_file(args): def convert_audio_file(args):
cmd, output_tmp, output_path = args cmd, output_tmp, output_path = args
@ -16,34 +18,47 @@ def convert_audio_file(args):
os.makedirs(output_path.parent, exist_ok=True) os.makedirs(output_path.parent, exist_ok=True)
shutil.move(output_tmp, output_path) shutil.move(output_tmp, output_path)
class AudioFileConverter():
def __init__(self, input_dir: Path, output_dir: Path): class ConversionConfig(NamedTuple):
self.input_dir = input_dir input_dir: Path
self.output_dir = output_dir output_dir: Path
self.output_fmt = "mp3" output_fmt: str
self.input_fmt = "flac" input_fmt: str
def get_converted_path(self, input_path: Path):
relative_path = input_path.relative_to(self.input_dir) def _get_converted_path(input_path: Path, config: ConversionConfig):
output_filename = str(input_path.stem) + f".{self.output_fmt}" """
Return the path you would obtain if you moved the file in the input
path to the output directory in the config and changed its extension.
"""
relative_path = input_path.relative_to(config.input_dir)
output_filename = str(input_path.stem) + f".{config.output_fmt}"
output_filename.replace("'", "") output_filename.replace("'", "")
output_path = self.output_dir / relative_path.parent / output_filename output_path = config.output_dir / relative_path.parent / output_filename
return output_path return output_path
def convert(self):
logger.info("Converting files in %s", self.input_dir) def get_files_recursive(search_path: Path, extension: str):
logger.info("Writing output to: %s", self.output_dir) return list(search_path.rglob(f"*.{extension}"))
os.makedirs(self.output_dir, exist_ok=True)
input_files = list(self.input_dir.resolve().rglob(f"*.{self.input_fmt}")) def convert(config: ConversionConfig):
logger.info("Converting files in %s", config.input_dir)
logger.info("Writing output to: %s", config.output_dir)
os.makedirs(config.output_dir, exist_ok=True)
input_files = get_files_recursive(
config.input_dir.resolve(), config.input_fmt
)
output_files = [] output_files = []
for input_file in input_files: for input_file in input_files:
candidate_output = self.get_converted_path(input_file) candidate_output = _get_converted_path(input_file)
if not candidate_output.exists(): if not candidate_output.exists():
output_files.append(candidate_output) output_files.append(candidate_output)
print(output_files)
tasks = [] tasks = []
for idx, (input_path, output_path) in enumerate(zip(input_files, output_files)): for idx, (input_path, output_path) in enumerate(zip(input_files, output_files)):
@ -71,19 +86,21 @@ def move(input_dir: Path, output_dir: Path):
os.makedirs(output_path.parent, exist_ok=True) os.makedirs(output_path.parent, exist_ok=True)
shutil.move(path, output_path) shutil.move(path, output_path)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--input_dir', parser.add_argument(
"--input_dir",
type=Path, type=Path,
default=Path(), default=Path(),
help="Directory with input files for conversion.") help="Directory with input files for conversion.",
)
parser.add_argument('--output_dir', parser.add_argument(
type=Path, "--output_dir", type=Path, default=Path(), help="Directory for converted files"
default=Path(), )
help="Directory for converted files")
args = parser.parse_args() args = parser.parse_args()
@ -93,5 +110,3 @@ if __name__ == "__main__":
converter = AudioFileConverter(args.input_dir.resolve(), args.output_dir.resolve()) converter = AudioFileConverter(args.input_dir.resolve(), args.output_dir.resolve())
converter.convert() converter.convert()
# move(args.input_dir.resolve(), args.output_dir.resolve()) # move(args.input_dir.resolve(), args.output_dir.resolve())

View file

@ -28,7 +28,9 @@ def sync(source_dir: Path, target_dir: Path, host: str):
source_files = [] source_files = []
for dir_path, _, files in os.walk(source_dir): for dir_path, _, files in os.walk(source_dir):
for each_file in files: for each_file in files:
source_files.append(Path(dir_path + "/" + each_file).relative_to(source_dir)) source_files.append(
Path(dir_path + "/" + each_file).relative_to(source_dir)
)
sync_files = [] sync_files = []
for source_file in source_files: for source_file in source_files:
@ -41,24 +43,28 @@ def sync(source_dir: Path, target_dir: Path, host: str):
os.makedirs((sync_dir / sync_file).parent, exist_ok=True) os.makedirs((sync_dir / sync_file).parent, exist_ok=True)
shutil.copy(source_dir / sync_file, sync_dir / sync_file) shutil.copy(source_dir / sync_file, sync_dir / sync_file)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--source_dir', parser.add_argument(
"--source_dir",
type=Path, type=Path,
default=Path(), default=Path(),
help="Directory with source files to sync from.") help="Directory with source files to sync from.",
)
parser.add_argument('--host', parser.add_argument(
type=str, "--host", type=str, default="", help="Name of host system to sync with."
default = "", )
help="Name of host system to sync with.")
parser.add_argument('--target_dir', parser.add_argument(
"--target_dir",
type=Path, type=Path,
default=Path(), default=Path(),
help="Directory with source files to sync to.") help="Directory with source files to sync to.",
)
args = parser.parse_args() args = parser.parse_args()
@ -66,5 +72,3 @@ if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO) logging.getLogger().setLevel(logging.INFO)
sync(args.source_dir.resolve(), args.target_dir.resolve(), args.host) sync(args.source_dir.resolve(), args.target_dir.resolve(), args.host)