Skip to content

Commit

Permalink
Storm dbmanager now makes use of the db_lock internally.
Browse files Browse the repository at this point in the history
Becuase the @transact decorator does not work on inline functions, I created private subfunctions to be called. These will be removed once we get rid of the lock.
  • Loading branch information
lfdversluis committed May 30, 2016
1 parent f3fc4ec commit 30ea219
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 48 deletions.
75 changes: 46 additions & 29 deletions StormDBManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def on_error(failure):
self._logger.exception(u"Failed to load database version: %s", failure.getTraceback())

# Schedule the query and add a callback and errback to the deferred.
return self.schedule_query(self.fetch_one, u"SELECT value FROM MyInfo WHERE entry == 'version'").addCallbacks(on_result, on_error)
return self.fetchone(u"SELECT value FROM MyInfo WHERE entry == 'version'").addCallbacks(on_result, on_error)

def schedule_query(self, callable, *args, **kwargs):
"""
Expand All @@ -68,58 +68,72 @@ def schedule_query(self, callable, *args, **kwargs):

return self.db_lock.run(callable, *args, **kwargs)

@transact
def execute_query(self, query, arguments=None):
def execute(self, query, arguments=None):
"""
Executes a query on the twisted thread-pool using the storm framework.
:param query: The sql query to be executed
:param arguments: Optional arguments that go with the sql query
:return: None as this is function is executed on the thread-pool, database objects
such as cursors cannot be returned.
:return: A deferred that fires once the execution is done, the result will be None.
"""
connection = Connection(self._database)
connection.execute(query, arguments, noresult=True)
connection.close()

@transact
def fetch_one(self, query, arguments=None):
@transact
def _execute(self, query, arguments=None):
connection = Connection(self._database)
connection.execute(query, arguments, noresult=True)
connection.close()

return self.db_lock.run(_execute, self, query, arguments)

def fetchone(self, query, arguments=None):
"""
Executes a query on the twisted thread-pool using the storm framework and returns the first result.
:param query: The sql query to be executed.
:param arguments: Optional arguments that go with the sql query.
:return: A deferred that fires with the first tuple that matches the query or None.
The result would be the same as using execute and calling the next() function on it.
"""
connection = Connection(self._database)
result = connection.execute(query, arguments).get_one()
connection.close()
return result

@transact
def fetch_all(self, query, arguments=None):
@transact
def _fetchone(self, query, arguments=None):
connection = Connection(self._database)
result = connection.execute(query, arguments).get_one()
connection.close()
return result

return self.db_lock.run(_fetchone, self, query, arguments)

def fetchall(self, query, arguments=None):
"""
Executes a query on the twisted thread-pool using the storm framework and
returns all results as a list of tuples.
:param query: The sql query to be executed.
:param arguments: Optional arguments that go with the sql query.
:return: A deferred that fires with a list of tuple results that matches the query or an empty list.
"""
connection = Connection(self._database)
return connection.execute(query, arguments).get_all()

@transact
@transact
def _fetchall(self, query, arguments=None):
connection = Connection(self._database)
return connection.execute(query, arguments).get_all()

return self.db_lock.run(_fetchall, self, query, arguments)

def insert(self, table_name, **kwargs):
"""
Inserts data provided as keyword arguments into the table provided as an argument.
:param table_name: The name of the table the data has to be inserted into.
:param argv: A dictionary where the key represents the column and the value the value to be inserted.
:return: A deferred that fires when the data has been inserted.
"""
return self.db_lock.run(self._insert, table_name, **kwargs)

@transact
def _insert(self, table_name, **kwargs):
connection = Connection(self._database)
self._insert(connection, table_name, **kwargs)
self.__insert(connection, table_name, **kwargs)
connection.close()

def _insert(self, connection, table_name, **kwargs):
def __insert(self, connection, table_name, **kwargs):
"""
Utility function to insert data which is not decorated by the @transact to prevent
a loop calling this function to create many threads.
Expand All @@ -138,7 +152,6 @@ def _insert(self, connection, table_name, **kwargs):

connection.execute(sql, kwargs.values(), noresult=True)

@transact
def insert_many(self, table_name, arg_list):
"""
Inserts many items into the database
Expand All @@ -147,12 +160,16 @@ def insert_many(self, table_name, arg_list):
the value the value to be inserted into this column.
:return: A deferred that fires once the bulk insertion is done.
"""
if len(arg_list) == 0: return
connection = Connection(self._database)
for args in arg_list:
self._insert(connection, table_name, **args)

connection.close()
@transact
def _insertmany(self, table_name, arg_list):
if len(arg_list) == 0: return
connection = Connection(self._database)
for args in arg_list:
self.__insert(connection, table_name, **args)
connection.close()

return self.db_lock.run(_insertmany, self, table_name, arg_list)

def delete(self, table_name, **kwargs):
"""
Expand All @@ -174,7 +191,7 @@ def delete(self, table_name, **kwargs):
sql += u'%s=? AND ' % k
arg.append(v)
sql = sql[:-5] # Remove the last AND
return self.execute_query(sql, arg)
return self.execute(sql, arg)

def count(self, table_name):
"""
Expand All @@ -183,5 +200,5 @@ def count(self, table_name):
:return: A deferred that fires with the number of rows in the table.
"""
sql = u"SELECT count(*) FROM %s LIMIT 1" % table_name
result = self.fetch_one(sql)
result = self.fetchone(sql)
return result
38 changes: 19 additions & 19 deletions tests/test_storm_db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create_car_database(self):
:return: A deferred that fires once the table has been made.
"""
sql = u"CREATE TABLE car(brand);"
return self.storm_db.schedule_query(self.storm_db.execute_query, sql)
return self.storm_db.execute(sql)

