"""
Apache Software License 2.0
Copyright (c) 2020, 8x8, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import ast
import datetime
import enum
import json
from binascii import hexlify
import validators
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import Encoding
from dynaconf import LazySettings
from jinja2 import Environment
from jinja2 import FileSystemLoader
from loguru import logger
from ..backend import StoreObject
from .cert_engine import CertDnsEngine
[docs]class Cert(StoreObject):
"""Object representation of a TLS certificate"""
_body: str #: String representation of private, chain and public portions of certificate as a map/json
_info: str #: Certificate details
_data: {} #: Combined body and info map
_path: str #: Objects stored using <mount><path><name><type>
_policy: str #: Vault access policy, gen from jinja template, explicit to instance of cert
_mount: str #: Based on certificate its mount is either KNOX_VAULT_MOUNT or KNOX_VAULT_MOUNT/client
_file: object #: Raw file contents of certificate
_x509: x509 #: Parsed data object from raw file
_common_name: str #: Defaults to value from certificate
_type: str #: Certificate type identifier
_jinja: Environment #: Template engine
[docs] class CertTypes(enum.Enum):
PEM = 1
DER = 2
PFX = 3
[docs] @classmethod
def valid(cls, name) -> bool:
return any(x for x in cls if x.name == name)
PEM = CertTypes.PEM
DER = CertTypes.DER
PFX = CertTypes.PFX
def __init__(self, settings: LazySettings, common_name=None) -> None:
"""Constructor for Cert"""
self._settings = settings
self._common_name = self.valid_name(common_name)
self._body = ""
self._info = ""
self._type = ""
super().__init__(name=self.name, path=self.path, body=self._body, info=self._info)
self._jinja = Environment(loader=FileSystemLoader('templates'))
self._tmpl_body = self._jinja.get_template('body_template.js')
self._tmpl_info = self._jinja.get_template('info_template.js')
self._tmpl_data = self._jinja.get_template('data_template.js')
self._tmpl_policy = self._jinja.get_template('policy_template.js')
[docs] def load_x509(self, path: str) -> None:
"""Given path to PEM x509 read in certificate
:param path: File path to x509 PEM file
:type path: str
"""
self.type = Cert.PEM.name
with open(path, mode='r+', encoding='utf-8') as fp:
self._file = fp.read()
self._x509 = x509.load_pem_x509_certificate(bytes(self._file, 'utf-8'), default_backend())
"""Generate data structures using custom Jinja2 templates"""
self._info = ast.literal_eval(self._tmpl_info.render(cert=self))
self._body = ast.literal_eval(self._tmpl_body.render(cert=self))
self._data = ast.literal_eval(self._tmpl_data.render(cert=self))
"""Ensure raw file contents in public key, Jinja2 fails to parse if there are CR LF"""
self.public = self._file
"""Match the objects common name to the true common name from the certificate and
swap out '*' astrix for the keyword wildcard.
If the certificate does not have a common name then use the user provided name
"""
if 'commonName' in self._data['cert_info']['subject'] \
and len(self._data['cert_info']['subject']['commonName']) > 0:
self._common_name = self._data['cert_info']['subject']['commonName']
else:
logger.warning(f'Certificate {self.name} does not have a value for common name.')
[docs] def policy(self) -> str:
self._policy = self._tmpl_policy.render(cert=self)
return self._policy
[docs] def load(self, pub: str, key: str, certtype: enum.Enum = PEM, chain: str = None) -> None:
"""Read in components of a certificate, given filename paths for each
:param pub: File name of public portion of key
:type pub: str
:param key: File name of private portion of key
:type key: str
:param chain: File name of intermediate certificates. Optional as they could be in pub
:type chain: str
:param certtype: Enum of certificate types [PEM=1, DER=2]
:type certtype: Enum
"""
if certtype == Cert.PEM.name:
self.load_x509(pub)
with open(key, mode="r") as key_fp:
self._body['cert_body']['private'] = key_fp.read()
if chain:
with open(chain, mode="r") as chain_fp:
self._body['cert_body']['chain'] = chain_fp.read()
self._data['cert_body'] = self._body['cert_body']
self._data['cert_policy'] = self.policy()
if not validators.domain(self.name):
logger.info(f'{self.name} appears to be a client/server certificate not a web/https certificate')
[docs] @classmethod
def valid_name(cls, value: str) -> str:
"""Some engines might have problems with astrix, as they are used for glob searching and or RBAC.
Replace it with the key word 'wildcard'. This does not affect the actual certificate."""
name = value.replace('*', 'wildcard')
if validators.domain(name):
return name
else:
return value
@property
def mount(self) -> str:
if validators.domain(self._common_name):
self._mount = f"{self._settings['KNOX_VAULT_MOUNT']}/"
else:
self._mount = f"{self._settings['KNOX_VAULT_MOUNT']}/client"
return self._mount
@property
def policy_mount(self) -> str:
if validators.domain(self._common_name.replace('*', 'wildcard')):
self._mount = f"{self._settings['KNOX_VAULT_MOUNT']}/data"
else:
self._mount = f"{self._settings['KNOX_VAULT_MOUNT']}/data/client"
return self._mount
[docs] def subject(self) -> str:
"""Return the certificate subject details"""
subj = {attr.oid._name: attr.value for attr in self._x509.subject}
subj['alternativeNames'] = self.subjectaltnames()
return json.dumps(subj, indent=8)
[docs] def subjectaltnames(self) -> str:
"""Return Subject alternate names"""
try:
cert = self._x509
ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME)
return json.dumps(f'{ext.value.get_values_for_type(x509.DNSName)}')
except Exception as ex: # noqa E722
return ''
@property
def type(self) -> str:
return self._type
@type.setter
def type(self, value: str) -> None:
if Cert.CertTypes.valid(value):
self._type = value
[docs] def issuer(self) -> str:
"""Return the certificate issuer details"""
return json.dumps({attr.oid._name: attr.value for attr in self._x509.issuer}, indent=8)
[docs] def validity(self) -> str:
"""Return the certificates dates of validity"""
cert = self._x509
return json.dumps({
'not_valid_before': f'{cert.not_valid_before}',
'not_valid_after': f'{cert.not_valid_after}',
}, indent=8)
[docs] def key_details(self) -> str:
"""Return characteristics of key used to generate the certificate"""
cert = self._x509
public_key = self._x509.public_key()
key_info = {'size': public_key.key_size}
if isinstance(public_key, rsa.RSAPublicKey):
key_type = 'RSA'
elif isinstance(public_key, dsa.DSAPublicKey):
key_type = 'DSA'
elif isinstance(public_key, ec.EllipticCurvePublicKey):
key_type = 'ECC'
key_info['curve'] = public_key.curve.name
else:
raise ValueError('Invalid key type.')
key_info['type'] = key_type
return json.dumps({
'version': cert.version.name,
'fingerprint_sha256': hexlify(cert.fingerprint(hashes.SHA256())).decode(),
'serial_number': f'{cert.serial_number}',
'key': key_info
}, indent=8)
[docs] def isValid(self) -> bool:
"""Check certificate validity period"""
logger.trace(f'Is the date within the validity dates?\n\tNot valid after: {self._x509.not_valid_after}'
f'\n\tNow: {datetime.datetime.now()}'
f'\n\tNot valid before {self._x509.not_valid_before}')
return self._x509.not_valid_after > datetime.datetime.now() > self._x509.not_valid_before
[docs] @staticmethod
def to_store_path(common_name: str) -> str:
"""Generate a backend store path based on the certificates common name
www.example.com becomes /com/example/www
:return: str
"""
domainsplit = common_name.split('.')
return "/" + "/".join(reversed(domainsplit))
@property
def name(self) -> str:
self._name = self.valid_name(self._common_name)
return self._name
@property
def path(self) -> str:
if validators.domain(self.name):
self._path = Cert.to_store_path(self.name)
else:
self._path = ''
return self._path
def __str__(self) -> str:
return json.dumps(self._data, indent=4)
@property
def private(self) -> str:
"""Unless its a dict, its not loaded yet"""
if isinstance(self._body, dict):
return json.dumps(self._body['cert_body']['private']).replace('\n', '')
else:
return ""
@property
def chain(self) -> str:
"""Unless its a dict, its not loaded yet"""
if isinstance(self._body, dict):
return json.dumps(self._body['cert_body']['chain']).replace('\n', '')
else:
return ""
@chain.setter
def chain(self, value) -> None:
if isinstance(self._body, dict):
self._body['cert_body']['chain'] = value
self._data['cert_body']['chain'] = value
@property
def public(self) -> str:
"""Convenience method for Jinja2 templates. Jinja2 does not process the string if it has carriage returns."""
if self.type == Cert.PEM.name:
return self._x509.public_bytes(Encoding.PEM).decode('utf-8').replace('\n', '')
else:
raise CertUnsupportedTypeException(type=self.type)
@public.setter
def public(self, value: str) -> None:
if isinstance(self._body, dict):
self._body['cert_body']['public'] = value
self._data['cert_body']['public'] = value
[docs] def info(self) -> str:
return json.dumps(self._info['cert_info'], indent=4)
[docs] def body(self) -> str:
return json.dumps(self._body['cert_body'], indent=4)
@property
def data(self) -> str:
"""Content to persist, typically JSON"""
return self._data
[docs] def generate(self) -> None:
""" Generate certificate for a given common name"""
try:
cde = CertDnsEngine(self._settings)
certfile, chainfile, privkey = cde.call_provider(self._common_name)
self.load(pub=certfile, key=privkey, chain=chainfile, certtype=Cert.PEM.name)
except Exception:
logger.error(f'Failed to generate certificate {self._common_name}')
class CertUnsupportedTypeException(Exception):
"""Exception raised for an unrecognized certificate type"""
def __init__(self, type: str = None) -> None:
self.type = type
super().__init__()
def __str__(self):
return f'Unsupported certificate type: {self.type}'