-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Description
When connecting to CosmosDB to query data through Spark connector, e.g. the total item count = 20k, setting maxItemCount = 5k in the client code, the round trip is supposed to be 4, however, it often somehow triggers 5 round trips, which brings extra RU consumption.
We found that by commenting out 'cancelOn' and recompiled the jar file for testing, the issue never reoccurred, roundtrip count is always 4, which is correct count. Is this a BUG? Or any particular purpose of the below code?
override def close(): Unit = {
lastPagedFlux.getAndSet(None) match {
case Some(oldPagedFlux) => oldPagedFlux.cancelOn(Schedulers.boundedElastic()).onErrorComplete().subscribe().dispose()
case None =>
}
}
Line 283 in 429ad86
| case Some(oldPagedFlux) => oldPagedFlux.cancelOn(Schedulers.boundedElastic()).onErrorComplete().subscribe().dispose() |
Below is the code I use in data bricks:
from pyspark.sql.types import StructType, StructField, StringType
cosmos_schema = StructType([
StructField('createddatetime', StringType(), True),
StructField('closeddatetime', StringType(), True),
StructField('id', StringType(), False),
StructField('owner', StringType(), True),
StructField('ownerregion', StringType(), True),
StructField('ownermanager', StringType(), True),
StructField('casenumber', StringType(), True)
])
cosmos_config = {
"spark.cosmos.accountEndpoint": COSMOS_ENDPOINT,
"spark.cosmos.accountkey": CosmosMasterKey,
"spark.cosmos.database": COSMOS_DATABASE,
"spark.cosmos.container": COSMOS_CONTAINER,
"spark.cosmos.diagnostics": "simple",
'spark.cosmos.read.inferSchema.enabled': 'false',
"spark.cosmos.read.customQuery": customQuery,
"spark.cosmos.read.maxItemCount": 5000
}
sc.setLogLevel("DEBUG")
df = spark.read.format("cosmos.oltp").options(**cosmos_config).schema(cosmos_schema).load()
total_item_count=df.count()
print(f'Total documents retrieved: {total_item_count}')