Skip to content

Commit 1eedf38

Browse files
author
Christian Herrera
committed
Add lessons course
1 parent 995037c commit 1eedf38

File tree

98 files changed

+4264
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+4264
-3
lines changed

.gitignore

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
dist/*
2+
target/
3+
spark-warehouse/
4+
project/boot/
5+
project/plugins/project/
6+
.cache
7+
8+
### Scala ###
9+
*.class
10+
*.log
11+
12+
# Embedded metastore
13+
derby.log
14+
metastore_db/
15+

.scalafmt.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
version=2.5.2
2+
align.preset = more
3+
maxColumn = 80
4+
importSelectors = singleLine

README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<p align="center">
2+
<a href="https://codely.com">
3+
<img src="https://user-images.githubusercontent.com/10558907/170513882-a09eee57-7765-4ca4-b2dd-3c2e061fdad0.png" width="300px" height="92px" alt="Codely logo"/>
4+
</a>
5+
</p>
6+
7+
<h1 align="center">
8+
🎇 Spark for programmers
9+
</h1>
10+
11+
<p align="center">
12+
<a href="https://github.com/CodelyTV"><img src="https://img.shields.io/badge/Codely-OS-green.svg?style=flat-square" alt="Codely Open Source projects"/></a>
13+
<a href="https://pro.codely.com"><img src="https://img.shields.io/badge/Codely-Pro-black.svg?style=flat-square" alt="Codely Pro courses"/></a>
14+
</p>
15+
16+
<p align="center">
17+
Learn Apache Spark from scratch with a practical approach
18+
</p>
19+
20+
<p align="center">
21+
<a href="https://github.com/CodelyTV/spark_for_devs-course/stargazers">Stars are welcome 😊</a><br><br>
22+
Course (Spanish): <a href="#">Spark for programmers</a>
23+
</p>

build.sbt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
scalaVersion := "2.12.12"
2+
version := "0.1.0-SNAPSHOT"
3+
name := "spark-for-programmers-course"
4+
organization := "com.codely"
5+
6+
val sparkVesion = "3.5.0"
7+
8+
libraryDependencies ++= Seq(
9+
"org.apache.spark" %% "spark-core" % sparkVesion,
10+
"org.apache.spark" %% "spark-sql" % sparkVesion,
11+
"org.apache.spark" %% "spark-hive" % sparkVesion,
12+
"org.apache.spark" %% "spark-streaming" % sparkVesion,
13+
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVesion,
14+
"io.delta" %% "delta-spark" % "3.1.0",
15+
// "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.375",
16+
"org.apache.hadoop" % "hadoop-aws" % "3.2.2",
17+
"com.rabbitmq" % "amqp-client" % "5.12.0",
18+
"com.typesafe" % "config" % "1.4.1",
19+
//"org.apache.hadoop" % "hadoop-common" % "3.3.1",
20+
"org.scalatest" %% "scalatest" % "3.2.18" % Test,
21+
"org.scalatest" %% "scalatest-flatspec" % "3.2.18" % Test,
22+
"com.dimafeng" %% "testcontainers-scala" % "0.40.12" % Test,
23+
"com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.12" % Test,
24+
"com.dimafeng" %% "testcontainers-scala-postgresql" % "0.41.4" % Test,
25+
"org.postgresql" % "postgresql" % "9.4.1207" % Test,
26+
"org.mockito" %% "mockito-scala" % "1.16.42" % Test
27+
)
28+
29+
assembly / mainClass := Some(
30+
"com.codely.lesson_07_spark_optimize_and_monitoring.video_01__deploy_application.DeploySparkApp"
31+
)
32+
33+
assembly / assemblyMergeStrategy := {
34+
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
35+
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") =>
36+
MergeStrategy.first
37+
case _ => MergeStrategy.first
38+
}

project/build.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version = 1.9.8

project/plugins.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
2+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2")

src/main/scala/com/codely/lesson_01__discover_apache_spark/video_01__from_excel_to_sql/FromCSVToSQL.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ object FromCSVToSQL extends App {
1919
.csv(pathNetflixFile)
2020
.createOrReplaceTempView("netflix")
2121

22-
spark.sql("select * from netflix")
22+
spark
23+
.sql("select * from netflix")
2324
.show()
2425

2526
// Making use of the csv data source options
@@ -32,7 +33,8 @@ object FromCSVToSQL extends App {
3233
.csv(pathNetflixFile)
3334
.createOrReplaceTempView("netflix")
3435

35-
spark.sql("SELECT * FROM netflix LIMIT 10")
36+
spark
37+
.sql("SELECT * FROM netflix LIMIT 10")
3638
.show()
3739

3840
spark
@@ -60,7 +62,8 @@ object FromCSVToSQL extends App {
6062
|""".stripMargin)
6163
.show()
6264

63-
spark.sql("""
65+
spark
66+
.sql("""
6467
| SELECT lower(word), count(*) AS count
6568
| FROM (
6669
| SELECT explode(split(title, ' ')) as word from netflix
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.codely.lesson_01__discover_apache_spark.video_02__trafffic_bottleneck_detection
2+
3+
import org.apache.spark.sql.SparkSession
4+
import org.apache.spark.sql.functions.col
5+
import org.apache.spark.sql.streaming.Trigger
6+
7+
private object TrafficBottleneckDetection extends App {
8+
9+
val SpeedThreshold = 60
10+
val SocketPort = 9999
11+
12+
val spark = SparkSession.builder
13+
.master("local[*]")
14+
.getOrCreate()
15+
16+
spark.sparkContext.setLogLevel("WARN")
17+
18+
val lines = spark.readStream
19+
.format("socket")
20+
.option("host", "localhost")
21+
.option("port", SocketPort)
22+
.load()
23+
24+
import spark.implicits._
25+
26+
val trafficData = lines
27+
.as[String]
28+
.map(line => line.split(","))
29+
.map(arr => (arr(0), arr(1).toInt))
30+
.toDF("segmentID", "speed")
31+
32+
val averageSpeeds = trafficData.groupBy("segmentID").avg("speed")
33+
34+
val trafficJams = averageSpeeds.filter(col("avg(speed)") < SpeedThreshold)
35+
36+
val query = trafficJams.writeStream
37+
.format("console")
38+
.trigger(Trigger.ProcessingTime("5 seconds"))
39+
.outputMode("complete")
40+
.start()
41+
42+
query.awaitTermination()
43+
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.codely.lesson_01__discover_apache_spark.video_02__trafffic_bottleneck_detection
2+
3+
import java.io.PrintStream
4+
import java.net.ServerSocket
5+
import scala.util.Random
6+
7+
private object TrafficDataGenerator extends App {
8+
val Port = 9999
9+
val MinSpeed = 10
10+
val MaxSpeed = 110
11+
val NumSegments = 5
12+
val RefreshRate = 500 // Milliseconds
13+
14+
val server = new ServerSocket(Port)
15+
println(s"Data generator server started at port $Port...")
16+
17+
val client = server.accept()
18+
println("Client connected.")
19+
20+
val out = new PrintStream(client.getOutputStream)
21+
22+
try {
23+
while (true) {
24+
sendTrafficData(generateTrafficData)
25+
Thread.sleep(RefreshRate)
26+
}
27+
} catch {
28+
case e: Exception => e.printStackTrace()
29+
} finally {
30+
out.close()
31+
client.close()
32+
server.close()
33+
println("Server stopped.")
34+
}
35+
36+
private def sendTrafficData(data: String): Unit = {
37+
println(s"Sending data: $data")
38+
out.println(data)
39+
out.flush()
40+
}
41+
42+
private def generateTrafficData = {
43+
val segmentId = Random.nextInt(NumSegments) + 1 // 1 to 5
44+
val speed = Random.nextInt(MaxSpeed - MinSpeed + 1) + MinSpeed // 10 to 110
45+
s"$segmentId,$speed"
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.codely.lesson_01__discover_apache_spark.video_03__intro_domain_events_analysis
2+
3+
import org.apache.spark.sql.{DataFrame, SparkSession}
4+
import org.apache.spark.sql.functions.{col, desc, explode, lit, month}
5+
import org.apache.spark.sql.types._
6+
7+
private object HighPriceProductsPurchased extends App {
8+
9+
val spark = SparkSession
10+
.builder()
11+
.appName("HighPriceProductsPurchased")
12+
.master("local[8]")
13+
.getOrCreate()
14+
15+
spark.sparkContext.setLogLevel("WARN")
16+
17+
val purchasedCompletedFilePath =
18+
"src/main/scala/com/codely/lesson_01__discover_apache_spark/video_03__intro_domain_events_analysis/data/purchasecompleted.json"
19+
20+
spark.read
21+
.format("json")
22+
.load(purchasedCompletedFilePath)
23+
24+
val productPurchasedDF: DataFrame = spark.read
25+
.json(purchasedCompletedFilePath)
26+
27+
productPurchasedDF.show()
28+
productPurchasedDF.printSchema()
29+
30+
val productPurchasedSchema: StructType = StructType(
31+
Array(
32+
StructField("eventId", StringType),
33+
StructField("eventType", StringType),
34+
StructField(
35+
"products",
36+
ArrayType(
37+
StructType(
38+
Array(
39+
StructField("productId", StringType),
40+
StructField("quantity", IntegerType),
41+
StructField("description", StringType),
42+
StructField("category", StringType),
43+
StructField("price", FloatType)
44+
)
45+
)
46+
)
47+
),
48+
StructField("timestamp", TimestampType),
49+
StructField("transactionId", StringType),
50+
StructField("userId", StringType)
51+
)
52+
)
53+
54+
val productPurchasedWithSchemaDF = spark.read
55+
.schema(productPurchasedSchema)
56+
.json(purchasedCompletedFilePath)
57+
58+
productPurchasedWithSchemaDF.printSchema()
59+
60+
productPurchasedWithSchemaDF
61+
.select(col("transactionId"))
62+
.show()
63+
64+
productPurchasedWithSchemaDF
65+
.filter(month(col("timestamp")) === 2)
66+
.show()
67+
68+
productPurchasedWithSchemaDF
69+
.withColumn("new_column", lit("codely"))
70+
.show(false)
71+
72+
val februaryTransactions =
73+
productPurchasedWithSchemaDF.filter(month(col("timestamp")) === 2)
74+
75+
val explodedTransactions = februaryTransactions
76+
.withColumn("product", explode(col("products")).as("product"))
77+
.select(
78+
col("timestamp"),
79+
col("transactionId"),
80+
col("product.description"),
81+
col("product.category"),
82+
col("product.price")
83+
)
84+
85+
explodedTransactions.show(false)
86+
87+
explodedTransactions
88+
.filter(col("category").isin("Electronics", "Gaming"))
89+
.orderBy(desc("price"))
90+
.dropDuplicates("description")
91+
.limit(5)
92+
.show(false)
93+
}

0 commit comments

Comments
 (0)