????JFIF??x?x????'
| Server IP : 79.136.114.73 / Your IP : 216.73.216.57 Web Server : Apache/2.4.7 (Ubuntu) PHP/5.5.9-1ubuntu4.29 OpenSSL/1.0.1f System : Linux b8009 3.13.0-170-generic #220-Ubuntu SMP Thu May 9 12:40:49 UTC 2019 x86_64 User : www-data ( 33) PHP Version : 5.5.9-1ubuntu4.29 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python3/dist-packages/uaclient/entitlements/ |
Upload File : |
import abc
from datetime import datetime
import logging
import re
try:
from typing import Any, Callable, Dict, Optional, Tuple # noqa: F401
StaticAffordance = Tuple[str, Callable[[], Any], bool]
except ImportError:
# typing isn't available on trusty, so ignore its absence
pass
from uaclient import config
from uaclient import contract
from uaclient import status
from uaclient import util
from uaclient.status import (
ApplicabilityStatus,
ContractStatus,
UserFacingStatus,
)
RE_KERNEL_UNAME = (
r"(?P<major>[\d]+)[.-](?P<minor>[\d]+)[.-](?P<patch>[\d]+\-[\d]+)"
r"-(?P<flavor>[A-Za-z0-9_-]+)"
)
class UAEntitlement(metaclass=abc.ABCMeta):
# Optional URL for top-level product service information
help_doc_url = None # type: str
@property
@abc.abstractmethod
def name(self) -> str:
"""The lowercase name of this entitlement"""
pass
@property
@abc.abstractmethod
def title(self) -> str:
"""The human readable title of this entitlement"""
pass
@property
@abc.abstractmethod
def description(self) -> str:
"""A sentence describing this entitlement"""
pass
# A tuple of 3-tuples with (failure_message, functor, expected_results)
# If any static_affordance does not match expected_results fail with
# <failure_message>. Overridden in livepatch and fips
static_affordances = () # type: Tuple[StaticAffordance, ...]
def __init__(self, cfg: "Optional[config.UAConfig]" = None) -> None:
"""Setup UAEntitlement instance
@param config: Parsed configuration dictionary
"""
if not cfg:
cfg = config.UAConfig()
self.cfg = cfg
@abc.abstractmethod
def enable(self, *, silent_if_inapplicable: bool = False) -> bool:
"""Enable specific entitlement.
:param silent_if_inapplicable:
Don't emit any messages until after it has been determined that
this entitlement is applicable to the current machine.
@return: True on success, False otherwise.
"""
pass
def can_disable(self, silent: bool = False) -> bool:
"""Report whether or not disabling is possible for the entitlement.
@param silent: Boolean set True to silence printed messages/warnings.
"""
application_status, _ = self.application_status()
if application_status == status.ApplicationStatus.DISABLED:
if not silent:
print(
status.MESSAGE_ALREADY_DISABLED_TMPL.format(
title=self.title
)
)
return False
return True
def can_enable(self, silent: bool = False) -> bool:
"""
Report whether or not enabling is possible for the entitlement.
:param silent: if True, suppress output
"""
if self.is_access_expired():
token = self.cfg.machine_token["machineToken"]
contract_client = contract.UAContractClient(self.cfg)
contract_client.request_resource_machine_access(token, self.name)
if not self.contract_status() == ContractStatus.ENTITLED:
if not silent:
print(status.MESSAGE_UNENTITLED_TMPL.format(title=self.title))
return False
application_status, _ = self.application_status()
if application_status != status.ApplicationStatus.DISABLED:
if not silent:
print(
status.MESSAGE_ALREADY_ENABLED_TMPL.format(
title=self.title
)
)
return False
applicability_status, details = self.applicability_status()
if applicability_status == status.ApplicabilityStatus.INAPPLICABLE:
if not silent:
print(details)
return False
return True
def applicability_status(self) -> "Tuple[ApplicabilityStatus, str]":
"""Check all contract affordances to vet current platform
Affordances are a list of support constraints for the entitlement.
Examples include a list of supported series, architectures for kernel
revisions.
:return:
tuple of (ApplicabilityStatus, detailed_message). APPLICABLE if
platform passes all defined affordances, INAPPLICABLE if it doesn't
meet all of the provided constraints.
"""
entitlement_cfg = self.cfg.entitlements.get(self.name)
if not entitlement_cfg:
return (
ApplicabilityStatus.APPLICABLE,
"no entitlement affordances checked",
)
for error_message, functor, expected_result in self.static_affordances:
if functor() != expected_result:
return ApplicabilityStatus.INAPPLICABLE, error_message
affordances = entitlement_cfg["entitlement"].get("affordances", {})
platform = util.get_platform_info()
affordance_arches = affordances.get("architectures", [])
if affordance_arches and platform["arch"] not in affordance_arches:
return (
ApplicabilityStatus.INAPPLICABLE,
status.MESSAGE_INAPPLICABLE_ARCH_TMPL.format(
title=self.title,
arch=platform["arch"],
supported_arches=", ".join(affordance_arches),
),
)
affordance_series = affordances.get("series", [])
if affordance_series and platform["series"] not in affordance_series:
return (
ApplicabilityStatus.INAPPLICABLE,
status.MESSAGE_INAPPLICABLE_SERIES_TMPL.format(
title=self.title, series=platform["version"]
),
)
kernel = platform["kernel"]
affordance_kernels = affordances.get("kernelFlavors", [])
affordance_min_kernel = affordances.get("minKernelVersion")
match = re.match(RE_KERNEL_UNAME, kernel)
if affordance_kernels:
if not match or match.group("flavor") not in affordance_kernels:
return (
ApplicabilityStatus.INAPPLICABLE,
status.MESSAGE_INAPPLICABLE_KERNEL_TMPL.format(
title=self.title,
kernel=kernel,
supported_kernels=", ".join(affordance_kernels),
),
)
if affordance_min_kernel:
invalid_msg = status.MESSAGE_INAPPLICABLE_KERNEL_VER_TMPL.format(
title=self.title,
kernel=kernel,
min_kernel=affordance_min_kernel,
)
try:
kernel_major, kernel_minor = affordance_min_kernel.split(".")
min_kern_major = int(kernel_major)
min_kern_minor = int(kernel_minor)
except ValueError:
logging.warning(
"Could not parse minKernelVersion: %s",
affordance_min_kernel,
)
return (ApplicabilityStatus.INAPPLICABLE, invalid_msg)
if not match:
return ApplicabilityStatus.INAPPLICABLE, invalid_msg
kernel_major = int(match.group("major"))
kernel_minor = int(match.group("minor"))
if kernel_major < min_kern_major:
return ApplicabilityStatus.INAPPLICABLE, invalid_msg
elif (
kernel_major == min_kern_major
and kernel_minor < min_kern_minor
):
return ApplicabilityStatus.INAPPLICABLE, invalid_msg
return ApplicabilityStatus.APPLICABLE, ""
@abc.abstractmethod
def disable(self, silent: bool = False) -> bool:
"""Disable specific entitlement
@param silent: Boolean set True to silence print/log of messages
@return: True on success, False otherwise.
"""
pass
def contract_status(self) -> ContractStatus:
"""Return whether the user is entitled to the entitlement or not"""
if not self.cfg.is_attached:
return ContractStatus.UNENTITLED
entitlement_cfg = self.cfg.entitlements.get(self.name, {})
if entitlement_cfg and entitlement_cfg["entitlement"].get("entitled"):
return ContractStatus.ENTITLED
return ContractStatus.UNENTITLED
def is_access_expired(self) -> bool:
"""Return entitlement access info as stale and needing refresh."""
entitlement_contract = self.cfg.entitlements.get(self.name, {})
# TODO(No expiry per resource in MVP yet)
expire_str = entitlement_contract.get("expires")
if not expire_str:
return False
expiry = datetime.strptime(expire_str, "%Y-%m-%dT%H:%M:%S.%fZ")
if expiry >= datetime.utcnow():
return False
return True
def process_contract_deltas(
self,
orig_access: "Dict[str, Any]",
deltas: "Dict[str, Any]",
allow_enable: bool = False,
) -> bool:
"""Process any contract access deltas for this entitlement.
:param orig_access: Dictionary containing the original
resourceEntitlement access details.
:param deltas: Dictionary which contains only the changed access keys
and values.
:param allow_enable: Boolean set True if allowed to perform the enable
operation. When False, a message will be logged to inform the user
about the recommended enabled service.
:return: True when delta operations are processed; False when noop.
:raise: UserFacingError when auto-enable fails unexpectedly.
"""
if not deltas:
return True # We processed all deltas that needed processing
delta_entitlement = deltas.get("entitlement", {})
transition_to_unentitled = bool(delta_entitlement == util.DROPPED_KEY)
if not transition_to_unentitled:
if delta_entitlement:
util.apply_series_overrides(deltas)
delta_entitlement = deltas["entitlement"]
if orig_access and "entitled" in delta_entitlement:
transition_to_unentitled = delta_entitlement["entitled"] in (
False,
util.DROPPED_KEY,
)
if transition_to_unentitled:
application_status, _ = self.application_status()
if application_status != status.ApplicationStatus.DISABLED:
if self.can_disable(silent=True):
self.disable()
logging.info(
"Due to contract refresh, '%s' is now disabled.",
self.name,
)
else:
logging.warning(
"Unable to disable '%s' as recommended during contract"
" refresh. Service is still active. See"
" `ua status`",
self.name,
)
# Clean up former entitled machine-access-<name> response cache
# file because uaclient doesn't access machine-access-* routes or
# responses on unentitled services.
self.cfg.delete_cache_key("machine-access-{}".format(self.name))
return True
resourceToken = orig_access.get("resourceToken")
if not resourceToken:
resourceToken = deltas.get("resourceToken")
delta_obligations = delta_entitlement.get("obligations", {})
can_enable = self.can_enable(silent=True)
enableByDefault = bool(
delta_obligations.get("enableByDefault") and resourceToken
)
if can_enable and enableByDefault:
if allow_enable:
msg = status.MESSAGE_ENABLE_BY_DEFAULT_TMPL.format(
name=self.name
)
logging.info(msg)
self.enable()
else:
msg = status.MESSAGE_ENABLE_BY_DEFAULT_MANUAL_TMPL.format(
name=self.name
)
logging.info(msg)
return True
return False
def user_facing_status(self) -> "Tuple[UserFacingStatus, str]":
"""Return (user-facing status, details) for entitlement"""
applicability, details = self.applicability_status()
if applicability != ApplicabilityStatus.APPLICABLE:
return UserFacingStatus.INAPPLICABLE, details
entitlement_cfg = self.cfg.entitlements.get(self.name)
if not entitlement_cfg:
return (
UserFacingStatus.UNAVAILABLE,
"{} is not entitled".format(self.title),
)
elif entitlement_cfg["entitlement"].get("entitled", False) is False:
return (
UserFacingStatus.UNAVAILABLE,
"{} is not entitled".format(self.title),
)
application_status, explanation = self.application_status()
user_facing_status = {
status.ApplicationStatus.ENABLED: UserFacingStatus.ACTIVE,
status.ApplicationStatus.DISABLED: UserFacingStatus.INACTIVE,
}[application_status]
return user_facing_status, explanation
@abc.abstractmethod
def application_status(self) -> "Tuple[status.ApplicationStatus, str]":
"""
The current status of application of this entitlement
:return:
A tuple of (ApplicationStatus, human-friendly reason)
"""
pass