Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
meng-han committed Oct 14, 2024
1 parent d630ab9 commit afbb479
Showing 1 changed file with 79 additions and 79 deletions.
158 changes: 79 additions & 79 deletions confidant/routes/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
from confidant.utils import misc

logger = logging.getLogger(__name__)
blueprint = blueprints.Blueprint("certificates", __name__)
blueprint = blueprints.Blueprint('certificates', __name__)

acl_module_check = misc.load_module(settings.ACL_MODULE)


@blueprint.route("/v1/certificates/<ca>/<cn>", methods=["GET"])
@blueprint.route('/v1/certificates/<ca>/<cn>', methods=['GET'])
@authnz.require_auth
def get_certificate(ca, cn):
"""
'''
Get a certificate from the provided CA, for the provided CN.
.. :quickref: Certificate; Get certificate from the provided CA, for the
Expand Down Expand Up @@ -61,52 +61,52 @@ def get_certificate(ca, cn):
Content-Type: application/json
{
'certificate': '---...BEGIN...',
'certificate_chain': '---...BEGIN...',
'key': '---...BEGIN...'
"certificate": "---...BEGIN...",
"certificate_chain": "---...BEGIN...",
"key": "---...BEGIN..."
}
:resheader Content-Type: application/json
:statuscode 200: success
:statuscode 403: client does not have access to generate the requested
certificate.
"""
'''
try:
ca_object = certificatemanager.get_ca(ca)
except CertificateAuthorityNotFoundError:
return jsonify({"error": "Provided CA not found."}), 404
san = request.args.getlist("san")
return jsonify({'error': 'Provided CA not found.'}), 404
san = request.args.getlist('san')

logged_in_user = authnz.get_logged_in_user()
if not acl_module_check(
resource_type="certificate",
action="get",
resource_type='certificate',
action='get',
resource_id=cn,
kwargs={
"ca": ca,
"san": san,
'ca': ca,
'san': san,
},
):
msg = (
"{} does not have access to get certificate cn {} against" " ca {}"
'{} does not have access to get certificate cn {} against' ' ca {}'
).format(
authnz.get_logged_in_user(),
cn,
ca,
)
error_msg = {"error": msg, "reference": cn}
error_msg = {'error': msg, 'reference': cn}
return jsonify(error_msg), 403

logger.info(
"get_certificate called on id=%s for ca=%s by user=%s",
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

validity = request.args.get(
"validity",
default=ca_object.settings["max_validity_days"],
'validity',
default=ca_object.settings['max_validity_days'],
type=int,
)
try:
Expand All @@ -117,23 +117,23 @@ def get_certificate(ca, cn):
)
except CertificateNotReadyError:
# Ratelimit response for a locked certificate in the cache
error_msg = "Certificate being requested, please wait and try again."
error_msg = 'Certificate being requested, please wait and try again.'
response = jsonify(error_msg)
response.retry_after = 2
return response, 429
certificate_response = CertificateResponse(
certificate=certificate["certificate"],
certificate_chain=certificate["certificate_chain"],
key=certificate["key"],
certificate=certificate['certificate'],
certificate_chain=certificate['certificate_chain'],
key=certificate['key'],
)
return certificate_expanded_response_schema.dumps(certificate_response)


@blueprint.route("/v1/certificates/<ca>", methods=["POST"])
@blueprint.route('/v1/certificates/<ca>', methods=['POST'])
@authnz.require_auth
@authnz.require_csrf_token
def get_certificate_from_csr(ca):
"""
'''
Get a certificate from the ca provided in the url, using the CSR, validity
and san provided in the POST body.
Expand Down Expand Up @@ -164,8 +164,8 @@ def get_certificate_from_csr(ca):
Content-Type: application/json
{
'certificate': '---...BEGIN...',
'certificate_chain': '---...BEGIN...'
"certificate": "---...BEGIN...",
"certificate_chain": "---...BEGIN..."
}
:resheader Content-Type: application/json
Expand All @@ -174,30 +174,30 @@ def get_certificate_from_csr(ca):
or was missing from the request.
:statuscode 403: Client does not have access to generate the requested
certificate.
"""
'''
try:
ca_object = certificatemanager.get_ca(ca)
except CertificateAuthorityNotFoundError:
return jsonify({"error": "Provided CA not found."}), 404
return jsonify({'error': 'Provided CA not found.'}), 404
data = request.get_json()
if not data or not data.get("csr"):
if not data or not data.get('csr'):
return (
jsonify(
{"error": "csr must be provided in the POST body."},
{'error': 'csr must be provided in the POST body.'},
),
400,
)
validity = data.get(
"validity",
ca_object.settings["max_validity_days"],
'validity',
ca_object.settings['max_validity_days'],
)
try:
csr = ca_object.decode_csr(data["csr"])
csr = ca_object.decode_csr(data['csr'])
except Exception:
logger.exception("Failed to decode PEM csr")
logger.exception('Failed to decode PEM csr')
return (
jsonify(
{"error": "csr could not be decoded"},
{'error': 'csr could not be decoded'},
),
400,
)
Expand All @@ -208,40 +208,40 @@ def get_certificate_from_csr(ca):

logged_in_user = authnz.get_logged_in_user()
if not acl_module_check(
resource_type="certificate",
action="get",
resource_type='certificate',
action='get',
resource_id=cn,
kwargs={
"ca": ca,
"san": san,
'ca': ca,
'san': san,
},
):
msg = (
f"{authnz.get_logged_in_user()} does not have access to get"
"certificate cn {cn} against ca {ca}"
f'{authnz.get_logged_in_user()} does not have access to get'
'certificate cn {cn} against ca {ca}'
)
error_msg = {"error": msg, "reference": cn}
error_msg = {'error': msg, 'reference': cn}
return jsonify(error_msg), 403

logger.info(
"get_certificate called on id=%s for ca=%s by user=%s",
'get_certificate called on id=%s for ca=%s by user=%s',
cn,
ca,
logged_in_user,
)

certificate = ca_object.issue_certificate(data["csr"], validity)
certificate = ca_object.issue_certificate(data['csr'], validity)
certificate_response = CertificateResponse(
certificate=certificate["certificate"],
certificate_chain=certificate["certificate_chain"],
certificate=certificate['certificate'],
certificate_chain=certificate['certificate_chain'],
)
return certificate_response_schema.dumps(certificate_response)


@blueprint.route("/v1/cas", methods=["GET"])
@blueprint.route('/v1/cas', methods=['GET'])
@authnz.require_auth
def list_cas():
"""
'''
List the configured CAs.
.. :quickref: Certificate Authorities; Get a list of the detailed
Expand All @@ -261,12 +261,12 @@ def list_cas():
Content-Type: application/json
{
'cas': [
'example-ca': {
'certificate': '---...BEGIN...',
'certificate_chain': '---...BEGIN...',
'tags': {
'hello': 'world'
"cas": [
"example-ca": {
"certificate": "---...BEGIN...",
"certificate_chain": "---...BEGIN...",
"tags": {
"hello": "world"
},
...
]
Expand All @@ -275,31 +275,31 @@ def list_cas():
:resheader Content-Type: application/json
:statuscode 200: success
:statuscode 403: client does not have access to list CAs
"""
'''

