summaryrefslogtreecommitdiff
path: root/aur.py
diff options
context:
space:
mode:
Diffstat (limited to 'aur.py')
-rw-r--r--aur.py235
1 files changed, 235 insertions, 0 deletions
diff --git a/aur.py b/aur.py
new file mode 100644
index 0000000..2bea1d1
--- /dev/null
+++ b/aur.py
@@ -0,0 +1,235 @@
+import archinstall
+import re
+import shutil
+import logging
+import asyncio
+import os
+from pathlib import Path
+
+__version__ = 0.4
+
+SUDOERS_FMT = "etc/sudoers.d/{user}"
+
+
+class PipelineStep:
+ def cleanup(self):
+ pass
+
+
+class PackageClassifier(PipelineStep):
+ def split(self, packages):
+ aur, std = [], []
+ for pkg in packages:
+ try:
+ if (
+ archinstall.lib.general.SysCommand(f"pacman -Ss {pkg}").exit_code
+ == 0
+ ):
+ std.append(pkg)
+ else:
+ aur.append(pkg)
+ except archinstall.lib.exceptions.SysCallError:
+ aur.append(pkg)
+ return std, aur
+
+
+class UserManager(PipelineStep):
+ def __init__(self, user: str, mount_location: Path, installation):
+ self.user = user
+ self.mount_location = mount_location
+ self.installation = installation
+ self.user_created = False
+
+ def _run_chroot(self, command):
+ return self.installation.arch_chroot(command, run_as=self.user)
+
+ def create(self):
+ if self.user_created:
+ return
+
+ archinstall.lib.output.log(
+ f"Creating temporary user {self.user}", level=logging.INFO
+ )
+ self.installation.add_additional_packages(["fakeroot", "base-devel"])
+
+ sudoers_path = self.mount_location / SUDOERS_FMT.format(user=self.user)
+ sudoers_path.write_text(f"{self.user} ALL=(ALL:ALL) NOPASSWD: ALL\n")
+
+ password = archinstall.lib.models.users.Password(plaintext="somethingrandom")
+ user = archinstall.lib.models.users.User(
+ username=self.user, password=password, groups=[], sudo=False
+ )
+ self.installation.create_users([user])
+
+ home_path = self.mount_location / "home" / self.user
+ home_path.mkdir(parents=True, exist_ok=True)
+ self.installation.arch_chroot(
+ f"/usr/bin/chown {self.user}:{self.user} /home/{self.user}"
+ )
+
+ self.user_created = True
+
+ def cleanup(self):
+ if not self.user_created:
+ return
+
+ archinstall.lib.output.log(f"Cleaning up user {self.user}", level=logging.INFO)
+ try:
+ self._run_chroot("/usr/bin/gpgconf --kill gpg-agent")
+ self._run_chroot(f"/usr/bin/killall -u {self.user}")
+ except archinstall.lib.exceptions.SysCallError:
+ pass
+
+ self.installation.arch_chroot(f"/usr/bin/userdel {self.user}")
+ shutil.rmtree(self.mount_location / f"home/{self.user}", ignore_errors=True)
+ (self.mount_location / SUDOERS_FMT.format(user=self.user)).unlink(
+ missing_ok=True
+ )
+ self.user_created = False
+
+
+class PackageDownloader(PipelineStep):
+ def __init__(self, user: str, mount_location: Path, installation):
+ self.user = user
+ self.mount_location = mount_location
+ self.downloaded = []
+ self.installation = installation
+
+ def _run(self, command):
+ return self.installation.arch_chroot(command, run_as=self.user)
+
+ async def _download_package(self, package: str):
+ package_file = f"{package}.tar.gz"
+ url = f"https://aur.archlinux.org/cgit/aur.git/snapshot/{package_file}"
+ dest = f"/home/{self.user}/{package_file}"
+
+ try:
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(
+ None, lambda: self._run(f"/usr/bin/curl {url} > {dest}")
+ )
+ self.downloaded.append(package)
+ except Exception as e:
+ archinstall.lib.output.log(
+ f"Failed to download {package}: {e}", level=logging.ERROR, fg="red"
+ )
+
+ async def download(self, packages):
+ archinstall.log(f"Downloading AUR packages: {packages}", level=logging.INFO)
+ semaphore = asyncio.Semaphore(3)
+
+ async def sem_task(pkg):
+ async with semaphore:
+ await self._download_package(pkg)
+
+ await asyncio.gather(*(sem_task(pkg) for pkg in packages))
+
+ def cleanup(self):
+ for pkg in self.downloaded:
+ tar_path = self.mount_location / "home" / self.user / f"{pkg}.tar.gz"
+ tar_path.unlink(missing_ok=True)
+
+
+class PackageInstaller(PipelineStep):
+ def __init__(self, user: str, mount_location: Path, installation):
+ self.user = user
+ self.mount_location = mount_location
+ self.installation = installation
+ self.installed = []
+ self.installed_dirs = []
+
+ def _run(self, command):
+ return self.installation.arch_chroot(command, run_as=self.user)
+
+ def _untar(self, package: str):
+ self._run(
+ f"/usr/bin/tar --directory /home/{self.user}/ -xvzf /home/{self.user}/{package}.tar.gz"
+ )
+
+ def install(self, package: str):
+ self._untar(package)
+ build_dir = self.mount_location / "home" / self.user / package
+
+ try:
+ with (build_dir / "PKGBUILD").open() as fh:
+ content = fh.read()
+ gpgkeys = re.findall(r"validpgpkeys=\((.*)\)", content)
+ for key in gpgkeys:
+ key = key.strip("'\" ")
+ self._run(f"/usr/bin/gpg --recv-keys {key}")
+ except FileNotFoundError:
+ archinstall.lib.output.log(
+ f"Missing PKGBUILD for {package}", level=logging.ERROR, fg="red"
+ )
+ return
+
+ cmd = (
+ f"cd /home/{self.user}/{package} && makepkg --force --noconfirm --needed -si"
+ )
+ result = self._run(f'/bin/bash -c "{cmd}"')
+ if result.exit_code != 0:
+ archinstall.lib.output.log(
+ f"Build failed for {package}", level=logging.ERROR, fg="red"
+ )
+ return
+
+# packages = list(build_dir.glob("*.tar.zst"))
+# if not packages:
+# archinstall.log(
+# f"No built packages found for {package}", level=logging.ERROR, fg="red"
+# )
+# return
+#
+# self._run(
+# f"/usr/bin/pacman --noconfirm -U /home/{self.user}/{package}/{packages[0].name}"
+# )
+# self.installed.append(package)
+ self.installed_dirs.append(build_dir)
+
+ def cleanup(self):
+ for build_dir in self.installed_dirs:
+ shutil.rmtree(build_dir, ignore_errors=True)
+ self.installed_dirs.clear()
+
+
+class Plugin:
+ def __init__(self):
+ self.user = os.getenv("AUR_USER", "aoffline_usr")
+ self.lazy_initd = False
+
+ def _lazy_init(self):
+ self.installation = archinstall.lib.storage.storage["session"]
+ self.classifier = PackageClassifier()
+ self.mount_location = Path(self.installation.target)
+ self.usermgr = UserManager(self.user, self.mount_location, self.installation)
+ self.downloader = PackageDownloader(
+ self.user, self.mount_location, self.installation
+ )
+ self.installer = PackageInstaller(
+ self.user, self.mount_location, self.installation
+ )
+ self.lazy_initd = True
+
+ """
+ TODO:
+ Use the nobody account to run makepkg.
+ Clone the AUR repo, chown it to nobody, then use sudo -u nobody makepkg to build it.
+ """
+ def on_pacstrap(self, packages: list[str]) -> list[str]:
+ if not self.lazy_initd:
+ self._lazy_init()
+
+ std, aur = self.classifier.split(packages)
+ if not aur:
+ return std
+
+ self.usermgr.create()
+ asyncio.run(self.downloader.download(aur))
+
+ for pkg in self.downloader.downloaded:
+ self.installer.install(pkg)
+
+ for step in reversed([self.installer, self.downloader, self.usermgr]):
+ step.cleanup()
+
+ return std