From 5da399c300098b485a492201ac28e595cb5cb485 Mon Sep 17 00:00:00 2001 From: lfdversluis Date: Sat, 28 May 2016 15:13:31 +0200 Subject: [PATCH] Storm dbmanager now makes use of the db_lock internally. --- StormDBManager.py | 75 +++++++++++++++++++++------------- tests/test_storm_db_manager.py | 38 ++++++++--------- 2 files changed, 65 insertions(+), 48 deletions(-) diff --git a/StormDBManager.py b/StormDBManager.py index b5a200f2..f92ae763 100644 --- a/StormDBManager.py +++ b/StormDBManager.py @@ -55,7 +55,7 @@ def on_error(failure): self._logger.exception(u"Failed to load database version: %s", failure.getTraceback()) # Schedule the query and add a callback and errback to the deferred. - return self.schedule_query(self.fetch_one, u"SELECT value FROM MyInfo WHERE entry == 'version'").addCallbacks(on_result, on_error) + return self.fetchone(u"SELECT value FROM MyInfo WHERE entry == 'version'").addCallbacks(on_result, on_error) def schedule_query(self, callable, *args, **kwargs): """ @@ -68,21 +68,23 @@ def schedule_query(self, callable, *args, **kwargs): return self.db_lock.run(callable, *args, **kwargs) - @transact - def execute_query(self, query, arguments=None): + def execute(self, query, arguments=None): """ Executes a query on the twisted thread-pool using the storm framework. :param query: The sql query to be executed :param arguments: Optional arguments that go with the sql query - :return: None as this is function is executed on the thread-pool, database objects - such as cursors cannot be returned. + :return: A deferred that fires once the execution is done, the result will be None. """ - connection = Connection(self._database) - connection.execute(query, arguments, noresult=True) - connection.close() - @transact - def fetch_one(self, query, arguments=None): + @transact + def _execute(self, query, arguments=None): + connection = Connection(self._database) + connection.execute(query, arguments, noresult=True) + connection.close() + + return self.db_lock.run(_execute, self, query, arguments) + + def fetchone(self, query, arguments=None): """ Executes a query on the twisted thread-pool using the storm framework and returns the first result. :param query: The sql query to be executed. @@ -90,13 +92,17 @@ def fetch_one(self, query, arguments=None): :return: A deferred that fires with the first tuple that matches the query or None. The result would be the same as using execute and calling the next() function on it. """ - connection = Connection(self._database) - result = connection.execute(query, arguments).get_one() - connection.close() - return result - @transact - def fetch_all(self, query, arguments=None): + @transact + def _fetchone(self, query, arguments=None): + connection = Connection(self._database) + result = connection.execute(query, arguments).get_one() + connection.close() + return result + + return self.db_lock.run(_fetchone, self, query, arguments) + + def fetchall(self, query, arguments=None): """ Executes a query on the twisted thread-pool using the storm framework and returns all results as a list of tuples. @@ -104,10 +110,14 @@ def fetch_all(self, query, arguments=None): :param arguments: Optional arguments that go with the sql query. :return: A deferred that fires with a list of tuple results that matches the query or an empty list. """ - connection = Connection(self._database) - return connection.execute(query, arguments).get_all() - @transact + @transact + def _fetchall(self, query, arguments=None): + connection = Connection(self._database) + return connection.execute(query, arguments).get_all() + + return self.db_lock.run(_fetchall, self, query, arguments) + def insert(self, table_name, **kwargs): """ Inserts data provided as keyword arguments into the table provided as an argument. @@ -115,11 +125,15 @@ def insert(self, table_name, **kwargs): :param argv: A dictionary where the key represents the column and the value the value to be inserted. :return: A deferred that fires when the data has been inserted. """ + return self.db_lock.run(self._insert, table_name, **kwargs) + + @transact + def _insert(self, table_name, **kwargs): connection = Connection(self._database) - self._insert(connection, table_name, **kwargs) + self.__insert(connection, table_name, **kwargs) connection.close() - def _insert(self, connection, table_name, **kwargs): + def __insert(self, connection, table_name, **kwargs): """ Utility function to insert data which is not decorated by the @transact to prevent a loop calling this function to create many threads. @@ -138,7 +152,6 @@ def _insert(self, connection, table_name, **kwargs): connection.execute(sql, kwargs.values(), noresult=True) - @transact def insert_many(self, table_name, arg_list): """ Inserts many items into the database @@ -147,12 +160,16 @@ def insert_many(self, table_name, arg_list): the value the value to be inserted into this column. :return: A deferred that fires once the bulk insertion is done. """ - if len(arg_list) == 0: return - connection = Connection(self._database) - for args in arg_list: - self._insert(connection, table_name, **args) - connection.close() + @transact + def _insertmany(self, table_name, arg_list): + if len(arg_list) == 0: return + connection = Connection(self._database) + for args in arg_list: + self.__insert(connection, table_name, **args) + connection.close() + + return self.db_lock.run(_insertmany, self, table_name, arg_list) def delete(self, table_name, **kwargs): """ @@ -174,7 +191,7 @@ def delete(self, table_name, **kwargs): sql += u'%s=? AND ' % k arg.append(v) sql = sql[:-5] # Remove the last AND - return self.execute_query(sql, arg) + return self.execute(sql, arg) def count(self, table_name): """ @@ -183,5 +200,5 @@ def count(self, table_name): :return: A deferred that fires with the number of rows in the table. """ sql = u"SELECT count(*) FROM %s LIMIT 1" % table_name - result = self.fetch_one(sql) + result = self.fetchone(sql) return result diff --git a/tests/test_storm_db_manager.py b/tests/test_storm_db_manager.py index af25b10a..4e9e2a61 100644 --- a/tests/test_storm_db_manager.py +++ b/tests/test_storm_db_manager.py @@ -34,7 +34,7 @@ def create_car_database(self): :return: A deferred that fires once the table has been made. """ sql = u"CREATE TABLE car(brand);" - return self.storm_db.schedule_query(self.storm_db.execute_query, sql) + return self.storm_db.execute(sql) def create_myinfo_table(self): """ @@ -48,7 +48,7 @@ def create_myinfo_table(self): value text ); """ - return self.storm_db.schedule_query(self.storm_db.execute_query, sql) + return self.storm_db.execute(sql) @deferred(timeout=5) def test_execute_function(self): @@ -71,10 +71,10 @@ def assert_result(result): def fetch_inserted(_): sql = u"SELECT * FROM car" - return self.storm_db.schedule_query(self.storm_db.fetch_one, sql) + return self.storm_db.fetchone(sql) def insert_into_db(_): - return self.storm_db.schedule_query(self.storm_db.insert, "car", brand="BMW") + return self.storm_db.insert( "car", brand="BMW") result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(insert_into_db) # Insert one value @@ -96,13 +96,13 @@ def assert_result(result): def fetch_inserted(_): sql = u"SELECT * FROM car" - return self.storm_db.schedule_query(self.storm_db.fetch_all, sql) + return self.storm_db.fetchall(sql) def insert_into_db(_): list = [] list.append({"brand": "BMW"}) list.append({"brand": "Volvo"}) - return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list) + return self.storm_db.insert_many( "car", list) result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(insert_into_db) # Insert two value @@ -123,16 +123,16 @@ def assert_result(result): def fetch_inserted(_): sql = u"SELECT * FROM car" - return self.storm_db.schedule_query(self.storm_db.fetch_all, sql) + return self.storm_db.fetchall(sql) def delete_one(_): - return self.storm_db.schedule_query(self.storm_db.delete, "car", brand="BMW") + return self.storm_db.delete( "car", brand="BMW") def insert_into_db(_): list = [] list.append({"brand": "BMW"}) list.append({"brand": "Volvo"}) - return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list) + return self.storm_db.insert_many("car", list) result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(insert_into_db) # Insert two value @@ -154,16 +154,16 @@ def assert_result(result): def fetch_inserted(_): sql = u"SELECT * FROM car" - return self.storm_db.schedule_query(self.storm_db.fetch_all, sql) + return self.storm_db.fetchall(sql) def delete_one(_): - return self.storm_db.schedule_query(self.storm_db.delete, "car", brand=("LIKE", "BMW")) + return self.storm_db.delete("car", brand=("LIKE", "BMW")) def insert_into_db(_): list = [] list.append({"brand": "BMW"}) list.append({"brand": "Volvo"}) - return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list) + return self.storm_db.insert_many("car", list) result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(insert_into_db) # Insert two value @@ -184,13 +184,13 @@ def assert_result(result): self.assertEquals(result[0], 2, "Result was not 2") def get_size(_): - return self.storm_db.schedule_query(self.storm_db.count, "car") + return self.storm_db.count("car") def insert_into_db(_): list = [] list.append({"brand": "BMW"}) list.append({"brand": "Volvo"}) - return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list) + return self.storm_db.insert_many("car", list) result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(insert_into_db) # Insert two value @@ -211,7 +211,7 @@ def assert_result(_): self.assertEqual(self.storm_db._version, 0, "Version was not 0 but: %r" % self.storm_db._version) def get_size(_): - return self.storm_db.schedule_query(self.storm_db.count, "car") + return self.storm_db.count("car") result_deferred = self.create_car_database() # Create the car table result_deferred.addCallback(get_size) # Get the version @@ -233,7 +233,7 @@ def get_version(_): return self.storm_db._retrieve_version() def insert_version(_): - return self.storm_db.schedule_query(self.storm_db.insert, "MyInfo", entry="version", value="2") + return self.storm_db.insert("MyInfo", entry="version", value="2") result_deferred = self.create_myinfo_table() # Create the database result_deferred.addCallback(insert_version) # Get the version @@ -258,13 +258,13 @@ def assert_sequence(result): def fetch_all(_): sql = u"SELECT * FROM numtest" - return self.storm_db.schedule_query(self.storm_db.fetch_all, sql) + return self.storm_db.fetchall(sql) defer_list = [] def schedule_tree_inserts(_): for i in xrange(1, 4): - defer_list.append(self.storm_db.schedule_query(self.storm_db.insert, "numtest", num=i)) + defer_list.append(self.storm_db.insert( "numtest", num=i)) return DeferredList(defer_list) @@ -275,7 +275,7 @@ def create_numtest_db(): num INTEGER ); """ - return self.storm_db.schedule_query(self.storm_db.execute_query, sql) + return self.storm_db.execute(sql) result_deferred = create_numtest_db() result_deferred.addCallback(schedule_tree_inserts)