????JFIF??x?x????'
| Server IP : 79.136.114.73 / Your IP : 216.73.216.25 Web Server : Apache/2.4.7 (Ubuntu) PHP/5.5.9-1ubuntu4.29 OpenSSL/1.0.1f System : Linux b8009 3.13.0-170-generic #220-Ubuntu SMP Thu May 9 12:40:49 UTC 2019 x86_64 User : www-data ( 33) PHP Version : 5.5.9-1ubuntu4.29 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority, MySQL : ON | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python3/dist-packages/uaclient/ |
Upload File : |
import copy
from datetime import datetime
import json
import logging
import os
import yaml
from collections import namedtuple
from uaclient import status, util
from uaclient.defaults import CONFIG_DEFAULTS, DEFAULT_CONFIG_FILE
from uaclient import exceptions
try:
from typing import Any, cast, Dict, Optional # noqa: F401
except ImportError:
# typing isn't available on trusty, so ignore its absence
def cast(_, x): # type: ignore
return x
DEFAULT_STATUS = {
"_doc": "Content provided in json response is currently considered"
" Experimental and may change",
"attached": False,
"expires": status.UserFacingStatus.INAPPLICABLE.value,
"origin": None,
"services": [],
"techSupportLevel": status.UserFacingStatus.INAPPLICABLE.value,
} # type: Dict[str, Any]
LOG = logging.getLogger(__name__)
PRIVATE_SUBDIR = "private"
# A data path is a filename, and an attribute ("private") indicating whether it
# should only be readable by root
DataPath = namedtuple("DataPath", ("filename", "private"))
class UAConfig:
data_paths = {
"machine-access-cc-eal": DataPath("machine-access-cc-eal.json", True),
"machine-access-cis-audit": DataPath(
"machine-access-cis-audit.json", True
),
"machine-access-esm-infra": DataPath(
"machine-access-esm-infra.json", True
),
"machine-access-fips": DataPath("machine-access-fips.json", True),
"machine-access-fips-updates": DataPath(
"machine-access-fips-updates.json", True
),
"machine-access-livepatch": DataPath(
"machine-access-livepatch.json", True
),
"machine-access-support": DataPath(
"machine-access-support.json", True
),
"machine-id": DataPath("machine-id", True),
"machine-token": DataPath("machine-token.json", True),
"status-cache": DataPath("status.json", False),
} # type: Dict[str, DataPath]
_entitlements = None # caching to avoid repetitive file reads
_machine_token = None # caching to avoid repetitive file reading
def __init__(self, cfg: "Dict[str, Any]" = None) -> None:
""""""
if cfg:
self.cfg = cfg
else:
self.cfg = parse_config()
@property
def accounts(self):
"""Return the list of accounts that apply to this authorized user."""
if self.is_attached:
accountInfo = self.machine_token["machineTokenInfo"]["accountInfo"]
return [accountInfo]
return []
@property
def contract_url(self):
return self.cfg.get("contract_url", "https://contracts.canonical.com")
@property
def data_dir(self):
return self.cfg["data_dir"]
@property
def log_level(self):
log_level = self.cfg.get("log_level")
try:
return getattr(logging, log_level.upper())
except AttributeError:
return getattr(logging, CONFIG_DEFAULTS["log_level"])
@property
def log_file(self):
return self.cfg.get("log_file", CONFIG_DEFAULTS["log_file"])
@property
def entitlements(self):
"""Return a dictionary of entitlements keyed by entitlement name.
Return an empty dict if no entitlements are present.
"""
if self._entitlements:
return self._entitlements
machine_token = self.machine_token
if not machine_token:
return {}
self._entitlements = {}
contractInfo = machine_token["machineTokenInfo"]["contractInfo"]
ent_by_name = dict(
(e["type"], e) for e in contractInfo["resourceEntitlements"]
)
for entitlement_name, ent_value in ent_by_name.items():
entitlement_cfg = {}
if ent_value.get("entitled"):
entitlement_cfg = self.read_cache(
"machine-access-{}".format(entitlement_name), silent=True
)
if not entitlement_cfg:
# Fallback to machine-token info on unentitled
entitlement_cfg = {"entitlement": ent_value}
util.apply_series_overrides(entitlement_cfg)
self._entitlements[entitlement_name] = entitlement_cfg
return self._entitlements
@property
def is_attached(self):
"""Report whether this machine configuration is attached to UA."""
return bool(self.machine_token) # machine_token is removed on detach
@property
def machine_token(self):
"""Return the machine-token if cached in the machine token response."""
if not self._machine_token:
self._machine_token = self.read_cache("machine-token")
return self._machine_token
def data_path(self, key: "Optional[str]" = None) -> str:
"""Return the file path in the data directory represented by the key"""
data_dir = self.cfg["data_dir"]
if not key:
return os.path.join(data_dir, PRIVATE_SUBDIR)
if key in self.data_paths:
data_path = self.data_paths[key]
if data_path.private:
return os.path.join(
data_dir, PRIVATE_SUBDIR, data_path.filename
)
return os.path.join(data_dir, data_path.filename)
return os.path.join(data_dir, PRIVATE_SUBDIR, key)
def _perform_delete(self, cache_path: str) -> None:
"""Delete the given cache_path if it exists.
(This is a separate method to allow easier disabling of deletion during
tests.)
"""
if os.path.exists(cache_path):
os.unlink(cache_path)
def delete_cache_key(self, key: str) -> None:
"""Remove specific cache file."""
if not key:
raise RuntimeError(
"Invalid or empty key provided to delete_cache_key"
)
if key.startswith("machine-access") or key == "machine-token":
self._entitlements = None
self._machine_token = None
cache_path = self.data_path(key)
self._perform_delete(cache_path)
def delete_cache(self):
"""Remove configuration cached response files class attributes."""
for path_key in self.data_paths.keys():
self.delete_cache_key(path_key)
def read_cache(self, key: str, silent: bool = False) -> "Optional[Any]":
cache_path = self.data_path(key)
try:
content = util.load_file(cache_path)
except Exception:
if not os.path.exists(cache_path) and not silent:
logging.debug("File does not exist: %s", cache_path)
return None
try:
return json.loads(content, cls=util.DatetimeAwareJSONDecoder)
except ValueError:
return content
def write_cache(self, key: str, content: "Any") -> None:
filepath = self.data_path(key)
data_dir = os.path.dirname(filepath)
if not os.path.exists(data_dir):
os.makedirs(data_dir)
if os.path.basename(data_dir) == PRIVATE_SUBDIR:
os.chmod(data_dir, 0o700)
if key.startswith("machine-access") or key == "machine-token":
self._machine_token = None
self._entitlements = None
if not isinstance(content, str):
content = json.dumps(content, cls=util.DatetimeAwareJSONEncoder)
mode = 0o600
if key in self.data_paths:
if not self.data_paths[key].private:
mode = 0o644
util.write_file(filepath, content, mode=mode)
def _unattached_status(self) -> "Dict[str, Any]":
"""Return unattached status as a dict."""
from uaclient.contract import get_available_resources
from uaclient.entitlements import ENTITLEMENT_CLASS_BY_NAME
response = copy.deepcopy(DEFAULT_STATUS)
resources = get_available_resources(self)
for resource in sorted(resources, key=lambda x: x["name"]):
if resource["available"]:
available = status.UserFacingAvailability.AVAILABLE.value
else:
available = status.UserFacingAvailability.UNAVAILABLE.value
ent_cls = ENTITLEMENT_CLASS_BY_NAME.get(resource["name"])
if not ent_cls:
LOG.debug(
"Ignoring availability of unknown service %s"
" from contract server",
resource["name"],
)
continue
response["services"].append(
{
"name": resource["name"],
"description": ent_cls.description,
"available": available,
}
)
return response
def _attached_service_status(
self, ent, inapplicable_resources
) -> "Dict[str, str]":
details = ""
contract_status = ent.contract_status()
if contract_status == status.ContractStatus.UNENTITLED:
ent_status = status.UserFacingStatus.UNAVAILABLE
else:
if ent.name in inapplicable_resources:
ent_status = status.UserFacingStatus.INAPPLICABLE
else:
ent_status, details = ent.user_facing_status()
return {
"name": ent.name,
"description": ent.description,
"entitled": contract_status.value,
"status": ent_status.value,
"statusDetails": details,
}
def _attached_status(self) -> "Dict[str, Any]":
"""Return configuration of attached status as a dictionary."""
from uaclient.contract import get_available_resources
from uaclient.entitlements import ENTITLEMENT_CLASSES
response = copy.deepcopy(DEFAULT_STATUS)
contractInfo = self.machine_token["machineTokenInfo"]["contractInfo"]
response.update(
{
"attached": True,
"account": self.accounts[0]["name"],
"account-id": self.accounts[0]["id"],
"origin": contractInfo.get("origin"),
"subscription": contractInfo["name"],
"subscription-id": contractInfo["id"],
}
)
if contractInfo.get("effectiveTo"):
response["expires"] = datetime.strptime(
contractInfo["effectiveTo"], "%Y-%m-%dT%H:%M:%SZ"
)
resources = get_available_resources(self)
inapplicable_resources = [
resource["name"]
for resource in sorted(resources, key=lambda x: x["name"])
if not resource["available"]
]
for ent_cls in ENTITLEMENT_CLASSES:
ent = ent_cls(self)
response["services"].append(
self._attached_service_status(ent, inapplicable_resources)
)
support = self.entitlements.get("support", {}).get("entitlement")
if support:
supportLevel = support.get("affordances", {}).get("supportLevel")
if not supportLevel:
supportLevel = DEFAULT_STATUS["techSupportLevel"]
response["techSupportLevel"] = supportLevel
return response
def status(self) -> "Dict[str, Any]":
"""Return status as a dict, using a cache for non-root users
When unattached, get available resources from the contract service
to report detailed availability of different resources for this
machine.
Write the status-cache when called by root.
"""
if os.getuid() != 0:
response = cast("Dict[str, Any]", self.read_cache("status-cache"))
if not response:
response = self._unattached_status()
elif not self.is_attached:
response = self._unattached_status()
else:
response = self._attached_status()
if os.getuid() == 0:
self.write_cache("status-cache", response)
return response
def parse_config(config_path=None):
"""Parse known UA config file
Attempt to find configuration in cwd and fallback to DEFAULT_CONFIG_FILE.
Any missing configuration keys will be set to CONFIG_DEFAULTS.
Values are overridden by any environment variable with prefix 'UA_'.
@param config_path: Fullpath to ua configfile. If unspecified, use
DEFAULT_CONFIG_FILE.
@return: Dict of configuration values.
"""
if not config_path:
config_path = DEFAULT_CONFIG_FILE
cfg = copy.copy(CONFIG_DEFAULTS)
local_cfg = os.path.join(os.getcwd(), os.path.basename(config_path))
if os.path.exists(local_cfg):
config_path = local_cfg
if os.environ.get("UA_CONFIG_FILE"):
config_path = os.environ.get("UA_CONFIG_FILE")
LOG.debug("Using UA client configuration file at %s", config_path)
if os.path.exists(config_path):
cfg.update(yaml.safe_load(util.load_file(config_path)))
env_keys = {}
for key, value in os.environ.items():
key = key.lower()
if key.startswith("ua_"):
env_keys[key[3:]] = value # Strip leading UA_
cfg.update(env_keys)
cfg["log_level"] = cfg["log_level"].upper()
cfg["data_dir"] = os.path.expanduser(cfg["data_dir"])
if not util.is_service_url(cfg["contract_url"]):
raise exceptions.UserFacingError(
"Invalid url in config. contract_url: {}".format(
cfg["contract_url"]
)
)
return cfg