Source code for cmkinitramfs.data

"""Module providing the Data class for init script data sources management

The :class:`Data` class defines an abstract object containing data,
it has multiple subclasses for multiple types of data.
The main methods of those classes are :meth:`Data.load`, :meth:`Data.unload`,
and :meth:`Data.set_final`.

Most functions will write into a stream (:term:`text file`) the content
of the init script.
Use a :class:`io.StringIO` if you need to use strings rather than a stream.
"""

from __future__ import annotations

import itertools
import os.path
from shlex import quote
from typing import Iterable, Iterator, IO, List, Optional, Set, Tuple


[docs]class Data: """Base class representing any data on the system This is an abstract class representing data on the system. Its main methods are :meth:`load` and :meth:`unload`. :meth:`set_final` declare the object as required for the final boot environment (e.g. root fs, usr fs), this will prevent the data from being unloaded. :param files: Files directly needed in the initramfs. Each file is a tuple in the format (``src``, ``dest``), where ``src`` is the source file on the current system, and ``dest`` is the destination in the initramfs (relative to its root directory). If ``dest`` is :data:`None`, then ``src`` is used. :param execs: Executables directly needed in the initramfs. Same format as :attr:`files`. :param libs: Libraries directly needed in the initramfs. Same format as :attr:`files`. :param busybox: Busybox compatible commands needed in the initramfs. Any commands that are compatible with Busybox's implementation should be added. Exception: special shell built-in commands and reserved words are guaranteed to be available and *can* be ommitted (a list is defined in :data:`cmkinitramfs.initramfs.SHELL_SPECIAL_BUILTIN` and :data:`cmkinitramfs.initramfs.SHELL_RESERVED_WORDS`). :param kmods: Kernel modules directly needed in the initramfs. Each module is a tuple in the format ``(module, params)``, where ``params`` is a tuple of module parameters (may be empty). :param _need: Loading and runtime dependencies :param _lneed: Loading only dependencies :param _needed_by: Reverse dependencies :param _is_final: The :class:`Data` should not be unloaded :param _is_loaded: The :class:`Data` is currently loaded """ files: Set[Tuple[str, Optional[str]]] execs: Set[Tuple[str, Optional[str]]] libs: Set[Tuple[str, Optional[str]]] busybox: Set[str] kmods: Set[Tuple[str, Tuple[str, ...]]] _need: List[Data] _lneed: List[Data] _needed_by: List[Data] _is_final: bool _is_loaded: bool
[docs] @classmethod def initialize(cls, out: IO[str]) -> None: """Initialize the data class Initialize the environment for the use of this data class: define needed functions and variables. A Data class should be initialized only once in the init script, before the first :meth:`Data.load` call of the class. Default initialization is a no-op and should be redefined by subclasses. Subclasses should call their parent :class:`Data` class' :meth:`Data.initialize` method. :param out: Stream to write into """
def __init__(self) -> None: self.files = set() self.execs = set() self.libs = set() self.busybox = set() self.kmods = set() self._need = [] self._lneed = [] self._needed_by = [] self._is_final = False self._is_loaded = False def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.files == other.files \ and self.execs == other.execs and self.libs == other.libs \ and self._need == other._need and self._lneed == other._lneed \ and self._is_final == other._is_final
[docs] def iter_all_deps(self) -> Iterator[Data]: """Recursivelly get dependencies :return: Iterator over all the dependencies """ for dep in itertools.chain(self._need, self._lneed): yield dep yield from dep.iter_all_deps()
[docs] def is_final(self) -> bool: """Returns a :class:`bool` indicating if the :class:`Data` is final""" return self._is_final
[docs] def set_final(self) -> None: """This function set the data object as final This means the data is required by the final boot environment and should never be unloaded (it would be pointless). This will also mark its hard dependencies as final. """ self._is_final = True for k in self._need: k.set_final()
[docs] def add_dep(self, dep: Data) -> None: """Add a :class:`Data` object to the hard dependencies""" if dep in self._lneed: self._lneed.remove(dep) if dep not in self._need: self._need.append(dep) if self not in dep._needed_by: dep._needed_by.append(self)
[docs] def add_load_dep(self, dep: Data) -> None: """Add a :class:`Data` object to the loading dependencies""" if dep not in self._lneed and dep not in self._need: self._lneed.append(dep) if self not in dep._needed_by: dep._needed_by.append(self)
[docs] def _pre_load(self, out: IO[str]) -> None: """This function does preparation for loading the Data Loads all the needed dependencies. It should be called from :meth:`load` before the actual loading of the data. This method *should not* be called if the :class:`Data` is already loaded. :param out: Stream to write into :raises DataError: Already loaded """ if self._is_loaded: raise DataError(f"{self} is already loaded") self._is_loaded = True # Load dependencies for k in self._need + self._lneed: if not k._is_loaded: k.load(out)
[docs] def _post_load(self, out: IO[str]) -> None: """This function does post loading cleanup If the object is a loading dependency only, it will load all its reverse dependencies in order to be unloaded as soon as possible. Unloading quickly can be useful when dealing with sensitive data (e.g. a LUKS key). It should be called from :meth:`load` after the actual loading of the data. :param out: Stream to write into """ # If not final, load data needing self, this will allow an # unloading as soon as possible if not self._is_final: for k in self._needed_by: if not k._is_loaded: k.load(out) # Unload data not needed anymore for k in self._lneed: k._needed_by.remove(self) if not k._needed_by: k.unload(out)
[docs] def load(self, out: IO[str]) -> None: """This function loads the data It should be redefined by subclasses, this definition is a no-op only dealing with dependencies. Before loading, this function should load the dependencies with :meth:`_pre_load`. After loading, this function should unload unnecessary dependencies with :meth:`_post_load`. This method *should not* be called if the data is already loaded. :param out: Stream to write into """ self._pre_load(out) self._post_load(out)
[docs] def _pre_unload(self, out: IO[str]) -> None: """This function does pre unloading sanity checks It should be called from :meth:`unload` before the actual unloading of the data. :param out: Stream to write into :raises DataError: Not loaded or dependency issue """ if not self._is_loaded: raise DataError(f"{self} is not loaded") if self._is_final or self._needed_by: raise DataError(f"{self} is still needed or not temporary")
[docs] def _post_unload(self, out: IO[str]) -> None: """This function does post unloading cleanup It removes itself from the :attr:`_needed_by` reverse dependencies of all its dependencies, and check if the dependency can be unloaded. This method should be called from :meth:`unload` after the actual unloading of the data. This *should not* be called if the data is not loaded. :param out: Stream to write into """ for k in self._need: k._needed_by.remove(self) if not k._needed_by: k.unload(out) self._is_loaded = False
[docs] def unload(self, out: IO[str]) -> None: """This function unloads data It should be redefined by subclasses, this definition is a no-op only dealing with dependencies. Before unloading, this function should check for any dependency error, with :meth:`_pre_unload`. After unloading, this function should unload all unneeded dependencies, with :meth:`_post_unload`. :param out: Stream to write into """ self._pre_unload(out) self._post_unload(out)
[docs] def __str__(self) -> str: """Get the name of the data This string may be quoted with simple quotes in the script. This **has** to be implemented by subclasses. """ raise NotImplementedError()
[docs] def path(self) -> str: """Get the path of this data This function provides a string allowing access to data from within the init environment, this string can be a path or a command in a subshell (e.g. ``"$(findfs UUID=foobar)"``). This string should be ready to be used in the script without being quoted nor escaped. This **has** to be implemented by subclasses. """ raise NotImplementedError()
[docs]class DataError(Exception): """Error in the :class:`Data` object"""
[docs]class PathData(Data): """Absolute path :param datapath: Path of the data """ datapath: str def __init__(self, datapath: str): super().__init__() self.datapath = datapath def __str__(self) -> str: return self.datapath def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.datapath == other.datapath def path(self) -> str: return quote(self.datapath)
[docs]class UuidData(Data): """UUID of a data The UUID can be a filesystem UUID, or other UUID known by other :class:`Data` classes (e.g. a MD UUID). :param uuid: UUID of the data :param partition: If :data:`True`, the UUID is treated as a partition UUID """ uuid: str partition: bool def __init__(self, uuid: str, partition: bool = False): super().__init__() if partition: # PARTUUID is only available in util-linux findfs self.execs |= {('findfs', None)} else: self.busybox |= {'findfs'} self.uuid = uuid self.partition = partition def __str__(self) -> str: return ('PARTUUID=' if self.partition else 'UUID=') + self.uuid def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.uuid == other.uuid and self.partition == other.partition def path(self) -> str: prefix = 'PARTUUID=' if self.partition else 'UUID=' return '"$(findfs ' + quote(prefix + self.uuid) + ')"'
[docs]class LabelData(Data): """Label of a data The label can be a filesystem or partition label, or a label known by other :class:`Data` classes. :param label: Label of the data :param partition: If :data:`True`, the label is treated as a partition label """ label: str partition: bool def __init__(self, label: str, partition: bool = False): super().__init__() if partition: # PARTLABEL is only available in util-linux findfs self.execs |= {('findfs', None)} else: self.busybox |= {'findfs'} self.label = label self.partition = partition def __str__(self) -> str: return ('PARTLABEL=' if self.partition else 'LABEL=') + self.label def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.label == other.label and self.partition == other.partition def path(self) -> str: prefix = 'PARTLABEL=' if self.partition else 'LABEL=' return '"$(findfs ' + quote(prefix + self.label) + ')"'
[docs]class LuksData(Data): """LUKS encrypted block device :param source: :class:`Data` to unlock (crypto_LUKS volume), it will be set as a hard dependency :param name: Name for the LUKS volume :param key: :class:`Data` to use as key file, it will be set as a load dependency :param header: :class:`Data` containing the LUKS header, it will be set as a load dependency :param discard: Enable discards """ source: Data name: str key: Optional[Data] header: Optional[Data] discard: bool def __init__(self, source: Data, name: str, key: Optional[Data] = None, header: Optional[Data] = None, discard: bool = False): super().__init__() self.execs.add(('cryptsetup', None)) self.libs.add(('libgcc_s.so.1', None)) self.kmods.add(('dm-crypt', ())) self.source = source self.name = name self.key = key self.header = header self.discard = discard self.add_dep(self.source) if self.key: self.add_load_dep(self.key) if self.header: self.add_load_dep(self.header) def __str__(self) -> str: return self.name def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.source == other.source and self.name == other.name \ and self.key == other.key and self.header == other.header \ and self.discard == other.discard def load(self, out: IO[str]) -> None: header = f'--header {self.header.path()} ' if self.header else '' key_file = f'--key-file {self.key.path()} ' if self.key else '' discard = '--allow-discards ' if self.discard else '' self._pre_load(out) out.writelines(( f"info 'Unlocking LUKS device {self}'\n", "cryptsetup ", header, key_file, discard, f"open {self.source.path()} {quote(self.name)} || die ", quote(f'Failed to unlock LUKS device {self}'), '\n', "\n", )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( f"info 'Closing LUKS device {self}'\n", f"cryptsetup close {quote(self.name)} || die ", quote(f'Failed to close LUKS device {self}'), '\n', "\n", )) self._post_unload(out) def path(self) -> str: return quote('/dev/mapper/' + self.name)
[docs]class LvmData(Data): """LVM logical volume :param vg_name: Name of the volume group :param lv_name: Name of the logical volume """ vg_name: str lv_name: str @staticmethod def __lvm_conf(out: IO[str]) -> None: """Create LVM config in ``/etc/lvm/lvmlocal.conf`` This override some configurations specific to the initramfs environment. Note: if ``/etc/lvm/lvmlocal.conf`` exists, we append to it, which may cause duplicate configuration warnings from LVM. """ out.writelines(( "debug 'Writing LVM configuration'\n", "mkdir -p /etc/lvm && touch /etc/lvm/lvmlocal.conf || warn ", "'Failed to create LVM configuration file'\n", "{\n", "\techo 'activation/monitoring = 0'\n", "\techo 'activation/udev_rules = 0'\n", "\techo 'activation/udev_sync = 0'\n", "\techo 'devices/external_device_info_source = \"none\"'\n", "\techo 'devices/md_component_detection = 0'\n", "\techo 'devices/multipath_component_detection = 0'\n", "\techo 'devices/obtain_device_list_from_udev = 0'\n", "\techo 'global/locking_type = 4'\n", "\techo 'global/use_lvmetad = 0'\n", "\techo 'global/use_lvmlockd = 0'\n", "\techo 'global/use_lvmpolld = 0'\n", "} >>/etc/lvm/lvmlocal.conf || warn ", "'Failed to write LVM configuration file'\n" "\n", )) @classmethod def initialize(cls, out: IO[str]) -> None: super().initialize(out) LvmData.__lvm_conf(out) def __init__(self, vg_name: str, lv_name: str): super().__init__() self.execs.add(('lvm', None)) self.vg_name = vg_name self.lv_name = lv_name def __str__(self) -> str: return self.vg_name + "/" + self.lv_name def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.vg_name == other.vg_name and self.lv_name == other.lv_name def load(self, out: IO[str]) -> None: self._pre_load(out) out.writelines(( f"info 'Enabling LVM logical volume {self}'\n", "lvm lvchange --sysinit -a ly ", f"{quote(f'{self.vg_name}/{self.lv_name}')} || die ", quote(f'Failed to enable LVM logical volume {self}'), '\n', "lvm vgmknodes || err ", quote(f'Failed to create LVM nodes for {self}'), '\n', "\n", )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( f"info 'Disabling LVM logical volume {self}'\n", "lvm lvchange --sysinit -a ln ", f"{quote(f'{self.vg_name}/{self.lv_name}')} || die ", quote(f'Failed to disable LVM logical volume {self}'), '\n', "lvm vgmknodes || err ", quote(f'Failed to remove LVM nodes for {self}'), '\n', "\n", )) self._post_unload(out) def path(self) -> str: # If LV or VG name has an hyphen '-', LVM doubles it in the path return quote('/dev/mapper/' + self.vg_name.replace('-', '--') + '-' + self.lv_name.replace('-', '--'))
[docs]class MountData(Data): """Mount point :param source: :class:`Data` to use as source (e.g. /dev/sda1, my-luks-data), it will be set as a hard dependency :param mountpoint: Absolute path of the mountpoint :param filesystem: Filesystem (used for ``mount -t filesystem``) :param options: Mount options """ source: Data mountpoint: str filesystem: str options: str @staticmethod def __fun_fsck(out: IO[str]) -> None: """Define the mount_fsck function This function takes any number of arguments, which will be passed to ``fsck``. This function checks the return code of the ``fsck`` command and acts accordingly. This functions calls fsck with ``$@``. It checks the return code of ``fsck`` and : - No error: returns 0. - Non fatal error: prints an error and returns 0. - Non fatal error requiring reboot: prints an error and reboot. - Fatal error: returns 1. :param out: Stream to write into """ fsck_err = { 1: ('notice', "Filesystem errors corrected"), 2: ('notice', "System should be rebooted"), 4: ('alert', "Filesystem errors left uncorrected"), 8: ('crit', "Operational error"), 16: ('crit', "Usage or syntax error"), 32: ('err', "Checking canceled by user request"), 128: ('crit', "Shared-library error"), } code_err = 4 | 8 | 16 | 32 | 64 | 128 code_reboot = 2 out.writelines(( 'mount_fsck()\n', '{\n', '\tFSTAB_FILE=/dev/null "$@"\n', '\tfsck_ret=$?\n' '\t[ "${fsck_ret}" -eq 0 ] && return 0\n', )) for err_code, err_data in fsck_err.items(): err_call, err_str = err_data out.writelines(( f'\t[ "$((fsck_ret & {err_code}))" -eq {err_code} ] && ', err_call, ' ', quote(f"fsck: {err_str}"), '\n', )) out.writelines(( '\t[ "$((fsck_ret & ', str(code_err), '))" -ne 0 ] && return 1\n', '\tif [ "$((fsck_ret & ', str(code_reboot), '))" -eq 2 ]; then ', 'notice \'Rebooting...\'; reboot -f; fi\n', '\treturn 0\n', '}\n', '\n', )) @staticmethod def mkdir(path: str, fatal: bool = False) -> Iterable[str]: """Create a directory""" return ( f'[ -d {quote(path)} ] || mkdir {quote(path)} || ', 'die ' if fatal else 'err ', quote(f'Failed to create directory {quote(path)}'), '\n', ) @classmethod def initialize(cls, out: IO[str]) -> None: super().initialize(out) MountData.__fun_fsck(out) def __init__(self, source: Data, mountpoint: str, filesystem: str, options: str = "ro"): super().__init__() self.busybox |= {'fsck', '[', 'reboot', 'mkdir', 'mount', 'umount'} self.source = source if source else PathData("none") self.mountpoint = mountpoint self.filesystem = filesystem self.options = options if self.filesystem in ('btrfs',): self.execs.add(('btrfs', None)) self.execs.add(('fsck.btrfs', None)) self.kmods.add(('btrfs', ())) elif self.filesystem in ('ext4',): self.execs.add(('fsck.ext4', None)) self.execs.add(('e2fsck', None)) self.kmods.add(('ext4', ())) elif self.filesystem in ('xfs',): self.execs.add(('fsck.xfs', None)) self.execs.add(('xfs_repair', None)) self.kmods.add(('xfs', ())) elif self.filesystem in ('fat', 'vfat'): self.execs.add(('fsck.fat', None)) self.execs.add(('fsck.vfat', None)) self.kmods.add(('vfat', ())) elif self.filesystem in ('exfat',): self.execs.add(('fsck.exfat', None)) self.kmods.add(('exfat', ())) elif self.filesystem in ('f2fs',): self.execs.add(('fsck.f2fs', None)) self.kmods.add(('f2fs', ())) elif self.filesystem in ('zfs',): self.execs.add(('fsck.zfs', None)) self.kmods.add(('zfs', ())) self.add_dep(self.source) def __str__(self) -> str: return self.mountpoint def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.source == other.source \ and self.mountpoint == other.mountpoint \ and self.filesystem == other.filesystem \ and self.options == other.options def load(self, out: IO[str]) -> None: fsck_exec = f'fsck -t {quote(self.filesystem)}' \ if self.filesystem != 'zfs' else 'fsck.zfs' fsck = ( f'mount_fsck {fsck_exec} {self.source.path()} || die ', quote(f'Failed to check filesystem {self}'), '\n', ) if self.source.path() != 'none' else () mkdir = self.mkdir(self.mountpoint) \ if os.path.dirname(self.mountpoint) == '/mnt' else () self._pre_load(out) out.writelines(( f"info 'Mounting filesystem {self}'\n", *fsck, *mkdir, f"mount -t {quote(self.filesystem)} -o {quote(self.options)} ", f"{self.source.path()} {quote(self.mountpoint)} || die ", quote(f'Failed to mount filesystem {self}'), '\n', "\n", )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( f"info 'Unmounting filesystem {self}'\n", f"umount {quote(self.mountpoint)} || die ", quote(f'Failed to unmount filesystem {self}'), '\n', "\n", )) self._post_unload(out) def path(self) -> str: return quote(self.mountpoint)
[docs]class MdData(Data): """MD RAID :param sources: :class:`Data` to use as sources (e.g. /dev/sda1 and /dev/sdb1; or UUID=foo), they will be set as hard dependencies :param name: Name for the MD device :raises ValueError: No :class:`Data` source """ sources: Tuple[Data, ...] name: str def __init__(self, sources: Iterable[Data], name: str): super().__init__() self.execs.add(('mdadm', None)) self.sources = tuple(sources) self.name = name if not self.sources: raise ValueError(f"{self} has no source defined") for source in self.sources: self.add_dep(source) def __str__(self) -> str: return self.name def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.sources == other.sources and self.name == other.name def load(self, out: IO[str]) -> None: # Get the string containing all sources to use sources: Set[str] = set() for source in self.sources: if isinstance(source, UuidData): sources.add(f"--uuid {quote(source.uuid)} ") else: sources.add(f"{source.path()} ") self._pre_load(out) out.writelines(( f"info 'Assembling MD RAID {self}'\n", "MDADM_NO_UDEV=1 ", "mdadm --assemble ", *sources, f"{quote(self.name)} || die ", quote(f'Failed to assemble MD RAID {self}'), '\n', "\n", )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( f"info 'Stopping MD RAID {self}'\n", "MDADM_NO_UDEV=1 ", f"mdadm --stop {quote(self.name)} || die ", quote(f'Failed to stop MD RAID {self}'), '\n', "\n", )) self._post_unload(out) def path(self) -> str: return quote('/dev/md/' + self.name)
[docs]class CloneData(Data): """Clone a :class:`Data` to another :param source: :class:`Data` to use as source, it will be set as a load dependency :param dest: :class:`Data` to use as destination, it will be set as a hard dependency """ source: Data dest: Data def __init__(self, source: Data, dest: Data): super().__init__() self.busybox |= {'cp'} self.source = source self.dest = dest self.add_load_dep(self.source) self.add_dep(self.dest) def __str__(self) -> str: return f"{self.source} to {self.dest}" def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.source == other.source and self.dest == other.dest def load(self, out: IO[str]) -> None: self._pre_load(out) out.writelines(( f"info 'Cloning {self}'\n", f"cp -aT {self.source.path()} {self.dest.path()} || die ", quote(f'Failed to clone {self}'), '\n', "\n", )) self._post_load(out) def path(self) -> str: return self.dest.path()
[docs]class ZFSPoolData(Data): """ ZFS pool :param pool: Pool name :param cache: :class:`Data` containing a ZFS cache file, it will be set as a load dependency """ pool: str cache: Optional[Data] def __init__(self, pool: str, cache: Optional[Data]): super().__init__() self.pool = pool self.cache = cache if self.cache is not None: self.add_load_dep(self.cache) self.execs.add(('zpool', None)) self.kmods.add(('zfs', ())) def __str__(self) -> str: return f'ZFS pool {self.pool}' def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.pool == other.pool and self.cache == other.cache def load(self, out: IO[str]) -> None: self._pre_load(out) cache = f'-c {self.cache.path()} ' if self.cache is not None else '' out.writelines(( 'info ', quote(f'Importing {self}'), '\n', 'zpool import -N ', cache, quote(self.pool), ' || die ', quote(f'Failed to import {self}'), '\n', '\n', )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( 'info ', quote(f'Importing {self}'), '\n', 'zpool export ', quote(self.pool), ' || die', quote(f'Failed to export {self}'), '\n', '\n', )) self._post_unload(out) def path(self) -> str: return quote(self.pool)
[docs]class ZFSCryptData(Data): """ZFS encrypted dataset :param pool: :class:`ZFSPoolData` containing the encrypted dataset, it will be set as a hard dependency :param dataset: Dataset name :param key: :class:`Data` to use as key file, it will be set as a load dependency """ pool: ZFSPoolData dataset: str key: Optional[Data] def __init__(self, pool: Data, dataset: str, key: Optional[Data] = None): super().__init__() if not isinstance(pool, ZFSPoolData): raise TypeError(f"{self.pool} is not a {ZFSPoolData}") self.pool = pool self.dataset = dataset self.key = key if self.pool.pool != dataset.split('/')[0]: raise Exception(f"{self} is not on pool {self.pool}") self.add_dep(self.pool) if self.key is not None: self.add_load_dep(self.key) self.execs.add(('zfs', None)) def __str__(self) -> str: return f'ZFS encrypted dataset {self.dataset}' def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.pool == other.pool and self.dataset == other.dataset \ and self.key == other.key def load(self, out: IO[str]) -> None: self._pre_load(out) key = f'-L {self.key.path()} ' if self.key is not None else '' out.writelines(( 'info ', quote(f'Unlocking {self}'), '\n', f'zfs load-key -r {key}{quote(self.dataset)} 1>&2 || die ', quote(f'Failed to unlock {self}'), '\n', '\n', )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) out.writelines(( 'info ', quote(f'Locking {self}'), '\n', f'zfs unload-key -r {quote(self.dataset)} || die ', quote(f'Failed to lock {self}'), '\n', '\n', )) self._post_unload(out) def path(self) -> str: return quote(self.dataset)
class Network(Data): """Networking configuration :param device: MAC address of the device :param ip: IP address (None for DHCP) :param mask: IP mask (defaults to classful or DHCP) :param gateway: default route IP (optional) """ device: str ip: Optional[str] mask: Optional[str] gateway: Optional[str] @staticmethod def classful_mask(ip: str) -> str: first = int(ip.split('.')[0]) if first < 128: return '255.0.0.0' elif first < 192: return '255.255.0.0' elif first < 224: return '255.255.255.0' else: raise ValueError(f"No classful network mask for {ip}") @staticmethod def __fun_find_iface(out: IO[str]) -> None: """"Define the ``find_iface`` function This function takes one MAC address and outputs the corresponding network interface name (e.g. ``eth0``). Return value: 0 on success, 1 on failure. """ out.writelines(( 'find_iface()\n', '{\n', '\tfor k in /sys/class/net/*; do\n', '\t\tif ! grep -q "${1}" "${k}/address" 1>/dev/null 2>&1; ', 'then continue; fi\n', '\t\techo "$(basename -- "${k}")"\n', '\t\treturn 0\n', '\tdone\n', '\treturn 1\n' '}\n', '\n', )) @classmethod def initialize(cls, out: IO[str]) -> None: super().initialize(out) Network.__fun_find_iface(out) def __init__( self, device: str, ip: Optional[str] = None, mask: Optional[str] = None, gateway: Optional[str] = None ): super().__init__() self.device = device if mask is None and ip is not None: mask = Network.classful_mask(ip) self.ip = ip self.mask = mask self.gateway = gateway self.busybox |= {'ip', 'udhcpc'} self.files |= { ('/usr/share/udhcpc/default.script', '/etc/udhcpc.script'), } def __str__(self) -> str: return f'network interface {self.device}' def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.ip == other.ip and self.mask == other.mask \ and self.gateway == other.gateway and self.device == other.device def load(self, out: IO[str]) -> None: device = quote(self.device) ip = quote(self.ip if self.ip is not None else '') mask = quote(self.mask if self.mask is not None else '') gateway = quote(self.gateway if self.gateway is not None else '') iface = '"${iface}"' iface_full = quote(f'{self.device} (') + iface + quote(')') static_ip = ( f'ip addr add {ip}/{mask} dev {iface} || die ', quote(f'Failed to add {self.ip} to '), iface_full, '\n', ) dhcp_ip = ( f'udhcpc -nqfS -s /etc/udhcpc.script -i {iface} || die ', quote('DHCP failed on '), iface_full, '\n', ) gw_route = ( f'ip route add default via {gateway} dev {iface} || die ', quote(f'Failed to set gateway {self.gateway} on '), iface_full, '\n', ) self._pre_load(out) out.writelines(( 'info ', quote(f'Raising {self}'), '\n', f'iface="$(find_iface {device})" || die ', quote(f'Failed to find network interface {self.device}'), '\n', f'ip link set {iface} up || die ', quote('Failed to raise network interface '), iface_full, '\n', *(static_ip if self.ip is not None else dhcp_ip), *(gw_route if self.gateway is not None else ()), '\n', )) self._post_load(out) def unload(self, out: IO[str]) -> None: device = quote(self.device) iface = '"${iface}"' iface_full = quote(f'{self.device} (') + iface + quote(')') self._pre_unload(out) out.writelines(( 'info ', quote(f'Shutting down {self}'), '\n', f'iface="$(find_iface {device})" || die ', quote(f'Failed to find network interface {self.device}'), '\n', f'ip link set {iface} down || die ', quote('Failed to shutdown network interface '), iface_full, '\n', '\n', )) self._post_unload(out) class ISCSI(Data): """iSCSI target :param initiator: Initiator name :param target: iSCSI target :param portal_group: Target portal group tag :param address: iSCSI server address :param port: iSCSI server port :param username: Authentication username :param password: Authentication password :param username_in: Incoming authentication username :param password_in: Incoming authentication password """ initiator: str target: str portal_group: int address: str port: int username: Optional[str] password: Optional[str] username_in: Optional[str] password_in: Optional[str] def __init__( self, initiator: str, target: str, portal_group: int, address: str, port: int = 3260, username: Optional[str] = None, password: Optional[str] = None, username_in: Optional[str] = None, password_in: Optional[str] = None, ): super().__init__() self.initiator = initiator self.target = target self.portal_group = portal_group self.address = address self.port = port self.username = username self.password = password self.username_in = username_in self.password_in = password_in if (self.username is None) != (self.password is None): raise ValueError("Both username and password must be set") if (self.username_in is None) != (self.password_in is None): raise ValueError("Both username_in and password_in must be set") self.execs |= {('iscsistart', None)} def __str__(self) -> str: return f'iSCSI target {self.target}' def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and super().__eq__(other) \ and self.initiator == other.initiator \ and self.target == other.target \ and self.portal_group == other.portal_group \ and self.address == other.address \ and self.port == other.port \ and self.username == other.username \ and self.password == other.password \ and self.username_in == other.username_in \ and self.password_in == other.password_in def load(self, out: IO[str]) -> None: auth = ( ' -u ', quote( self.username if self.username is not None else '' ), ' -w ', quote( self.password if self.password is not None else '' ), ) auth_in = ( ' -U ', quote( self.username_in if self.username_in is not None else '' ), ' -W ', quote( self.password_in if self.password_in is not None else '' ), ) self._pre_load(out) out.writelines(( 'info ', quote(f'Loading {self}'), '\n', 'iscsistart', ' -i ', quote(self.initiator), ' -t ', quote(self.target), ' -g ', str(self.portal_group), ' -a ', quote(self.address), ' -p ', str(self.port), *(auth if self.username is not None else ()), *(auth_in if self.username_in is not None else ()), ' || die ', quote(f'Failed to load {self}'), '\n', '\n', )) self._post_load(out) def unload(self, out: IO[str]) -> None: self._pre_unload(out) self._post_unload(out)