|
1 | | -from generateJWT import JWTGenerator |
2 | | - |
3 | 1 | from flask import Flask |
4 | 2 | from flask import request |
5 | 3 | from flask import jsonify |
6 | 4 | from flask import make_response |
7 | 5 | from flask import render_template |
8 | | -from cryptography.hazmat.primitives.serialization import load_pem_private_key |
9 | | -from cryptography.hazmat.backends import default_backend |
10 | 6 | from datetime import timedelta, timezone, datetime |
11 | 7 | import logging |
12 | 8 | import requests |
|
16 | 12 | SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0') |
17 | 13 | SERVICE_PORT = os.getenv('SERVER_PORT', 8888) |
18 | 14 | CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') |
| 15 | +SNOWFLAKE_HOST = os.getenv('SNOWFLAKE_HOST') |
| 16 | +SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT') |
19 | 17 |
|
20 | 18 |
|
21 | 19 | def get_logger(logger_name): |
@@ -48,7 +46,7 @@ def readiness_probe(): |
48 | 46 | @app.get("/get") |
49 | 47 | def get_func(): |
50 | 48 | response = make_response("GET success!") |
51 | | - response.headers['Access-Control-Allow-Origin'] = 'https://b3efa44-sfengineering-prod2-snowservices-test2.snowflakecomputing.app' |
| 49 | + response.headers['Access-Control-Allow-Origin'] = 'https://localhost:9999' |
52 | 50 | response.headers['Access-Control-Allow-Credentials'] = True |
53 | 51 | response.headers['Access-Control-Allow-Headers'] = 'Fake-Header-X,Fake-Header-Y,Fake-Header-Z' |
54 | 52 |
|
@@ -88,74 +86,6 @@ def ui(): |
88 | 86 |
|
89 | 87 | return render_template("basic_ui.html") |
90 | 88 |
|
91 | | -def get_p8(): |
92 | | - p8 = "" |
93 | | - with open("./rsa_key.p8", 'rb') as pem_in: |
94 | | - pemlines = pem_in.read() |
95 | | - try: |
96 | | - # Try to access the private key without a passphrase. |
97 | | - p8 = load_pem_private_key(pemlines, None, default_backend()) |
98 | | - logger.debug(f"p8: {p8}") |
99 | | - except TypeError: |
100 | | - logger.error("Failed getting private key. ") |
101 | | - return p8 |
102 | | - |
103 | | -def token_exchange(token, role, endpoint, snowflake_account_url, isPat): |
104 | | - scope_role = f'session:role:{role}' |
105 | | - scope = f'{scope_role} {endpoint}' |
106 | | - |
107 | | - if isPat: |
108 | | - data = { |
109 | | - 'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange', |
110 | | - 'scope': scope, |
111 | | - 'subject_token': token, |
112 | | - 'subject_token_type': 'programmatic_access_token' |
113 | | - } |
114 | | - else: |
115 | | - data = { |
116 | | - 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', |
117 | | - 'scope': scope, |
118 | | - 'assertion': token, |
119 | | - } |
120 | | - |
121 | | - logger.info(data) |
122 | | - response = _do_token_exchange(data, snowflake_account_url) |
123 | | - return response.text |
124 | | - |
125 | | -def _do_token_exchange(data, snowflake_account_url) -> requests.Response: |
126 | | - url = f'{snowflake_account_url}/oauth/token' |
127 | | - response = requests.post(url, data=data, verify=False) |
128 | | - logger.info("snowflake jwt response code : %s" % response.status_code) |
129 | | - assert 200 == response.status_code, "unable to get snowflake token" |
130 | | - return response |
131 | | - |
132 | | - |
133 | | -@app.route('/jwt', methods=['POST']) |
134 | | -def handle_data(): |
135 | | - if request.method == 'POST': |
136 | | - data = request.json |
137 | | - logger.debug(f'data when calling /jwt: {data}') |
138 | | - # Load the private key from the specified file. |
139 | | - user = data.get("user") |
140 | | - role = data.get("role") |
141 | | - isPat = data.get("isPat") |
142 | | - snowflake_account_url = data.get("snowflake_account_url") |
143 | | - snowflake_account_hostname = snowflake_account_url[8:] |
144 | | - account = snowflake_account_hostname.split('.')[0] |
145 | | - logger.debug(f'Account from Snowflake Account URL: {account}') |
146 | | - endpoint = data.get("endpoint") |
147 | | - key = data.get("key") |
148 | | - if isPat: |
149 | | - snowflake_jwt = token_exchange(key, role=role, endpoint=endpoint, |
150 | | - snowflake_account_url=snowflake_account_url, isPat=isPat) |
151 | | - else: |
152 | | - token = JWTGenerator(account, user, key, timedelta(minutes=59), |
153 | | - timedelta(minutes=54)).get_token() |
154 | | - snowflake_jwt = token_exchange(token, role=role, endpoint=endpoint, |
155 | | - snowflake_account_url=snowflake_account_url, isPat=isPat) |
156 | | - |
157 | | - return snowflake_jwt |
158 | | - |
159 | 89 | @app.after_request |
160 | 90 | def apply_csp(response): |
161 | 91 | csp = ( |
|
0 commit comments