Skip to content

Commit

Permalink
Timestamp Tolerance (#208)
Browse files Browse the repository at this point in the history
* Addition for tolerance of timestamps. The timestamp tolerance will be in milliseconds.

* Addition for tolerance of timestamps. The timestamp tolerance will be in milliseconds.

* Added Timestamp comparison for pre-2.0

* Added test cases for Timestamp tolerance

* Fixed miscalculations in test cases

* codacy improvements by making lines shorter
  • Loading branch information
chiefmanc authored and holdenk committed Jan 18, 2018
1 parent ccebb55 commit 0ebf978
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.holdenkarau.spark.testing

import java.io.File
import java.sql.Timestamp

import org.scalatest.Suite

Expand Down Expand Up @@ -231,6 +232,11 @@ object DataFrameSuiteBase {
return false
}

case t1: Timestamp =>
if (abs(t1.getTime - o2.asInstanceOf[Timestamp].getTime) > tol) {
return false
}

case _ =>
if (o1 != o2) return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ object DataFrameSuiteBase {
case d1: java.math.BigDecimal =>
if (d1.compareTo(o2.asInstanceOf[java.math.BigDecimal]) != 0) return false

case t1: Timestamp =>
if (abs(t1.getTime - o2.asInstanceOf[Timestamp].getTime) > tol) {
return false
}

case _ =>
if (o1 != o2) return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package com.holdenkarau.spark.testing

import java.sql.Timestamp

import org.apache.spark.sql.Row
import org.scalatest.FunSuite

Expand Down Expand Up @@ -64,6 +66,9 @@ class SampleDataFrameTest extends FunSuite with DataFrameSuiteBase {
val row6 = Row("1")
val row6a = Row("2")
val row7 = Row(1.toFloat)
val row8 = Row(Timestamp.valueOf("2018-01-12 20:22:13"))
val row9 = Row(Timestamp.valueOf("2018-01-12 20:22:18"))
val row10 = Row(Timestamp.valueOf("2018-01-12 20:23:13"))
assert(false === approxEquals(row, row2, 1E-7))
assert(true === approxEquals(row, row2, 1E-5))
assert(true === approxEquals(row3, row3, 1E-5))
Expand All @@ -74,6 +79,10 @@ class SampleDataFrameTest extends FunSuite with DataFrameSuiteBase {
assert(false === approxEquals(row6, row4, 1E-5))
assert(false === approxEquals(row6, row7, 1E-5))
assert(false === approxEquals(row6, row6a, 1E-5))
assert(true === approxEquals(row8, row9, 5000))
assert(false === approxEquals(row9, row8, 3000))
assert(true === approxEquals(row9, row10, 60000))
assert(false === approxEquals(row9, row10, 53000))
}

test("verify hive function support") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.Test;

import java.io.Serializable;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -97,6 +98,51 @@ public void approximateEqualLowTolerance() {
assertDatasetApproximateEquals(dataset1, dataset2, 0.2);
}

@Test
public void approximateEqualTime() {
OuterScopes.addOuterScope(this);

Time time1 = createTime("Holden", Timestamp.valueOf("2018-01-12 22:21:23"));
List<Time> list1 = Arrays.asList(time1);
Dataset<Time> dataset1 = sqlContext().createDataset(list1, Encoders.bean(Time.class));

Time time2 = createTime("Holden", Timestamp.valueOf("2018-01-12 22:21:23"));
List<Time> list2 = Arrays.asList(time2);
Dataset<Time> dataset2 = sqlContext().createDataset(list2, Encoders.bean(Time.class));

assertDatasetApproximateEquals(dataset1, dataset2, 0);
}

@Test
public void approximateEqualTimeAcceptableTolerance() {
OuterScopes.addOuterScope(this);

Time time1 = createTime("Shakanti", Timestamp.valueOf("2018-01-12 22:21:23"));
List<Time> list1 = Arrays.asList(time1);
Dataset<Time> dataset1 = sqlContext().createDataset(list1, Encoders.bean(Time.class));

Time time2 = createTime("Shakanti", Timestamp.valueOf("2018-01-12 22:21:43"));
List<Time> list2 = Arrays.asList(time2);
Dataset<Time> dataset2 = sqlContext().createDataset(list2, Encoders.bean(Time.class));

assertDatasetApproximateEquals(dataset1, dataset2, 30000);
}

@Test (expected = AssertionError.class)
public void approximateEqualTimeLowTolerance() {
OuterScopes.addOuterScope(this);

Time time1 = createTime("Shakanti", Timestamp.valueOf("2018-01-12 22:21:23"));
List<Time> list1 = Arrays.asList(time1);
Dataset<Time> dataset1 = sqlContext().createDataset(list1, Encoders.bean(Time.class));

Time time2 = createTime("Shakanti", Timestamp.valueOf("2018-01-12 22:22:43"));
List<Time> list2 = Arrays.asList(time2);
Dataset<Time> dataset2 = sqlContext().createDataset(list2, Encoders.bean(Time.class));

assertDatasetApproximateEquals(dataset1, dataset2, 60000);
}

private Person createPerson(String name, int age, double weight) {
Person person = new Person();
person.setName(name);
Expand All @@ -106,6 +152,14 @@ private Person createPerson(String name, int age, double weight) {
return person;
}

private Time createTime(String name, Timestamp time) {
Time t = new Time();
t.setName(name);
t.setTime(time);

return t;
}

public class Person implements Serializable {
private String name;
private int age;
Expand Down Expand Up @@ -151,4 +205,39 @@ public int hashCode() {
}
}

public class Time implements Serializable {
private String name;
private Timestamp time;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Timestamp getTime() {
return time;
}

public void setTime(Timestamp time) {
this.time = time;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Time) {
Time objTime = (Time) obj;
return objTime.getName().equals(this.name) && objTime.getTime().equals(this.time);
}

return false;
}

@Override
public int hashCode() {
return name.hashCode() + time.hashCode();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.holdenkarau.spark.testing

import java.sql.Timestamp

import org.scalatest.FunSuite

class SampleDatasetTest extends FunSuite with DatasetSuiteBase {
Expand Down Expand Up @@ -138,6 +140,52 @@ class SampleDatasetTest extends FunSuite with DatasetSuiteBase {
assertDatasetApproximateEquals(persons1, persons2, 0.2)
}
}

test("approximate time equal") {
import sqlContext.implicits._

val list1 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:32")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:18")))
val list2 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:32")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:18")))

