Skip to content
This repository was archived by the owner on Apr 15, 2022. It is now read-only.

Commit 100cb95

Browse files
Amrit BavejaAmrit Baveja
authored andcommitted
initial commit of pysplicecontext.
0 parents  commit 100cb95

File tree

5 files changed

+239
-0
lines changed

5 files changed

+239
-0
lines changed

setup.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from setuptools import setup, find_packages
2+
dependencies = [
3+
"atomicwrites==1.1.5",
4+
"attrs==18.1.0",
5+
"more-itertools==4.2.0",
6+
"pluggy==0.6.0",
7+
"py==1.5.3",
8+
"py4j==0.10.7",
9+
"pyspark==2.3.1",
10+
"pytest==3.6.1",
11+
"six==1.11.0"
12+
]
13+
setup(
14+
name="splicemachine",
15+
version="0.2.2",
16+
install_requires=dependencies,
17+
packages=['splicemachine'],
18+
)

splicemachine/__init__.py

Whitespace-only changes.

splicemachine/context.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from __future__ import print_function
2+
3+
from pyspark.sql import DataFrame
4+
from py4j.java_gateway import java_import
5+
6+
7+
class PySpliceContext:
8+
"""
9+
This class implements a SpliceMachineContext object (similar to the SparkContext object)
10+
"""
11+
12+
def __init__(self, JDBC_URL, sparkSQLContext, _unitTesting=False):
13+
"""
14+
:param JDBC_URL: (string) The JDBC URL Connection String for your Splice Machine Cluster
15+
:param sparkSQLContext: (sparkContext) A SparkContext Object for executing Spark Queries
16+
"""
17+
self.jdbcurl = JDBC_URL
18+
self._unitTesting = _unitTesting
19+
20+
if not _unitTesting: # Private Internal Argument to Override Using JVM
21+
self.sparkSQLContext = sparkSQLContext
22+
self.jvm = self.sparkSQLContext._sc._jvm
23+
java_import(self.jvm, "com.splicemachine.spark.splicemachine.*")
24+
java_import(self.jvm, "org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}")
25+
java_import(self.jvm, "scala.collection.JavaConverters._")
26+
self.context = self.jvm.com.splicemachine.spark.splicemachine.SplicemachineContext(self.jdbcurl)
27+
28+
else:
29+
from .utils import FakeJContext
30+
self.sparkSQLContext = sparkSQLContext
31+
self.jvm = ''
32+
self.context = FakeJContext(self.jdbcurl)
33+
34+
def getConnection(self):
35+
"""
36+
Return a connection to the database
37+
"""
38+
return self.context.getConnection()
39+
40+
def tableExists(self, schemaTableName):
41+
"""
42+
Check whether or not a table exists
43+
44+
:param schemaTableName: (string) Table Name
45+
"""
46+
return self.context.tableExists(schemaTableName)
47+
48+
def dropTable(self, schemaTableName): # works
49+
"""
50+
Drop a specified table.
51+
52+
:param schemaTableName (optional): (string) schemaName.tableName
53+
"""
54+
return self.context.dropTable(schemaTableName)
55+
56+
def df(self, sql):
57+
"""
58+
Return a Spark Dataframe from the results of a Splice Machine SQL Query
59+
60+
:param sql: (string) SQL Query (eg. SELECT * FROM table1 WHERE column2 > 3)
61+
:return: A Spark DataFrame containing the results
62+
"""
63+
if self._unitTesting:
64+
return self.context.df(sql)
65+
return DataFrame(self.context.df(sql), self.sparkSQLContext)
66+
67+
def insert(self, dataFrame, schemaTableName):
68+
"""
69+
Insert a RDD into a table (schema.table). The schema is required since RDD's do not have schema.
70+
71+
:param dataFrame: (RDD) The dataFrame you would like to insert
72+
:param schemaTableName: (string) The table in which you would like to insert the RDD
73+
"""
74+
return self.context.insert(dataFrame._jdf, schemaTableName)
75+
76+
def delete(self, dataFrame, schemaTableName):
77+
"""
78+
Delete records in a dataframe based on joining by primary keys from the data frame.
79+
Be careful with column naming and case sensitivity.
80+
81+
:param dataFrame: (RDD) The dataFrame you would like to delete
82+
:param schemaTableName: (string) Splice Machine Table
83+
"""
84+
return self.context.delete(dataFrame._jdf, schemaTableName)
85+
86+
def update(self, dataFrame, schemaTableName):
87+
"""
88+
Update data from a dataframe for a specified schemaTableName (schema.table).
89+
The keys are required for the update and any other columns provided will be updated in the rows.
90+
91+
:param dataFrame: (RDD) The dataFrame you would like to update
92+
:param schemaTableName: (string) Splice Machine Table
93+
:return:
94+
"""
95+
return self.context.update(dataFrame._jdf, schemaTableName)
96+
97+
def getSchema(self, schemaTableName):
98+
"""
99+
Return the schema via JDBC.
100+
101+
:param schemaTableName: (string) Table name
102+
"""
103+
return self.context.getSchema(schemaTableName)
104+

