????JFIF??x?x????'403WebShell
403Webshell
Server IP : 79.136.114.73  /  Your IP : 3.133.112.22
Web Server : Apache/2.4.7 (Ubuntu) PHP/5.5.9-1ubuntu4.29 OpenSSL/1.0.1f
System : Linux b8009 3.13.0-170-generic #220-Ubuntu SMP Thu May 9 12:40:49 UTC 2019 x86_64
User : www-data ( 33)
PHP Version : 5.5.9-1ubuntu4.29
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,
MySQL : ON  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib/python3/dist-packages/uaclient/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib/python3/dist-packages/uaclient/apt.py
import glob
import logging
import os
import re
import subprocess
import tempfile

from uaclient import exceptions
from uaclient import gpg
from uaclient import status
from uaclient import util

try:
    from typing import List  # noqa
except ImportError:
    # typing isn't available on trusty, so ignore its absence
    pass

APT_HELPER_TIMEOUT = 20.0  # 20 second timeout used for apt-helper call
APT_AUTH_COMMENT = "  # ubuntu-advantage-tools"
APT_CONFIG_AUTH_FILE = "Dir::Etc::netrc/"
APT_CONFIG_AUTH_PARTS_DIR = "Dir::Etc::netrcparts/"
APT_CONFIG_LISTS_DIR = "Dir::State::lists/"
APT_KEYS_DIR = "/etc/apt/trusted.gpg.d"
KEYRINGS_DIR = "/usr/share/keyrings"
APT_METHOD_HTTPS_FILE = "/usr/lib/apt/methods/https"
CA_CERTIFICATES_FILE = "/usr/sbin/update-ca-certificates"

# Since we generally have a person at the command line prompt. Don't loop
# for 5 minutes like charmhelpers because we expect the human to notice and
# resolve to apt conflict or try again.
# Hope for an optimal first try.
APT_RETRIES = [1.0, 5.0, 10.0]


def assert_valid_apt_credentials(repo_url, username, password):
    """Validate apt credentials for a PPA.

    @param repo_url: private-ppa url path
    @param username: PPA login username.
    @param password: PPA login password or resource token.

    @raises: UserFacingError for invalid credentials, timeout or unexpected
        errors.
    """
    protocol, repo_path = repo_url.split("://")
    if not os.path.exists("/usr/lib/apt/apt-helper"):
        return
    try:
        with tempfile.TemporaryDirectory() as tmpd:
            util.subp(
                [
                    "/usr/lib/apt/apt-helper",
                    "download-file",
                    "{}://{}:{}@{}/ubuntu/pool/".format(
                        protocol, username, password, repo_path
                    ),
                    os.path.join(tmpd, "apt-helper-output"),
                ],
                timeout=APT_HELPER_TIMEOUT,
            )
    except util.ProcessExecutionError as e:
        if e.exit_code == 100:
            stderr = str(e.stderr).lower()
            if re.search(r"401\s+unauthorized|httperror401", stderr):
                raise exceptions.UserFacingError(
                    "Invalid APT credentials provided for {}".format(repo_url)
                )
            elif re.search(r"connection timed out", stderr):
                raise exceptions.UserFacingError(
                    "Timeout trying to access APT repository at {}".format(
                        repo_url
                    )
                )
        raise exceptions.UserFacingError(
            "Unexpected APT error. See /var/log/ubuntu-advantage.log"
        )
    except subprocess.TimeoutExpired:
        raise exceptions.UserFacingError(
            "Cannot validate credentials for APT repo."
            " Timeout after {} seconds trying to reach {}.".format(
                APT_HELPER_TIMEOUT, repo_path
            )
        )


def run_apt_command(cmd, error_msg) -> str:
    """Run an apt command, retrying upon failure APT_RETRIES times.

    :return: stdout from successful run of the apt command.
    :raise UserFacingError: on issues running apt-cache policy.
    """
    try:
        out, _err = util.subp(cmd, capture=True, retry_sleeps=APT_RETRIES)
    except util.ProcessExecutionError as e:
        if "Could not get lock /var/lib/dpkg/lock" in str(e.stderr):
            error_msg += " Another process is running APT."
        raise exceptions.UserFacingError(error_msg)
    return out


