Skip to content

Commit 6ee66d3

Browse files
committed
Transformer Stream: don't use hard-coded window timestamps in the tests
Some of the tests are failing since windows don't have expected timestamps. In order to solve this problem, this commit makes necessary changes to read window timestamps from shredded message instead of using hard-coded values.
1 parent 8691ece commit 6ee66d3

File tree

17 files changed

+109
-111
lines changed

17 files changed

+109
-111
lines changed

modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/tsv/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "SHREDDED",
77
"types": [
@@ -113,8 +113,8 @@
113113
]
114114
},
115115
"timestamps": {
116-
"jobStarted": "1970-01-01T10:30:00Z",
117-
"jobCompleted": "1970-01-01T10:30:00Z",
116+
"jobStarted": "job_started_placeholder",
117+
"jobCompleted": "job_completed_placeholder",
118118
"min": "2021-10-13T20:21:47.595072674Z",
119119
"max": "2021-10-15T00:51:57.521746512Z"
120120
},

modules/common-transformer-stream/src/test/resources/processing-spec/1/output/good/widerow/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "JSON",
@@ -93,8 +93,8 @@
9393
]
9494
},
9595
"timestamps": {
96-
"jobStarted": "1970-01-01T10:30:00Z",
97-
"jobCompleted": "1970-01-01T10:30:00Z",
96+
"jobStarted": "job_started_placeholder",
97+
"jobCompleted": "job_completed_placeholder",
9898
"min": "2021-10-13T20:21:47.595072674Z",
9999
"max": "2021-10-15T00:51:57.521746512Z"
100100
},
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaCriterion":"iglu:org.schema/some_unknown_name/jsonschema/1-*-*","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":"1970-01-01T10:30:00Z"}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}
1+
{"schema":"iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0","data":{"processor":{"artifact":"snowplow-transformer-kinesis","version":"version_placeholder"},"failure":[{"schemaKey":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","error":{"error":"ResolutionError","lookupHistory":[{"repository":"Iglu Client Embedded","errors":[{"error":"NotFound"}],"attempts":1,"lastAttempt":""}]}}],"payload":{"app_id":"snowplowweb","platform":"web","etl_tstamp":"2014-06-01T14:04:11.639Z","collector_tstamp":"2014-05-29T18:16:35Z","dvce_created_tstamp":"2014-05-29T18:04:11.639Z","event":"page_view","event_id":"2b1b25a4-c0df-4859-8201-cf21492ad61b","txn_id":836413,"name_tracker":"clojure","v_tracker":"js-2.0.0-M2","v_collector":"clj-0.6.0-tom-0.0.4","v_etl":"hadoop-0.5.0-common-0.4.0","user_id":null,"user_ipaddress":"216.207.42.134","user_fingerprint":"3499345421","domain_userid":"3b1d1a375044eede","domain_sessionidx":3,"network_userid":"2bad2a4e-aae4-4bea-8acd-399e7fe0366a","geo_country":"US","geo_region":"CA","geo_city":"South San Francisco","geo_zipcode":null,"geo_latitude":37.654694,"geo_longitude":-122.4077,"geo_region_name":null,"ip_isp":null,"ip_organization":null,"ip_domain":null,"ip_netspeed":null,"page_url":"http://snowplowanalytics.com/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_title":"Writing Hive UDFs - a tutorial","page_referrer":null,"page_urlscheme":"http","page_urlhost":"snowplowanalytics.com","page_urlport":80,"page_urlpath":"/blog/2013/02/08/writing-hive-udfs-and-serdes/","page_urlquery":null,"page_urlfragment":null,"refr_urlscheme":null,"refr_urlhost":null,"refr_urlport":null,"refr_urlpath":null,"refr_urlquery":null,"refr_urlfragment":null,"refr_medium":null,"refr_source":null,"refr_term":null,"mkt_medium":null,"mkt_source":null,"mkt_term":null,"mkt_content":null,"mkt_campaign":null,"contexts":{},"se_category":null,"se_action":null,"se_label":null,"se_property":null,"se_value":null,"unstruct_event":null,"tr_orderid":null,"tr_affiliation":null,"tr_total":null,"tr_tax":null,"tr_shipping":null,"tr_city":null,"tr_state":null,"tr_country":null,"ti_orderid":null,"ti_sku":null,"ti_name":null,"ti_category":null,"ti_price":null,"ti_quantity":null,"pp_xoffset_min":null,"pp_xoffset_max":null,"pp_yoffset_min":null,"pp_yoffset_max":null,"useragent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/537.75.14","br_name":"Safari","br_family":"Safari","br_version":null,"br_type":"Browser","br_renderengine":"WEBKIT","br_lang":"en-us","br_features_pdf":false,"br_features_flash":false,"br_features_java":false,"br_features_director":false,"br_features_quicktime":false,"br_features_realplayer":false,"br_features_windowsmedia":false,"br_features_gears":false,"br_features_silverlight":false,"br_cookies":true,"br_colordepth":"24","br_viewwidth":1440,"br_viewheight":1845,"os_name":"Mac OS","os_family":"Mac OS","os_manufacturer":"Apple Inc.","os_timezone":"America/Los_Angeles","dvce_type":"Computer","dvce_ismobile":false,"dvce_screenwidth":1440,"dvce_screenheight":900,"doc_charset":"UTF-8","doc_width":1440,"doc_height":6015,"tr_currency":null,"tr_total_base":null,"tr_tax_base":null,"tr_shipping_base":null,"ti_currency":null,"ti_price_base":null,"base_currency":null,"geo_timezone":null,"mkt_clickid":null,"mkt_network":null,"etl_tags":null,"dvce_sent_tstamp":null,"refr_domain_userid":null,"refr_dvce_tstamp":null,"derived_contexts":{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0","data":[{"schema":"iglu:org.schema/some_unknown_name/jsonschema/1-0-0","data":{"datePublished":"2014-07-23T00:00:00Z","author":"Jonathan Almeida","inLanguage":"en-US","genre":"blog","breadcrumb":["blog","releases"],"keywords":["snowplow","analytics","java","jvm","tracker"]}}]},"domain_sessionid":null,"derived_tstamp":null,"event_vendor":null,"event_name":null,"event_format":null,"event_version":null,"event_fingerprint":null,"true_tstamp":null}}}

modules/common-transformer-stream/src/test/resources/processing-spec/3/output/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "SHREDDED",
77
"types": []
88
},
99
"timestamps": {
10-
"jobStarted": "1970-01-01T10:30:00Z",
11-
"jobCompleted": "1970-01-01T10:30:00Z",
10+
"jobStarted": "job_started_placeholder",
11+
"jobCompleted": "job_completed_placeholder",
1212
"min": "2014-05-29T18:16:35Z",
1313
"max": "2014-05-29T18:16:35Z"
1414
},

modules/common-transformer-stream/src/test/resources/processing-spec/4/output/good/parquet/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "PARQUET",
@@ -93,8 +93,8 @@
9393
]
9494
},
9595
"timestamps": {
96-
"jobStarted": "1970-01-01T10:30:00Z",
97-
"jobCompleted": "1970-01-01T10:30:00Z",
96+
"jobStarted": "job_started_placeholder",
97+
"jobCompleted": "job_completed_placeholder",
9898
"min": "2021-09-17T09:05:28.590000001Z",
9999
"max": "2021-10-15T09:06:27.101185600Z"
100100
},

modules/common-transformer-stream/src/test/resources/processing-spec/5/output/good/parquet/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "PARQUET",
@@ -105,8 +105,8 @@
105105
]
106106
},
107107
"timestamps": {
108-
"jobStarted": "1970-01-01T10:30:00Z",
109-
"jobCompleted": "1970-01-01T10:30:00Z",
108+
"jobStarted": "job_started_placeholder",
109+
"jobCompleted": "job_completed_placeholder",
110110
"min": "2022-02-01T22:14:21.648Z",
111111
"max": "2022-02-02T01:01:01.648Z"
112112
},

