Skip to content

RxJava Support

Siim Kinks edited this page Jan 21, 2018 · 4 revisions

Big thanks to Square's SQLBrite from where a great majority of this functionality's idea and inspiration comes from.

SqliteMagic has built-in support for RxJava with reactive stream semantics on queries.

By default all RxJava observed queries are notified in the Schedulers.io() scheduler. This way queries can run without blocking the main thread or the thread which caused the trigger.

This behavior can be changed during the library initialization:

SqliteMagic.builder(applicationContext)
    .sqliteFactory(new FrameworkSQLiteOpenHelperFactory())
    .scheduleRxQueriesOn(myDesiredScheduler)
    .openDefaultConnection();

Use observe() method on operation builders and queries to enter the reactive world.

For queries the observe() method returns QueryObservable, which when subscribed to will immediately notify subscriber with a Query to run.

Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .subscribe(query -> {
      List<Author> authors = query.runBlocking();
      // Do smth with author objects
    });

The returned Query object can be run with runBlocking() method which executes and parses the underlying query synchronously; or can be passed back to reactive world with run() method which returns Observable (when subscribed to executes and parses the underlying query).

Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .flatMap(Query::run)
    .subscribe(authors -> {
      // Do smth with author objects
    });

SqliteMagic also has query specific operators on QueryObservable, which are the suggested way to modify streams for running queries.

Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQuery() // modifies the stream to run queries on notifications
    .subscribe(authors -> {
      // Do smth with author objects
    });
    
Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQueryOnce() // modifies the stream to run only the first query and then completes
    .subscribe(authors -> {
      // Do smth with author objects
    });

Implemented operators are the following:

  • runQuery() Runs each emitted query and propagates its result(s) to downstream as a single item. This operator ignores null values returned from running queries. A query can only return null when selecting a single row and query returns no result. When multiple rows are selected and query returns no results, an empty List is returned and it is not ignored by this operator.

  • runQueryOrDefault(T) Runs each emitted query and propagates its result(s) to downstream as a single item. This operator emits defaultValue if null is returned from running query. A query can only return null when selecting a single row and query returns no result. When multiple rows are selected and query returns no results, an empty List is returned and defaultValue is not emitted by this operator.

  • runQueryOnce() Runs only the first emitted query and propagates its result(s) to downstream as a single item. This operator ignores nullvalue returned from running the query, in which case the observable just completes.

  • runQueryOnceOrDefault(T) Runs only the first emitted query and propagates its result(s) to downstream as a single item. This operator emits defaultValue if null is returned from running the query.

When using the count() method on query builder, the returned observable is of type CountQueryObservable, which adds the following operators:

  • isZero() Runs each emitted query and propagates true to downstream if query result is equal to zero and false otherwise.

  • isNotZero() Runs each emitted query and propagates true to downstream if query result is not equal to zero and false otherwise.

Table Change Notifications

In addition to the initial notification after subscribing to the query, updates to the tables defined in the query will trigger additional notifications for as long as you remain subscribed to the observable. This means that when you insert, update, or delete data, any subscribed queries will update with the new data instantly.

final AtomicInteger notifications = new AtomicInteger();
Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQuery()
    .subscribe(authors -> {
      notifications.getAndIncrement();
    });
System.out.println("Notifications: " + notifications.get()); // Prints 1

new Author("George R. R.", "Martin").insert().execute();
new Author("J. K.", "Rowling").insert().execute();
// It does not matter if operation is executed synchronously
// or with RxJava
new Author("Stephen", "King").insert()
    .observe()
    .subscribe();

System.out.println("Notifications: " + notifications.get()); // Prints 4

Unsubscribe from the returned Subscription to stop getting updates.

final AtomicInteger notifications = new AtomicInteger();
Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQuery()
    .subscribe(authors -> {
      notifications.getAndIncrement();
    });
System.out.println("Notifications: " + notifications.get()); // Prints 1

new Author("George R. R.", "Martin").insert().execute();
new Author("J. K.", "Rowling").insert().execute();
subscription.unsubscribe();

new Author("Stephen", "King").insert().execute();

System.out.println("Notifications: " + notifications.get()); // Prints 3

Use transactions to prevent large changes to the data from spamming your subscribers.

final AtomicInteger notifications = new AtomicInteger();
Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQuery()
    .subscribe(authors -> {
      notifications.getAndIncrement();
    });
System.out.println("Notifications: " + notifications.get()); // Prints 1

try (Transaction transaction = SqliteMagic.newTransaction()) {
  new Author("George R. R.", "Martin").insert().execute();
  new Author("J. K.", "Rowling").insert().execute();
  new Author("Stephen", "King").insert().execute();
  transaction.markSuccessful();
}

System.out.println("Notifications: " + notifications.get()); // Prints 2

Since queries are just regular RxJava Observable objects, operators can also be used to control the frequency of notifications to subscribers.

Subscription subscription = Select
    .from(AUTHOR)
    .observe()
    .runQuery()
    .debounce(500, MILLISECONDS)
    .subscribe(authors -> {
      // Do smth with author objects
    });

The full power of RxJava's operators are available for combining, filtering, and triggering any number of queries and data changes.

Clone this wiki locally