-
Notifications
You must be signed in to change notification settings - Fork 922
SPARKC-403: Add CLUSTERING ORDER in cql statement #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 3 commits
4ae3ff9
5bc7cc6
4dd085c
bae67de
697dd0e
0aa10be
dfa0382
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
package com.datastax.spark.connector.cql | ||
|
||
import java.io.IOException | ||
|
||
import com.datastax.spark.connector._ | ||
import com.datastax.spark.connector.mapper.{DataFrameColumnMapper, ColumnMapper} | ||
import org.apache.spark.Logging | ||
|
@@ -8,7 +10,6 @@ import org.apache.spark.sql.DataFrame | |
import scala.collection.JavaConversions._ | ||
import scala.language.existentials | ||
import scala.util.{Properties, Try} | ||
|
||
import com.datastax.driver.core._ | ||
import com.datastax.spark.connector.types.{CounterType, ColumnType} | ||
import com.datastax.spark.connector.util.Quote._ | ||
|
@@ -92,7 +93,8 @@ case object RegularColumn extends ColumnRole | |
case class ColumnDef( | ||
columnName: String, | ||
columnRole: ColumnRole, | ||
columnType: ColumnType[_]) extends FieldDef { | ||
columnType: ColumnType[_], | ||
clusteringOrder: ClusteringOrder = ClusteringOrder.ASC) extends FieldDef { | ||
|
||
def ref: ColumnRef = ColumnName(columnName) | ||
def isStatic = columnRole == StaticColumn | ||
|
@@ -128,6 +130,15 @@ object ColumnDef { | |
val columnType = ColumnType.fromDriverType(column.getType) | ||
ColumnDef(column.getName, columnRole, columnType) | ||
} | ||
|
||
def apply( | ||
column: ColumnMetadata, | ||
columnRole: ColumnRole, | ||
clusteringOrder: ClusteringOrder): ColumnDef = { | ||
|
||
val columnType = ColumnType.fromDriverType(column.getType) | ||
ColumnDef(column.getName, columnRole, columnType, clusteringOrder) | ||
} | ||
} | ||
|
||
/** A Cassandra table metadata that can be serialized. */ | ||
|
@@ -138,7 +149,9 @@ case class TableDef( | |
clusteringColumns: Seq[ColumnDef], | ||
regularColumns: Seq[ColumnDef], | ||
indexes: Seq[IndexDef] = Seq.empty, | ||
isView: Boolean = false) extends StructDef { | ||
isView: Boolean = false, | ||
clusteringOrder: Option[Seq[ClusteringOrder]] = None, | ||
options: String = "") extends StructDef { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still don't like being able to just pass a string here, If you really think we need this I think it should at least be Seq[String] and we should just require that they not contain "AND" or "WITH". Then we can convert the append code below into a string join instead of having the more complicated logic. This removes the need for the "appendOptions" function and replaces it with
|
||
|
||
require(partitionKey.forall(_.isPartitionKeyColumn), "All partition key columns must have role PartitionKeyColumn") | ||
require(clusteringColumns.forall(_.isClusteringColumn), "All clustering columns must have role ClusteringColumn") | ||
|
@@ -185,12 +198,36 @@ case class TableDef( | |
val clusteringColumnNames = clusteringColumns.map(_.columnName).map(quote) | ||
val primaryKeyClause = (partitionKeyClause +: clusteringColumnNames).mkString(", ") | ||
|
||
s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( | ||
val stmt = s"""CREATE TABLE ${quote(keyspaceName)}.${quote(tableName)} ( | ||
| $columnList, | ||
| PRIMARY KEY ($primaryKeyClause) | ||
|)""".stripMargin | ||
val ordered = if (clusteringColumns.nonEmpty) | ||
s"$stmt${Properties.lineSeparator}WITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns,clusteringOrder)})" | ||
else stmt | ||
appendOptions(ordered, options) | ||
} | ||
private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef], clusteringOrder: Option[Seq[ClusteringOrder]]): String = { | ||
|
||
val clusteringCols = clusteringOrder match { | ||
case Some(orders) if orders.size == clusteringColumns.size => | ||
for ( (col, order) <- clusteringColumns zip orders) yield col.copy(clusteringOrder = order) | ||
case Some(e) => throw new IOException("clusteringOrder size is not matching with Clustering Columns") | ||
case None => clusteringColumns | ||
} | ||
|
||
clusteringCols.map { col => | ||
if (col.clusteringOrder == ClusteringOrder.DESC) | ||
s"${quote(col.columnName)} DESC" else s"${quote(col.columnName)} ASC" | ||
}.toList.mkString(", ") | ||
} | ||
|
||
private[this] def appendOptions(stmt: String, opts: String) = | ||
if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt${Properties.lineSeparator}AND ${opts.substring(4)}" | ||
else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}" | ||
else if (opts == "") s"$stmt" | ||
else s"$stmt${Properties.lineSeparator}$opts" | ||
|
||
type ValueRepr = CassandraRow | ||
|
||
def newInstance(columnValues: Any*): CassandraRow = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be a property of the ColunDef now? Ie do you really need to specify it separately?