modules/common-transformer-stream/src/test/resources/processing-spec/6/output/good/parquet/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "PARQUET",
@@ -101,8 +101,8 @@
101101
]
102102
},
103103
"timestamps": {
104-
"jobStarted": "1970-01-01T10:30:00Z",
105-
"jobCompleted": "1970-01-01T10:30:00Z",
104+
"jobStarted": "job_started_placeholder",
105+
"jobCompleted": "job_completed_placeholder",
106106
"min": "2022-02-01T22:14:21.648Z",
107107
"max": "2022-02-02T01:01:01.648Z"
108108
},

modules/common-transformer-stream/src/test/resources/processing-spec/7/output/good/parquet/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "PARQUET",
@@ -21,8 +21,8 @@
2121
]
2222
},
2323
"timestamps": {
24-
"jobStarted": "1970-01-01T10:30:00Z",
25-
"jobCompleted": "1970-01-01T10:30:00Z",
24+
"jobStarted": "job_started_placeholder",
25+
"jobCompleted": "job_completed_placeholder",
2626
"min": "2022-02-01T22:32:41.069Z",
2727
"max": "2022-02-02T01:01:01.648Z"
2828
},

modules/common-transformer-stream/src/test/resources/processing-spec/8/output/completion.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"schema": "iglu:com.snowplowanalytics.snowplow.storage.rdbloader/shredding_complete/jsonschema/2-0-1",
33
"data": {
4-
"base": "output_path_placeholder/run=1970-01-01-10-30-00/",
4+
"base": "output_path_placeholder",
55
"typesInfo": {
66
"transformation": "WIDEROW",
77
"fileFormat": "JSON",
@@ -93,8 +93,8 @@
9393
]
9494
},
9595
"timestamps": {
96-
"jobStarted": "1970-01-01T10:30:00Z",
97-
"jobCompleted": "1970-01-01T10:31:00Z",
96+
"jobStarted": "job_started_placeholder",
97+
"jobCompleted": "job_completed_placeholder",
9898
"min": "2021-10-13T20:21:47.595072674Z",
9999
"max": "2021-10-15T00:51:57.521746512Z"
100100
},

modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/BaseProcessingSpec.scala

+26-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.proce
1313
import cats.effect.{IO, Resource}
1414
import cats.effect.kernel.Ref
1515

16+
import io.circe.optics.JsonPath._
17+
import io.circe.parser.{parse => parseCirce}
18+
1619
import com.snowplowanalytics.snowplow.rdbloader.generated.BuildInfo
1720
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
1821
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.FileUtils
@@ -58,11 +61,15 @@ trait BaseProcessingSpec extends Specification {
5861
}
5962
.reduce(_ and _)
6063

61-
protected def readMessageFromResource(resource: String, outputRootDirectory: Path) =
64+
protected def readMessageFromResource(resource: String, outputRootDirectory: Path): IO[String] = ???
65+
66+
protected def readMessageFromResource(resource: String, completionMessageVars: BaseProcessingSpec.CompletionMessageVars) =
6267
readLinesFromResource(resource)
6368
.map(_.mkString)
6469
.map(
65-
_.replace("output_path_placeholder", outputRootDirectory.toNioPath.toUri.toString.replaceAll("/+$", ""))
70+
_.replace("output_path_placeholder", completionMessageVars.base.toNioPath.toUri.toString)
71+
.replace("job_started_placeholder", completionMessageVars.jobStarted)
72+
.replace("job_completed_placeholder", completionMessageVars.jobCompleted)
6673
.replace("version_placeholder", BuildInfo.version)
6774
.replace(" ", "")
6875
)
@@ -86,6 +93,15 @@ trait BaseProcessingSpec extends Specification {
8693
new String(encoder.encode(config.app.replace("file:/", "s3:/").getBytes))
8794
)
8895
}
96+
97+
def extractCompletionMessageVars(processingOutput: BaseProcessingSpec.ProcessingOutput): BaseProcessingSpec.CompletionMessageVars = {
98+
val message = processingOutput.completionMessages.head
99+
val json = parseCirce(message).toOption.get
100+
val base = root.data.base.string.getOption(json).get.stripPrefix("file://")
101+
val jobStarted = root.data.timestamps.jobStarted.string.getOption(json).get
102+
val jobCompleted = root.data.timestamps.jobCompleted.string.getOption(json).get
103+
BaseProcessingSpec.CompletionMessageVars(Path(base), jobStarted, jobCompleted)
104+
}
89105
}
90106

91107
object BaseProcessingSpec {
@@ -96,5 +112,13 @@ object BaseProcessingSpec {
96112
badrowsFromQueue: Vector[String],
97113
checkpointed: Int
98114
)
115+
final case class CompletionMessageVars(
116+
base: Path,
117+
jobStarted: String,
118+
jobCompleted: String
119+
) {
120+
def goodPath: Path = Path(s"$base/output=good")
121+
def badPath: Path = Path(s"$base/output=bad")
122+
}
99123

100124
}

modules/common-transformer-stream/src/test/scala/com/snowplowanalytics/snowplow/rdbloader/transformer/stream/common/processing/QueueBadSinkSpec.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
package com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing
1212

1313
import cats.effect.unsafe.implicits.global
14-
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.AppId
1514
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.QueueBadSinkSpec._
1615
import com.snowplowanalytics.snowplow.rdbloader.transformer.stream.common.processing.BaseProcessingSpec.TransformerConfig
1716
import fs2.io.file.Path
@@ -39,12 +38,12 @@ class QueueBadSinkSpec extends BaseProcessingSpec {
3938
inputEventsPath = "/processing-spec/1/input/events"
4039
)
4140

42-
val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)
43-
val badDirectory = outputDirectory.resolve(s"run=1970-01-01-10-30-00-${AppId.appId}/output=bad")
41+
val config = TransformerConfig(configFromPath(outputDirectory), igluConfig)
4442

4543
for {
4644
output <- process(inputStream, config)
47-
badDirectoryExists <- pathExists(badDirectory)
45+
compVars = extractCompletionMessageVars(output)
46+
badDirectoryExists <- pathExists(compVars.badPath)
4847
expectedBadRows <- readLinesFromResource("/processing-spec/1/output/bad")
4948
} yield {
5049
val actualBadRows = output.badrowsFromQueue.toList
@@ -98,7 +97,8 @@ object QueueBadSinkSpec {
9897
| "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-0",
9998
| "data": {
10099
| "cacheSize": 500,
101-
| "repositories": []
100+
| "repositories": [
101+
| ]
102102
| }
103103
|}""".stripMargin
104104
}

0 commit comments

Comments
 (0)