Source code for cmkinitramfs.entry

"""Entry point module for cmkinitramfs"""

from __future__ import annotations

import argparse
import configparser
import itertools
import locale
import logging
import os
import os.path
import shlex
import shutil
import sys
from collections import defaultdict
from dataclasses import dataclass
from typing import (
    DefaultDict, Dict, Iterable, List, Mapping, Optional, Tuple, overload
)

import cmkinitramfs
import cmkinitramfs.data as datamod
import cmkinitramfs.initramfs as mkramfs
from .bin import find_lib, find_lib_iter
from .init import (mkinit, Breakpoint, BUSYBOX_COMMON_DEPS,
                   BUSYBOX_KEYMAP_DEPS, BUSYBOX_KMOD_DEPS)
from .utils import removeprefix

logger = logging.getLogger(__name__)
_VERSION_INFO = \
    f"%(prog)s ({cmkinitramfs.__name__}) {cmkinitramfs.__version__}"
BINARY_KEYMAP_MAGIC = b'bkeymap'


def _find_config_file() -> str:
    """Find a configuration file to use"""
    env_config = os.environ.get('CMKINITCFG')
    if env_config is not None and os.path.isfile(env_config):
        return env_config
    if os.path.isfile('./cmkinitramfs.ini'):
        return './cmkinitramfs.ini'
    if os.path.isfile('/etc/cmkinitramfs.ini'):
        return '/etc/cmkinitramfs.ini'
    raise FileNotFoundError("Configuration file not found")