def add_auth_apt_repo(
    repo_filename: str,
    repo_url: str,
    credentials: str,
    suites: "List[str]",
    keyring_file: str,
) -> None:
    """Add an authenticated apt repo and credentials to the system.

    @raises: InvalidAPTCredentialsError when the token provided can't access
        the repo PPA.
    """
    try:
        username, password = credentials.split(":")
    except ValueError:  # Then we have a bearer token
        username = "bearer"
        password = credentials
    series = util.get_platform_info()["series"]
    if repo_url.endswith("/"):
        repo_url = repo_url[:-1]
    assert_valid_apt_credentials(repo_url, username, password)

    # Does this system have updates suite enabled?
    updates_enabled = False
    policy = run_apt_command(
        ["apt-cache", "policy"], status.MESSAGE_APT_POLICY_FAILED
    )
    for line in policy.splitlines():
        # We only care about $suite-updates lines
        if "a={}-updates".format(series) not in line:
            continue
        # We only care about $suite-updates from the Ubuntu archive
        if "o=Ubuntu," not in line:
            continue
        updates_enabled = True
        break

    content = ""
    for suite in suites:
        if series not in suite:
            continue  # Only enable suites matching this current series
        maybe_comment = ""
        if "-updates" in suite and not updates_enabled:
            logging.debug(
                'Not enabling apt suite "%s" because "%s-updates" is not'
                " enabled",
                suite,
                series,
            )
            maybe_comment = "# "
        content += (
            "{maybe_comment}deb {url}/ubuntu {suite} main\n"
            "# deb-src {url}/ubuntu {suite} main\n".format(
                maybe_comment=maybe_comment, url=repo_url, suite=suite
            )
        )
    util.write_file(repo_filename, content)
    add_apt_auth_conf_entry(repo_url, username, password)
    source_keyring_file = os.path.join(KEYRINGS_DIR, keyring_file)
    destination_keyring_file = os.path.join(APT_KEYS_DIR, keyring_file)
    gpg.export_gpg_key(source_keyring_file, destination_keyring_file)


def add_apt_auth_conf_entry(repo_url, login, password):
    """Add or replace an apt auth line in apt's auth.conf file or conf.d."""
    apt_auth_file = get_apt_auth_file_from_apt_config()
    _protocol, repo_path = repo_url.split("://")
    if repo_path.endswith("/"):  # strip trailing slash
        repo_path = repo_path[:-1]
    if os.path.exists(apt_auth_file):
        orig_content = util.load_file(apt_auth_file)
    else:
        orig_content = ""
    repo_auth_line = (
        "machine {repo_path}/ login {login} password {password}"
        "{cmt}".format(
            repo_path=repo_path,
            login=login,
            password=password,
            cmt=APT_AUTH_COMMENT,
        )
    )
    added_new_auth = False
    new_lines = []
    for line in orig_content.splitlines():
        machine_match = re.match(r"machine\s+(?P<repo_url>[.\-\w]+)/?.*", line)
        if machine_match:
            matched_repo = machine_match.group("repo_url")
            if matched_repo == repo_path:
                # Replace old auth with new auth at same line
                new_lines.append(repo_auth_line)
                added_new_auth = True
                continue
            if matched_repo in repo_path:
                # Insert our repo before. We are a more specific apt repo match
                new_lines.append(repo_auth_line)
                added_new_auth = True
        new_lines.append(line)
    if not added_new_auth:
        new_lines.append(repo_auth_line)
    new_lines.append("")
    util.write_file(apt_auth_file, "\n".join(new_lines), mode=0o600)


def remove_repo_from_apt_auth_file(repo_url):
    """Remove a repo from the shared apt auth file"""
    _protocol, repo_path = repo_url.split("://")
    if repo_path.endswith("/"):  # strip trailing slash
        repo_path = repo_path[:-1]
    apt_auth_file = get_apt_auth_file_from_apt_config()
    if os.path.exists(apt_auth_file):
        apt_auth = util.load_file(apt_auth_file)
        auth_prefix = "machine {repo_path}/ login".format(repo_path=repo_path)
        content = "\n".join(
            [line for line in apt_auth.splitlines() if auth_prefix not in line]
        )
        if not content:
            os.unlink(apt_auth_file)
        else:
            util.write_file(apt_auth_file, content, mode=0o600)


