Source code for knox.backend.store_vault

"""
Apache Software License 2.0

Copyright (c) 2020, 8x8, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import json
import sys
from datetime import datetime

import hvac
import requests
import validators
from dynaconf import LazySettings
from loguru import logger

from .store_engine import StoreEngine
from .store_object import StoreObject


[docs]class VaultClient: """Client commands not available via hvac""" __vault_client: hvac.Client __headers = {'Content-Type': 'application/json', 'X-Vault-Token': ''} __url: str #: Vault server URL __token: str #: Auth token __approle: str #: Application Role ID __secretid: str #: Application Role Secret ID __mount: str #: Engine mount path __mounts: json #: Map of Vault mounts match = 'False' def __init__(self, settings: LazySettings) -> None: """Constructor for VaultClient""" self.__url = settings.VAULT_URL self.__approle = settings.VAULT_APPROLE self.__secretid = settings.VAULT_SECRET_ID self.__mount = settings.VAULT_MOUNT self.__mounts = {} self.__vault_client = hvac.Client(url=self.__url) self.__settings = settings
[docs] def initialize(self) -> bool: """During initialization, if in admin mode, ensure the kv mount point has been registered with Vault. To enable admin mode use the hidden param --admin with any command. knox --admin store find """ if self.__settings.CTX.obj['ADMIN_MODE']: return self.new_mount(mount=self.mount) else: return self.connect()
[docs] def connect(self) -> bool: """Knox uses an approle scheme to authenticate with Vault. This requires fetching a fresh, short lived, API token for every call to the API.""" try: logger.trace(f'Connecting to Vault approle: {self.__approle} secret_id: {self.__secretid}') resp = self.__vault_client.auth.approle.login(role_id=self.__approle, secret_id=self.__secretid, use_token=True) self.__token = resp['auth']['client_token'] logger.trace(f'client_token: {self.__token}') self.__headers['X-Vault-Token'] = self.__token except requests.exceptions.ConnectionError as err: logger.error(f'Failed to connect to {self.__url}: {err}') sys.exit(2) except hvac.exceptions.VaultError as err: logger.error(f'Failed to authenticate with Vault {err}') sys.exit(2) else: return True
@property def url(self) -> str: return self.__url @property def mount(self) -> str: return self.__mount @property def token(self) -> str: return self.__token
[docs] def logout(self) -> bool: return self.__vault_client.logout()
[docs] def _get(self, path: str) -> json: """GET REST API wrapper method :param path: Vault API to query :type path: String :return: JSON paylod """ try: """Connect refreshes the temp Vault auth token""" self.connect() response = requests.get(url=f'{self.__url}{path}', headers=self.__headers) response.raise_for_status() return json.loads(response.content.decode('utf-8')) except requests.exceptions.HTTPError as errh: logger.error(f'Http Error: {errh}') except requests.exceptions.ConnectionError as errc: logger.error(f'Error Connecting: {errc}') except requests.exceptions.Timeout as errt: logger.error(f'Timeout Error: {errt}') except requests.exceptions.RequestException as err: logger.error(f'Error: {err}')
[docs] def _post(self, path: str, data: json) -> json: """POST REST API wrapper method :param path: Vault API to change or create :type path: String :param data: Required request body :type data: JSON :return: requests.Response object """ try: """Connect refreshes the temp Vault auth token""" self.connect() response = requests.post(url=f'{self.__url}{path}', headers=self.__headers, data=data) response.raise_for_status() return json.loads(response.content.decode('utf-8')) except requests.exceptions.HTTPError as errh: logger.error(f'Http Error: {errh}') except requests.exceptions.ConnectionError as errc: logger.error(f'Error Connecting: {errc}') except requests.exceptions.Timeout as errt: logger.error(f'Timeout Error: {errt}') except requests.exceptions.RequestException as err: logger.error(f'Error: {err}')
[docs] def _put(self, path: str, data: json) -> requests.Response: """PUT REST API wrapper method :param path: Vault API to change or create :type path: String :param data: Required request body :type data: JSON :return: requests.Response object """ try: """Connect refreshes the temp Vault auth token""" self.connect() response = requests.put(url=f'{self.__url}{path}', headers=self.__headers, data=data) response.raise_for_status() return response except requests.exceptions.HTTPError as errh: logger.error(f'Http Error: {errh}') except requests.exceptions.ConnectionError as errc: logger.error(f'Error Connecting: {errc}') except requests.exceptions.Timeout as errt: logger.error(f'Timeout Error: {errt}') except requests.exceptions.RequestException as err: logger.error(f'Error: {err}')
[docs] def get_mounts(self) -> json: """Refresh set of mounts from Vault :return: JSON """ self.__mounts = self._get("/v1/sys/mounts") return self.__mounts
[docs] def new_mount(self, mount: str) -> bool: """Will create a vault mount of type k/v V2 if it doesn't exist :param mount: Name of the Vault K/V Secret Engine :type mount: String :return: Boolean """ self.__mounts = self._get("/v1/sys/mounts") logger.trace(f'{self.__class__}::new_mount {self.__mounts}') if self.__mounts and f'{mount}/' not in self.__mounts['data'].keys(): logger.info(f'Vault mount {self.__url}/v1/sys/mounts/{mount} does not exist, creating') self._put(f'/v1/sys/mounts/{mount}', '{"type": "kv-v2"}') self.__mounts = self._get("/v1/sys/mounts") return bool(self.__mounts)
[docs] def upsert(self, obj: StoreObject) -> bool: """Given a StoreObject create or update it into Vault. Metadata and content are stored separately to allow querying of non sensitive details. :param obj: The object to store :type obj: StoreObject :return: Boolean """ client = self.__vault_client if validators.domain(obj.name): mp = self.mount op = obj.path_name else: mp = f'{self.mount}' op = f'client/{obj.name}/{obj.type}' policyname = "" try: self.connect() logger.trace(f'updating secret {mp}/{op}/cert_body') client.secrets.kv.v2.create_or_update_secret(path=f'{op}/cert_body', mount_point=mp, secret=obj.data['cert_body']) self.connect() logger.trace(f'updating secret {mp}/{op}/cert_info') client.secrets.kv.v2.create_or_update_secret(path=f'{op}/cert_info', mount_point=mp, secret=obj.data['cert_info']) self.connect() list_policies_resp = client.sys.list_policies()['data']['policies'] if 'commonName' in obj.data['cert_info']['subject']: commonname = obj.data['cert_info']['subject']['commonName'] else: commonname = obj.name policyname = f'knox-read-{commonname}' if policyname in list_policies_resp: pass else: policy = obj.data['cert_policy'] logger.debug(f'Creating explict read access policy {policyname} for {op}/cert_body') logger.trace(f'{self.__class__}::upsert {policyname}:\n{policy}') self.connect() client.sys.create_or_update_policy(name=policyname, policy=policy) except hvac.exceptions.Forbidden as ve: policyphrase = "" if len(policyname) > 0: policyphrase = f'and or {policyname}' logger.error(f'Permission denied writing {op} {policyphrase}: {ve}') sys.exit(2) except hvac.exceptions.InvalidPath as ve: policyphrase = "" if len(policyname) > 0: policyphrase = f'and or {policyname}' logger.error(f'Path invalid for {op} {policyphrase}: {ve}') sys.exit(2) except hvac.exceptions.Unauthorized as ve: policyphrase = "" if len(policyname) > 0: policyphrase = f'and or {policyname}' logger.error(f'Credentials not authorized to write {op} {policyphrase}: {ve}') sys.exit(2) except Exception as vex: logger.error(f'Failed to write StoreObject to Vault {vex}') sys.exit(2) else: logger.info(f'Successfully stored {op} and {policyname}') return True
[docs] def read(self, path: str, name: str, type: str = None) -> tuple: """Given a path and name retrieve a tuple of dictionaries to create a StoreObject cert_body cert_info :param path: The path where the StoreObjects data is stored :type path: str :param name: Name of the StoreObject to retrieve :type name: str :param type: The type of StoreObject i.e. PEM :type type: str """ client = self.__vault_client if type: fullpathbody = f'{path}/{name}/{type}/cert_body' fullpathinfo = f'{path}/{name}/{type}/cert_info' else: fullpathbody = f'{path}/{name}/cert_body' fullpathinfo = f'{path}/{name}/cert_info' try: self.connect() logger.trace(f'Attempting to read \n\tbody:{fullpathbody}\n\tinfo:{fullpathinfo}') logger.trace(f'client.url: {client.url}') logger.trace(f'mount: {self.mount}') certbody = client.secrets.kv.v2.read_secret_version(path=fullpathbody, mount_point=self.mount) self.connect() certinfo = client.secrets.kv.v2.read_secret_version(path=fullpathinfo, mount_point=self.mount) return certbody, certinfo except hvac.exceptions.Forbidden as ve: logger.error(f'Permission denied reading {path}/{name}: {ve}') sys.exit(2) except hvac.exceptions.InvalidPath as ve: logger.error(f'Path invalid for {path}/{name}: {ve}') sys.exit(2) except hvac.exceptions.Unauthorized as ve: logger.error(f'Credentials not authorized to read {path}/{name}: {ve}') sys.exit(2)
[docs] def search(self, rootpath: str, rootkey: str, searchresults: list, pattern: str = None) -> list: """Search for 'cert_info' for a given vault path :param rootpath: Beginning search path :type rootpath: str :param rootkey: Used to get commonname from search path :type rootkey: str :param searchresults: Stores the search results..default is empty :type searchresults: list :param pattern: Unaltered search pattern :type pattern: str :return: list """ try: client = self.__vault_client self.connect() rootpath = rootpath.replace('//', '/') logger.trace(f'Searching {rootpath}') secrets = client.secrets.kv.list_secrets(path=rootpath, mount_point=self.mount) secrets_keys = secrets.get('data').get('keys') if isinstance(secrets_keys, list): if 'cert_info' not in secrets_keys: for key in secrets_keys: subpaths = rootpath + key self.search(rootpath=subpaths, rootkey=key, pattern=pattern, searchresults=searchresults) else: cert_info_path = rootpath + "cert_info" cert_common_name = rootpath.split('/')[-3] self.connect() cert_info = client.secrets.kv.v2.read_secret_version(path=cert_info_path, mount_point=self.mount)['data']['data'] cert_info_str = json.dumps(cert_info) logger.trace(f'does {pattern} match {cert_info_str}') if len(cert_info.keys()) > 0 and (pattern == "*" or cert_info_str.find(pattern) > 0): logger.trace(f'yes, {pattern} matches {cert_info_str}') current_date = datetime.now() cert_expiry_date = datetime.strptime(cert_info['validity']['not_valid_after'], '%Y-%m-%d %H:%M:%S') days_to_expire = cert_expiry_date - current_date results_dict = {'common_name': cert_common_name, 'vault_cert_path': f'/{self.mount}{rootpath}', 'cert_issue_date': cert_info['validity']['not_valid_before'], 'cert_expiry_date': cert_info['validity']['not_valid_after'], 'days_to_expire': days_to_expire.days } if 'commonName' in cert_info['issuer']: results_dict['issuer'] = cert_info['issuer']['commonName'] else: results_dict['issuer'] = "" if 'alternativeNames' in cert_info['subject']: results_dict['alternativeNames'] = cert_info['subject']['alternativeNames'] else: results_dict['alternativeNames'] = "" searchresults.append(results_dict) else: logger.trace(f'no, {pattern} not found {cert_info_path}') except requests.exceptions.ConnectionError as ve: logger.error(f'Failed to connect to {self.__url}: {ve}') sys.exit(2) except hvac.exceptions.Forbidden as ve: logger.error(f'Permission denied for reading from {rootpath}: {ve}') sys.exit(2) except hvac.exceptions.InvalidPath as ve: logger.error(f'Path not found for {rootpath}: {ve}') sys.exit(2) except hvac.exceptions.Unauthorized as ve: logger.error(f'Credentials not authorized to access {rootpath}: {ve}') sys.exit(2) else: return searchresults
[docs]class VaultStoreEngine(StoreEngine): """Vault implementation of the StoreEngine interface""" __client: VaultClient def __init__(self, settings) -> None: """Constructor for VaultStoreEngine""" super().__init__() self._settings = settings self.__client = VaultClient(settings) logger.debug(f'🔐 Vault backend configuration loaded. {self.__client.url}') if self.initialize(): logger.debug("🔐 Vault backend initialized.") else: logger.error(f'🛑 Failed to connect to Vault {self.__client.url}')
[docs] def initialize(self) -> bool: """Ensure the Vault client is initialized""" return self.__client.initialize()
[docs] def open(self) -> bool: """Ensure the Vault client is connected""" return self.__client.connect()
[docs] def close(self) -> bool: """Ensure we close the vault connection""" return self.__client.logout()
def __del__(self): """When the object is destroyed make sure we close the connection to Vault""" return self.close()
[docs] def write(self, obj: StoreObject) -> bool: """Given a StoreObject, store it into vault using mount/path/name == body,info :param obj: The StoreObject to persist :type obj: StoreObject :return: bool """ return self.__client.upsert(obj)
[docs] def read(self, path: str, name: str, type=None) -> StoreObject: """Using the provided path and name retrieve the data from the store and create a new StoreObject :param path: Store path to the object :type path: str :param name: Name of the object to retrieve :type name: str :param type: StoreObject type, if known :type type: str :return: StoreObject """ try: certbody, certinfo = self.__client.read(path, name, type) cert = StoreObject(name=name, path=path, body=certbody['data']['data'], info=certinfo['data']['data'], type=type) cert._data = {'cert_body': certbody['data']['data'], 'cert_info': certinfo['data']['data']} except Exception as vex: logger.error(f'Failed to read StoreObject /{self.__client.mount}{path}/{name} {vex}') sys.exit(2) else: logger.info(f' Successfully read {cert.path_name}') return cert
[docs] def find(self, pattern) -> list: """Search certificate info for a given search pattern :param pattern: Search glob pattern ex: *, abc.8x8.com, abc.8x8.com/*, 8x8.com/* :type pattern: str :return: list """ return self.__client.search(rootpath='/', rootkey="", pattern=pattern, searchresults=[])