Open
Description
Summary
This implementation scales from development environments to production systems handling multi-million record comparisons
#!/usr/bin/env python3
import mysql.connector
from mysql.connector import Error, pooling
from contextlib import contextmanager
from typing import List, Tuple, Optional, Dict, Any, Iterator
import logging
import hashlib
import time
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import os
from enum import Enum
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('cryptolib_db_validation.log')
]
)
logger = logging.getLogger('CryptoLib.DBValidation')
class ValidationLevel(Enum):
BASIC = "basic"
STANDARD = "standard"
FIPS_140 = "fips_140"
HIGH_ASSURANCE = "high_assurance"
@dataclass
class CryptoDBConfig:
host: str
port: int = 3306
database: str = "cryptolib_test"
user: str = "crypto_validator"
password: str = ""
ssl_disabled: bool = False
ssl_ca: Optional[str] = None
ssl_cert: Optional[str] = None
ssl_key: Optional[str] = None
pool_name: str = "crypto_pool"
pool_size: int = 3 # Conservative for crypto workloads
connection_timeout: int = 10
autocommit: bool = True
charset: str = 'utf8mb4'
use_unicode: bool = True
class CryptoValidationError(Exception):
def __init__(self, message: str, error_code: str = "CRYPTO_VALIDATION_ERROR", details: Optional[Dict] = None):
self.error_code = error_code
self.details = details or {}
super().__init__(message)
class SecureBinaryComparator:
@staticmethod
def constant_time_compare(data1: bytes, data2: bytes) -> bool:
if len(data1) != len(data2):
return False
result = 0
for b1, b2 in zip(data1, data2):
result |= b1 ^ b2
return result == 0
@staticmethod
def validate_crypto_arrays(arr1: List[Tuple], arr2: List[Tuple],
validation_level: ValidationLevel = ValidationLevel.STANDARD) -> Dict[str, Any]:
start_time = time.perf_counter()
if len(arr1) != len(arr2):
return {
'valid': False,
'error_code': 'LENGTH_MISMATCH',
'expected_length': len(arr1),
'actual_length': len(arr2),
'severity': 'CRITICAL',
'execution_time_ns': int((time.perf_counter() - start_time) * 1e9)
}
if not arr1 and not arr2:
return {
'valid': True,
'message': 'Both datasets empty - validation passed',
'execution_time_ns': int((time.perf_counter() - start_time) * 1e9)
}
if validation_level in [ValidationLevel.BASIC, ValidationLevel.STANDARD]:
try:
hash1 = hashlib.sha256(str(sorted(arr1)).encode('utf-8')).hexdigest()
hash2 = hashlib.sha256(str(sorted(arr2)).encode('utf-8')).hexdigest()
if hash1 == hash2:
return {
'valid': True,
'message': f'Datasets identical - {len(arr1)} records validated',
'validation_method': 'HASH_COMPARISON',
'execution_time_ns': int((time.perf_counter() - start_time) * 1e9)
}
except Exception as e:
logger.warning(f"Hash comparison failed, falling back to element-wise: {e}")
mismatches = []
crypto_specific_errors = []
for idx, (record1, record2) in enumerate(zip(arr1, arr2)):
if record1 != record2:
mismatch_detail = {
'index': idx,
'expected': record1,
'actual': record2,
'field_analysis': SecureBinaryComparator._analyze_crypto_fields(record1, record2)
}
mismatches.append(mismatch_detail)
if SecureBinaryComparator._is_crypto_sensitive_mismatch(record1, record2):
crypto_specific_errors.append({
'index': idx,
'error_type': 'CRYPTO_SENSITIVE_MISMATCH',
'details': 'Mismatch in cryptographically sensitive field'
})
if len(mismatches) >= 100:
break
execution_time_ns = int((time.perf_counter() - start_time) * 1e9)
if mismatches:
severity = 'CRITICAL' if crypto_specific_errors else 'HIGH'
return {
'valid': False,
'error_code': 'DATA_MISMATCH',
'total_mismatches': len([1 for r1, r2 in zip(arr1, arr2) if r1 != r2]),
'detailed_mismatches': mismatches,
'crypto_errors': crypto_specific_errors,
'severity': severity,
'total_records': len(arr1),
'execution_time_ns': execution_time_ns
}
return {
'valid': True,
'message': f'All {len(arr1)} records validated successfully',
'validation_method': 'ELEMENT_WISE',
'execution_time_ns': execution_time_ns
}
@staticmethod
def _analyze_crypto_fields(record1: Tuple, record2: Tuple) -> Dict[str, str]:
analysis = {}
if len(record1) != len(record2):
analysis['structure'] = 'FIELD_COUNT_MISMATCH'
return analysis
for i, (field1, field2) in enumerate(zip(record1, record2)):
if field1 != field2:
if isinstance(field1, (bytes, bytearray)) or isinstance(field2, (bytes, bytearray)):
analysis[f'field_{i}'] = 'BINARY_DATA_MISMATCH'
elif str(field1).startswith(('0x', '0X')) or str(field2).startswith(('0x', '0X')):
analysis[f'field_{i}'] = 'HEX_VALUE_MISMATCH'
else:
analysis[f'field_{i}'] = 'VALUE_MISMATCH'
return analysis
@staticmethod
def _is_crypto_sensitive_mismatch(record1: Tuple, record2: Tuple) -> bool:
for field1, field2 in zip(record1, record2):
field_str = str(field1).lower()
if any(keyword in field_str for keyword in ['key', 'iv', 'nonce', 'hash', 'signature', 'cipher']):
return True
return False
class CryptoDatabase:
def __init__(self, config: CryptoDBConfig):
self.config = config
self._connection_pool = None
self._initialize_secure_pool()
def _initialize_secure_pool(self) -> None:
try:
pool_config = {
'pool_name': self.config.pool_name,
'pool_size': self.config.pool_size,
'pool_reset_session': True,
'host': self.config.host,
'port': self.config.port,
'database': self.config.database,
'user': self.config.user,
'password': self.config.password,
'autocommit': self.config.autocommit,
'use_unicode': self.config.use_unicode,
'charset': self.config.charset,
'connection_timeout': self.config.connection_timeout,
'sql_mode': 'STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE,ERROR_FOR_DIVISION_BY_ZERO'
}
if not self.config.ssl_disabled:
if self.config.ssl_ca:
pool_config['ssl_ca'] = self.config.ssl_ca
if self.config.ssl_cert:
pool_config['ssl_cert'] = self.config.ssl_cert
if self.config.ssl_key:
pool_config['ssl_key'] = self.config.ssl_key
self._connection_pool = pooling.MySQLConnectionPool(**pool_config)
logger.info(f"Secure connection pool initialized: {self.config.pool_name}")
except Error as e:
raise CryptoValidationError(
f"Failed to initialize secure connection pool: {e}",
"POOL_INIT_FAILED",
{'config': self.config.pool_name}
)
@contextmanager
def get_connection(self):
connection = None
try:
connection = self._connection_pool.get_connection()
if connection.is_connected():
yield connection
else:
raise CryptoValidationError("Connection not established", "CONNECTION_FAILED")
except Error as e:
logger.error(f"Database connection error: {e}")
raise CryptoValidationError(f"Connection failed: {e}", "CONNECTION_ERROR")
finally:
if connection and connection.is_connected():
connection.close()
def fetch_crypto_test_data(self, table_name: str,
algorithm: Optional[str] = None,
key_length: Optional[int] = None,
test_case_id: Optional[str] = None,
order_by: str = "id",
limit: Optional[int] = None) -> List[Tuple]:
query_parts = [f"SELECT * FROM {table_name}"]
params = []
where_conditions = []
if algorithm:
where_conditions.append("algorithm = %s")
params.append(algorithm)
if key_length is not None:
where_conditions.append("key_length = %s")
params.append(key_length)
if test_case_id:
where_conditions.append("test_case_id = %s")
params.append(test_case_id)
if where_conditions:
query_parts.append("WHERE " + " AND ".join(where_conditions))
if order_by:
query_parts.append(f"ORDER BY {order_by}")
if limit:
query_parts.append(f"LIMIT {limit}")
query = " ".join(query_parts)
with self.get_connection() as connection:
cursor = connection.cursor(buffered=False)
try:
logger.debug(f"Executing crypto query: {query}")
start_time = time.perf_counter()
cursor.execute(query, params)
data = []
chunk_size = 5000 # Smaller chunks for crypto data
while True:
chunk = cursor.fetchmany(size=chunk_size)
if not chunk:
break
data.extend(chunk)
execution_time = time.perf_counter() - start_time
logger.info(f"Crypto query completed: {len(data)} records in {execution_time:.3f}s")
return data
except Error as e:
logger.error(f"Crypto query execution failed: {e}")
raise CryptoValidationError(f"Query failed: {e}", "QUERY_FAILED")
finally:
cursor.close()
def get_crypto_table_info(self, table_name: str) -> Dict[str, Any]:
with self.get_connection() as connection:
cursor = connection.cursor(dictionary=True)
try:
cursor.execute(f"SELECT COUNT(*) as record_count FROM {table_name}")
stats = cursor.fetchone()
cursor.execute(f"""
SELECT algorithm, COUNT(*) as count
FROM {table_name}
WHERE algorithm IS NOT NULL
GROUP BY algorithm
ORDER BY count DESC
""")
algorithms = cursor.fetchall()
cursor.execute(f"""
SELECT key_length, COUNT(*) as count
FROM {table_name}
WHERE key_length IS NOT NULL
GROUP BY key_length
ORDER BY key_length
""")
key_lengths = cursor.fetchall()
return {
'record_count': stats['record_count'],
'algorithms': algorithms,
'key_lengths': key_lengths,
'table_name': table_name
}
except Error as e:
logger.warning(f"Could not retrieve crypto table info: {e}")
return {'record_count': 0, 'table_name': table_name}
finally:
cursor.close()
class CryptoLibValidator:
def __init__(self, primary_config: CryptoDBConfig, secondary_config: CryptoDBConfig):
self.primary_db = CryptoDatabase(primary_config)
self.secondary_db = CryptoDatabase(secondary_config)
self.comparator = SecureBinaryComparator()
def validate_crypto_implementation(self,
test_table: str,
algorithm: Optional[str] = None,
validation_level: ValidationLevel = ValidationLevel.STANDARD) -> Dict[str, Any]:
try:
logger.info(f"Starting crypto validation: table={test_table}, algorithm={algorithm}, level={validation_level}")
with ThreadPoolExecutor(max_workers=2) as executor:
future_primary = executor.submit(
self.primary_db.fetch_crypto_test_data,
test_table, algorithm
)
future_secondary = executor.submit(
self.secondary_db.fetch_crypto_test_data,
test_table, algorithm
)
primary_data = future_primary.result()
secondary_data = future_secondary.result()
validation_result = self.comparator.validate_crypto_arrays(
primary_data, secondary_data, validation_level
)
validation_result.update({
'test_table': test_table,
'algorithm': algorithm,
'validation_level': validation_level.value,
'timestamp': time.time()
})
return validation_result
except Exception as e:
logger.error(f"Crypto validation failed: {e}")
return {
'valid': False,
'error_code': 'VALIDATION_EXCEPTION',
'message': str(e),
'test_table': test_table,
'algorithm': algorithm
}
def cleanup(self) -> None:
logger.info("Cleaning up crypto validation resources")
def enhanced_compare(arr1: List[Tuple], arr2: List[Tuple]) -> int:
comparator = SecureBinaryComparator()
result = comparator.validate_crypto_arrays(arr1, arr2, ValidationLevel.STANDARD)
if result['valid']:
print("Cryptographic test vectors match successfully!")
logger.info(f"Validation passed: {result.get('message', 'Arrays match')}")
return 0
else:
print(f"Cryptographic validation failed: {result['error_code']}")
if 'detailed_mismatches' in result:
print(f"Found {result['total_mismatches']} mismatches out of {result['total_records']} records")
for mismatch in result['detailed_mismatches'][:5]: # Show first 5
print(f" Index {mismatch['index']}: Expected {mismatch['expected']} != Actual {mismatch['actual']}")
logger.error(f"Validation failed: {result}")
return -1
class Database:
def __init__(self, host: str, port: int, db_name: str, table_name: str, user: str, password: str):
self.config = CryptoDBConfig(
host=host,
port=port,
database=db_name,
user=user,
password=password
)
self.table = table_name
self.crypto_db = CryptoDatabase(self.config)
self.data = []
def fetch_all_data(self) -> List[Tuple]:
try:
self.data = self.crypto_db.fetch_crypto_test_data(self.table)
return self.data
except CryptoValidationError as e:
logger.error(f"Failed to fetch crypto test data: {e}")
print(f"Error: {e}")
return []
def close_connection(self) -> None:
pass
def compare(arr1: List[Tuple], arr2: List[Tuple]) -> int:
return enhanced_compare(arr1, arr2)
if __name__ == "__main__":
try:
primary_db = Database("localhost", 3306, "cryptolib_test", "aes_test_vectors", "crypto_user", "crypto_pass")
secondary_db = Database("localhost", 3306, "cryptolib_test", "aes_test_vectors", "crypto_user", "crypto_pass")
primary_data = primary_db.fetch_all_data()
secondary_data = secondary_db.fetch_all_data()
result = compare(primary_data, secondary_data)
if result == 0:
logger.info("Cryptographic validation completed successfully")
else:
logger.error("Cryptographic validation failed")
sys.exit(1)
except Exception as e:
logger.error(f"Critical error in crypto validation: {e}")
sys.exit(1)
# Daily replication check
comparator = DatasetComparator(production_primary, production_replica)
result = comparator.compare_tables_intelligent("crypto_inventory", "else")
Use Case
NIST Test Vector Validation:
validator = CryptoLibValidator(reference_db_config, implementation_db_config)
result = validator.validate_crypto_implementation("aes_gcm_vectors", "AES-GCM-256", ValidationLevel.FIPS_140)
Metadata
Metadata
Assignees
Labels
No labels