Skip to content

Commit da61bd2

Browse files
authored
Merge pull request #590 from waitinfuture/support-complex-type
support spark3
2 parents 4909dba + 18c8781 commit da61bd2

File tree

8 files changed

+41
-16
lines changed

8 files changed

+41
-16
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,13 @@
2323
2424
```
2525

26+
## Build emr-maxcompute with Spark3.2.0
27+
```
28+
git clone https://github.com/aliyun/aliyun-emapreduce-datasources.git
29+
cd aliyun-emapreduce-datasources/emr-maxcompute/
30+
mvn clean package -Pspark3 -DskipTests
31+
```
32+
2633
#### Use SDK in Eclipse project directly
2734

2835
- copy sdk jar to your project

emr-maxcompute/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@
4646
<type>test-jar</type>
4747
</dependency>
4848

49+
<dependency>
50+
<groupId>org.apache.commons</groupId>
51+
<artifactId>commons-lang3</artifactId>
52+
<version>${commons.lang3.version}</version>
53+
<scope>test</scope>
54+
</dependency>
55+
4956
<dependency>
5057
<groupId>org.apache.spark</groupId>
5158
<artifactId>spark-sql_${scala.binary.version}</artifactId>

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/odps/OdpsRDD.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,17 @@
1717
package org.apache.spark.aliyun.odps
1818

1919
import java.io.EOFException
20-
2120
import scala.collection.mutable.ArrayBuffer
2221
import scala.reflect.ClassTag
23-
2422
import com.aliyun.odps.{Odps, PartitionSpec, TableSchema}
2523
import com.aliyun.odps.account.AliyunAccount
2624
import com.aliyun.odps.data.Record
2725
import com.aliyun.odps.tunnel.TableTunnel
2826
import com.aliyun.odps.tunnel.io.TunnelRecordReader
29-
3027
import org.apache.spark._
3128
import org.apache.spark.internal.Logging
3229
import org.apache.spark.rdd.RDD
33-
import org.apache.spark.util.NextIterator
30+
import org.apache.spark.util.{NextIterator, TaskCompletionListener}
3431

3532
class OdpsPartition(rddId: Int,
3633
idx: Int,
@@ -90,9 +87,11 @@ class OdpsRDD[T: ClassTag](@transient sc: SparkContext,
9087
val reader = downloadSession.openRecordReader(split.start, split.count)
9188
val inputMetrics = context.taskMetrics.inputMetrics
9289

93-
context.addTaskCompletionListener {
94-
context => closeIfNeeded()
95-
}
90+
context.addTaskCompletionListener(new TaskCompletionListener {
91+
override def onTaskCompletion(context: TaskContext): Unit = {
92+
closeIfNeeded()
93+
}
94+
})
9695

9796
override def getNext() = {
9897
var ret = null.asInstanceOf[T]

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/odps/datasource/ODPSRDD.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.catalyst.InternalRow
3030
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
3131
import org.apache.spark.sql.types._
32-
import org.apache.spark.util.NextIterator
32+
import org.apache.spark.util.{NextIterator, TaskCompletionListener}
3333

3434
class ODPSRDD(
3535
sc: SparkContext,
@@ -66,9 +66,11 @@ class ODPSRDD(
6666
val reader = downloadSession.openRecordReader(split.start, split.count)
6767
val inputMetrics = context.taskMetrics.inputMetrics
6868

69-
context.addTaskCompletionListener {
70-
context => closeIfNeeded()
71-
}
69+
context.addTaskCompletionListener(new TaskCompletionListener {
70+
override def onTaskCompletion(context: TaskContext): Unit = {
71+
closeIfNeeded()
72+
}
73+
})
7274

7375
val mutableRow = new SpecificInternalRow(schema.fields.map(x => x.dataType))
7476

emr-maxcompute/src/main/scala/org/apache/spark/aliyun/odps/datasource/ODPSWriter.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,15 @@ class ODPSWriter(
151151
writer.close()
152152
}
153153

154-
data.foreachPartition {
155-
iterator =>
154+
data.foreachPartition((iterator: Iterator[Row]) => {
156155
val account_ = new AliyunAccount(accessKeyId, accessKeySecret)
157156
val odps = new Odps(account_)
158157
odps.setDefaultProject(project)
159158
odps.setEndpoint(odpsUrl)
160159
val odpsUtils = new OdpsUtils(odps)
161160
val dataSchema = odpsUtils.getTableSchema(project, table, false)
162161
writeToFile(odps, dataSchema, iterator)
163-
}
162+
})
164163
val arr = Array.tabulate(data.rdd.partitions.length)(l => Long.box(l))
165164
uploadSession.commit(arr)
166165
}

emr-maxcompute/src/test/scala/org/apache/spark/sql/aliyun/odps/OdpsOpsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class OdpsOpsSuite extends SparkFunSuite {
9494
odpsUtils.runSQL(project, "TRUNCATE TABLE odps_basic_types;")
9595
}
9696

97-
test("support basic types") {
97+
test("[OdpsOpsSuite] support basic types") {
9898
val table = "odps_basic_types"
9999
val struct = StructType(
100100
StructField("a", BooleanType, true) ::

emr-maxcompute/src/test/scala/org/apache/spark/sql/aliyun/odps/types/OdpsDataTypeSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class OdpsDataTypeSuite extends SparkFunSuite {
9090
odpsUtils.runSQL(project, "TRUNCATE TABLE odps_basic_types;")
9191
}
9292

93-
test("support basic types") {
93+
test("[OdpsDataTypeSuite] support basic types") {
9494
val table = "odps_basic_types"
9595
val struct = StructType(
9696
StructField("a", BooleanType, true) ::

pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,17 @@
795795
<activeByDefault>true</activeByDefault>
796796
</activation>
797797
</profile>
798+
<profile>
799+
<id>spark3</id>
800+
<properties>
801+
<scala.version>2.12.15</scala.version>
802+
<scala.binary.version>2.12</scala.binary.version>
803+
<scalatest.version>3.2.9</scalatest.version>
804+
<spark.version>3.2.0</spark.version>
805+
<hadoop.version>3.3.1</hadoop.version>
806+
<commons.lang3.version>3.12.0</commons.lang3.version>
807+
</properties>
808+
</profile>
798809

799810
<profile>
800811
<id>ds</id>

0 commit comments

Comments
 (0)