val time1 = sc.parallelize(list1).toDS
val time2 = sc.parallelize(list2).toDS

assertDatasetApproximateEquals(time1, time2, 0)
}

test("approximate time not equal acceptable tolerance") {
import sqlContext.implicits._

val list1 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:32")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:18")))
val list2 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:49")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:22")))

val time1 = sc.parallelize(list1).toDS
val time2 = sc.parallelize(list2).toDS

assertDatasetApproximateEquals(time1, time2, 17000)
}

test("approximate time not equal low tolerance") {
import sqlContext.implicits._

val list1 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:32")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:18")))
val list2 = List(MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:41:49")),
MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:32:22")))

val time1 = sc.parallelize(list1).toDS
val time2 = sc.parallelize(list2).toDS

intercept[org.scalatest.exceptions.TestFailedException] {
assertDatasetApproximateEquals(time1, time2, 2000)
}
}


}

case class Person(name: String, age: Int, weight: Double)
Expand All @@ -149,3 +197,5 @@ case class CustomPerson(name: String, age: Int, weight: Double) {
case _ => false
}
}

case class MagicTime(name: String, t: Timestamp)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.junit.Test;

import java.io.Serializable;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -82,9 +83,51 @@ public void testApproximateEqualRows() {
assertFalse(approxEquals(row1, row2, 0));
}

public void testApproximateEqualTimestamp() {
List<MagicTime> magics1 = Arrays.asList(new MagicTime("Holden", Timestamp.valueOf("2018-01-12 19:17:32")),
new MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:17:32")));

List<MagicTime> magics2 = sc.parallelize(List(new MagicTime("Holden", Timestamp.valueOf("2018-01-12 19:17:35")),
new MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:18:40")))).toDF;

assertDataFrameApproximateEquals(toDF(magics1), toDF(magics2), 75000);
}

@Test (expected = java.lang.AssertionError.class)
public void testApproximateNotEqualTimestamp() {
List<MagicTime> magics1 = Arrays.asList(new MagicTime("Holden", Timestamp.valueOf("2018-01-12 19:17:32")),
new MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:17:32")));

List<MagicTime> magics2 = Arrays.asList(new MagicTime("Holden", Timestamp.valueOf("2018-01-12 19:17:35")),
new MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 19:18:40")));

assertDataFrameApproximateEquals(toDF(magics1), toDF(magics2), 59000);
}

@Test
public void testApproximateEqualTimeRows() {
List<MagicTime> magics = Arrays.asList(new MagicTime("Holden", Timestamp.valueOf("2018-01-12 20:49:32")),
new MagicTime("Shakanti", Timestamp.valueOf("2018-01-12 20:50:02")));

DataFrame df = sqlContext().createDataFrame(jsc().parallelize(magics), MagicTime.class);

Row row1 = df.collect()[0];
Row row2 = df.collect()[1];

assertTrue(approxEquals(row1, row1, 0));
assertTrue(approxEquals(row1, row2, 58000));
assertFalse(approxEquals(row1, row2, 0));
}

private DataFrame timeDF(List<MagicTime> list) {
JavaRDD<MagicTime> rdd = jsc().parallelize(list);
return sqlContext().createDataFrame(
}

private DataFrame toDF(List<BasicMagic> list) {
JavaRDD<BasicMagic> rdd = jsc().parallelize(list);
return sqlContext().createDataFrame(rdd, BasicMagic.class);
}

case class MagicTime(name: String, time: Timestamp)
}

0 comments on commit 0ebf978

Please sign in to comment.