Skip to content

Commit f691335

Browse files
committed
Example code
1 parent 48d7c78 commit f691335

File tree

7 files changed

+9153
-2
lines changed

7 files changed

+9153
-2
lines changed

LICENSE

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
MIT License
22

3-
Copyright (c) 2019 Aerospike Examples
3+
Copyright (c) 2019 Ronen Botzer and Aerospike, Inc.
44

55
Permission is hereby granted, free of charge, to any person obtaining a copy
66
of this software and associated documentation files (the "Software"), to deal

README.md

+4-1
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
1-
# modeling-iot-sensors
1+
# Modeling IoT Sensors in Aerospike
2+
3+
This is companion code for my Medium article.
4+

populate_sensor_data.py

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import print_function
3+
import argparse
4+
import aerospike
5+
from aerospike import exception as e
6+
7+
try:
8+
from aerospike_helpers.operations import list_operations as lh
9+
except:
10+
pass # Needs Aerospike client >= 3.4.0
11+
import sys
12+
13+
argparser = argparse.ArgumentParser(add_help=False)
14+
argparser.add_argument("data_file", nargs="?", default=None, help="Path to data file")
15+
argparser.add_argument(
16+
"--help", dest="help", action="store_true", help="Displays this message."
17+
)
18+
argparser.add_argument(
19+
"-U",
20+
"--username",
21+
dest="username",
22+
metavar="<USERNAME>",
23+
help="Username to connect to database.",
24+
)
25+
argparser.add_argument(
26+
"-P",
27+
"--password",
28+
dest="password",
29+
metavar="<PASSWORD>",
30+
help="Password to connect to database.",
31+
)
32+
argparser.add_argument(
33+
"-h",
34+
"--host",
35+
dest="host",
36+
default="127.0.0.1",
37+
metavar="<ADDRESS>",
38+
help="Address of Aerospike server.",
39+
)
40+
argparser.add_argument(
41+
"-p",
42+
"--port",
43+
dest="port",
44+
type=int,
45+
default=3000,
46+
metavar="<PORT>",
47+
help="Port of the Aerospike server.",
48+
)
49+
argparser.add_argument(
50+
"-n",
51+
"--namespace",
52+
dest="namespace",
53+
default="test",
54+
metavar="<NS>",
55+
help="Port of the Aerospike server.",
56+
)
57+
argparser.add_argument(
58+
"-s",
59+
"--set",
60+
dest="set",
61+
default="sensor_data",
62+
metavar="<SET>",
63+
help="Port of the Aerospike server.",
64+
)
65+
argparser.add_argument(
66+
"-i",
67+
"--sensor",
68+
dest="sensor_id",
69+
type=int,
70+
default=1,
71+
metavar="<SENSOR-ID>",
72+
help="Sensor ID",
73+
)
74+
argparser.add_argument(
75+
"-q", "--quiet", dest="quiet", action="store_true", help="Quiet Mode"
76+
)
77+
options = argparser.parse_args()
78+
if options.help or not options.data_file:
79+
argparser.print_help()
80+
print()
81+
sys.exit(1)
82+
83+
84+
def version_tuple(version):
85+
return tuple(int(i) for i in version.split("."))
86+
87+
88+
if options.namespace and options.namespace != "None":
89+
namespace = options.namespace
90+
else:
91+
namespace = None
92+
set = options.set if options.set and options.set != "None" else None
93+
94+
config = {"hosts": [(options.host, options.port)]}
95+
try:
96+
client = aerospike.client(config).connect(options.username, options.password)
97+
policy = {"key": aerospike.POLICY_KEY_SEND}
98+
except e.ClientError as e:
99+
if not options.quiet:
100+
print("Error: {0} [{1}]".format(e.msg, e.code))
101+
sys.exit(2)
102+
103+
version = client.info_all("version")
104+
release = list(version.values())[0][1].split(" ")[-1]
105+
if (version_tuple(aerospike.__version__) < version_tuple("3.4.0")
106+
or version_tuple(release) < version_tuple("4.0")):
107+
if not options.quiet:
108+
print(
109+
"\nPlease use Python client >= 3.4.0, ",
110+
"Aerospike database >= 4.0 for this example.",
111+
)
112+
sys.exit(3)
113+
114+
sensor_id = options.sensor_id
115+
spacer = "=" * 30
116+
minute = 0
117+
f = open(options.data_file, "r")
118+
for line in f:
119+
try:
120+
try:
121+
_, h, t = line.split(",")
122+
try:
123+
prev_hour
124+
except:
125+
prev_temp = int(float(t.strip()[1:-1]) * 10)
126+
prev_day = h[1:6]
127+
prev_hour = int(h[7:9])
128+
continue
129+
temp = int(float(t.strip()[1:-1]) * 10)
130+
day = h[1:6]
131+
hour = int(h[7:9])
132+
readings = []
133+
step = (temp - prev_temp) / 60.0
134+
for i in range(0, 60):
135+
prev_temp = prev_temp + step
136+
readings.append([minute, int(prev_temp)])
137+
minute = minute + 1
138+
key = (namespace, set, "sensor{}-{}".format(sensor_id, prev_day))
139+
if not options.quiet:
140+
print(spacer)
141+
print("Day {0} hour {1}".format(prev_day, prev_hour))
142+
print(readings)
143+
client.operate(key, [lh.list_append_items("t", readings)], policy=policy)
144+
if day != prev_day:
145+
minute = 0
146+
prev_temp = temp
147+
prev_day = day
148+
prev_hour = hour
149+
except ValueError as e:
150+
if not options.quiet:
151+
print(e)
152+
pass
153+
except IndexError:
154+
pass
155+
f.close()
156+
if not options.quiet:
157+
print(spacer)
158+
print("Sensor {} data for December 31".format(sensor_id))
159+
k, m, b = client.get((namespace, set, "sensor{}-12-31".format(sensor_id)))
160+
print(b)
161+
print("Above is Sensor {} data for December 31".format(sensor_id))
162+
print(spacer)
163+
client.close()

