Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions vertica_python/tests/integration_tests/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,57 @@ def test_sslcontext_verify_full(self):
res = self._query_and_fetchone(self.SSL_STATE_SQL)
self.assertEqual(res[0], 'Server')

def test_tls13_support_auto_negotiation(self):
"""
Verify that the client supports TLS 1.3 negotiation.
If the server supports TLS 1.3, the connection should establish using it.
If the server supports only TLS 1.2, the connection should still succeed.
"""

# Set up server certificates and enable TLS
CA_cert = self._generate_and_set_certificates()

# Create SSL context allowing both TLS 1.2 and 1.3
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
ssl_context.load_verify_locations(cadata=CA_cert)

# Assign SSL context to connection info
self._conn_info['ssl'] = ssl_context

with self._connect() as conn:
cur = conn.cursor()
res = self._query_and_fetchone(self.SSL_STATE_SQL)
self.assertEqual(res[0], 'Server')

# Try to get the negotiated TLS version from the socket
tls_version = None
try:
if hasattr(conn._socket, "_sslobj"):
tls_version = conn._socket._sslobj.version()
elif hasattr(conn._socket, "version"):
tls_version = conn._socket.version()
except Exception:
pass

# Log version for debug (optional)
print(f"Negotiated TLS version: {tls_version}")

# Ensure TLS negotiation was successful
self.assertIsNotNone(tls_version, "Could not determine negotiated TLS version")

# Accept both 1.2 and 1.3, but prefer 1.3 if available
self.assertIn(
tls_version, ("TLSv1.2", "TLSv1.3"),
msg=f"Unexpected TLS version negotiated: {tls_version}"
)

if tls_version == "TLSv1.3":
print("TLS 1.3 is successfully negotiated and supported.")
else:
print("Fell back to TLS 1.2 (TLS 1.3 not supported by server).")

def test_sslcontext_mutual_TLS(self):
# Setting certificates with TLS configuration
CA_cert = self._generate_and_set_certificates(mutual_mode=True)
Expand Down
Loading