def remove_auth_apt_repo(
    repo_filename: str, repo_url: str, keyring_file: str = None
) -> None:
    """Remove an authenticated apt repo and credentials to the system"""
    util.del_file(repo_filename)
    if keyring_file:
        keyring_file = os.path.join(APT_KEYS_DIR, keyring_file)
        util.del_file(keyring_file)
    remove_repo_from_apt_auth_file(repo_url)


def restore_commented_apt_list_file(filename: str) -> None:
    """Uncomment commented deb lines in the given file."""
    file_content = util.load_file(filename)
    file_content = file_content.replace("# deb ", "deb ")
    util.write_file(filename, file_content)


def add_ppa_pinning(apt_preference_file, repo_url, origin, priority):
    """Add an apt preferences file and pin for a PPA."""
    series = util.get_platform_info()["series"]
    _protocol, repo_path = repo_url.split("://")
    if repo_path.endswith("/"):  # strip trailing slash
        repo_path = repo_path[:-1]
    content = (
        "Package: *\n"
        "Pin: release o={origin}, n={series}\n"
        "Pin-Priority: {priority}\n".format(
            origin=origin, priority=priority, series=series
        )
    )
    util.write_file(apt_preference_file, content)


def get_apt_auth_file_from_apt_config():
    """Return to patch to the system configured APT auth file."""
    out, _err = util.subp(
        ["apt-config", "shell", "key", APT_CONFIG_AUTH_PARTS_DIR]
    )
    if out:  # then auth.conf.d parts is present
        return out.split("'")[1] + "90ubuntu-advantage"
    else:  # then use configured /etc/apt/auth.conf
        out, _err = util.subp(
            ["apt-config", "shell", "key", APT_CONFIG_AUTH_FILE]
        )
        return out.split("'")[1].rstrip("/")


def find_apt_list_files(repo_url, series):
    """List any apt files in APT_CONFIG_LISTS_DIR given repo_url and series."""
    _protocol, repo_path = repo_url.split("://")
    if repo_path.endswith("/"):  # strip trailing slash
        repo_path = repo_path[:-1]
    lists_dir = "/var/lib/apt/lists"
    out, _err = util.subp(["apt-config", "shell", "key", APT_CONFIG_LISTS_DIR])
    if out:  # then lists dir is present in config
        lists_dir = out.split("'")[1]

    aptlist_filename = repo_path.replace("/", "_")
    return sorted(
        glob.glob(
            os.path.join(
                lists_dir, aptlist_filename + "_dists_{}*".format(series)
            )
        )
    )


def remove_apt_list_files(repo_url, series):
    """Remove any apt list files present for this repo_url and series."""
    for path in find_apt_list_files(repo_url, series):
        if os.path.exists(path):
            os.unlink(path)


def clean_apt_sources(*, _entitlements=None):
    """
    Clean apt sources list files written by uaclient

    :param _entitlements:
        The uaclient.entitlements module to use, defaults to
        uaclient.entitlements. (This is only present for testing, because the
        import happens within the function to avoid circular imports.)
    """
    if _entitlements is None:
        from uaclient import entitlements as _entitlements

    for ent_cls in _entitlements.ENTITLEMENT_CLASSES:
        if not hasattr(ent_cls, "repo_url"):
            continue
        repo_list_glob = ent_cls.repo_list_file_tmpl.format(
            name=ent_cls.name, series="*"
        )

        # Remove list files
        for path in glob.glob(repo_list_glob):
            logging.info("Removing apt source file: %s", path)
            os.unlink(path)


def get_installed_packages() -> "List[str]":
    out, _ = util.subp(["dpkg-query", "-W", "--showformat=${Package}\\n"])
    return out.splitlines()

Youez - 2016 - github.com/yon3zu
LinuXploit