Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
Merge pull request #84 from sundy-li/uv
Browse files Browse the repository at this point in the history
package to uv
  • Loading branch information
hantmac authored Dec 9, 2024
2 parents 1bffb20 + c5c0976 commit dec102e
Show file tree
Hide file tree
Showing 28 changed files with 1,036 additions and 364 deletions.
35 changes: 11 additions & 24 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,26 @@ on:
jobs:
test:
runs-on: ubuntu-latest
services:
databend:
# image: datafuselabs/databend-query
image: datafuselabs/databend
env:
QUERY_DEFAULT_USER: databend
QUERY_DEFAULT_PASSWORD: databend
MINIO_ENABLED: true
ports:
- 8000:8000
- 9000:9000

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Python-3.10
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install uv
uses: astral-sh/setup-uv@v4

- name: Pip Install
run: |
make install
- name: Set up Python
run: uv python install

- name: Verify Service Running
run: |
cid=$(docker ps -a | grep databend | cut -d' ' -f1)
docker logs ${cid}
curl -v http://localhost:8000/v1/health
- name: Install the project
run: uv sync --all-extras --dev

- name: Start databend-server
run: make up

- name: Test
env:
TEST_DATABEND_DSN: "http://databend:databend@localhost:8000/default"
TEST_DATABEND_DSN: "http://root:@localhost:8000/default"
run: |
make lint
make ci
19 changes: 7 additions & 12 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,14 @@ jobs:
- name: Checkout Repository
uses: actions/checkout@v3

- name: Set up Python
uses: actions/setup-python@v3
with:
python-version: 3.9
- name: Install uv
uses: astral-sh/setup-uv@v4

- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install setuptools wheel twine
- name: Set up Python
run: uv python install

- name: Determine Version Change
run: |
export VERSION=$(cat databend_py/VERSION)
- name: Install the project
run: uv sync --all-extras --dev

