Skip to content

Commit

Permalink
feat: Improves transaction suport on a per-driver level
Browse files Browse the repository at this point in the history
  • Loading branch information
thecodedrift committed Oct 18, 2022
1 parent 07a5c60 commit 66a523a
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 172 deletions.
11 changes: 10 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,19 @@ Written in AVA. Please see /test/\*.spec.ts for examples. When writing driver te

- By default, only the Loki driver tests will run. This is because not every architecture can run every DB
- Setting `process.env.MONGO_URI` will enable MongoDB tests using an external MongoDB instance. Please ensure it supports Replica Sets
- Setting `process.env.POSTGRES_URI` will enable Postgres tests using an external Postgres instance
- Setting `process.env.POSTGRES_URL` will enable Postgres tests using an external Postgres instance

Until [this ava issue](https://github.com/avajs/ava/issues/2979) is resolved, we work around this by selecting `test`/`test.skip` as a runtime evaluation.

#### 🐋🌿 Docker + Mongo

These tests work with the docker image for mongodb, with the following caveats:

1. After pulling, you must log into the instance and run `rs.initiate()` to enable the replica set
2. You can then connect via Direct Connection with a URI such as `mongodb://127.0.0.1:27017/?replicaSet=rs0&directConnection=true`

You may also use a free Atlas instance from MongoDB to test this driver, as all Atlas instances run replica sets by default.

### 🏁 E2E Testing

End to End tests are accepted. Please use the `LokiAdapter` for any tests, as it does not mandate the external dependencies to be loaded.
Expand Down
50 changes: 25 additions & 25 deletions src/driver/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ const asynced = (...args: unknown[]) =>
r(args);
});

export class BaseDriver<Schema = unknown, Table = unknown>
implements Driver<Schema, Table>
export class BaseDriver<Schema = unknown, Table = unknown, TxInfo = unknown>
implements Driver<Schema, Table, TxInfo>
{
events: DriverEmitter;
private conn: unknown;
Expand Down Expand Up @@ -67,74 +67,74 @@ export class BaseDriver<Schema = unknown, Table = unknown>
}

/** Bookend a transaction with driver specific handling */
async transaction(body: () => Promise<unknown>) {
async transaction(body: (txn: TxInfo) => Promise<unknown>) {
await asynced(body);
throw new DriverNotImplementedError();
}

/** Take N items from the queue for processing */
async take(visibility: number, limit = 1): Promise<QueueDoc[]> {
await asynced(visibility, limit);
async take(visibility: number, limit = 1, tx?: TxInfo): Promise<QueueDoc[]> {
await asynced(visibility, limit, tx);
throw new DriverNotImplementedError();
}

/** Ack a job, removing it from the queue */
async ack(ack: string) {
await asynced(ack);
async ack(ack: string, tx?: TxInfo) {
await asynced(ack, tx);
throw new DriverNotImplementedError();
}

/** Promote a job, making it immediately available for running */
async promote(ref: string) {
await asynced(ref);
async promote(ref: string, tx?: TxInfo) {
await asynced(ref, tx);
throw new DriverNotImplementedError();
}

/** Delay a job, pushing its visibility window out */
async delay(ref: string, delayBy: number) {
await asynced(ref, delayBy);
async delay(ref: string, delayBy: number, tx?: TxInfo) {
await asynced(ref, delayBy, tx);
throw new DriverNotImplementedError();
}

/** Replay a job, copying and inserting a new job to run immediately */
async replay(ref: string) {
await asynced(ref);
async replay(ref: string, tx?: TxInfo) {
await asynced(ref, tx);
throw new DriverNotImplementedError();
}

/** Fail a job, shifting the next run ahead to a retry time */
async fail(ack: string, retryIn: number, attempt: number) {
await asynced(ack, retryIn, attempt);
async fail(ack: string, retryIn: number, attempt: number, tx?: TxInfo) {
await asynced(ack, retryIn, attempt, tx);
throw new DriverNotImplementedError();
}

/** Place an item into the dead letter queue and ack it */
async dead(doc: QueueDoc) {
await asynced(doc);
async dead(doc: QueueDoc, tx?: TxInfo) {
await asynced(doc, tx);
throw new DriverNotImplementedError();
}

/** Extend the runtime of a job */
async ping(ack: string, extendBy = 15) {
await asynced(ack, extendBy);
async ping(ack: string, extendBy = 15, tx?: TxInfo) {
await asynced(ack, extendBy, tx);
throw new DriverNotImplementedError();
}

/** Remove any jobs that are before a certain date */
async clean(before: Date) {
await asynced(before);
async clean(before: Date, tx?: TxInfo) {
await asynced(before, tx);
throw new DriverNotImplementedError();
}

/** Replace any upcoming instances of a doc with new data */
async replaceUpcoming(doc: QueueDoc): Promise<QueueDoc> {
await asynced(doc);
async replaceUpcoming(doc: QueueDoc, tx?: TxInfo): Promise<QueueDoc> {
await asynced(doc, tx);
throw new DriverNotImplementedError();
}

/** Remove all upcoming instances of a job by its ref */
async removeUpcoming(ref: string) {
await asynced(ref);
async removeUpcoming(ref: string, tx?: TxInfo) {
await asynced(ref, tx);
throw new DriverNotImplementedError();
}

Expand Down
10 changes: 7 additions & 3 deletions src/driver/loki.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,11 @@ export const getClient = (identifier: string) => {
* LokiJS Driver Class. Creates a connection that allows DocMQ to talk to
* an in-memory LokiJS instance
*/
export class LokiDriver extends BaseDriver<Loki, Collection<LokiDoc>> {
export class LokiDriver extends BaseDriver<
Loki,
Collection<LokiDoc>,
undefined
> {
protected _db: Loki | undefined;
protected _jobs: Collection<LokiDoc> | undefined;

Expand Down Expand Up @@ -159,15 +163,15 @@ export class LokiDriver extends BaseDriver<Loki, Collection<LokiDoc>> {
});
}

async transaction(body: () => Promise<unknown>): Promise<void> {
async transaction(body: (tx: undefined) => Promise<unknown>): Promise<void> {
await this.ready();

if (!this._jobs) {
throw new DriverInitializationError();
}

this._jobs.startTransaction();
await body();
await body(undefined);
this._jobs.commit();
}

Expand Down
Loading

0 comments on commit 66a523a

Please sign in to comment.