forked from supergarotinho/ambari-mongodb
-
Notifications
You must be signed in to change notification settings - Fork 1
/
service_advisor.py
480 lines (392 loc) · 23.9 KB
/
service_advisor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#!/usr/bin/env ambari-python-wrap
import re
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
"""
The naming convention for ServiceAdvisor subclasses depends on whether they are
in common-services or are part of the stack version's services.
In common-services, the naming convention is <service_name><service_version>ServiceAdvisor.
In the stack, the naming convention is <stack_name><stack_version><service_name>ServiceAdvisor.
Unlike the StackAdvisor, the ServiceAdvisor does NOT provide any inheritance.
If you want to use inheritance to augment a previous version of a service's
advisor you can use the following code to dynamically load the previous advisor.
Some changes will be need to provide the correct path and class names.
"""
import os
import imp
import traceback
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PARENT_DIR = os.path.join(SCRIPT_DIR, '../../../../../stacks/')
PARENT_FILE = os.path.join(PARENT_DIR, 'service_advisor.py')
try:
with open(PARENT_FILE, 'rb') as fp:
service_advisor = imp.load_module('service_advisor', fp, PARENT_FILE, ('.py', 'rb', imp.PY_SOURCE))
except Exception as e:
traceback.print_exc()
print "Failed to load parent"
from stack_advisor import DefaultStackAdvisor
from resource_management.core.logger import Logger
from collections import namedtuple
ShardNumbers = namedtuple('ShardNumbers', 'numberOfInstances numberOfArbiters')
# HDP23StackAdvisor
# DefaultStackAdvisor
# service_advisor.ServiceAdvisor
class HDP23MongoDBServiceAdvisor(service_advisor.ServiceAdvisor):
"""
"""
MONGOS_COMPONENT_NAME = 'MONGOS'
MONGODC_COMPONENT_NAME = 'MONGODC'
MONGOD_COMPONENT_NAME = 'MONGODB'
CLUSTER_DEFINITION_CONF_NAME = 'cluster_definition'
PORTS_CONF_NAME = 'ports'
def __init__(self, *args, **kwargs):
Logger.initialize_logger()
Logger.info("HDP23MongoDBServiceAdvisor has been created!")
self.as_super = super(HDP23MongoDBServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)
"""
If any components of the service should be colocated with other services,
this is where you should set up that layout. Example:
# colocate HAWQSEGMENT with DATANODE, if no hosts have been allocated for HAWQSEGMENT
hawqSegment = [component for component in serviceComponents if component["StackServiceComponents"]["component_name"] == "HAWQSEGMENT"][0]
if not self.isComponentHostsPopulated(hawqSegment):
for hostName in hostsComponentsMap.keys():
hostComponents = hostsComponentsMap[hostName]
if {"name": "DATANODE"} in hostComponents and {"name": "HAWQSEGMENT"} not in hostComponents:
hostsComponentsMap[hostName].append( { "name": "HAWQSEGMENT" } )
if {"name": "DATANODE"} not in hostComponents and {"name": "HAWQSEGMENT"} in hostComponents:
hostComponents.remove({"name": "HAWQSEGMENT"})
"""
def colocateService(self, hostsComponentsMap, serviceComponents):
pass
"""
Any configuration recommendations for the service should be defined in this function.
This should be similar to any of the recommendXXXXConfigurations functions in the stack_advisor.py
such as recommendYARNConfigurations().
"""
def getServiceConfigurationRecommendations(self, configurations, clusterSummary, services, hosts):
return []
"""
Returns an array of Validation objects about issues with the hostnames to which components are assigned.
This should detect validation issues which are different than those the stack_advisor.py detects.
The default validations are in stack_advisor.py getComponentLayoutValidations function.
"""
def getServiceComponentLayoutValidations(self, services, hosts):
Logger.info("Initiating MongoDB layout validation ...")
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
mongodHosts = self.getHosts(componentsList, self.MONGOD_COMPONENT_NAME)
mongoConfHosts = self.getHosts(componentsList, self.MONGODC_COMPONENT_NAME)
mongosHosts = self.getHosts(componentsList, self.MONGOS_COMPONENT_NAME)
Logger.info("Mongod hosts: "+str(mongodHosts))
Logger.info("MongoConf hosts: "+str(mongoConfHosts))
Logger.info("Mongos hosts: "+str(mongosHosts))
items = []
if (len(mongosHosts) > 0) and (len(mongoConfHosts) == 0):
message = "For a sharding cluster, it must have Mongo Config Instances"
items.append({"type": 'host-component', "level": 'ERROR', "message": message,
"component-name": self.MONGODC_COMPONENT_NAME})
if (len(mongoConfHosts) > 0) and (len(mongosHosts) == 0):
message = "For a sharding cluster, it must have one or more Mongos Query Router services"
items.append({"type": 'host-component', "level": 'ERROR', "message": message,
"component-name": self.MONGOS_COMPONENT_NAME})
return items
def getWarnItem(self,siteName,prop_name,message):
return {'config-type': siteName,
'message': message,
'type': 'configuration',
'config-name': prop_name,
'level': 'WARN'}
def getErrorItem(self,siteName,prop_name,message):
return {'config-type': siteName,
'message': message,
'type': 'configuration',
'config-name': prop_name,
'level': 'ERROR'}
def validateIfRootDir(self, siteName, properties, validationItems, prop_name, display_name):
root_dir = '/'
if prop_name in properties and properties[prop_name].strip() == root_dir:
message = "It is not advisable to have " + display_name + " at " + root_dir + ". Consider creating a sub " \
"directory for it"
validationItems.append(self.getWarnItem(siteName,prop_name,message))
def checkForMultipleDirs(self, siteName, properties, validationItems, prop_name, display_name):
# check for delimiters space, comma, colon and semi-colon
if prop_name in properties and len(re.sub(r'[,;:]', ' ', properties[prop_name]).split(' ')) > 1:
message = "Multiple directories for " + display_name + " are not allowed."
validationItems.append(self.getWarnItem(siteName,prop_name,message))
def parsePortsConfig(self, ports_string):
"""
Parse the user ports configuration
:param ports_string: The ports configuration string
:type ports_string: str
:return: A list of ports
:rtype list[str]
"""
ports_string_list = ports_string.split(",")
ports = []
for spec in ports_string_list:
if spec.find("-") > -1:
limits = spec.split("-")
for i in range(int(limits[0]), int(limits[1]) + 1):
ports.append(str(i))
else:
ports.append(spec)
return ports
def validatePortConfig(self, siteName, properties, validationItems, prop_name):
"""
Check if the specified in the correct format
"""
if prop_name in properties:
ports = self.parsePortsConfig(properties[prop_name])
for port in ports:
try:
port_str = int(port)
except:
message = "The ports configuration is not valid. Please follow the recommended format."
validationItems.append(self.getErrorItem(siteName,prop_name,message))
break
def checkForInexistentNodes(self, siteName, properties, validationItems, prop_name, ambari_hosts, component_display_name):
Logger.info("Checking for inexistent nodes on " + component_display_name)
Logger.info("Configuration name: " + prop_name)
if prop_name in properties:
cluster_definition = properties[prop_name]
Logger.info("Cluster definition: " + cluster_definition)
if len(cluster_definition.strip()) > 0:
nodes_instances = {}
cluster_shards = cluster_definition.split(";")
for shard in cluster_shards:
shard_nodes = shard.split(",")
for node in shard_nodes:
if node.find("/arbiter") > -1:
node_name = node[0:node.find("/arbiter")]
else:
node_name = node
if node_name not in ambari_hosts:
message = "The node " + node_name + " in the shard \"" + shard + "\" does not have the " + \
component_display_name + " component installed on Ambari. The Ambari nodes " \
"are: " + str.join(",", ambari_hosts)
validationItems.append(self.getErrorItem(siteName,prop_name,message))
# This will be used to check if some node does not have instances
if nodes_instances.has_key(node_name):
nodes_instances[node_name] += 1
else:
nodes_instances[node_name] = 1
for node in ambari_hosts:
if not nodes_instances.has_key(node):
message = "The node " + node + " in ambari does not have any instances of " + \
component_display_name + " configured. You must remove the node in ambari install or " \
"add some instances for it in the cluster configuration"
validationItems.append(self.getWarnItem(siteName, prop_name, message))
def getMinimumNumberOfPorts(self, cluster_definition):
"""
Returns the minimum number of ports needed for a Node given the cluster definition
:param cluster_definition: The config string for the cluster
:type cluster_definition: str
:return: int
"""
nodes_ports = {}
if len(cluster_definition.strip()) > 0:
cluster_shards = cluster_definition.split(";")
for shard in cluster_shards:
shard_nodes = shard.split(",")
for node in shard_nodes:
if node.find("/arbiter") > -1:
node_name = node[0:node.find("/arbiter")]
else:
node_name = node
if nodes_ports.has_key(node_name):
nodes_ports[node_name] += 1
else:
nodes_ports[node_name] = 1
if len(nodes_ports) > 0:
return max(nodes_ports.values())
else:
return 0
def validateMongoDBConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
siteName = "mongodb"
validationItems = []
# 1. Check in some of directories are the root dir or if they have with special chars
directories = {
'db_path': 'MongoDB DB Path Prefix',
'log_path': 'MongoDB Log Path',
'pid_db_path': 'MongoDB PID Path'
}
for property_name, display_name in directories.iteritems():
self.validateIfRootDir(siteName, properties, validationItems, property_name, display_name)
self.checkForMultipleDirs(siteName, properties, validationItems, property_name, display_name)
return validationItems
def validateMinNumberOfPorts(self, siteName, properties, validationItems, prop_name, cluster_definition,
component_display_name):
min_ports = self.getMinimumNumberOfPorts(cluster_definition)
supplied_ports = len((self.parsePortsConfig(properties[prop_name])))
if min_ports > supplied_ports:
message = "As your cluster has more than one instance of " + component_display_name + \
" per node, you need to supply more ports. You have supplied " + str(supplied_ports) + \
" ports. But there is an instance that requires " + str(min_ports) + " ports."
validationItems.append(self.getErrorItem(siteName, prop_name, message))
def getClusterNumbers(self, cluster_definition):
"""
:param cluster_definition: The config string for the cluster
:type cluster_definition: str
:return: Cluster numbers for each shard
:rtype: list[ShardNumbers]
"""
shards = []
if len(cluster_definition.strip()) > 0:
cluster_shards = cluster_definition.split(";")
for shard in cluster_shards:
shard_nodes = shard.split(",")
numberOfInstances = len(shard_nodes)
numberOfArbiters = len(filter(lambda node: node.find("/arbiter") > -1, shard_nodes))
shards.append(ShardNumbers(numberOfInstances, numberOfArbiters))
return shards
def validateMongoDInstancesConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
siteName = 'mongod'
mongod_configs = properties
validationItems = []
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
mongodHosts = self.getHosts(componentsList, self.MONGOD_COMPONENT_NAME)
mongoConfHosts = self.getHosts(componentsList, self.MONGODC_COMPONENT_NAME)
# 1. Check if all nodes in the cluster_configuration has the component installed on ambari
self.checkForInexistentNodes(siteName, properties, validationItems, self.CLUSTER_DEFINITION_CONF_NAME,
mongodHosts, "Mongo DB Server")
# 2. Check the port configurations
self.validatePortConfig(siteName, properties, validationItems, self.PORTS_CONF_NAME)
# 3. Check if we have met the minimum number of needed ports
self.validateMinNumberOfPorts(siteName, properties, validationItems, self.PORTS_CONF_NAME,
mongod_configs[self.CLUSTER_DEFINITION_CONF_NAME], "Mongo DB Server")
# 4. Check if we have more than one arbiter per shard
shardsNumbers = self.getClusterNumbers(mongod_configs[self.CLUSTER_DEFINITION_CONF_NAME])
if len(filter(lambda shard: shard.numberOfArbiters > 1, shardsNumbers)) > 0:
message = "It is not advisable to have more than one arbiter per shard. Consider changing the cluster " \
"configuration."
validationItems.append(self.getWarnItem(siteName,self.CLUSTER_DEFINITION_CONF_NAME,message))
# 5. Check if we have an arbiter as the first node of a shard
cluster_shards = mongod_configs[self.CLUSTER_DEFINITION_CONF_NAME].split(";")
for shard in cluster_shards:
shard_nodes = shard.split(",")
if shard_nodes[0].find("/arbiter") > -1:
message = "The first node of the shard must not be an arbiter. You must change the position of the " + \
shard_nodes[0] + " in the shard \"" + shard + "\""
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
# 6. Check if we have more than one shard without mongoconf instances
if (len(cluster_shards) > 0) and (len(mongoConfHosts) == 0):
message = "You can not have more than one shard without mongoconf and mongos instances. You have " \
"configurated " + str(len(cluster_shards)) + " shards. Reduce the number of shards ro add " \
"mongoconf instances."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
return validationItems
def validateMongoConfigInstancesConfigurations(self, properties, recommendedDefaults, configurations, services,
hosts):
Logger.info("Initiating mongo-conf configuration validation...")
siteName = "mongo-conf"
mongoconf_configs = properties
validationItems = []
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
mongoConfHosts = self.getHosts(componentsList, self.MONGODC_COMPONENT_NAME)
# 1. Check if all nodes in the cluster_configuration has the component installed on ambari
# 1.1 It also check if some node in ambari does not have any instances
self.checkForInexistentNodes(siteName, properties, validationItems, self.CLUSTER_DEFINITION_CONF_NAME, mongoConfHosts,
"Mongo Config Server")
# 2. Check the port configurations
self.validatePortConfig(siteName, properties, validationItems, self.PORTS_CONF_NAME)
# 3. Check if we have met the minimum number of needed ports
self.validateMinNumberOfPorts(siteName, properties, validationItems, self.PORTS_CONF_NAME,
mongoconf_configs[self.CLUSTER_DEFINITION_CONF_NAME], "Mongo Config Server")
# 4. Check if we have just one shard
shardsNumbers = self.getClusterNumbers(mongoconf_configs[self.CLUSTER_DEFINITION_CONF_NAME])
if len(shardsNumbers) > 1:
message = "You can't have more than one shard for mongo db config instances. You have configured " + \
str(len(shardsNumbers)) + " shards for it."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
# 5. Check if we have exactly 3 mongo config instances
if len(shardsNumbers) == 1:
if shardsNumbers[0].numberOfInstances != 3:
message = "You must have 3 Mongo Config Instances. You have configured " + \
str(shardsNumbers[0].numberOfInstances) + " instances for it."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
elif len(shardsNumbers) == 0:
if len(mongoConfHosts) < 3:
message = "You must have 3 Mongo Config Instances. As you have only " + str(len(mongoConfHosts)) + \
" nodes available for Mongo Config Instances. You have two options. 1-Change the cluster " \
"definition property in mongo-conf configuration specifying more instances per node in " \
"order to archieve 3 mongo config instances. 2-Add more nodes with mongo config in ambari."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
# 6. Check if we have any arbiters (mongoconf does not support arbiters)
if len(filter(lambda shard: shard.numberOfArbiters > 0, shardsNumbers)) > 0:
message = "You can't have any arbiters in mongo config replicaset."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
return validationItems
def validateMongoSInstancesConfigurations(self, properties, recommendedDefaults, configurations, services, hosts):
siteName = 'mongos'
mongos_configs = properties
validationItems = []
componentsListList = [service["components"] for service in services["services"]]
componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist]
mongoConfHosts = self.getHosts(componentsList, self.MONGODC_COMPONENT_NAME)
# 1. Check if all nodes in the cluster_configuration has the component installed on ambari
# 1.1 It also check if some node in ambari does not have any instances
self.checkForInexistentNodes(siteName, properties, validationItems, self.CLUSTER_DEFINITION_CONF_NAME, mongoConfHosts,
"Mongo Query Router")
# 2. Check the port configurations
self.validatePortConfig(siteName, properties, validationItems, self.PORTS_CONF_NAME)
# 3. Check if we have met the minimum number of needed ports
self.validateMinNumberOfPorts(siteName, properties, validationItems, self.PORTS_CONF_NAME,
mongos_configs[self.CLUSTER_DEFINITION_CONF_NAME], "Mongo Query Router")
# 4. Check if we have just one shard
shardsNumbers = self.getClusterNumbers(mongos_configs[self.CLUSTER_DEFINITION_CONF_NAME])
if len(shardsNumbers) > 1:
message = "You can't have more than one shard for Mongo Query Router instances. You have configured " + \
str(len(shardsNumbers)) + " shards for it."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
# 5. Check if we have any arbiters (mongoconf does not support arbiters)
if len(filter(lambda shard: shard.numberOfArbiters > 0, shardsNumbers)) > 0:
message = "You can't have any arbiters in Mongo Query Router configuration."
validationItems.append(self.getErrorItem(siteName, self.CLUSTER_DEFINITION_CONF_NAME, message))
return validationItems
"""
Any configuration validations for the service should be defined in this function.
This should be similar to any of the validateXXXXConfigurations functions in the stack_advisor.py
such as validateHDFSConfigurations.
"""
def getServiceConfigurationsValidationItems(self, configurations, recommendedDefaults, services, hosts):
Logger.info("Initiating MongoDb Configuration Check!")
siteName = "mongodb"
items = self.validateMongoDBConfigurations(configurations[siteName]['properties'],None,configurations,services,
hosts)
siteName = "mongod"
resultItems = self.validateMongoDInstancesConfigurations(configurations[siteName]['properties'],None,
configurations,services, hosts)
items.extend(resultItems)
siteName = "mongo-conf"
resultItems = self.validateMongoConfigInstancesConfigurations(configurations[siteName]['properties'],None,
configurations,services, hosts)
items.extend(resultItems)
siteName = "mongos"
resultItems = self.validateMongoSInstancesConfigurations(configurations[siteName]['properties'],None,
configurations,services, hosts)
items.extend(resultItems)
return items
class HDP24MongoDBServiceAdvisor(HDP23MongoDBServiceAdvisor):
"""
"""
def __init__(self, *args, **kwargs):
Logger.initialize_logger()
Logger.info("HDP24MongoDBServiceAdvisor has been created!")
self.as_super = super(HDP23MongoDBServiceAdvisor, self)
self.as_super.__init__(*args, **kwargs)