|
| 1 | +import unittest |
| 2 | +import logging |
| 3 | +import time |
| 4 | +from pyspark.sql import SparkSession |
| 5 | +from splicemachine.spark.context import PySpliceContext |
| 6 | +import uuid |
| 7 | +import tempfile |
| 8 | +import os |
| 9 | + |
| 10 | + |
| 11 | +class PySpliceTest(unittest.TestCase): |
| 12 | + |
| 13 | + |
| 14 | + @classmethod |
| 15 | + def create_spark_session(cls): |
| 16 | + spark_session = SparkSession.builder.getOrCreate() |
| 17 | + #spark_session.sparkContext.setLogLevel("ERROR") |
| 18 | + logger = spark_session.sparkContext._jvm.org.apache.log4j |
| 19 | + logger.LogManager.getLogger("org").setLevel(logger.Level.OFF) |
| 20 | + logger.LogManager.getLogger("akka").setLevel(logger.Level.OFF) |
| 21 | + return spark_session |
| 22 | + |
| 23 | + @classmethod |
| 24 | + def create_testing_pysplice_session(cls,spark_session): |
| 25 | + db_url = 'jdbc:splice://localhost:1527/splicedb;user=splice;password=admin' |
| 26 | + splice_context = PySpliceContext(db_url,spark_session) |
| 27 | + return splice_context |
| 28 | + |
| 29 | + @classmethod |
| 30 | + def setUp(cls): |
| 31 | + cls.spark_session = cls.create_spark_session() |
| 32 | + cls.splice_context = cls.create_testing_pysplice_session(spark_session=cls.spark_session) |
| 33 | + |
| 34 | + |
| 35 | + @classmethod |
| 36 | + def tearDown(cls): |
| 37 | + cls.spark_session.stop() |
| 38 | + |
| 39 | + |
| 40 | +class Test(PySpliceTest): |
| 41 | + def test_analyzeSchema(self): |
| 42 | + self.splice_context.analyzeSchema("splice") |
| 43 | + assert True |
| 44 | + |
| 45 | + |
| 46 | + def test_analyzeTable(self): |
| 47 | + self.splice_context.analyzeTable("sys.systables") |
| 48 | + assert True |
| 49 | + |
| 50 | + def test_executeUpdate(self): |
| 51 | + self.splice_context.executeUpdate("drop table if exists splice.systables") |
| 52 | + self.splice_context.executeUpdate("create table systables as select * from sys.systables") |
| 53 | + assert self.splice_context.tableExists("SPLICE.SYSTABLES") |
| 54 | + |
| 55 | + def test_dropTable(self): |
| 56 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test_droptable") |
| 57 | + self.splice_context.executeUpdate("create table pysplice_test_droptable ( COL1 int primary key)") |
| 58 | + self.splice_context.dropTable("splice.pysplice_test_droptable") |
| 59 | + cnt = self.splice_context.df("select count(*) as cnt from sys.sysschemas a join sys.systables b on a.SCHEMAID = b.SCHEMAID where a.SCHEMANAME = 'SPLICE' and b.TABLENAME = 'PYSPLICE_TEST_DROPTABLE'").collect()[0]['CNT'] |
| 60 | + assert cnt == 0 |
| 61 | + |
| 62 | + def test_df(self): |
| 63 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test_df") |
| 64 | + test_df_df = self.spark_session.createDataFrame([[1],[2]], "COL1: int") |
| 65 | + self.splice_context.executeUpdate("create table pysplice_test_df ( COL1 int primary key)") |
| 66 | + self.splice_context.insert(test_df_df,"splice.pysplice_test_df") |
| 67 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test_df").collect()[0]['CNT'] |
| 68 | + assert cnt == 2 |
| 69 | + |
| 70 | + def test_delete(self): |
| 71 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test") |
| 72 | + test_delete_df = self.spark_session.createDataFrame([[1],[2]], "COL1: int") |
| 73 | + self.splice_context.executeUpdate("create table pysplice_test ( COL1 int primary key)") |
| 74 | + self.splice_context.insert(test_delete_df,"splice.pysplice_test") |
| 75 | + self.splice_context.delete(test_delete_df,"splice.pysplice_test") |
| 76 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test").collect()[0]['CNT'] |
| 77 | + self.splice_context.dropTable("splice.pysplice_test") |
| 78 | + assert cnt == 0 |
| 79 | + |
| 80 | + def test_execute(self): |
| 81 | + self.splice_context.execute("select count(*) from sys.systables") |
| 82 | + assert True |
| 83 | + |
| 84 | + def test_export(self): |
| 85 | + test_export_df = self.spark_session.createDataFrame([[1],[2]], "COL1:int") |
| 86 | + temp_dir = tempfile.gettempdir() |
| 87 | + file = os.path.join(temp_dir,str(uuid.uuid4()) + '.csv') |
| 88 | + print(file) |
| 89 | + self.splice_context.export(test_export_df,file) |
| 90 | + test_export_load_df = self.spark_session.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").csv(file,inferSchema=True) |
| 91 | + assert test_export_df.count() == test_export_load_df.count() |
| 92 | + |
| 93 | + def test_exportBinary(self): |
| 94 | + test_exportBinary_df = self.spark_session.createDataFrame([[1],[2]], "COL1:int") |
| 95 | + temp_dir = tempfile.gettempdir() |
| 96 | + file = os.path.join(temp_dir,str(uuid.uuid4()) + '.parquet') |
| 97 | + self.splice_context.exportBinary(test_exportBinary_df,file,False,"parquet") |
| 98 | + load_df = self.spark_session.read.parquet(file) |
| 99 | + assert test_exportBinary_df.count() == load_df.count() |
| 100 | + |
| 101 | + def test_getSchema(self): |
| 102 | + systables_schema_from_df = self.splice_context.df("select * from sys.systables").schema |
| 103 | + systables_schema = self.splice_context.getSchema("sys.systables") |
| 104 | + assert systables_schema_from_df == systables_schema |
| 105 | + |
| 106 | + def test_insert(self): |
| 107 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test_insert") |
| 108 | + test_insert_df = self.spark_session.createDataFrame([[1],[2]], "COL1 : int") |
| 109 | + self.splice_context.executeUpdate("create table splice.pysplice_test_insert ( col1 int primary key)") |
| 110 | + time.sleep(10) |
| 111 | + self.splice_context.insert(test_insert_df,"splice.pysplice_test_insert") |
| 112 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test_insert").collect()[0]['CNT'] |
| 113 | + assert cnt == 2 |
| 114 | + |
| 115 | + def test_internalDf(self): |
| 116 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test") |
| 117 | + test_internalDf_df = self.spark_session.createDataFrame([[1],[2]], "COL1 : int") |
| 118 | + self.splice_context.executeUpdate("create table splice.pysplice_test ( col1 int primary key)") |
| 119 | + self.splice_context.insert(test_internalDf_df,"splice.pysplice_test") |
| 120 | + cnt = self.splice_context.internalDf("select count(*) as cnt from splice.pysplice_test").collect()[0]['CNT'] |
| 121 | + assert cnt == 2 |
| 122 | + |
| 123 | + def test_tableExists(self): |
| 124 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test") |
| 125 | + self.splice_context.executeUpdate("create table pysplice_test ( COL1 int primary key)") |
| 126 | + return self.splice_context.tableExists("splice.pysplice_test") |
| 127 | + |
| 128 | + def test_truncateTable(self): |
| 129 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test") |
| 130 | + test_truncateTable_df = self.spark_session.createDataFrame([[1],[2]], "COL1: int") |
| 131 | + self.splice_context.executeUpdate("create table pysplice_test ( col1 int primary key)") |
| 132 | + self.splice_context.insert(test_truncateTable_df,"splice.pysplice_test") |
| 133 | + self.splice_context.truncateTable("splice.pysplice_test") |
| 134 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test").collect()[0]['CNT'] |
| 135 | + assert cnt == 0 |
| 136 | + |
| 137 | + def test_update(self): |
| 138 | + test_update_df = self.spark_session.createDataFrame([[1,2],[2,3]], "COL1:int,COL2:int") |
| 139 | + test_update_update_df = self.spark_session.createDataFrame([[1,2],[2,4]],"COL1:int,COL2:int") |
| 140 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test_update") |
| 141 | + self.splice_context.executeUpdate("create table pysplice_test_update ( COL1 int primary key,COL2 int)") |
| 142 | + self.splice_context.insert(test_update_df,"splice.pysplice_test_update") |
| 143 | + self.splice_context.update(test_update_update_df,"splice.pysplice_test_update") |
| 144 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test_update where col2 = 4").collect()[0]["CNT"] |
| 145 | + assert cnt == 1 |
| 146 | + |
| 147 | + def test_upsert(self): |
| 148 | + test_upsert_df = self.spark_session.createDataFrame([[1,2],[2,3]], "COL1:int,COL2:int") |
| 149 | + test_upsert_upsert_df = self.spark_session.createDataFrame([[1,2],[2,4],[3,3]],"COL1:int,COL2:int") |
| 150 | + self.splice_context.executeUpdate("drop table if exists splice.pysplice_test_upsert") |
| 151 | + self.splice_context.executeUpdate("create table pysplice_test_upsert ( COL1 int primary key,COL2 int)") |
| 152 | + self.splice_context.insert(test_upsert_df,"splice.pysplice_test_upsert") |
| 153 | + self.splice_context.upsert(test_upsert_upsert_df,"splice.pysplice_test_upsert") |
| 154 | + cnt = self.splice_context.df("select count(*) as cnt from splice.pysplice_test_upsert where col2 = 2 or col1 = 3").collect()[0]["CNT"] |
| 155 | + assert cnt == 2 |
0 commit comments