[docs]@dataclass class Config: """Configuration informations :param root: Rootfs data needed to boot :param mounts: Non-rootfs datas needed to boot :param keymap: Keymap information tuple ``(source, build, dest)``: ``source``: keymap to convert, ``build``: converted keymap, ``dest``: keymap path within the initramfs :param files: User configured files, see :attr:`cmkinitramfs.init.Data.files` :param execs: User configured executables, see :attr:`cmkinitramfs.init.Data.files` :param libs: User configured libraries, see :attr:`cmkinitramfs.init.Data.files` :param busybox: Needed executables compatibles with busybox implementation :param init_path: Path where the init script will be generated :param cmkcpiodir_opts: Default options for cmkcpiodir :param cmkcpiolist_opts: Default options for cmkcpiolist :param modules: Kernel modules to be loaded in the initramfs: ``{module: (arg, ...)}``. See :func:`cmkinitramfs.init.mkinit`. :param scripts: User scripts to run at given breakpoints. See ``scripts`` for :func:`cmkinitramfs.init.mkinit`. """ root: datamod.Data mounts: Iterable[datamod.Data] keymap: Optional[Tuple[str, str, str]] files: Iterable[Tuple[str, Optional[str]]] execs: Iterable[Tuple[str, Optional[str]]] libs: Iterable[Tuple[str, Optional[str]]] busybox: Iterable[str] init_path: str cmkcpiodir_opts: str cmkcpiolist_opts: str modules: Mapping[str, Iterable[str]] has_modules_manual: bool scripts: Mapping[Breakpoint, Iterable[str]]
[docs]def read_config(config_file: Optional[str] = None) -> Config: """Read a configuration file and generate data structures from it :param config_file: Configuration file to use. Defaults to, in order: ``CMKINITCFG`` environment variable, ``./cmkinitramfs.ini``, ``/etc/cmkinitramfs.ini``. :return: Configuration dictionnary, described by :class:`Config` :raises ValueError: Config file parsing error """ @overload def find_data(data_str: None) -> None: ... @overload def find_data(data_str: str) -> datamod.Data: ... def find_data(data_str: Optional[str]) -> Optional[datamod.Data]: """Find a Data object from a data string""" if data_str is None: return None if data_str.startswith('PATH='): data_str = removeprefix(data_str, 'PATH=') if data_dic.get(data_str) is None: data_dic[data_str] = datamod.PathData(data_str) elif data_str.startswith('UUID='): data_str = removeprefix(data_str, 'UUID=') if data_dic.get(data_str) is None: data_dic[data_str] = datamod.UuidData(data_str, False) elif data_str.startswith('LABEL='): data_str = removeprefix(data_str, 'LABEL=') if data_dic.get(data_str) is None: data_dic[data_str] = datamod.LabelData(data_str, False) elif data_str.startswith('PARTUUID='): data_str = removeprefix(data_str, 'PARTUUID=') if data_dic.get(data_str) is None: data_dic[data_str] = datamod.UuidData(data_str, True) elif data_str.startswith('PARTLABEL='): data_str = removeprefix(data_str, 'PARTLABEL=') if data_dic.get(data_str) is None: data_dic[data_str] = datamod.LabelData(data_str, True) elif data_str.startswith('DATA='): data_str = removeprefix(data_str, 'DATA=') elif data_dic.get(data_str) is None and os.path.isabs(data_str): data_dic[data_str] = datamod.PathData(data_str) return data_dic[data_str] # Read config file if config_file is None: config_file = _find_config_file() config = configparser.ConfigParser() if config.read(config_file) != [config_file]: raise ValueError(f"Cound not read configuration {config_file}") # Get all data sources in data_dic data_dic: Dict[str, datamod.Data] = {} for data_id in config.sections(): data_config = config[data_id] if data_config['type'] == 'luks': data_dic[data_id] = datamod.LuksData( find_data(data_config['source']), data_config['name'], find_data(data_config.get('key')), find_data(data_config.get('header')), data_config.getboolean('discard', fallback=False), ) elif data_config['type'] == 'lvm': data_dic[data_id] = datamod.LvmData( data_config['vg-name'], data_config['lv-name'], ) elif data_config['type'] == 'mount': data_dic[data_id] = datamod.MountData( find_data(data_config['source']), data_config['mountpoint'], data_config['filesystem'], data_config.get('options', 'ro'), ) elif data_config['type'] == 'md': data_dic[data_id] = datamod.MdData( [find_data(k.strip()) for k in data_config['source'].strip().split('\n')], data_config['name'], ) elif data_config['type'] == 'zfspool': data_dic[data_id] = datamod.ZFSPoolData( data_config['pool'], find_data(data_config.get('cache')), ) elif data_config['type'] == 'zfscrypt': data_dic[data_id] = datamod.ZFSCryptData( find_data(data_config.get( 'pool', data_config['dataset'].split('/')[0] )), data_config['dataset'], find_data(data_config.get('key')) ) elif data_config['type'] == 'network': data_dic[data_id] = datamod.Network( data_config['device'], data_config.get('ip'), data_config.get('mask'), data_config.get('gateway'), ) elif data_config['type'] == 'iscsi': data_dic[data_id] = datamod.ISCSI( data_config['initiator'], data_config['target'], int(data_config['portal-group']), data_config['address'], int(data_config.get('port', '3260')), data_config.get('username'), data_config.get('password'), data_config.get('username-in'), data_config.get('password-in'), ) else: raise Exception(f"Unknown config type {data_config['type']}") # Configure dependencies for data_id, data in data_dic.items(): if data_id not in config.sections(): continue data_config = config[data_id] for dep in data_config.get('need', '').split(','): if dep.strip(): data.add_dep(find_data(dep.strip())) for ldep in data_config.get('load-need', '').split(','): if ldep.strip(): data.add_load_dep(find_data(ldep.strip())) # Define Data for root and for other mounts root = find_data(config['DEFAULT']['root']) mounts = tuple( find_data(k.strip()) for k in config['DEFAULT'].get('mountpoints', '').split(',') if k.strip() ) # Define needed files, execs and libs files = set() for data in itertools.chain((root,), mounts): files |= data.files for ddep in data.iter_all_deps(): files |= ddep.files for line in config['DEFAULT'].get('files', '').split('\n'): if line: src, *dest = line.split(':', maxsplit=1) files.add((src, dest[0] if dest else None)) execs = set() for data in itertools.chain((root,), mounts): execs |= data.execs for ddep in data.iter_all_deps(): execs |= ddep.execs for line in config['DEFAULT'].get('execs', '').split('\n'): if line: src, *dest = line.split(':', maxsplit=1) execs.add((src, dest[0] if dest else None)) libs = set() for data in itertools.chain((root,), mounts): libs |= data.libs for ddep in data.iter_all_deps(): libs |= ddep.libs for line in config['DEFAULT'].get('libs', '').split('\n'): if line: src, *dest = line.split(':', maxsplit=1) libs.add((src, dest[0] if dest else None)) busybox = set() for data in itertools.chain((root,), mounts): busybox |= data.busybox for ddep in data.iter_all_deps(): busybox |= ddep.busybox for line in config['DEFAULT'].get('busybox', '').split('\n'): if line: busybox.add(line.strip()) modules: DefaultDict[str, List[str]] = defaultdict(list) def add_kmod_deps(kmods: Iterable[Tuple[str, Tuple[str, ...]]]) -> None: for mod, param in kmods: modules[mod].extend(param) has_modules_manual = False for module in config['DEFAULT'].get('modules', '').split('\n'): if module: mod_name, *mod_args = module.split() modules[mod_name].extend(mod_args) has_modules_manual = True for data in itertools.chain((root,), mounts): add_kmod_deps(data.kmods) for ddep in data.iter_all_deps(): add_kmod_deps(ddep.kmods) # User scripts breakpoints = { 'early': Breakpoint.EARLY, 'init': Breakpoint.INIT, 'module': Breakpoint.MODULE, 'rootfs': Breakpoint.ROOTFS, 'mount': Breakpoint.MOUNT, } scripts: Dict[Breakpoint, List[str]] = {k: [] for k in Breakpoint} for script in config['DEFAULT'].get('scripts', '').split('\n'): if script: bname, script = script.split(':', maxsplit=1) scripts[breakpoints[bname.strip().lower()]].append(script.strip()) # Create dictionnary to return ret_cfg = Config( root=root, mounts=mounts, keymap=( config['DEFAULT'].get('keymap-src'), config['DEFAULT'].get('keymap-path', '/tmp/keymap.bmap'), config['DEFAULT'].get('keymap-dest', '/root/keymap.bmap'), ) if config['DEFAULT'].getboolean('keymap', fallback=False) else None, files=files, execs=execs, libs=libs, busybox=busybox, init_path=config['DEFAULT'].get('init-path', '/tmp/init.sh'), cmkcpiodir_opts=config['DEFAULT'].get( 'cmkcpiodir-default-opts', '' ), cmkcpiolist_opts=config['DEFAULT'].get( 'cmkcpiolist-default-opts', '' ), modules=modules, has_modules_manual=has_modules_manual, scripts=scripts, ) # Configure final data sources for data in itertools.chain(ret_cfg.mounts, (ret_cfg.root,)): data.set_final() return ret_cfg
[docs]def entry_cmkinit() -> None: """Main entry point of the module""" config = read_config() parser = argparse.ArgumentParser(description="Build an init script") parser.add_argument('--version', action='version', version=_VERSION_INFO) parser.parse_args() mkinit( out=sys.stdout, root=config.root, mounts=config.mounts, keymap=(None if config.keymap is None else config.keymap[2]), modules=config.modules, scripts=config.scripts, )
def _common_parser_logging(verbose: bool = False, quiet: int = 0) \ -> argparse.ArgumentParser: """Create the common parser for entry points with a logger :param verbose: Default verbose value :param quiet: Default quiet value """ parser = argparse.ArgumentParser(add_help=False) parser.add_argument( '--verbose', '-v', action='store_true', default=verbose, help="be verbose", ) parser.add_argument( '--quiet', '-q', action='count', default=quiet, help="be quiet (can be repeated)", ) return parser def _set_logging_level(verbose: bool, quiet: int) -> None: """Set global logging level according to verbose and quiet""" if verbose: level = logging.DEBUG elif quiet >= 3: level = logging.CRITICAL elif quiet >= 2: level = logging.ERROR elif quiet >= 1: level = logging.WARNING else: level = logging.INFO logging.getLogger().setLevel(level)
[docs]def entry_findlib() -> None: """Entry point for the findlib utility""" parser = argparse.ArgumentParser( description="Find a library on the system", parents=(_common_parser_logging(),), ) parser.add_argument('--version', action='version', version=_VERSION_INFO) parser.add_argument( '--compatible', '-c', type=str, default=None, help="set a binary the library must be compatible with", ) parser.add_argument( '--root', '-r', type=str, default='/', help="set the root directory to search for the library", ) parser.add_argument( '--null', '-0', action='store_true', default=False, help="paths will be delemited by null characters instead of newlines", ) parser.add_argument( '--glob', '-g', action='store_true', default=False, help="library names are glob patterns", ) parser.add_argument( 'libs', metavar='LIB', type=str, nargs='+', help="library to search", ) args = parser.parse_args() _set_logging_level(args.verbose, args.quiet + 1) errors = False for lib in args.libs: logger.info("Searching library: %s", lib) try: lib_iter = \ (find_lib(lib, compat=args.compatible, root=args.root),) \ if not args.glob \ else find_lib_iter(lib, compat=args.compatible, root=args.root) for found, _ in lib_iter: if args.quiet < 3: print(found, end=('\n' if not args.null else '\0')) except FileNotFoundError: logger.error("%s: Library not found", lib) errors = True continue sys.exit(0 if not errors else 1)
def _common_parser_cmkcpio() -> argparse.ArgumentParser: """Create the common parser for cmkcpio* entry points""" parser = argparse.ArgumentParser( add_help=False, parents=(_common_parser_logging(),), ) parser.add_argument( '--version', action='version', version=_VERSION_INFO ) parser.add_argument( "--debug", "-d", action="store_true", default=False, help="debugging mode: non-root, implies -k" ) parser.add_argument( "--output", "-o", type=str, default='/usr/src/initramfs.cpio', help="set the output of the CPIO archive" ) parser.add_argument( '--binroot', '-r', type=str, default='/', help="set the root directory for binaries (executables and libraries)" ) parser.add_argument( '--kernel', '-K', action='append', type=str, default=None, help=("set the target kernel versions of the initramfs, " "defaults to the running kernel") ) parser.add_argument( '--no-kmod', action='store_true', default=False, help="disable kernel modules support", ) return parser def _build_initramfs(initramfs: mkramfs.Initramfs, config: Config) -> None: """Add files to the initramfs from the configuration""" busybox_deps = set(config.busybox) | BUSYBOX_COMMON_DEPS # Add necessary files for src, dest in config.files: logger.info("Adding file %s", src) initramfs.add_file(src, dest) for src, dest in config.libs: logger.info("Adding library %s", src) initramfs.add_library(src, dest) for src, dest in config.execs: logger.info("Adding executable %s", src) initramfs.add_executable(src, dest) # Add keymap if config.keymap is not None: busybox_deps |= BUSYBOX_KEYMAP_DEPS logger.info("Adding keymap as %s", config.keymap[2]) with open(config.keymap[1], 'rb') as bkeymap: if bkeymap.read(len(BINARY_KEYMAP_MAGIC)) != BINARY_KEYMAP_MAGIC: logger.error("Binary keymap %s: bad file format", config.keymap[1]) initramfs.add_file(*config.keymap[1:3], mode=0o644) # Add module if initramfs.kernels: busybox_deps |= BUSYBOX_KMOD_DEPS for module in config.modules: logger.info("Adding kernel module %s", module) initramfs.add_kmod(module) # Add /init logger.info("Adding init script") initramfs.add_file(config.init_path, '/init', mode=0o755) # Add busybox logger.info("Adding busybox") initramfs.add_busybox(needed=busybox_deps)
[docs]def entry_cmkcpiolist() -> None: """Entry point for cmkcpiolist""" # Load configuration config = read_config() # Arguments parser = argparse.ArgumentParser( description="Build an initramfs using a CPIO list", parents=(_common_parser_cmkcpio(),) ) group = parser.add_mutually_exclusive_group() group.add_argument( '--only-build-archive', '-c', action="store_true", default=False, help="only build the CPIO archive from an existing CPIO list" ) group.add_argument( '--only-build-list', '-L', action="store_true", default=False, help="only build the CPIO list, implies -k" ) parser.add_argument( '--keep', '-k', action="store_true", default=False, help="keep the created CPIO list" ) parser.add_argument( '--cpio-list', '-l', type=str, default='/tmp/initramfs.list', help="set the location of the CPIO list" ) args = parser.parse_args( shlex.split(config.cmkcpiolist_opts, posix=True) + sys.argv[1:] ) _set_logging_level(args.verbose, args.quiet) # Parse arguments if args.debug or args.only_build_list: args.keep = True if args.no_kmod and config.has_modules_manual: logger.warning("Kernel modules disabled but configuration " "has manually configured modules") # Keymap if config.keymap is not None and config.keymap[0] is not None: with open(config.keymap[1], 'wb') as keymap_bin: mkramfs.keymap_build( config.keymap[0], keymap_bin, unicode=(locale.getdefaultlocale()[1] == 'UTF-8') ) # Init with open(config.init_path, 'w') as init_file: mkinit( out=init_file, root=config.root, mounts=config.mounts, keymap=(None if config.keymap is None else config.keymap[2]), modules=(None if args.no_kmod else config.modules), scripts=config.scripts, ) # Initramfs if not args.only_build_archive: assert config.init_path is not None logger.info("Creating initramfs") initramfs = mkramfs.Initramfs( user=(0 if not args.debug else os.getuid()), group=(0 if not args.debug else os.getgid()), binroot=args.binroot, kernels=(() if args.no_kmod else args.kernel), ) _build_initramfs(initramfs, config) # CPIO list logger.info("Generating CPIO list") if args.cpio_list == '-' and args.only_build_list: initramfs.build_to_cpio_list(sys.stdout) else: with open(args.cpio_list, 'w') as cpiolist: initramfs.build_to_cpio_list(cpiolist) if not args.only_build_list: # Build CPIO archive logger.info("Generating CPIO archive to %s", args.output) if args.output == '-': mkramfs.mkcpio_from_list(args.cpio_list, sys.stdout.buffer) else: with open(args.output, 'wb') as cpiodest: mkramfs.mkcpio_from_list(args.cpio_list, cpiodest) if not args.keep: # Cleanup temporary files if os.path.exists(args.cpio_list): logger.info("Cleaning %s", args.cpio_list) os.remove(args.cpio_list) if config.keymap is not None and os.path.exists(config.keymap[1]): logger.info("Cleaning %s", config.keymap[1]) os.remove(config.keymap[1]) if os.path.exists(config.init_path): logger.info("Cleaning %s", config.init_path) os.remove(config.init_path)
[docs]def entry_cmkcpiodir() -> None: """Entry point for cmkcpiodir""" # Load configuration config = read_config() # Arguments parser = argparse.ArgumentParser( description="Build an initramfs using a directory.", parents=(_common_parser_cmkcpio(),) ) group = parser.add_mutually_exclusive_group() group.add_argument( '--only-build-archive', '-c', action="store_true", default=False, help="only build the CPIO archive from an existing initramfs directory" ) group.add_argument( '--only-build-directory', '-D', action="store_true", default=False, help="only build the initramfs directory, implies -k" ) parser.add_argument( '--keep', '-k', action="store_true", default=False, help="keep the created initramfs directory" ) parser.add_argument( "--clean", "-C", action="store_true", default=False, help="overwrite temporary directory if it exists, use carefully" ) parser.add_argument( '--build-dir', '-b', type=str, default='/tmp/initramfs', help="set the location of the initramfs directory" ) args = parser.parse_args( shlex.split(config.cmkcpiodir_opts, posix=True) + sys.argv[1:] ) _set_logging_level(args.verbose, args.quiet) # Parse arguments if args.debug or args.only_build_directory: args.keep = True if args.no_kmod and config.has_modules_manual: logger.warning("Kernel modules disabled but configuration " "has manually configured modules") # Keymap if config.keymap is not None and config.keymap[0] is not None: with open(config.keymap[1], 'wb') as keymap_bin: mkramfs.keymap_build( config.keymap[0], keymap_bin, unicode=(locale.getdefaultlocale()[1] == 'UTF-8') ) # Init with open(config.init_path, 'w') as init_file: mkinit( out=init_file, root=config.root, mounts=config.mounts, keymap=(None if config.keymap is None else config.keymap[2]), modules=(None if args.no_kmod else config.modules), scripts=config.scripts, ) # Initramfs if not args.only_build_archive: assert config.init_path is not None logger.info("Creating initramfs") initramfs = mkramfs.Initramfs( user=(0 if not args.debug else os.getuid()), group=(0 if not args.debug else os.getgid()), binroot=args.binroot, kernels=(() if args.no_kmod else args.kernel), ) _build_initramfs(initramfs, config) if not args.only_build_archive: # Pre-build cleanup if args.clean and os.path.exists(args.build_dir): logger.warning("Overwriting %s", args.build_dir) shutil.rmtree(args.build_dir) # Build logger.info("Building initramfs to directory %s", args.build_dir) initramfs.build_to_directory(args.build_dir, do_nodes=(not args.debug)) if not args.only_build_directory: # Create CPIO archive logger.info("Generating CPIO archive to %s from %s", args.output, args.build_dir) if args.output == '-': mkramfs.mkcpio_from_dir(args.build_dir, sys.stdout.buffer) else: with open(args.output, 'wb') as cpiodest: mkramfs.mkcpio_from_dir(args.build_dir, cpiodest) if not args.keep: # Cleanup temporary files if os.path.exists(args.build_dir): logger.info("Cleaning %s", args.build_dir) shutil.rmtree(args.build_dir) if config.keymap is not None and os.path.exists(config.keymap[1]): logger.info("Cleaning %s", config.keymap[1]) os.remove(config.keymap[1]) if os.path.exists(config.init_path): logger.info("Cleaning %s", config.init_path) os.remove(config.init_path)