-
Notifications
You must be signed in to change notification settings - Fork 601
Add multihost support for PostgreSQL collector #646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,25 @@ | |
|
||
* psycopg2 | ||
|
||
#### Example Configuration | ||
|
||
#Section with defaults | ||
enabled=True | ||
password=default_password | ||
port = 5432 | ||
|
||
#Instance specific configs | ||
[instances] | ||
|
||
[[postgres_a]] | ||
host = db1.loc | ||
password = instance_password_a | ||
|
||
[[postgres_b]] | ||
host = db2.loc | ||
port = 5433 | ||
password = instance_password_b | ||
|
||
""" | ||
|
||
import diamond.collector | ||
|
@@ -30,22 +49,25 @@ def get_default_config_help(self): | |
""" | ||
config_help = super(PostgresqlCollector, | ||
self).get_default_config_help() | ||
|
||
config_help.update({ | ||
'host': 'Hostname', | ||
'dbname': 'DB to connect to in order to get list of DBs in PgSQL', | ||
'user': 'Username', | ||
'password': 'Password', | ||
'port': 'Port number', | ||
'password_provider': "Whether to auth with supplied password or" | ||
" .pgpass file <password|pgpass>", | ||
" .pgpass file <password|pgpass>", | ||
'sslmode': 'Whether to use SSL - <disable|allow|require|...>', | ||
'underscore': 'Convert _ to .', | ||
'underscore': 'Convert _ to .', | ||
'extended': 'Enable collection of extended database stats.', | ||
'metrics': 'List of enabled metrics to collect', | ||
'pg_version': "The version of postgres that you'll be monitoring" | ||
" eg. in format 9.2", | ||
'pg_version': "The version of postgres that you'll be monitoring" | ||
"eg. in format 9.2", | ||
'has_admin': 'Admin privileges are required to execute some' | ||
' queries.', | ||
' queries.', | ||
'instances': 'A subcategory of postgres instances with a host ' | ||
'and port. Optionally all variables can be ' | ||
'overridden per instance (see example).', | ||
}) | ||
return config_help | ||
|
||
|
@@ -54,6 +76,7 @@ def get_default_config(self): | |
Return default config. | ||
""" | ||
config = super(PostgresqlCollector, self).get_default_config() | ||
|
||
config.update({ | ||
'path': 'postgres', | ||
'host': 'localhost', | ||
|
@@ -68,6 +91,7 @@ def get_default_config(self): | |
'metrics': [], | ||
'pg_version': 9.2, | ||
'has_admin': True, | ||
'instances': {}, | ||
}) | ||
return config | ||
|
||
|
@@ -79,53 +103,81 @@ def collect(self): | |
self.log.error('Unable to import module psycopg2') | ||
return {} | ||
|
||
# Get list of databases | ||
dbs = self._get_db_names() | ||
if len(dbs) == 0: | ||
self.log.error("I have 0 databases!") | ||
return {} | ||
|
||
if self.config['metrics']: | ||
metrics = self.config['metrics'] | ||
elif str_to_bool(self.config['extended']): | ||
metrics = registry['extended'] | ||
if str_to_bool(self.config['has_admin']) \ | ||
and 'WalSegmentStats' not in metrics: | ||
metrics.append('WalSegmentStats') | ||
instances = self.config.get('instances') | ||
|
||
# HACK: setting default with subcategory messes up merging of configs, | ||
# so we only set the default if one wasn't provided. | ||
if not instances: | ||
instances = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You need to set to self.instances since you use it in _get_fig() |
||
'default': { | ||
'host': self.config['host'], | ||
} | ||
} | ||
|
||
for instance in instances: | ||
# Get list of databases | ||
dbs = self._get_db_names(instance) | ||
if len(dbs) == 0: | ||
self.log.error("I have 0 databases!") | ||
return {} | ||
|
||
if self._get_config(instance, 'metrics'): | ||
metrics = self._get_config(instance, 'metrics') | ||
elif str_to_bool(self._get_config(instance, 'extended')): | ||
metrics = registry['extended'] | ||
if str_to_bool(self._get_config(instance, 'has_admin')) \ | ||
and 'WalSegmentStats' not in metrics: | ||
metrics.append('WalSegmentStats') | ||
else: | ||
metrics = registry['basic'] | ||
|
||
# Iterate every QueryStats class | ||
for metric_name in set(metrics): | ||
if metric_name not in metrics_registry: | ||
self.log.error( | ||
'metric_name %s not found in metric registry' | ||
% metric_name) | ||
continue | ||
|
||
for dbase in dbs: | ||
conn = self._connect(instance, database=dbase) | ||
try: | ||
klass = metrics_registry[metric_name] | ||
stat = klass(dbase, conn, | ||
underscore=self._get_config(instance, | ||
'underscore')) | ||
stat.fetch(self._get_config(instance, 'pg_version')) | ||
for metric, value in stat: | ||
if value is not None: | ||
self.publish("%s.%s" % (instance, metric), | ||
value) | ||
|
||
# Setting multi_db to True will run this query on all | ||
# known databases. This is bad for queries that hit | ||
# views like pg_database, which are shared | ||
# across databases. | ||
# | ||
# If multi_db is False, bail early after the first query | ||
# iteration. Otherwise, continue to remaining databases. | ||
if stat.multi_db is False: | ||
break | ||
finally: | ||
conn.close() | ||
|
||
def _get_config(self, instance, name): | ||
""" | ||
Return instance config value or value from default section | ||
if it is overriden or None | ||
""" | ||
instance_config = self.config['instances'].get(instance) | ||
|
||
if instance_config: | ||
return instance_config.get(name, self.config.get(name, None) | ||
if name != 'host' else None) | ||
else: | ||
metrics = registry['basic'] | ||
|
||
# Iterate every QueryStats class | ||
for metric_name in set(metrics): | ||
if metric_name not in metrics_registry: | ||
self.log.error( | ||
'metric_name %s not found in metric registry' % metric_name) | ||
continue | ||
|
||
for dbase in dbs: | ||
conn = self._connect(database=dbase) | ||
try: | ||
klass = metrics_registry[metric_name] | ||
stat = klass(dbase, conn, | ||
underscore=self.config['underscore']) | ||
stat.fetch(self.config['pg_version']) | ||
for metric, value in stat: | ||
if value is not None: | ||
self.publish(metric, value) | ||
|
||
# Setting multi_db to True will run this query on all known | ||
# databases. This is bad for queries that hit views like | ||
# pg_database, which are shared across databases. | ||
# | ||
# If multi_db is False, bail early after the first query | ||
# iteration. Otherwise, continue to remaining databases. | ||
if stat.multi_db is False: | ||
break | ||
finally: | ||
conn.close() | ||
|
||
def _get_db_names(self): | ||
return None | ||
|
||
def _get_db_names(self, instance): | ||
""" | ||
Try to get a list of db names | ||
""" | ||
|
@@ -134,7 +186,8 @@ def _get_db_names(self): | |
WHERE datallowconn AND NOT datistemplate | ||
AND NOT datname='postgres' AND NOT datname='rdsadmin' ORDER BY 1 | ||
""" | ||
conn = self._connect(self.config['dbname']) | ||
conn = self._connect(instance, | ||
self._get_config(instance, 'dbname')) | ||
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor) | ||
cursor.execute(query) | ||
datnames = [d['datname'] for d in cursor.fetchall()] | ||
|
@@ -147,16 +200,16 @@ def _get_db_names(self): | |
|
||
return datnames | ||
|
||
def _connect(self, database=None): | ||
def _connect(self, instance, database=None): | ||
""" | ||
Connect to given database | ||
""" | ||
conn_args = { | ||
'host': self.config['host'], | ||
'user': self.config['user'], | ||
'password': self.config['password'], | ||
'port': self.config['port'], | ||
'sslmode': self.config['sslmode'], | ||
'host': self._get_config(instance, 'host'), | ||
'user': self._get_config(instance, 'user'), | ||
'password': self._get_config(instance, 'password'), | ||
'port': self._get_config(instance, 'port'), | ||
'sslmode': self._get_config(instance, 'sslmode'), | ||
} | ||
|
||
if database: | ||
|
@@ -165,7 +218,7 @@ def _connect(self, database=None): | |
conn_args['database'] = 'postgres' | ||
|
||
# libpq will use ~/.pgpass only if no password supplied | ||
if self.config['password_provider'] == 'pgpass': | ||
if self._get_config(instance, 'password_provider') == 'pgpass': | ||
del conn_args['password'] | ||
|
||
try: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,43 +14,60 @@ class TestPostgresqlCollector(CollectorTestCase): | |
def setUp(self, allowed_names=None): | ||
if not allowed_names: | ||
allowed_names = [] | ||
|
||
default_config = get_collector_config('PostgresqlCollector', {}) | ||
self.default_collector = PostgresqlCollector(default_config, None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't look like you ever used this. Can you add tests using it to test the default config behavior? |
||
|
||
config = get_collector_config('PostgresqlCollector', { | ||
'password': 'default_password', | ||
'port': 5432, | ||
'instances': { | ||
'postgres_a': { | ||
'host': 'db1.loc', | ||
'password_provider': 'pgpass', | ||
}, | ||
'postgres_b': { | ||
'host': 'db2.loc', | ||
'port': 5433, | ||
'password': 'instance_password_b', | ||
} | ||
} | ||
}) | ||
self.collector = PostgresqlCollector(config, None) | ||
|
||
def test_import(self): | ||
self.assertTrue(PostgresqlCollector) | ||
|
||
def test_config_override(self): | ||
self.assertEqual(self.collector._get_config('postgres_a', 'port'), 5432) | ||
|
||
self.assertEqual(self.collector._get_config('postgres_b', 'port'), 5433) | ||
|
||
@patch('postgres.psycopg2') | ||
def test_connect_with_password(self, psycopg2_mock): | ||
conn_mock = Mock() | ||
psycopg2_mock.connect.return_value = conn_mock | ||
|
||
ret = self.collector._connect('test_db') | ||
ret = self.collector._connect('postgres_b', 'test_db') | ||
|
||
self.assertTrue(conn_mock.set_isolation_level.called) | ||
self.assertEqual(ret, conn_mock) | ||
psycopg2_mock.connect.assert_called_once_with( | ||
database='test_db', host='localhost', password='postgres', | ||
port=5432, sslmode='disable', user='postgres' | ||
database='test_db', host='db2.loc', password='instance_password_b', | ||
port=5433, sslmode='disable', user='postgres' | ||
) | ||
|
||
@patch('postgres.psycopg2') | ||
def test_connect_with_pgpass(self, psycopg2_mock): | ||
config = get_collector_config('PostgresqlCollector', { | ||
'password_provider': 'pgpass' | ||
}) | ||
self.collector = PostgresqlCollector(config, None) | ||
|
||
conn_mock = Mock() | ||
psycopg2_mock.connect.return_value = conn_mock | ||
|
||
ret = self.collector._connect('test_db') | ||
ret = self.collector._connect('postgres_a', 'test_db') | ||
|
||
self.assertTrue(conn_mock.set_isolation_level.called) | ||
self.assertEqual(ret, conn_mock) | ||
psycopg2_mock.connect.assert_called_once_with( | ||
database='test_db', host='localhost', | ||
database='test_db', host='db1.loc', | ||
port=5432, sslmode='disable', user='postgres' | ||
) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this code never run since you defined a default for instances that is not None?