- name: Release Package and Tag
env:
Expand All @@ -38,7 +33,7 @@ jobs:
git config user.email "[email protected]"
git tag -a "v$VERSION" -m "Release Version $VERSION"
git push origin "v$VERSION"
python setup.py sdist bdist_wheel
uv publish
echo "show user name:"
echo ${{ secrets.TWINE_USERNAME }}
twine upload -u ${{ secrets.TWINE_USERNAME }} -p ${{ secrets.TWINE_PASSWORD }} dist/*
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
upload.csv
.envrc
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
18 changes: 12 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
prepare:
mkdir -p data/databend

up: prepare
docker compose -f docker-compose.yaml up --quiet-pull -d databend --wait
curl -u root: -XPOST "http://localhost:8000/v1/query" -H 'Content-Type: application/json' -d '{"sql": "select version()", "pagination": { "wait_time_secs": 10}}'

start: up

test:
python tests/test_client.py
uv run pytest .

ci:
python tests/test_client.py
uv run pytest .

lint:
pyflakes .
uv run ruff check

install:
pip install -r requirements.txt
pip install -e .
141 changes: 82 additions & 59 deletions databend_py/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ class Client(object):
"""

def __init__(self, *args, **kwargs):
self.settings = (kwargs.pop('settings', None) or {}).copy()
self.result_config = (kwargs.pop('result_config', None) or {}).copy()
self.settings = (kwargs.pop("settings", None) or {}).copy()
self.result_config = (kwargs.pop("result_config", None) or {}).copy()
self.connection = Connection(*args, **kwargs)
self.query_result_cls = QueryResult
self.helper = Helper
self._debug = asbool(self.settings.get('debug', False))
self._uploader = DataUploader(self, self.connection, self.settings, debug=self._debug,
compress=self.settings.get('compress', False))
self._debug = asbool(self.settings.get("debug", False))
self._uploader = DataUploader(
self,
self.connection,
self.settings,
debug=self._debug,
compress=self.settings.get("compress", False),
)

def __enter__(self):
return self
Expand All @@ -34,9 +39,9 @@ def disconnect_connection(self):
self.connection.disconnect()

def _data_generator(self, raw_data):
while raw_data['next_uri'] is not None:
while raw_data["next_uri"] is not None:
try:
raw_data = self._receive_data(raw_data['next_uri'])
raw_data = self._receive_data(raw_data["next_uri"])
yield raw_data
except (Exception, KeyboardInterrupt):
self.disconnect()
Expand All @@ -57,7 +62,8 @@ def _receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
return result.get_result()

def _iter_receive_result(self, query, query_id=None, with_column_types=False):
Expand All @@ -67,14 +73,16 @@ def _iter_receive_result(self, query, query_id=None, with_column_types=False):
helper.check_error()
gen = self._data_generator(raw_data)
result = self.query_result_cls(
gen, raw_data, with_column_types=with_column_types, **self.result_config)
gen, raw_data, with_column_types=with_column_types, **self.result_config
)
_, rows = result.get_result()
for row in rows:
for r in row:
yield r

def execute(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
def execute(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
"""
Executes query.
:param query: query that will be send to server.
Expand Down Expand Up @@ -112,52 +120,63 @@ def execute(self, query, params=None, with_column_types=False,
return [], rv

column_types, rv = self._process_ordinary_query(
query, params=params, with_column_types=with_column_types,
query_id=query_id)
query, params=params, with_column_types=with_column_types, query_id=query_id
)
return column_types, rv

# params = [(1,),(2,)] or params = [(1,2),(2,3)]
def _process_insert_query(self, query, params):
insert_rows = 0
if "values" in query:
query = query.split("values")[0] + 'values'
query = query.split("values")[0] + "values"
elif "VALUES" in query:
query = query.split("VALUES")[0] + 'VALUES'
if len(query.split(' ')) < 3:
query = query.split("VALUES")[0] + "VALUES"
if len(query.split(" ")) < 3:
raise Exception("Not standard insert/replace statement")
table_name = query.split(' ')[2]
batch_size = query.count(',') + 1
table_name = query.split(" ")[2]
batch_size = query.count(",") + 1
if params is not None and len(params) > 0:
if isinstance(params[0], tuple):
tuple_ls = params
else:
tuple_ls = [tuple(params[i:i + batch_size]) for i in range(0, len(params), batch_size)]
tuple_ls = [
tuple(params[i : i + batch_size])
for i in range(0, len(params), batch_size)
]
insert_rows = len(tuple_ls)
self._uploader.upload_to_table_by_copy(table_name, tuple_ls)
return insert_rows

def _process_ordinary_query(self, query, params=None, with_column_types=False,
query_id=None):
def _process_ordinary_query(
self, query, params=None, with_column_types=False, query_id=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._receive_result(query, query_id=query_id, with_column_types=with_column_types, )

def execute_iter(self, query, params=None, with_column_types=False,
query_id=None, settings=None):
query = self._substitute_params(query, params, self.connection.context)
return self._receive_result(
query,
query_id=query_id,
with_column_types=with_column_types,
)

def execute_iter(
self, query, params=None, with_column_types=False, query_id=None, settings=None
):
if params is not None:
query = self._substitute_params(
query, params, self.connection.context
)
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)

def _iter_process_ordinary_query(self, query, with_column_types=False, query_id=None):
return self._iter_receive_result(query, query_id=query_id, with_column_types=with_column_types)
query = self._substitute_params(query, params, self.connection.context)
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _iter_process_ordinary_query(
self, query, with_column_types=False, query_id=None
):
return self._iter_receive_result(
query, query_id=query_id, with_column_types=with_column_types
)

def _substitute_params(self, query, params, context):
if not isinstance(params, dict):
raise ValueError('Parameters are expected in dict form')
raise ValueError("Parameters are expected in dict form")

escaped = escape_params(params, context)
return query % escaped
Expand Down Expand Up @@ -186,59 +205,59 @@ def from_url(cls, url):
continue

timeouts = {
'connect_timeout',
'read_timeout',
'send_receive_timeout',
'sync_request_timeout'
"connect_timeout",
"read_timeout",
"send_receive_timeout",
"sync_request_timeout",
}

value = value[0]

if name == 'client_name':
if name == "client_name":
kwargs[name] = value
elif name == 'tenant':
elif name == "tenant":
kwargs[name] = value
elif name == 'warehouse':
elif name == "warehouse":
kwargs[name] = value
elif name == 'secure':
elif name == "secure":
kwargs[name] = asbool(value)
elif name == 'copy_purge':
elif name == "copy_purge":
kwargs[name] = asbool(value)
settings[name] = asbool(value)
elif name == 'debug':
elif name == "debug":
settings[name] = asbool(value)
elif name == 'compress':
elif name == "compress":
settings[name] = asbool(value)
elif name in timeouts:
kwargs[name] = float(value)
elif name == 'persist_cookies':
elif name == "persist_cookies":
kwargs[name] = asbool(value)
elif name == 'null_to_none':
elif name == "null_to_none":
result_config[name] = asbool(value)
else:
settings[name] = value # settings={'copy_purge':False}
secure = kwargs.get("secure", False)
kwargs['secure'] = secure
kwargs["secure"] = secure

host = parsed_url.hostname

if parsed_url.port is not None:
kwargs['port'] = parsed_url.port
kwargs["port"] = parsed_url.port

path = parsed_url.path.replace('/', '', 1)
path = parsed_url.path.replace("/", "", 1)
if path:
kwargs['database'] = path
kwargs["database"] = path

if parsed_url.username is not None:
kwargs['user'] = unquote(parsed_url.username)
kwargs["user"] = unquote(parsed_url.username)

if parsed_url.password is not None:
kwargs['password'] = unquote(parsed_url.password)
kwargs["password"] = unquote(parsed_url.password)

if settings:
kwargs['settings'] = settings
kwargs["settings"] = settings
if result_config:
kwargs['result_config'] = result_config
kwargs["result_config"] = result_config

return cls(host, **kwargs)

Expand All @@ -250,7 +269,9 @@ def insert(self, database_name, table_name, data):
data: the data which write into, it's a list of tuple
"""
# TODO: escape the database & table name
self._uploader.upload_to_table_by_copy("%s.%s" % (database_name, table_name), data)
self._uploader.upload_to_table_by_copy(
"%s.%s" % (database_name, table_name), data
)

def replace(self, database_name, table_name, conflict_keys, data):
"""
Expand All @@ -260,7 +281,9 @@ def replace(self, database_name, table_name, conflict_keys, data):
conflict_keys: the key that use to replace into
data: the data which write into, it's a list of tuple
"""
self._uploader.replace_into_table("%s.%s" % (database_name, table_name), conflict_keys, data)
self._uploader.replace_into_table(
"%s.%s" % (database_name, table_name), conflict_keys, data
)

def upload_to_stage(self, stage_dir, file_name, data):
"""
Expand Down
Loading

0 comments on commit dec102e

Please sign in to comment.