0.10.1-beta
This is a plugin for OpenVPN Community Edition that allows OpenVPN to authenticate directly against Okta, with support for TOTP and Okta Verify Push factors.
At a high level, OpenVPN communicates with this plugin via a “control file”, a temporary file that OpenVPN creates and polls periodicly. If the plugin writes the ASCII character “1” into the control file, the user in question is allowed to log in to OpenVPN, if we write the ASCII character “0” into the file, the user is denied.
Below are the key parts of the code for okta_openvpn.py
:
- Instantiate an OktaOpenVPNValidator object
- Load in configuration file and environment variables
- Authenticate the user
- Write result to the control file
The code flow for authenticating a user is as follows:
Here is how we instantiate an OktaOpenVPNValidator object:
<<main-loop>>
Here is the run()
method of the OktaOpenVPNValidator class, this
is what calls the methods which load the configuration file and
environment variables, then calls the authenticate()
method.
<<oktaopenvpnvalidator-run>>
Here is the authenticate()
method:
def authenticate(self):
if not self.username_trusted:
log.warning("Username %s is not trusted - failing",
self.okta_config['username'])
return False
try:
okta = self.cls(**self.okta_config)
self.user_valid = okta.auth()
return self.user_valid
except Exception as exception:
log.error(
"User %s (%s) authentication failed, "
"because %s() failed unexpectedly - %s",
self.okta_config['username'],
self.okta_config['client_ipaddr'],
self.cls.__name__,
exception
)
return False
This code in turns calls the auth()
method in the OktaAPIAuth
class, which does the following:
- Makes an authentication request to Okta, using the
preauth()
method. - Checks for errors
- Log the user in if the reply was
SUCCESS
- Deny the user if the reply is
MFA_ENROLL
orMFA_ENROLL_ACTIVATE
If the response is MFA_REQUIRED
or MFA_CHALLENGE
then we do the
following, for each factor that the user has registered:
- Skip the factor if this code doesn’t support that factor type.
- Call
doauth()
, the second phase authentication, using the passcode (if we have one) and thestateToken
.- Keep running
doauth()
if the response type isMFA_CHALLENGE
orWAITING
.
- Keep running
- If there response from
doauth()
isSUCCESS
then log the user in. - Fail otherwise.
When returning errors, we prefer the summary strings in
errorCauses
, over those in errorSummary
because the strings in
errorCauses
tend to be mroe descriptive. For more information,
see the documentation for Verify Security Question Factor.
try:
rv = self.preauth()
except Exception as s:
log.error('Error connecting to the Okta API: %s', s)
return False
# Check for erros from Okta
if 'errorCauses' in rv:
msg = rv['errorSummary']
log.info('User %s pre-authentication failed: %s',
self.username,
msg)
return False
elif 'status' in rv:
status = rv['status']
# Check authentication status from Okta
if status == "SUCCESS":
log.info('User %s authenticated without MFA', self.username)
return True
elif status == "MFA_ENROLL" or status == "MFA_ENROLL_ACTIVATE":
log.info('User %s needs to enroll first', self.username)
return False
elif status == "MFA_REQUIRED" or status == "MFA_CHALLENGE":
log.debug("User %s password validates, checking second factor",
self.username)
res = None
for factor in rv['_embedded']['factors']:
supported_factor_types = ["token:software:totp", "push"]
if factor['factorType'] not in supported_factor_types:
continue
fid = factor['id']
state_token = rv['stateToken']
try:
res = self.doauth(fid, state_token)
check_count = 0
fctr_rslt = 'factorResult'
while fctr_rslt in res and res[fctr_rslt] == 'WAITING':
print("Sleeping for {}".format(
self.mfa_push_delay_secs))
time.sleep(self.mfa_push_delay_secs)
res = self.doauth(fid, state_token)
check_count += 1
if check_count > self.mfa_push_max_retries:
log.info('User %s MFA push timed out' %
self.username)
return False
except Exception as e:
log.error('Unexpected error with the Okta API: %s', e)
return False
if 'status' in res and res['status'] == 'SUCCESS':
log.info("User %s is now authenticated "
"with MFA via Okta API", self.username)
return True
if 'errorCauses' in res:
msg = res['errorCauses'][0]['errorSummary']
log.debug('User %s MFA token authentication failed: %s',
self.username,
msg)
return False
else:
log.info("User %s is not allowed to authenticate: %s",
self.username,
status)
return False
Important: The key thing to know about OpenVPN plugins (like this one) are that they communicate with OpenVPN through a control file. When OpenVPN calls a plugin, it first creates a temporary file, passes the name of the temporary file to the plugin, then waits for the temporary file to be written.
If a ”1” is written to the file, OpenVPN logs the user in. If a ”0” is written to the file, the user is denied.
Here is what the code does below:
Because of how critical this control file is, we take the precaution of checking the permissions on the control file before writing anything to the file.
If the user authentication that happened previously was a success, we write a 1 to the file. Otherwise, we write a 0 to the file, denying the user by default.
def write_result_to_control_file(self):
self.check_control_file_permissions()
try:
with open(self.control_file, 'w') as f:
if self.user_valid:
f.write('1')
else:
f.write('0')
except IOError:
log.critical("Failed to write to OpenVPN control file '{}'".format(
self.control_file
))
Read the source on GitHub: https://github.com/okta/okta-openvpn
Key files to read:
- https://github.com/okta/okta-openvpn/blob/master/tests/test_OktaOpenVPNValidator.py
- https://github.com/okta/okta-openvpn/blob/master/okta_openvpn.py
These are the libraries that okta_openvpn.py
uses. Of note are:
cryptography
andcertifi
, which are used for key pinning checks.urllib3
which is used to make requests to the Okta API.okta_pinset
which contains the “keypins” or “fingerprints” used to verify that we are connecting directly to the Okta API.
import ConfigParser
from ConfigParser import MissingSectionHeaderError
import base64
import hashlib
import json
import logging
import logging.handlers
import os
import platform
import stat
import sys
import time
import urlparse
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import certifi
import urllib3
from okta_pinset import okta_pinset
This defines the User Agent for okta_openvpn.py
. Here is an
example of what a User Agent would look like:
OktaOpenVPN/0.10.0 (Darwin 12.4.0) CPython/2.7.5
This User Agent has three parts:
- The name and version of the software.
- The Operating System name and version.
In this case “Darwin” refers to the open-source Unix operating system from Apple. Version 12.4.0 of Darwin corrisponds with OS X Mountain Lion 10.8.4.
- The programming language interepreter and version.
In this case, “CPython” refers to the reference implementation of Python, which is written in C
version = "<<version>>"
# OktaOpenVPN/0.10.0 (Darwin 12.4.0) CPython/2.7.5
user_agent = ("OktaOpenVPN/{version} "
"({system} {system_version}) "
"{implementation}/{python_version}").format(
version=version,
system=platform.uname()[0],
system_version=platform.uname()[2],
implementation=platform.python_implementation(),
python_version=platform.python_version())
This sets up logging, by default we send everything
(logging.DEBUG
) to syslog. Also included is commented out code to
also log to STDERR and/or logging to a file
(/tmp/okta_openvpn.log
by default)
log = logging.getLogger('okta_openvpn')
log.setLevel(logging.DEBUG)
syslog = logging.handlers.SysLogHandler()
syslog_fmt = "%(module)s-%(processName)s[%(process)d]: %(name)s: %(message)s"
syslog.setFormatter(logging.Formatter(syslog_fmt))
log.addHandler(syslog)
# # Uncomment to enable logging to STDERR
# errlog = logging.StreamHandler()
# errlog.setFormatter(logging.Formatter(syslog_fmt))
# log.addHandler(errlog)
# # Uncomment to enable logging to a file
# filelog = logging.FileHandler('/tmp/okta_openvpn.log')
# filelog.setFormatter(logging.Formatter(syslog_fmt))
# log.addHandler(filelog)
These are custom exceptions that we use throw for error conditions that are unique to this script.
PinError
is used when the pin for the public key of the remote
TLS certificate isn’t found in our set of valid pins.
ControlFilePermissionsError
is used when we encounter a
permissions error related to the control file used to communicate
success/failure of an authentication to OpenVPN.
class PinError(Exception):
"Raised when a pin isn't found in a certificate"
pass
class ControlFilePermissionsError(Exception):
"Raised when the control file or containing directory have bad permissions"
pass
This code is used to implement HPKP-style key pinning with the Okta
API. The code works by extending the urllib3.HTTPSConnectionPool
object, implementing the _validate_conn
which is run to validate
connections.
Essentially, this code hashes the public key of the remote TLS certificate and compares the hash against a whitelist of hashes.
class PublicKeyPinsetConnectionPool(urllib3.HTTPSConnectionPool):
def __init__(self, *args, **kwargs):
self.pinset = kwargs.pop('assert_pinset', None)
super(PublicKeyPinsetConnectionPool, self).__init__(*args, **kwargs)
def _validate_conn(self, conn):
super(PublicKeyPinsetConnectionPool, self)._validate_conn(conn)
if not conn.is_verified:
raise Exception("Unexpected verification error.")
cert = conn.sock.getpeercert(binary_form=True)
public_key = x509.load_der_x509_certificate(
cert,
default_backend()).public_key()
public_key_raw = public_key.public_bytes(
serialization.Encoding.DER,
serialization.PublicFormat.SubjectPublicKeyInfo)
public_key_sha256 = hashlib.sha256(public_key_raw).digest()
public_key_sha256_base64 = base64.b64encode(public_key_sha256)
if public_key_sha256_base64 not in self.pinset:
pin_failure_message = (
'Refusing to authenticate '
'because host {remote_host} failed '
'a TLS public key pinning check. '
'Please contact [email protected] with this error message'
).format(remote_host=conn.host)
log.critical(pin_failure_message)
raise PinError("Public Key not found in pinset!")
This code that communicates with the Okta API.
class OktaAPIAuth(object):
def __init__(self, okta_url, okta_token,
username, password, client_ipaddr,
mfa_push_delay_secs=None,
mfa_push_max_retries=None,
assert_pinset=None):
passcode_len = 6
self.okta_url = None
self.okta_token = okta_token
self.username = username
self.password = password
self.client_ipaddr = client_ipaddr
self.passcode = None
self.okta_urlparse = urlparse.urlparse(okta_url)
self.mfa_push_delay_secs = mfa_push_delay_secs
self.mfa_push_max_retries = mfa_push_max_retries
if assert_pinset is None:
assert_pinset = okta_pinset
url_new = (self.okta_urlparse.scheme,
self.okta_urlparse.netloc,
'', '', '', '')
self.okta_url = urlparse.urlunparse(url_new)
if password and len(password) > passcode_len:
last = password[-passcode_len:]
if last.isdigit():
self.passcode = last
self.password = password[:-passcode_len]
self.pool = PublicKeyPinsetConnectionPool(
self.okta_urlparse.hostname,
self.okta_urlparse.port,
assert_pinset=assert_pinset,
cert_reqs='CERT_REQUIRED',
ca_certs=certifi.where(),
)
def okta_req(self, path, data):
ssws = "SSWS {token}".format(token=self.okta_token)
headers = {
'user-agent': user_agent,
'content-type': 'application/json',
'accept': 'application/json',
'authorization': ssws,
}
url = "{base}/api/v1{path}".format(base=self.okta_url, path=path)
req = self.pool.urlopen(
'POST',
url,
headers=headers,
body=json.dumps(data)
)
return json.loads(req.data)
def preauth(self):
path = "/authn"
data = {
'username': self.username,
'password': self.password,
}
return self.okta_req(path, data)
def doauth(self, fid, state_token):
path = "/authn/factors/{fid}/verify".format(fid=fid)
data = {
'fid': fid,
'stateToken': state_token,
'passCode': self.passcode,
}
return self.okta_req(path, data)
def auth(self):
username = self.username
password = self.password
status = False
rv = False
invalid_username_or_password = (
username is None or
username == '' or
password is None or
password == '')
if invalid_username_or_password:
log.info("Missing username or password for user: %s (%s) - "
"Reported username may be 'None' due to this",
username,
self.client_ipaddr)
return False
if not self.passcode:
log.info("No second factor found for username %s", username)
log.debug("Authenticating username %s", username)
<<okta-api-auth-auth-method>>
In short, this class gets the “environment” set up for the OktaAPIAuth class. It reads in configuration files and environment variables, makes sure that permissions are correct on the “Control File”, calls OktaAPIAuth and writes to the Control File as approprate.
class OktaOpenVPNValidator(object):
def __init__(self):
self.cls = OktaAPIAuth
self.username_trusted = False
self.user_valid = False
self.control_file = None
self.site_config = {}
self.config_file = None
self.env = os.environ
self.okta_config = {}
self.username_suffix = None
self.always_trust_username = False
# These can be modified in the 'okta_openvpn.ini' file.
# By default, we retry for 2 minutes:
self.mfa_push_max_retries = "20"
self.mfa_push_delay_secs = "3"
def read_configuration_file(self):
cfg_path_defaults = [
'/etc/openvpn/okta_openvpn.ini',
'/etc/okta_openvpn.ini',
'okta_openvpn.ini']
cfg_path = cfg_path_defaults
parser_defaults = {
'AllowUntrustedUsers': self.always_trust_username,
'UsernameSuffix': self.username_suffix,
'MFAPushMaxRetries': self.mfa_push_max_retries,
'MFAPushDelaySeconds': self.mfa_push_delay_secs,
}
if self.config_file:
cfg_path = []
cfg_path.append(self.config_file)
log.debug(cfg_path)
for cfg_file in cfg_path:
if os.path.isfile(cfg_file):
try:
cfg = ConfigParser.ConfigParser(defaults=parser_defaults)
cfg.read(cfg_file)
self.site_config = {
'okta_url': cfg.get('OktaAPI', 'Url'),
'okta_token': cfg.get('OktaAPI', 'Token'),
'mfa_push_max_retries': cfg.get('OktaAPI',
'MFAPushMaxRetries'),
'mfa_push_delay_secs': cfg.get('OktaAPI',
'MFAPushDelaySeconds'),
}
always_trust_username = cfg.get(
'OktaAPI',
'AllowUntrustedUsers')
if always_trust_username == 'True':
self.always_trust_username = True
self.username_suffix = cfg.get('OktaAPI', 'UsernameSuffix')
return True
except MissingSectionHeaderError as e:
log.debug(e)
if 'okta_url' not in self.site_config and \
'okta_token' not in self.site_config:
log.critical("Failed to load config")
return False
def load_environment_variables(self):
if 'okta_url' not in self.site_config:
log.critical('OKTA_URL not defined in configuration')
return False
if 'okta_token' not in self.site_config:
log.critical('OKTA_TOKEN not defined in configuration')
return False
# Taken from a validated VPN client-side SSL certificate
username = self.env.get('common_name')
password = self.env.get('password')
client_ipaddr = self.env.get('untrusted_ip', '0.0.0.0')
# Note:
# username_trusted is True if the username comes from a certificate
#
# Meaning, if self.common_name is NOT set, but self.username IS,
# then self.username_trusted will be False
if username is not None:
self.username_trusted = True
else:
# This is set according to what the VPN client has sent us
username = self.env.get('username')
if self.always_trust_username:
self.username_trusted = self.always_trust_username
if self.username_suffix and '@' not in username:
username = username + '@' + self.username_suffix
self.control_file = self.env.get('auth_control_file')
if self.control_file is None:
log.info(("No control file found, "
"if using a deferred plugin "
"authentication will stall and fail."))
self.okta_config = {
'okta_url': self.site_config['okta_url'],
'okta_token': self.site_config['okta_token'],
'username': username,
'password': password,
'client_ipaddr': client_ipaddr,
}
for item in ['mfa_push_max_retries', 'mfa_push_delay_secs']:
if item in self.site_config:
self.okta_config[item] = self.site_config[item]
assert_pin = self.env.get('assert_pin')
if assert_pin:
self.okta_config['assert_pinset'] = [assert_pin]
<<validator-authenticate>>
def check_control_file_permissions(self):
file_mode = os.stat(self.control_file).st_mode
if file_mode & stat.S_IWGRP or file_mode & stat.S_IWOTH:
log.critical(
'Refusing to authenticate. The file %s'
' must not be writable by non-owners.',
self.control_file
)
raise ControlFilePermissionsError()
dir_name = os.path.split(self.control_file)[0]
dir_mode = os.stat(dir_name).st_mode
if dir_mode & stat.S_IWGRP or dir_mode & stat.S_IWOTH:
log.critical(
'Refusing to authenticate.'
' The directory containing the file %s'
' must not be writable by non-owners.',
self.control_file
)
raise ControlFilePermissionsError()
<<write-result-to-control-file>>
<<oktaopenvpnvalidator-run>>
def run(self):
self.read_configuration_file()
self.load_environment_variables()
self.authenticate()
self.write_result_to_control_file()
If the user is valid, we exit with “0”. If the user is not valid,
we exit with “1”. This was split out into a seperate function to
avoid confusion seeing sys.exit(0)
in the code.
def return_error_code_for(validator):
if validator.user_valid:
sys.exit(0)
else:
sys.exit(1)
Checking if __name__
equals "__main__"
is the Pythonic way of
detecting if this code has been called from the command line (as
opposed to being included via an import
statement).
# This is tested by test_command.sh via tests/test_command.py
if __name__ == "__main__": # pragma: no cover
validator = OktaOpenVPNValidator()
validator.run()
return_error_code_for(validator)
#!/usr/bin/env python2
# vim: set noexpandtab:ts=4
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
# Contributors: [email protected]
<<imports>>
<<setup-useragent>>
<<setup-logging>>
<<custom-exceptions>>
<<publickey-pinset-connectionpool>>
<<okta-api-auth>>
<<okta-openvpn-validator>>
<<return-error-code-for>>
<<main-loop>>
Below are a list of “Pins” (or fingerprints) for the public keys that Okta uses, or will use, in TLS certificates.
There are total of 16 pins below, 4 pins per domain, for two public domains and two private domains that Okta uses for testing.
Here is how to generate a “pin” using openssl
command line
utilities:
<<fetch-tls-certificate>> |
<<extract-public-key>> |
<<convert-public-key-to-der>> |
<<create-sha256-base64-hash>>
Here is what this command does, line by line:
echo -n | openssl s_client -connect example.com:443
This fetches a TLS certificate from a server, printing the X.509
formatted certificate on STDOUT. echo -n
is needed because
s_client
expects something on STDIN.
openssl x509 -noout -pubkey
This takes an X.509 certificate on STDIN and prints a PEM formatted public key on STDOUT.
openssl rsa -pubin -outform der
This takes a PEM encoded public key on STDIN (-pubin
) and
prints the DER formatted key on STDOUT.
openssl dgst -sha256 -binary | base64
This makes a SHA-256 hash of STDIN, which is then converted to the Base64 encoding scheme.
# # Here is how a pin like those below may be generated:
# <<create-pins>>
okta_pinset = [
# okta.com
'r5EfzZxQVvQpKo3AgYRaT7X2bDO/kj3ACwmxfdT2zt8=',
'MaqlcUgk2mvY/RFSGeSwBRkI+rZ6/dxe/DuQfBT/vnQ=',
'72G5IEvDEWn+EThf3qjR7/bQSWaS2ZSLqolhnO6iyJI=',
'rrV6CLCCvqnk89gWibYT0JO6fNQ8cCit7GGoiVTjCOg=',
# oktapreview.com
'jZomPEBSDXoipA9un78hKRIeN/+U4ZteRaiX8YpWfqc=',
'axSbM6RQ+19oXxudaOTdwXJbSr6f7AahxbDHFy3p8s8=',
'SE4qe2vdD9tAegPwO79rMnZyhHvqj3i5g1c2HkyGUNE=',
'ylP0lMLMvBaiHn0ihLxHjzvlPVQNoyQ+rMiaj0da/Pw=',
# internal testing
'W2qOJ9F9eo3CYHzL5ZIjYEizINI1cUPEb7yD45ihTXg=',
'PJ1QGTlW5ViFNhswMsYKp4X8C7KdG8nDW4ZcXLmYMyI=',
'5LlRWGTBVjpfNXXU5T7cYVUbOSPcgpMgdjaWd/R9Leg=',
'lpaMLlEsp7/dVZoeWt3f9ciJIMGimixAIaKNsn9/bCY=',
# internal testing
'Uit61pzomPOIy0svL1z4OUx3FMBr9UWQVdyG7ZlSLK8=',
'Ul2vkypIA80/JDebYsXq8FGdtmtrx5WJAAHDlSwWOes=',
'rx1UuNLIkJs53Jd60G/zY947XcDIf56JyM/yFJyR/GE=',
'VvpiE4cl60BvOU8X4AfkWeUPsmRUSh/nVbJ2rnGDZHI=',
]
Here is how to debug using a local version of the mock server, via ngrok:
self.example_dot_com_pin = ( 'wiviOfSDwIlXvBBiGcwtOsGjCN+73Qo2Xxe5NRI0zwA=') self.herokuapp_dot_com_pin = ( - '2hLOYtjSs5a3Jxy5GVM5EMuqa3JHhR6gM99EoaDauug=') + 'zyLK9e1SySrnnTDsqXISq1MppH4OvOcJRM9eh0Rm8AA=') + # '2hLOYtjSs5a3Jxy5GVM5EMuqa3JHhR6gM99EoaDauug=') self.okta_url = os.environ.get( 'okta_url_mock', - 'https://mocked-okta-api.herokuapp.com') + 'https://0414e2d8.ngrok.io') + # 'https://mocked-okta-api.herokuapp.com') self.okta_token = 'mocked-token-for-openvpn' self.username_prefix = 'user_MFA_REQUIRED' self.username_suffix = 'example.com'
--- a/tests/test_ssl_public_key_pinning.py +++ b/tests/test_ssl_public_key_pinning.py @@ -80,7 +80,7 @@ class TestOktaAPIAuthTLSPinning(OktaTestCase): last_error = self.okta_log_messages['critical'][-1:][0] messages = [ 'efusing to authenticate', - 'mocked-okta-api.herokuapp.com', + # 'mocked-okta-api.herokuapp.com', 'TLS public key pinning check', 'lease contact [email protected]', ]