query_iot_data.py

+204
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import print_function
3+
import argparse
4+
import aerospike
5+
from aerospike import exception as e
6+
try:
7+
from aerospike_helpers.operations import list_operations as lh
8+
except:
9+
pass # Needs Aerospike client >= 3.4.0
10+
from aerospike import predexp as pxp
11+
import datetime
12+
from datetime import timedelta
13+
import pprint
14+
import sys
15+
16+
argparser = argparse.ArgumentParser(add_help=False)
17+
argparser.add_argument(
18+
"--help", dest="help", action="store_true", help="Displays this message."
19+
)
20+
argparser.add_argument(
21+
"-U",
22+
"--username",
23+
dest="username",
24+
metavar="<USERNAME>",
25+
help="Username to connect to database.",
26+
)
27+
argparser.add_argument(
28+
"-P",
29+
"--password",
30+
dest="password",
31+
metavar="<PASSWORD>",
32+
help="Password to connect to database.",
33+
)
34+
argparser.add_argument(
35+
"-h",
36+
"--host",
37+
dest="host",
38+
default="127.0.0.1",
39+
metavar="<ADDRESS>",
40+
help="Address of Aerospike server.",
41+
)
42+
argparser.add_argument(
43+
"-p",
44+
"--port",
45+
dest="port",
46+
type=int,
47+
default=3000,
48+
metavar="<PORT>",
49+
help="Port of the Aerospike server.",
50+
)
51+
argparser.add_argument(
52+
"-n",
53+
"--namespace",
54+
dest="namespace",
55+
default="test",
56+
metavar="<NS>",
57+
help="Port of the Aerospike server.",
58+
)
59+
argparser.add_argument(
60+
"-s",
61+
"--set",
62+
dest="set",
63+
default="sensor_data",
64+
metavar="<SET>",
65+
help="Port of the Aerospike server.",
66+
)
67+
argparser.add_argument(
68+
"--sensor",
69+
dest="sensor_id",
70+
type=int,
71+
default=1,
72+
metavar="<SENSOR-ID>",
73+
help="Sensor ID",
74+
)
75+
argparser.add_argument(
76+
"-i",
77+
"--interactive",
78+
dest="interactive",
79+
action="store_true",
80+
help="Interactive Mode",
81+
)
82+
options = argparser.parse_args()
83+
if options.help:
84+
argparser.print_help()
85+
print()
86+
sys.exit(1)
87+
88+
89+
def version_tuple(version):
90+
return tuple(int(i) for i in version.split("."))
91+
92+
93+
def pause():
94+
input("Hit return to continue")
95+
96+
97+
def print_sensor_data(rec, pp):
98+
k, _, b = rec
99+
print(k[2])
100+
print(b['t'])
101+
print("=" * 30)
102+
103+
104+
if options.namespace and options.namespace != "None":
105+
namespace = options.namespace
106+
else:
107+
namespace = None
108+
set = options.set if options.set and options.set != "None" else None
109+
110+
config = {"hosts": [(options.host, options.port)]}
111+
try:
112+
client = aerospike.client(config).connect(options.username, options.password)
113+
policy = {"key": aerospike.POLICY_KEY_SEND}
114+
except e.ClientError as e:
115+
if not options.quiet:
116+
print("Error: {0} [{1}]".format(e.msg, e.code))
117+
sys.exit(2)
118+
119+
version = client.info_all("version")
120+
release = list(version.values())[0][1].split(" ")[-1]
121+
if version_tuple(aerospike.__version__) < version_tuple("3.4.0") or version_tuple(
122+
release
123+
) < version_tuple("4.0"):
124+
print(
125+
"\nPlease use Python client >= 3.4.0, ",
126+
"Aerospike database >= 4.0 for this example.",
127+
)
128+
sys.exit(3)
129+
130+
pp = pprint.PrettyPrinter(indent=2)
131+
sensor_id = options.sensor_id
132+
spacer = "=" * 30
133+
minute = 0
134+
key = (namespace, set, "sensor{}-12-31".format(sensor_id))
135+
print("\nRetrieve sensor{} data for 8-11am, December 31st".format(sensor_id))
136+
if options.interactive:
137+
pause()
138+
starts = 8 * 60
139+
ends = 11 * 60
140+
try:
141+
ops = [
142+
lh.list_get_by_value_range(
143+
"t",
144+
aerospike.LIST_RETURN_VALUE,
145+
[starts, aerospike.null()],
146+
[ends, aerospike.null()],
147+
)
148+
]
149+
_, _, b = client.operate(key, ops)
150+
pp.pprint(b["t"])
151+
print(spacer)
152+
153+
print("\nGet sensor{} data for April 2nd".format(sensor_id))
154+
key = (namespace, set, "sensor{}-04-02".format(sensor_id))
155+
if options.interactive:
156+
pause()
157+
_, _, b = client.get(key)
158+
pp.pprint(b["t"])
159+
print(spacer)
160+
161+
print("\nGet a year's data for sensor{}".format(sensor_id))
162+
if options.interactive:
163+
pause()
164+
dt = datetime.datetime(2018, 1, 1, 0, 0, 0)
165+
keys = []
166+
for i in range(1, 366):
167+
keys.append((namespace, set,"sensor{}-{:02d}-{:02d}".format(sensor_id, dt.month, dt.day)))
168+
dt = dt + timedelta(days=1)
169+
sensor_year = client.get_many(keys)
170+
for rec in sensor_year:
171+
k, _, b = rec
172+
print(k[2])
173+
pp.pprint(b["t"])
174+
print(spacer)
175+
176+
print("\nGet the data from all sensors for June 19")
177+
if options.interactive:
178+
pause()
179+
dt = datetime.datetime(2018, 1, 1, 0, 0, 0)
180+
keys = []
181+
for i in range(1, 1001):
182+
keys.append((namespace, set,"sensor{}-06-19".format(i)))
183+
one_day_all_sensors = client.get_many(keys)
184+
for rec in one_day_all_sensors:
185+
k, _, b = rec
186+
print(k[2])
187+
pp.pprint(b["t"])
188+
print(spacer)
189+
190+
print("\nScan for a random sampling (about 0.25%) of all the sensor data")
191+
if options.interactive:
192+
pause()
193+
predexp = [
194+
pxp.rec_digest_modulo(365),
195+
pxp.integer_value(1),
196+
pxp.integer_equal()
197+
]
198+
query = client.query(namespace, set)
199+
query.predexp(predexp)
200+
query.foreach(print_sensor_data)
201+
202+
except e.RecordError as e:
203+
print("Error: {0} [{1}]".format(e.msg, e.code))
204+
client.close()

0 commit comments

Comments
 (0)