import archinstall import re import shutil import logging import asyncio import os from pathlib import Path __version__ = 0.4 SUDOERS_FMT = "etc/sudoers.d/{user}" class PackageClassifier: def split(self, packages): aur, std = [], [] for pkg in packages: try: if ( archinstall.lib.general.SysCommand(f"pacman -Ss {pkg}").exit_code == 0 ): std.append(pkg) else: aur.append(pkg) except archinstall.lib.exceptions.SysCallError: aur.append(pkg) return std, aur class UserManager: def __init__(self, user: str, mount_location: Path, installation): self.user = user self.mount_location = mount_location self.installation = installation self.user_created = False def _run_chroot(self, command): return self.installation.arch_chroot(command, run_as=self.user) def _sudoers(self): return self.mount_location / f"etc/sudoers.d/{self.user}" def _home(self): return self.mount_location / f"home/{self.user}" def create(self): if self.user_created: return archinstall.lib.output.log( f"Creating temporary user {self.user}", level=logging.INFO ) self.installation.add_additional_packages(["fakeroot", "base-devel"]) self._sudoers().write_text(f"{self.user} ALL=(ALL:ALL) NOPASSWD: ALL\n") password = archinstall.lib.models.users.Password(plaintext="somethingrandom") user = archinstall.lib.models.users.User( username=self.user, password=password, groups=[], sudo=False ) self.installation.create_users([user]) self._home().mkdir(parents=True, exist_ok=True) self.installation.arch_chroot( f"/usr/bin/chown {self.user}:{self.user} /home/{self.user}" ) self.user_created = True def cleanup(self): if not self.user_created: return archinstall.lib.output.log(f"Cleaning up user {self.user}", level=logging.INFO) try: self._run_chroot("/usr/bin/gpgconf --kill gpg-agent") self._run_chroot(f"/usr/bin/killall -u {self.user}") except archinstall.lib.exceptions.SysCallError: pass self.installation.arch_chroot(f"/usr/bin/userdel {self.user}") shutil.rmtree(self._home(), ignore_errors=True) self._sudoers.unlink(missing_ok=True) self.user_created = False class PackageDownloader: def __init__(self, user: str, mount_location: Path, installation): self.user = user self.mount_location = mount_location self.downloaded = [] self.installation = installation def _run(self, command): return self.installation.arch_chroot(command, run_as=self.user) async def _download_package(self, package: str): package_file = f"{package}.tar.gz" url = f"https://aur.archlinux.org/cgit/aur.git/snapshot/{package_file}" dest = f"/home/{self.user}/{package_file}" try: loop = asyncio.get_event_loop() await loop.run_in_executor( None, lambda: self._run(f"/usr/bin/curl {url} > {dest}") ) self.downloaded.append(package) except Exception as e: archinstall.lib.output.log( f"Failed to download {package}: {e}", level=logging.ERROR, fg="red" ) async def download(self, packages): archinstall.log(f"Downloading AUR packages: {packages}", level=logging.INFO) semaphore = asyncio.Semaphore(3) async def sem_task(pkg): async with semaphore: await self._download_package(pkg) await asyncio.gather(*(sem_task(pkg) for pkg in packages)) class PackageInstaller: def __init__(self, user: str, mount_location: Path, installation): self.user = user self.mount_location = mount_location self.installation = installation self.installed = [] self.installed_dirs = [] def _run(self, command): return self.installation.arch_chroot(command, run_as=self.user) def _untar(self, package: str): self._run( f"/usr/bin/tar --directory /home/{self.user}/ -xvzf /home/{self.user}/{package}.tar.gz" ) def install(self, package: str): self._untar(package) build_dir = self.mount_location / "home" / self.user / package try: with (build_dir / "PKGBUILD").open() as fh: content = fh.read() gpgkeys = re.findall(r"validpgpkeys=\((.*)\)", content) for key in gpgkeys: key = key.strip("'\" ") self._run(f"/usr/bin/gpg --recv-keys {key}") except FileNotFoundError: archinstall.lib.output.log( f"Missing PKGBUILD for {package}", level=logging.ERROR, fg="red" ) return cmd = ( f"cd /home/{self.user}/{package} && makepkg --force --noconfirm --needed -si" ) result = self._run(f'/bin/bash -c "{cmd}"') if result.exit_code != 0: archinstall.lib.output.log( f"Build failed for {package}", level=logging.ERROR, fg="red" ) return self.installed_dirs.append(build_dir) class Plugin: def __init__(self): self.user = os.getenv("AUR_USER", "packagebuilder") self.lazy_initd = False def _lazy_init(self): self.installation = archinstall.lib.storage.storage["session"] self.classifier = PackageClassifier() self.mount_location = Path(self.installation.target) self.usermgr = UserManager(self.user, self.mount_location, self.installation) self.downloader = PackageDownloader( self.user, self.mount_location, self.installation ) self.installer = PackageInstaller( self.user, self.mount_location, self.installation ) self.lazy_initd = True def on_pacstrap(self, packages: list[str]) -> list[str]: if not self.lazy_initd: self._lazy_init() std, aur = self.classifier.split(packages) if not aur: return std self.usermgr.create() asyncio.run(self.downloader.download(aur)) for pkg in self.downloader.downloaded: self.installer.install(pkg) self.usermgr.cleanup() return std