????JFIF??x?x????'
| Server IP : 79.136.114.73 / Your IP : 216.73.216.55 Web Server : Apache/2.4.7 (Ubuntu) PHP/5.5.9-1ubuntu4.29 OpenSSL/1.0.1f System : Linux b8009 3.13.0-170-generic #220-Ubuntu SMP Thu May 9 12:40:49 UTC 2019 x86_64 User : www-data ( 33) PHP Version : 5.5.9-1ubuntu4.29 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python3/dist-packages/uaclient/entitlements/ |
Upload File : |
import logging
from uaclient.entitlements import base
from uaclient import apt, exceptions, status
from uaclient import util
from uaclient.status import ApplicationStatus
SNAP_CMD = "/usr/bin/snap"
SNAP_INSTALL_RETRIES = [0.5, 1.0, 5.0]
try:
from typing import Any, Dict, Tuple # noqa: F401
except ImportError:
# typing isn't available on trusty, so ignore its absence
pass
ERROR_MSG_MAP = {
"Unknown Auth-Token": "Invalid Auth-Token provided to livepatch.",
"unsupported kernel": "Your running kernel is not supported by Livepatch.",
}
class LivepatchEntitlement(base.UAEntitlement):
help_doc_url = "https://ubuntu.com/livepatch"
name = "livepatch"
title = "Livepatch"
description = "Canonical Livepatch service"
# Use a lambda so we can mock util.is_container in tests
static_affordances = (
(
"Cannot install Livepatch on a container",
lambda: util.is_container(),
False,
),
)
def enable(self, *, silent_if_inapplicable: bool = False) -> bool:
"""Enable specific entitlement.
:param silent_if_inapplicable:
Don't emit any messages until after it has been determined that
this entitlement is applicable to the current machine.
@return: True on success, False otherwise.
"""
if not self.can_enable(silent=silent_if_inapplicable):
return False
if not util.which("/snap/bin/canonical-livepatch"):
if not util.which(SNAP_CMD):
print("Installing snapd")
print(status.MESSAGE_APT_UPDATING_LISTS)
try:
apt.run_apt_command(
["apt-get", "update"], status.MESSAGE_APT_UPDATE_FAILED
)
except exceptions.UserFacingError as e:
logging.debug(
"Trying to install snapd."
" Ignoring apt-get update failure: %s",
str(e),
)
util.subp(
["apt-get", "install", "--assume-yes", "snapd"],
capture=True,
retry_sleeps=apt.APT_RETRIES,
)
util.subp(
[SNAP_CMD, "wait", "system", "seed.loaded"], capture=True
)
elif "snapd" not in apt.get_installed_packages():
raise exceptions.UserFacingError(
"/usr/bin/snap is present but snapd is not installed;"
" cannot enable {}".format(self.title)
)
print("Installing canonical-livepatch snap")
try:
util.subp(
[SNAP_CMD, "install", "canonical-livepatch"],
capture=True,
retry_sleeps=SNAP_INSTALL_RETRIES,
)
except util.ProcessExecutionError as e:
msg = "Unable to install Livepatch client: " + str(e)
raise exceptions.UserFacingError(msg)
return self.setup_livepatch_config(
process_directives=True, process_token=True
)
def setup_livepatch_config(
self, process_directives: bool = True, process_token: bool = True
) -> bool:
"""Processs configuration setup for livepatch directives.
:param process_directives: Boolean set True when directives should be
processsed.
:param process_token: Boolean set True when token should be
processsed.
"""
entitlement_cfg = self.cfg.entitlements.get(self.name)
if process_directives:
try:
process_config_directives(entitlement_cfg)
except util.ProcessExecutionError as e:
msg = "Unable to configure Livepatch: " + str(e)
print(msg)
logging.error(msg)
return False
if process_token:
livepatch_token = entitlement_cfg.get("resourceToken")
if not livepatch_token:
logging.debug(
"No specific resourceToken present. Using machine token as"
" %s credentials",
self.title,
)
livepatch_token = self.cfg.machine_token["machineToken"]
application_status, _details = self.application_status()
if application_status != status.ApplicationStatus.DISABLED:
logging.info(
"Disabling %s prior to re-attach with new token",
self.title,
)
try:
util.subp(["/snap/bin/canonical-livepatch", "disable"])
except util.ProcessExecutionError as e:
logging.error(str(e))
return False
try:
util.subp(
[
"/snap/bin/canonical-livepatch",
"enable",
livepatch_token,
],
capture=True,
)
except util.ProcessExecutionError as e:
msg = "Unable to enable Livepatch: "
for error_message, print_message in ERROR_MSG_MAP.items():
if error_message in str(e):
msg += print_message
break
if msg == "Unable to enable Livepatch: ":
msg += str(e)
print(msg)
return False
print("Canonical livepatch enabled.")
return True
def disable(self, silent=False):
"""Disable specific entitlement
@return: True on success, False otherwise.
"""
if not self.can_disable(silent):
return False
if not util.which("/snap/bin/canonical-livepatch"):
return True
util.subp(["/snap/bin/canonical-livepatch", "disable"], capture=True)
logging.debug("Removing canonical-livepatch snap")
if not silent:
print("Removing canonical-livepatch snap")
util.subp([SNAP_CMD, "remove", "canonical-livepatch"], capture=True)
return True
def application_status(self) -> "Tuple[ApplicationStatus, str]":
status = (ApplicationStatus.ENABLED, "")
try:
util.subp(["/snap/bin/canonical-livepatch", "status"])
except util.ProcessExecutionError as e:
# TODO(May want to parse INACTIVE/failure assessment)
logging.debug("Livepatch not enabled. %s", str(e))
status = (ApplicationStatus.DISABLED, str(e))
return status
def process_contract_deltas(
self,
orig_access: "Dict[str, Any]",
deltas: "Dict[str, Any]",
allow_enable: bool = False,
) -> bool:
"""Process any contract access deltas for this entitlement.
:param orig_access: Dictionary containing the original
resourceEntitlement access details.
:param deltas: Dictionary which contains only the changed access keys
and values.
:param allow_enable: Boolean set True if allowed to perform the enable
operation. When False, a message will be logged to inform the user
about the recommended enabled service.
:return: True when delta operations are processed; False when noop.
"""
if super().process_contract_deltas(orig_access, deltas, allow_enable):
return True # Already processed parent class deltas
application_status, _ = self.application_status()
if application_status == status.ApplicationStatus.DISABLED:
return True # only operate on changed directives when ACTIVE
delta_entitlement = deltas.get("entitlement", {})
delta_directives = delta_entitlement.get("directives", {})
supported_deltas = set(["caCerts", "remoteServer"])
process_directives = bool(
supported_deltas.intersection(delta_directives)
)
process_token = bool(deltas.get("resourceToken", False))
if any([process_directives, process_token]):
logging.info("Updating '%s' on changed directives.", self.name)
return self.setup_livepatch_config(
process_directives=process_directives,
process_token=process_token,
)
return True
def process_config_directives(cfg):
"""Process livepatch configuration directives.
We process caCerts before remoteServer because changing remote-server
in the canonical-livepatch CLI performs a PUT against the new server name.
If new caCerts were required for the new remoteServer, this
canonical-livepatch client PUT could fail on unmatched old caCerts.
@raises: ProcessExecutionError if unable to configure livepatch.
"""
if not cfg:
return
directives = cfg.get("entitlement", {}).get("directives", {})
ca_certs = directives.get("caCerts")
if ca_certs:
util.subp(
[
"/snap/bin/canonical-livepatch",
"config",
"ca-certs={}".format(ca_certs),
],
capture=True,
)
remote_server = directives.get("remoteServer", "")
if remote_server.endswith("/"):
remote_server = remote_server[:-1]
if remote_server:
util.subp(
[
"/snap/bin/canonical-livepatch",
"config",
"remote-server={}".format(remote_server),
],
capture=True,
)