Clean package structure ahead of tests

This commit is contained in:
James Grogan 2024-03-29 12:23:14 +00:00
parent 43114d8c1f
commit 4843413883
10 changed files with 16 additions and 10 deletions

View file

View file

@ -0,0 +1,29 @@
import logging
from .util import run_op
class UfwInterface:
def __init__(self):
pass
def enable(self):
op = "ufw enable"
logging.info(f"Enabling ufw: {op}")
run_op(op)
def allow_app(self, app_name: str):
op = f"ufw allow {app_name}"
logging.info(f"Allowing ufw app: {op}")
run_op(op)
class Firewall:
def __init__(self):
self.ufw = UfwInterface()
def allow_app(self, app_name: str):
self.ufw.allow_app(app_name)
def enable(self):
self.ufw.enable()

View file

@ -0,0 +1,29 @@
from .firewall import Firewall
from .user import User, UserManager
from .ssh_config import SshConfig
from .package_manager import PackageManager
class Machine:
def __init__(self, default_user: User):
self.user = default_user
self.user_manager = UserManager()
self.firewall = Firewall()
self.ssh_config = SshConfig()
self.package_manager = PackageManager()
def enable_firewall(self):
self.firewall.allow_app("OpenSSH")
self.firewall.enable()
def secure_ssh_config(self):
self.ssh_config.sync_target_values()
self.ssh_config.restart_service()
def setup(self):
self.package_manager.update()
self.user_manager.setup_user(self.user)
self.enable_firewall()
self.secure_ssh_config()
self.package_manager.install_packages(["rsync", "fail2ban"])
self.ssh_config.copy_ssh_dir_to_user(self.user.name)

View file

@ -0,0 +1,40 @@
import logging
from .util import run_op
class AptInterface:
def __init__(self):
pass
def update(self):
op = "apt-get update"
logging.info(f"Updating apt: {op}")
run_op(op)
def upgrade(self):
op = "apt-get -y upgrade"
logging.info(f"Upgrading via apt: {op}")
run_op(op)
def install_packages(self, packages: list):
packages_str = "".join(packages)
op = f"apt-get install -y {packages_str}"
logging.info(f"Installing packages: {op}")
run_op(op)
class PackageManager:
def __init__(self):
self.apt = AptInterface()
def update(self):
self.apt.update()
def upgrade(self):
self.update()
self.apt.upgrade()
def install_packages(self, packages: list):
self.apt.install_packages(packages)

View file

@ -0,0 +1,26 @@
from pathlib import Path
import logging
from .util import run_op
class SshConfig:
def __init__(self):
self.config_path = Path("/etc/ssh/ssh_config")
self.target_values = {"PermitRootLogin": "No",
"PasswordAuthentication": "No",
"ChallengeResponseAuthentication": "No",
"UsePAM": "No"}
def sync_target_values(self):
logging.info(f"Updating ssh config in: {self.config_path}")
pass
def restart_service(self):
op = "systemctl restart ssh"
logging.info(f"Restarting ssh service: {op}")
run_op(op)
def copy_ssh_dir_to_user(self, username:str):
op = f"rsync --archive --chown={username}:{username} ~/.ssh /home/{username}"

27
src/machine_admin/user.py Normal file
View file

@ -0,0 +1,27 @@
import logging
from .util import run_op
class User:
def __init__(self, name, has_sudo=False):
self.name = name
self.has_sudo = has_sudo
class UserManager:
def __init__():
pass
def setup_user(self, user: User):
add_user(user)
if user.has_sudo:
add_user_to_sudo(user)
def add_user(self, user: User):
op = f"adduser {user.name}"
logging.info(f"Adding user: {op}")
run_op(op)
def add_user_to_sudo(self, user: User):
op = f"usermod -aG sudo {user.name}"
logging.info(f"Adding user to sudo: {op}")
run_op(op)

13
src/machine_admin/util.py Normal file
View file

@ -0,0 +1,13 @@
import subprocess
import logging
_DRY_RUN = False
def set_is_dry_run(is_dry_run: bool):
_DRY_RUN = is_dry_run
def run_op(op: str):
if _DRY_RUN:
return subprocess.run(op, shell=True)
else:
logging.info(f"Dry Run | {op}")

24
src/machine_setup.py Normal file
View file

@ -0,0 +1,24 @@
import argparse
import logging
from machine_admin.user import User
from machine_admin.machine import Machine
if __name__ == "__main__":
parser = argparse.ArgumentParser(
prog='MachineSetup',
description='Scripts for machine provisioning')
parser.add_argument('--username',
help="Name of the default non-root user")
parser.add_argument('--dry_run',
help="If set then don't change the system state - used for testing.",
default = False)
args = parser.parse_args()
user = User(args.username, has_sudo=True)
machine = Machine(user)
machine.setup()