Skip to content

Commit 6165638

Browse files
committed
CaresianPartition format and cleanup
1 parent accc681 commit 6165638

File tree

3 files changed

+54
-40
lines changed

3 files changed

+54
-40
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Added
10-
- Add ZStd compression support for GTiff
10+
- Add ZStd compression support for GTiff [#3580](https://github.com/locationtech/geotrellis/pull/3580)
11+
- Do not depend on private Spark API, avoids sealing violation [#3586](https://github.com/locationtech/geotrellis/pull/3586)
1112

1213
## [3.8.0] - 2025-04-23
1314

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package geotrellis.spark.join
2+
3+
import org.apache.spark.Partition
4+
import org.apache.spark.internal.Logging
5+
import org.apache.spark.rdd.RDD
6+
7+
import java.io.{IOException, ObjectOutputStream}
8+
import scala.util.control.NonFatal
9+
10+
// https://github.com/apache/spark/blob/686d84453610e463df7df95395ce6ed36a6efacd/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala#L29
11+
private[join] class CartesianPartition(
12+
idx: Int,
13+
@transient private val rdd1: RDD[_],
14+
@transient private val rdd2: RDD[_],
15+
s1Index: Int,
16+
s2Index: Int
17+
) extends Partition {
18+
19+
var s1 = rdd1.partitions(s1Index)
20+
var s2 = rdd2.partitions(s2Index)
21+
override val index: Int = idx
22+
23+
@throws(classOf[IOException])
24+
private def writeObject(oos: ObjectOutputStream): Unit = CartesianPartition.tryOrIOException {
25+
// Update the reference to parent split at the time of task serialization
26+
s1 = rdd1.partitions(s1Index)
27+
s2 = rdd2.partitions(s2Index)
28+
oos.defaultWriteObject()
29+
}
30+
}
31+
32+
object CartesianPartition extends Logging {
33+
/**
34+
* Execute a block of code that returns a value, re-throwing any non-fatal uncaught
35+
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
36+
* read and write methods, since Java's serializer will not report non-IOExceptions properly;
37+
* see SPARK-4080 for more context.
38+
*/
39+
// https://github.com/apache/spark/blob/686d84453610e463df7df95395ce6ed36a6efacd/common/utils/src/main/scala/org/apache/spark/util/SparkErrorUtils.scala#L35
40+
private def tryOrIOException[T](block: => T): T = {
41+
try {
42+
block
43+
} catch {
44+
case e: IOException =>
45+
logError("Exception encountered", e)
46+
throw e
47+
case NonFatal(e) =>
48+
logError("Exception encountered", e)
49+
throw new IOException(e)
50+
}
51+
}
52+
}

spark/src/main/scala/geotrellis/spark/join/FilteredCartesianRDD.scala

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,47 +23,8 @@ package geotrellis.spark.join
2323

2424
import org.apache.spark._
2525
import org.apache.spark.rdd.RDD
26-
import org.log4s.getLogger
2726

28-
import java.io.{IOException, ObjectOutputStream}
2927
import scala.reflect.ClassTag
30-
import scala.util.control.NonFatal
31-
32-
private class CartesianPartition(
33-
idx: Int,
34-
@transient private val rdd1: RDD[_],
35-
@transient private val rdd2: RDD[_],
36-
s1Index: Int,
37-
s2Index: Int
38-
) extends Partition {
39-
40-
@transient private[this] lazy val logger = getLogger
41-
42-
var s1 = rdd1.partitions(s1Index)
43-
var s2 = rdd2.partitions(s2Index)
44-
override val index: Int = idx
45-
46-
private def tryOrIOException[T](block: => T): T = {
47-
try {
48-
block
49-
} catch {
50-
case e: IOException =>
51-
logger.error(e)("Exception encountered")
52-
throw e
53-
case NonFatal(e) =>
54-
logger.error(e)("Exception encountered")
55-
throw new IOException(e)
56-
}
57-
}
58-
59-
@throws(classOf[IOException])
60-
private def writeObject(oos: ObjectOutputStream): Unit = tryOrIOException {
61-
// Update the reference to parent split at the time of task serialization
62-
s1 = rdd1.partitions(s1Index)
63-
s2 = rdd2.partitions(s2Index)
64-
oos.defaultWriteObject()
65-
}
66-
}
6728

6829
/** Performs a cartesian join of two RDDs using filter and refine pattern.
6930
*

0 commit comments

Comments
 (0)