Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#26 add create checkpoint function #79

Merged
merged 15 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
18 changes: 5 additions & 13 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.model.Checkpoint
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {

Expand All @@ -36,23 +35,16 @@ class AtumAgent private[agent] () {

/**
* Sends `CheckpointDTO` to the AtumService API
* @param checkpoint
*/
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Sends `Checkpoint` to the AtumService API
*
* @param checkpoint
* @param checkpoint Already initialized Checkpoint object to store
*/
def saveCheckpoint(checkpoint: Checkpoint): Unit = {
dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO)
private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* @param atumPartitions
* @return
*/
Expand Down
57 changes: 42 additions & 15 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasuresMapper}
import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO}
import za.co.absa.atum.agent.model.Measurement.MeasurementByAtum
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

import java.time.OffsetDateTime
import java.util.UUID
import scala.collection.immutable.ListMap

/**
Expand All @@ -43,24 +45,48 @@ class AtumContext private[agent] (
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = {
??? // TODO #26
private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
measures.map { m =>
val measurementResult = m.function(df)
MeasurementByAtum(m, measurementResult.result, measurementResult.resultType)
}
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val endTime = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq
)

agent.saveCheckpoint(checkpointDTO)
this
}

def createCheckpointOnProvidedData(
checkpointName: String,
author: String,
measurements: Seq[Measurement]
): Checkpoint = {
def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()
Checkpoint(

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
atumPartitions = this.atumPartitions,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
measurements = measurements
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO)
)

agent.saveCheckpoint(checkpointDTO)
this
}

def addAdditionalData(key: String, value: String): Unit = {
Expand Down Expand Up @@ -121,21 +147,22 @@ object AtumContext {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresMapper.mapToMeasures(atumContextDTO.measures),
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData.additionalData
)
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Set a point in the pipeline to execute calculation.
* Set a point in the pipeline to execute calculation and store it.
* @param checkpointName The key assigned to this checkpoint
* @param author Author of the checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
// todo: implement checkpoint creation
atumContext.createCheckpoint(checkpointName, author, df)
df
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait MeasurementProcessor {

Expand All @@ -26,6 +27,13 @@ trait MeasurementProcessor {
}

object MeasurementProcessor {
type MeasurementFunction = DataFrame => String
/**
* The raw result of measurement is always gonna be string, because we want to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more),
* but the actual type is stored alongside the computation because we don't want to lost this information.
*/
final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType)

type MeasurementFunction = DataFrame => ResultOfMeasurement

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ class ConsoleDispatcher extends Dispatcher with Logging {
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
.post(serverUri)
.send(backend)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@

package za.co.absa.atum.agent.exception

case class UnsupportedMeasureException(msg: String) extends Exception(msg)
sealed abstract class AtumAgentException extends Exception

case class MeasurementProvidedException(msg: String) extends AtumAgentException
case class MeasureException(msg: String) extends AtumAgentException

This file was deleted.

This file was deleted.

Loading