forked from snchit/Advanced-Data-Science-with-IBM
-
Notifications
You must be signed in to change notification settings - Fork 0
/
assignment3.1.py
191 lines (106 loc) · 6.43 KB
/
assignment3.1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python
# coding: utf-8
# # Assignment 3
#
# Welcome to Assignment 3. This will be even more fun. Now we will calculate statistical measures on the test data you have created.
#
# YOU ARE NOT ALLOWED TO USE ANY OTHER 3RD PARTY LIBRARIES LIKE PANDAS. PLEASE ONLY MODIFY CONTENT INSIDE THE FUNCTION SKELETONS
# Please read why: https://www.coursera.org/learn/exploring-visualizing-iot-data/discussions/weeks/3/threads/skjCbNgeEeapeQ5W6suLkA
# . Just make sure you hit the play button on each cell from top to down. There are seven functions you have to implement. Please also make sure than on each change on a function you hit the play button again on the corresponding cell to make it available to the rest of this notebook.
# Please also make sure to only implement the function bodies and DON'T add any additional code outside functions since this might confuse the autograder.
#
# So the function below is used to make it easy for you to create a data frame from a cloudant data frame using the so called "DataSource" which is some sort of a plugin which allows ApacheSpark to use different data sources.
#
# All functions can be implemented using DataFrames, ApacheSparkSQL or RDDs. We are only interested in the result. You are given the reference to the data frame in the "df" parameter and in case you want to use SQL just use the "spark" parameter which is a reference to the global SparkSession object. Finally if you want to use RDDs just use "df.rdd" for obtaining a reference to the underlying RDD object.
#
# Let's start with the first function. Please calculate the minimal temperature for the test data set you have created. We've provided a little skeleton for you in case you want to use SQL. You can use this skeleton for all subsequent functions. Everything can be implemented using SQL only if you like.
# In[7]:
def minTemperature(df,spark):
return spark.sql("SELECT min(temperature) as mintemp from washing").first().mintemp
# Please now do the same for the mean of the temperature
# In[8]:
def meanTemperature(df,spark):
return spark.sql("SELECT avg(temperature) as meantemp from washing").first().meantemp
# Please now do the same for the maximum of the temperature
# In[9]:
def maxTemperature(df,spark):
return spark.sql("SELECT max(temperature) as maxtemp from washing").first().maxtemp
# Please now do the same for the standard deviation of the temperature
# In[10]:
def sdTemperature(df,spark):
return spark.sql("SELECT stddev(temperature) as stddevtemp from washing").first().stddevtemp
# Please now do the same for the skew of the temperature. Since the SQL statement for this is a bit more complicated we've provided a skeleton for you. You have to insert custom code at four position in order to make the function work. Alternatively you can also remove everything and implement if on your own. Note that we are making use of two previously defined functions, so please make sure they are correct. Also note that we are making use of python's string formatting capabilitis where the results of the two function calls to "meanTemperature" and "sdTemperature" are inserted at the "%s" symbols in the SQL string.
# In[11]:
def skewTemperature(df,spark):
return spark.sql("""
SELECT
(
1/count(temperature)
) *
SUM (
POWER(temperature-%s,3)/POWER(%s,3)
)
as sktemperature from washing
""" %(meanTemperature(df,spark),sdTemperature(df,spark))).first().sktemperature
# Kurtosis is the 4th statistical moment, so if you are smart you can make use of the code for skew which is the 3rd statistical moment. Actually only two things are different.
# In[12]:
def kurtosisTemperature(df,spark):
return spark.sql("""
SELECT
(
1/count(temperature)
) *
SUM (
POWER(temperature-%s,4)/POWER(%s,4)
)
as ktemperature from washing
""" %(meanTemperature(df,spark),sdTemperature(df,spark))).first().ktemperature
# Just a hint. This can be solved easily using SQL as well, but as shown in the lecture also using RDDs.
# In[13]:
def correlationTemperatureHardness(df,spark):
return df.stat.corr("temperature", "hardness")
# ### PLEASE DON'T REMOVE THIS BLOCK - THE FOLLOWING CODE IS NOT GRADED
# #axx
# ### PLEASE DON'T REMOVE THIS BLOCK - THE FOLLOWING CODE IS NOT GRADED
# Now it is time to grab a PARQUET file and create a dataframe out of it. Using SparkSQL you can handle it like a database.
# In[14]:
get_ipython().system('wget https://github.com/IBM/coursera/blob/master/coursera_ds/washing.parquet?raw=true')
get_ipython().system('mv washing.parquet?raw=true washing.parquet')
# In[15]:
df = spark.read.parquet('washing.parquet')
df.createOrReplaceTempView('washing')
df.show()
# Now it is time to connect to the object store and read a PARQUET file and create a dataframe out of it. We've created that data for you already. Using SparkSQL you can handle it like a database.
# In[16]:
import ibmos2spark
# @hidden_cell
credentials = {
'endpoint': 'https://s3-api.us-geo.objectstorage.service.networklayer.com',
'api_key': 'PUJMZf9PLqN4y-6NUtVlEuq6zFoWhfuecFVMYLBrkxrT',
'service_id': 'iam-ServiceId-9cd8e66e-3bb4-495a-807a-588692cca4d0',
'iam_service_endpoint': 'https://iam.bluemix.net/oidc/token'}
configuration_name = 'os_b0f1407510994fd1b793b85137baafb8_configs'
cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Since JSON data can be semi-structured and contain additional metadata, it is possible that you might face issues with the DataFrame layout.
# Please read the documentation of 'SparkSession.read()' to learn more about the possibilities to adjust the data loading.
# PySpark documentation: http://spark.apache.org/docs/2.0.2/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.json
df = spark.read.parquet(cos.url('washing.parquet', 'courseradsnew-donotdelete-pr-1hffrnl2pprwut'))
df.createOrReplaceTempView('washing')
df.show()
# In[17]:
minTemperature(df,spark)
# In[18]:
meanTemperature(df,spark)
# In[19]:
maxTemperature(df,spark)
# In[20]:
sdTemperature(df,spark)
# In[21]:
skewTemperature(df,spark)
# In[22]:
kurtosisTemperature(df,spark)
# In[23]:
correlationTemperatureHardness(df,spark)
# Congratulations, you are done, please download this notebook as python file using the export function and submit is to the gader using the filename "assignment3.1.py"