-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathsql.py
65 lines (59 loc) · 1.87 KB
/
sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
This module contains the method of read_sql.
It is a wrapper on connectorx.read_sql function.
"""
from typing import Optional, Tuple, Union, List, Any
try:
import connectorx as cx
_WITH_CX = True
except ImportError:
_WITH_CX = False
def read_sql(
conn: str,
query: Union[List[str], str],
*,
return_type: str = "pandas",
protocol: str = "binary",
partition_on: Optional[str] = None,
partition_range: Optional[Tuple[int, int]] = None,
partition_num: Optional[int] = None,
) -> Any:
"""
Run the SQL query, download the data from database into a dataframe.
Please check out https://github.com/sfu-db/connector-x for more details.
Parameters
----------
conn
the connection string.
query
a SQL query or a list of SQL query.
return_type
the return type of this function. It can be "arrow", "pandas", "modin", "dask" or "polars".
protocol
the protocol used to fetch data from source. Valid protocols are database dependent
(https://github.com/sfu-db/connector-x/blob/main/Types.md).
partition_on
the column to partition the result.
partition_range
the value range of the partition column.
partition_num
how many partition to generate.
Example
--------
>>> db_url = "postgresql://username:password@server:port/database"
>>> query = "SELECT * FROM lineitem"
>>> read_sql(db_url, query, partition_on="partition_col", partition_num=10)
"""
if _WITH_CX:
df = cx.read_sql(
conn=conn,
query=query,
return_type=return_type,
protocol=protocol,
partition_on=partition_on,
partition_range=partition_range,
partition_num=partition_num,
)
return df
else:
raise ImportError("connectorx is not installed." "Please run pip install connectorx")