|
330 | 330 | === "Python"
|
331 | 331 | ```python
|
332 | 332 | # Python 示例
|
333 |
| - # pip install requests drf-httpsig |
334 |
| - import requests, datetime, json |
335 |
| - from httpsig.requests_auth import HTTPSignatureAuth |
336 |
| - |
337 |
| - def get_auth(KeyID, SecretID): |
| 333 | + |
| 334 | + # 安装依赖 |
| 335 | + # pip install requests |
| 336 | + |
| 337 | + import requests |
| 338 | + import datetime |
| 339 | + import json |
| 340 | + import hmac |
| 341 | + import hashlib |
| 342 | + import base64 |
| 343 | + |
| 344 | + def generate_signature(secret, string_to_sign): |
| 345 | + """ |
| 346 | + 生成 HMAC-SHA256 签名 |
| 347 | + :param secret: 密钥 |
| 348 | + :param string_to_sign: 待签名的字符串 |
| 349 | + :return: 签名后的字符串 |
| 350 | + """ |
| 351 | + h = hmac.new(secret.encode('utf-8'), string_to_sign.encode('utf-8'), hashlib.sha256) |
| 352 | + return base64.b64encode(h.digest()).decode('utf-8') |
| 353 | + |
| 354 | + def get_auth_header(key_id, secret, method, path, headers): |
| 355 | + """ |
| 356 | + 生成签名认证头部 |
| 357 | + :param key_id: 访问密钥 ID |
| 358 | + :param secret: 访问密钥 |
| 359 | + :param method: 请求方法 |
| 360 | + :param path: 请求路径 |
| 361 | + :param headers: 请求头部 |
| 362 | + :return: 包含签名的认证头部 |
| 363 | + """ |
| 364 | + # 定义需要签名的头部 |
338 | 365 | signature_headers = ['(request-target)', 'accept', 'date']
|
339 |
| - auth = HTTPSignatureAuth(key_id=KeyID, secret=SecretID, algorithm='hmac-sha256', headers=signature_headers) |
340 |
| - return auth |
341 |
| - |
342 |
| - def get_user_info(jms_url, auth): |
343 |
| - url = jms_url + '/api/v1/users/users/' |
| 366 | + # 构建待签名的字符串 |
| 367 | + request_target = f"{method.lower()} {path}" |
| 368 | + string_to_sign = f"(request-target): {request_target}\n" |
| 369 | + for header in signature_headers[1:]: |
| 370 | + string_to_sign += f"{header}: {headers[header]}\n" |
| 371 | + string_to_sign = string_to_sign.rstrip() |
| 372 | + # 生成签名 |
| 373 | + signature = generate_signature(secret, string_to_sign) |
| 374 | + # 构建认证头部 |
| 375 | + auth_header = f'Signature keyId="{key_id}",algorithm="hmac-sha256",headers="{" ".join(signature_headers)}",signature="{signature}"' |
| 376 | + return auth_header |
| 377 | + |
| 378 | + def get_user_info(jms_url, key_id, secret): |
| 379 | + """ |
| 380 | + 获取用户信息 |
| 381 | + :param jms_url: Jumpserver 的 URL |
| 382 | + :param key_id: 访问密钥 ID |
| 383 | + :param secret: 访问密钥 |
| 384 | + """ |
| 385 | + url = jms_url + '/api/v1/users/profile/' |
| 386 | + path = url.replace(jms_url, '') |
344 | 387 | gmt_form = '%a, %d %b %Y %H:%M:%S GMT'
|
345 | 388 | headers = {
|
346 |
| - 'Accept': 'application/json', |
| 389 | + 'accept': 'application/json', |
347 | 390 | 'X-JMS-ORG': '00000000-0000-0000-0000-000000000002',
|
348 |
| - 'Date': datetime.datetime.utcnow().strftime(gmt_form) |
| 391 | + 'date': datetime.datetime.utcnow().strftime(gmt_form) |
349 | 392 | }
|
350 |
| - |
351 |
| - response = requests.get(url, auth=auth, headers=headers) |
352 |
| - print(json.loads(response.text)) |
353 |
| - |
| 393 | + # 生成认证头部 |
| 394 | + auth_header = get_auth_header(key_id, secret, 'GET', path, headers) |
| 395 | + headers['Authorization'] = auth_header |
| 396 | + # 发送请求 |
| 397 | + response = requests.get(url, headers=headers, verify=False) |
| 398 | + try: |
| 399 | + print(json.loads(response.text)) |
| 400 | + except json.JSONDecodeError: |
| 401 | + print(f"Failed to decode JSON response: {response.text}") |
| 402 | + |
354 | 403 | if __name__ == '__main__':
|
355 | 404 | jms_url = 'https://demo.jumpserver.org'
|
356 | 405 | KeyID = 'AccessKeyID'
|
357 | 406 | SecretID = 'AccessKeySecret'
|
358 |
| - auth = get_auth(KeyID, SecretID) |
359 |
| - get_user_info(jms_url, auth) |
| 407 | + get_user_info(jms_url, KeyID, SecretID) |
360 | 408 | ```
|
361 | 409 |
|
362 | 410 | === "Golang"
|
|
0 commit comments