splicemachine/test_context.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import pyspark
2+
from .context import PySpliceContext
3+
from .utils import fakeDf
4+
5+
conf = pyspark.SparkConf().setAppName('Unit Test Python SpliceContext API')
6+
sc = pyspark.SparkContext(conf=conf)
7+
sqlContext = pyspark.sql.SQLContext(sc)
8+
spliceContext = PySpliceContext('', sqlContext, _unitTesting=True)
9+
10+
11+
class TestContext:
12+
def test_getConnection(self):
13+
out = spliceContext.getConnection()
14+
assert out['event'] == 'get connection'
15+
16+
def test_tableExists(self):
17+
out = spliceContext.tableExists('schema1.table1')
18+
assert out['event'] == 'table exists'
19+
assert out['schemaTableName'] == 'schema1.table1'
20+
assert out['schemaName'] == 'schema1'
21+
assert out['tableName'] == 'table1'
22+
23+
def test_dropTable(self):
24+
out = spliceContext.dropTable('schema2.table3')
25+
assert out['event'] == 'drop table'
26+
assert out['schemaTableName'] == 'schema2.table3'
27+
assert out['schemaName'] == 'schema2'
28+
assert out['tableName'] == 'table3'
29+
30+
def test_df(self):
31+
out = spliceContext.df('SELECT * FROM table1')
32+
assert out['sql'] == 'SELECT * FROM table1'
33+
assert out['event'] == 'df'
34+
35+
def test_insert(self):
36+
out = spliceContext.insert(fakeDf(), 'schema.table94')
37+
assert out['tableName'] == 'table94'
38+
assert out['schemaTableName'] == 'schema.table94'
39+
assert out['schemaName'] == 'schema'
40+
assert out['event'] == 'insert'
41+
42+
def test_delete(self):
43+
out = spliceContext.delete(fakeDf(), 'schema4.table4')
44+
assert out['tableName'] == 'table4'
45+
assert out['schemaTableName'] == 'schema4.table4'
46+
assert out['schemaName'] == 'schema4'
47+
assert out['event'] == 'delete'
48+
49+
def test_update(self):
50+
out = spliceContext.update(fakeDf(), 'schema0.table390')
51+
assert out['tableName'] == 'table390'
52+
assert out['schemaTableName'] == 'schema0.table390'
53+
assert out['schemaName'] == 'schema0'
54+
assert out['event'] == 'update'
55+
56+
def test_getSchema(self):
57+
out = spliceContext.getSchema('schema41.table12')
58+
assert out['event'] == 'getSchema'
59+
assert out['schemaTableName'] == 'schema41.table12'
60+
assert out['schemaName'] == 'schema41'
61+
assert out['tableName'] == 'table12'

splicemachine/utils.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
def _generateOperationsTable(**kwargs):
2+
"""
3+
Usage: _generateOperationsTable(event='get connection')
4+
--> {'event': 'get connection'}
5+
"""
6+
return kwargs
7+
8+
9+
class fakeDf(object):
10+
def __init__(self):
11+
self._jdf = ''
12+
13+
14+
class FakeJContext(object):
15+
"""
16+
This class is a Fake Representation of the Scala SpliceMachineContext API for unit testing
17+
"""
18+
19+
def __init__(self, JDBC_URL):
20+
print("Class Initialized")
21+
22+
def getConnection(self):
23+
return _generateOperationsTable(event='get connection')
24+
25+
def tableExists(self, schemaTableName):
26+
schemaName, tableName = schemaTableName.split('.')
27+
return _generateOperationsTable(event='table exists', schemaTableName=schemaTableName, schemaName=schemaName,
28+
tableName=tableName)
29+
30+
def dropTable(self, schemaTableName):
31+
schemaName, tableName = schemaTableName.split('.')
32+
return _generateOperationsTable(event='drop table', schemaTableName=schemaTableName, schemaName=schemaName,
33+
tableName=tableName)
34+
35+
def df(self, sql):
36+
return _generateOperationsTable(event='df', sql=sql)
37+
38+
def insert(self, dataFrame, schemaTableName):
39+
schemaName, tableName = schemaTableName.split('.')
40+
return _generateOperationsTable(event='insert', schemaTableName=schemaTableName, schemaName=schemaName,
41+
tableName=tableName, dataFrame=dataFrame)
42+
43+
def delete(self, dataFrame, schemaTableName):
44+
schemaName, tableName = schemaTableName.split('.')
45+
return _generateOperationsTable(event='delete', schemaTableName=schemaTableName, schemaName=schemaName,
46+
tableName=tableName, dataFrame=dataFrame)
47+
48+
def update(self, dataFrame, schemaTableName):
49+
schemaName, tableName = schemaTableName.split('.')
50+
return _generateOperationsTable(event='update', schemaTableName=schemaTableName, schemaName=schemaName,
51+
tableName=tableName, dataFrame=dataFrame)
52+
53+
def getSchema(self, schemaTableName):
54+
schemaName, tableName = schemaTableName.split('.')
55+
return _generateOperationsTable(event='getSchema', schemaTableName=schemaTableName, schemaName=schemaName,
56+
tableName=tableName)

0 commit comments

Comments
 (0)