Skip to content

Commit 6dcc7c3

Browse files
authored
Acdc count metrics on DB table records (#52)
1 parent 9c300ef commit 6dcc7c3

File tree

9 files changed

+95
-4
lines changed

9 files changed

+95
-4
lines changed

acdc-core/src/main/scala/com/salesforce/mce/acdc/db/DatasetInstanceQuery.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ object DatasetInstanceQuery {
3535

3636
def forDataset(dataset: String) = DatasetInstanceTable().filter(_.dataset === dataset).result
3737

38+
def count() = DatasetInstanceTable().length.result
39+
3840
def filter(
3941
dataset: Option[String],
4042
like: Option[String],
@@ -59,7 +61,8 @@ object DatasetInstanceQuery {
5961
(t, d) => t.filter(r => r.dataset === d)
6062
)
6163

62-
like.foldLeft(filteredByDataset)((t, l) => t.filter(r => r.name.like(l)))
64+
like
65+
.foldLeft(filteredByDataset)((t, l) => t.filter(r => r.name.like(l)))
6366
.sortBy(sortByColumn)
6467
.drop(offset)
6568
.take(limit)

acdc-core/src/main/scala/com/salesforce/mce/acdc/db/DatasetLineageQuery.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import slick.jdbc.PostgresProfile.api._
1313

1414
object DatasetLineageQuery {
1515

16+
def count() = DatasetLineageTable().length.result
17+
1618
case class ForDestination(dest: String) {
1719

1820
def table = DatasetLineageTable().filter(_.toDataset === dest)

acdc-core/src/main/scala/com/salesforce/mce/acdc/db/DatasetQuery.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ object DatasetQuery {
2929

3030
def now() = LocalDateTime.now()
3131

32+
def count() = DatasetTable().length.result
33+
3234
case class ForName(name: String) {
3335

3436
def table = DatasetTable().filter(_.name === name)

acdc-ws/app/Module.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import com.google.inject.AbstractModule
99

1010
import services.{Metric, PrometheusMetric}
11-
import tasks.{AuthSettingReloadTask, DataInstExpirationTask}
11+
import tasks.{AuthSettingReloadTask, DataCountTask, DataInstExpirationTask}
1212
import utils.{Authorization, AuthorizationSettings}
1313

1414
class Module extends AbstractModule {
@@ -22,6 +22,8 @@ class Module extends AbstractModule {
2222
bind(classOf[Metric]).to(classOf[PrometheusMetric])
2323
// Data forget task
2424
bind(classOf[DataInstExpirationTask]).asEagerSingleton()
25+
// Data count task
26+
bind(classOf[DataCountTask]).asEagerSingleton()
2527
}
2628

2729
}

acdc-ws/app/services/Metric.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ trait Metric {
5454

5555
def clear(): Unit
5656

57+
def countDB(table: String, cnt: Double): Unit
58+
5759
}
5860

5961
@Singleton
@@ -77,6 +79,17 @@ class PrometheusMetric @Inject() (implicit ec: ExecutionContext) extends Metric
7779
.quantile(0.99d, 0.001d)
7880
.register
7981

82+
private val dbCountGauge = Gauge
83+
.build()
84+
.name("acdc_db_record_count")
85+
.labelNames("table")
86+
.help("acdc DB table records count")
87+
.register
88+
89+
override def countDB(table: String, cnt: Double): Unit = {
90+
dbCountGauge.labels(table).set(cnt)
91+
}
92+
8093
override def incrementStatusCount(status: String): Unit = {
8194
httpStatusCount.labels(status).inc()
8295
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package tasks
2+
3+
import javax.inject.Inject
4+
5+
import scala.concurrent.ExecutionContext
6+
import scala.concurrent.duration.DurationInt
7+
8+
import akka.actor.ActorSystem
9+
import play.api.Logging
10+
11+
import com.salesforce.mce.acdc.db.{
12+
AcdcDatabase,
13+
DatasetInstanceQuery,
14+
DatasetLineageQuery,
15+
DatasetQuery
16+
}
17+
import services.{DatabaseService, Metric}
18+
import utils.DbConfig
19+
20+
class DataCountTask @Inject() (
21+
actorSystem: ActorSystem,
22+
dbService: DatabaseService,
23+
dbConfig: DbConfig,
24+
metric: Metric
25+
)(implicit
26+
ec: ExecutionContext
27+
) extends Logging {
28+
29+
val countFrequency: Int = dbConfig.countTaskFrequencyMinute
30+
def db: AcdcDatabase = dbService.db
31+
32+
def refresh(): Unit = {
33+
actorSystem.scheduler.scheduleOnce(countFrequency.minutes) {
34+
val datasetCount = db.sync(DatasetQuery.count())
35+
val datasetInstanceCount = db.sync(DatasetInstanceQuery.count())
36+
val datasetLineageCount = db.sync(DatasetLineageQuery.count())
37+
38+
logger.debug(s"Counted $datasetCount records from dataset ...")
39+
logger.debug(s"Counted $datasetInstanceCount records from dataset_instance ...")
40+
logger.debug(s"Counted $datasetLineageCount records from dataset_lineage ...")
41+
42+
metric.countDB("dataset", datasetCount.toDouble)
43+
metric.countDB("dataset-instance", datasetInstanceCount.toDouble)
44+
metric.countDB("dataset-lineage", datasetLineageCount.toDouble)
45+
46+
refresh()
47+
}
48+
}
49+
50+
if (countFrequency > 0) {
51+
refresh()
52+
} else {
53+
logger.info("data count frequency is not a positive integer, data count task skipped.")
54+
}
55+
}

acdc-ws/app/utils/DbConfig.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ import scala.util.{Failure, Success, Try}
1111

1212
import com.typesafe.config.ConfigException
1313
import com.typesafe.config.ConfigFactory
14+
import play.api.Logging
1415

15-
class DbConfig() {
16+
class DbConfig() extends Logging {
1617

1718
val config = ConfigFactory.load().getConfig("acdc.db")
1819

@@ -22,4 +23,12 @@ class DbConfig() {
2223
case Failure(e) => throw e
2324
}
2425

26+
def countTaskFrequencyMinute: Int = Try(config.getInt(s"count-task-frequency-minute")) match {
27+
case Success(d) => d
28+
case Failure(e: ConfigException.Missing) =>
29+
logger.warn("No config found for count-task-frequency-minute, defaulting to 0.")
30+
0
31+
case Failure(e) => throw e
32+
}
33+
2534
}

acdc-ws/conf/application.conf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ acdc.auth = {
3232
acdc.db {
3333
# maximum length in days to keep
3434
ttl = ${?DATA_TTL}
35+
36+
# count and report records in dataset, datasetInstance, and datasetLineage tables
37+
# 0 is to skip data count task, set it to a positive Int for effective count frequency
38+
count-task-frequency-minute = 0
39+
count-task-frequency-minute = ${?COUNT_TASK_FREQUENCY_MINUTE}
3540
}
3641

3742
## Akka

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "0.8.11"
1+
ThisBuild / version := "0.8.12"

0 commit comments

Comments
 (0)