logged_in_user = authnz.get_logged_in_user()
if not acl_module_check(
resource_type="ca",
action="list",
resource_type='ca',
action='list',
):
msg = "{} does not have access to list cas".format(
msg = '{} does not have access to list cas'.format(
authnz.get_logged_in_user(),
)
error_msg = {"error": msg}
error_msg = {'error': msg}
return jsonify(error_msg), 403

cas = certificatemanager.list_cas()

logger.info("list_cas called by user=%s", logged_in_user)
logger.info('list_cas called by user=%s', logged_in_user)

cas_response = CertificateAuthoritiesResponse.from_cas(cas)
return certificate_authorities_response_schema.dumps(cas_response)


@blueprint.route("/v1/cas/<ca>", methods=["GET"])
@blueprint.route('/v1/cas/<ca>', methods=['GET'])
@authnz.require_auth
def get_ca(ca):
"""
'''
Get the CA information for the provided ca.
.. :quickref: Certificate Authorities; Get the detailed certificate
Expand All @@ -323,42 +323,42 @@ def get_ca(ca):
Content-Type: application/json
{
'example-ca': {
'certificate': '---...BEGIN...',
'certificate_chain': '---...BEGIN...',
'tags': {
'hello': 'world'
"example-ca": {
"certificate": "---...BEGIN...",
"certificate_chain": "---...BEGIN...",
"tags": {
"hello": "world"
}
}
:resheader Content-Type: application/json
:statuscode 200: Success
:statuscode 403: Client does not have access to get the requested CA.
"""
'''
try:
ca_object = certificatemanager.get_ca(ca)
except CertificateAuthorityNotFoundError:
return jsonify({"error": "Provided CA not found."}), 404
return jsonify({'error': 'Provided CA not found.'}), 404

logged_in_user = authnz.get_logged_in_user()

if not acl_module_check(
resource_type="ca",
action="get",
resource_type='ca',
action='get',
resource_id=ca,
):
msg = f"""
msg = f'''
{authnz.get_logged_in_user()} does not have access to get ca {ca}
"""
error_msg = {"error": msg, "reference": ca}
'''
error_msg = {'error': msg, 'reference': ca}
return jsonify(error_msg), 403

logger.info("get_ca called on id=%s by user=%s", ca, logged_in_user)
logger.info('get_ca called on id=%s by user=%s', ca, logged_in_user)
_ca = ca_object.get_certificate_authority_certificate()
ca_response = CertificateAuthorityResponse(
ca=_ca["ca"],
certificate=_ca["certificate"],
certificate_chain=_ca["certificate_chain"],
tags=_ca["tags"],
ca=_ca['ca'],
certificate=_ca['certificate'],
certificate_chain=_ca['certificate_chain'],
tags=_ca['tags'],
)
return certificate_authority_response_schema.dumps(ca_response)

0 comments on commit afbb479

Please sign in to comment.