def create_myinfo_table(self):
"""
Expand All @@ -48,7 +48,7 @@ def create_myinfo_table(self):
value text
);
"""
return self.storm_db.schedule_query(self.storm_db.execute_query, sql)
return self.storm_db.execute(sql)

@deferred(timeout=5)
def test_execute_function(self):
Expand All @@ -71,10 +71,10 @@ def assert_result(result):

def fetch_inserted(_):
sql = u"SELECT * FROM car"
return self.storm_db.schedule_query(self.storm_db.fetch_one, sql)
return self.storm_db.fetchone(sql)

def insert_into_db(_):
return self.storm_db.schedule_query(self.storm_db.insert, "car", brand="BMW")
return self.storm_db.insert( "car", brand="BMW")

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(insert_into_db) # Insert one value
Expand All @@ -96,13 +96,13 @@ def assert_result(result):

def fetch_inserted(_):
sql = u"SELECT * FROM car"
return self.storm_db.schedule_query(self.storm_db.fetch_all, sql)
return self.storm_db.fetchall(sql)

def insert_into_db(_):
list = []
list.append({"brand": "BMW"})
list.append({"brand": "Volvo"})
return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list)
return self.storm_db.insert_many( "car", list)

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(insert_into_db) # Insert two value
Expand All @@ -123,16 +123,16 @@ def assert_result(result):

def fetch_inserted(_):
sql = u"SELECT * FROM car"
return self.storm_db.schedule_query(self.storm_db.fetch_all, sql)
return self.storm_db.fetchall(sql)

def delete_one(_):
return self.storm_db.schedule_query(self.storm_db.delete, "car", brand="BMW")
return self.storm_db.delete( "car", brand="BMW")

def insert_into_db(_):
list = []
list.append({"brand": "BMW"})
list.append({"brand": "Volvo"})
return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list)
return self.storm_db.insert_many("car", list)

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(insert_into_db) # Insert two value
Expand All @@ -154,16 +154,16 @@ def assert_result(result):

def fetch_inserted(_):
sql = u"SELECT * FROM car"
return self.storm_db.schedule_query(self.storm_db.fetch_all, sql)
return self.storm_db.fetchall(sql)

def delete_one(_):
return self.storm_db.schedule_query(self.storm_db.delete, "car", brand=("LIKE", "BMW"))
return self.storm_db.delete("car", brand=("LIKE", "BMW"))

def insert_into_db(_):
list = []
list.append({"brand": "BMW"})
list.append({"brand": "Volvo"})
return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list)
return self.storm_db.insert_many("car", list)

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(insert_into_db) # Insert two value
Expand All @@ -184,13 +184,13 @@ def assert_result(result):
self.assertEquals(result[0], 2, "Result was not 2")

def get_size(_):
return self.storm_db.schedule_query(self.storm_db.count, "car")
return self.storm_db.count("car")

def insert_into_db(_):
list = []
list.append({"brand": "BMW"})
list.append({"brand": "Volvo"})
return self.storm_db.schedule_query(self.storm_db.insert_many, "car", list)
return self.storm_db.insert_many("car", list)

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(insert_into_db) # Insert two value
Expand All @@ -211,7 +211,7 @@ def assert_result(_):
self.assertEqual(self.storm_db._version, 0, "Version was not 0 but: %r" % self.storm_db._version)

def get_size(_):
return self.storm_db.schedule_query(self.storm_db.count, "car")
return self.storm_db.count("car")

result_deferred = self.create_car_database() # Create the car table
result_deferred.addCallback(get_size) # Get the version
Expand All @@ -233,7 +233,7 @@ def get_version(_):
return self.storm_db._retrieve_version()

def insert_version(_):
return self.storm_db.schedule_query(self.storm_db.insert, "MyInfo", entry="version", value="2")
return self.storm_db.insert("MyInfo", entry="version", value="2")

result_deferred = self.create_myinfo_table() # Create the database
result_deferred.addCallback(insert_version) # Get the version
Expand All @@ -258,13 +258,13 @@ def assert_sequence(result):

def fetch_all(_):
sql = u"SELECT * FROM numtest"
return self.storm_db.schedule_query(self.storm_db.fetch_all, sql)
return self.storm_db.fetchall(sql)

defer_list = []

def schedule_tree_inserts(_):
for i in xrange(1, 4):
defer_list.append(self.storm_db.schedule_query(self.storm_db.insert, "numtest", num=i))
defer_list.append(self.storm_db.insert( "numtest", num=i))

return DeferredList(defer_list)

Expand All @@ -275,7 +275,7 @@ def create_numtest_db():
num INTEGER
);
"""
return self.storm_db.schedule_query(self.storm_db.execute_query, sql)
return self.storm_db.execute(sql)

result_deferred = create_numtest_db()
result_deferred.addCallback(schedule_tree_inserts)
Expand Down

0 comments on commit 30ea219

Please sign in to comment.