forked from GoogleCloudDataproc/initialization-actions
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpyspark_metastore_test.py
36 lines (28 loc) · 1.02 KB
/
pyspark_metastore_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/python
import pyspark
import random
import string
# Constants that must match the constants in cloud-sql-proxy.sh
METASTORE_DB='hive_metastore'
HIVE_USER='hive'
HIVE_USER_PASSWORD='hive-password'
sc = pyspark.SparkContext()
sqlContext = pyspark.sql.HiveContext(sc)
# Find available table name.
table_names = sqlContext.tableNames()
test_table_name = None
while not test_table_name or test_table_name in table_names:
test_table_name = 'table_' + ''.join(
[random.choice(string.ascii_lowercase) for x in range(4)])
# Create table.
sqlContext.range(10).write.saveAsTable(test_table_name)
# Read table metadata from Cloud SQL proxy.
tables = sqlContext.read.jdbc(
'jdbc:mysql:///{}?user={}&password={}'.format(
METASTORE_DB, HIVE_USER, HIVE_USER_PASSWORD),
'TBLS')
test_table = tables.where(tables.TBL_NAME == test_table_name).collect()[0]
print 'Successfully found table {} in Cloud SQL Hive metastore'.format(
test_table.TBL_NAME)
# Clean up table.
sqlContext.sql('DROP TABLE ' + test_table_name)