Skip to content

Commit

Permalink
feat: add watch and unwatch commands
Browse files Browse the repository at this point in the history
  • Loading branch information
MiniGod committed Jun 4, 2024
1 parent 6a58955 commit d485615
Show file tree
Hide file tree
Showing 13 changed files with 249 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/commands/discard.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export function discard() {
if (!this.batch) {
throw new Error('ERR DISCARD without MULTI')
}
this.dirty = false
this.watching.clear()
this.batch = undefined
return 'OK'
}
Expand Down
2 changes: 2 additions & 0 deletions src/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ export * from './ttl'
export * from './type'
export * from './unlink'
export * from './unsubscribe'
export * from './unwatch'
export * from './watch'
export * from './xadd'
export * from './xlen'
export * from './xrange'
Expand Down
12 changes: 12 additions & 0 deletions src/commands/unwatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'

export function unwatch() {
this.dirty = false
this.watching.clear()
return 'OK'
}

export function unwatchBuffer() {
const val = unwatch.call(this)
return convertStringToBuffer(val)
}
16 changes: 16 additions & 0 deletions src/commands/watch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'

export function watch(...keys) {
if (!this.dirty) {
for (const key of keys) {
this.watching.add(key)
}
}

return 'OK'
}

export function watchBuffer(...keys) {
const val = watch.apply(this, keys)
return convertStringToBuffer(val)
}
4 changes: 3 additions & 1 deletion src/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ export default contextMap

export function createContext(keyPrefix) {
const expires = createSharedExpires()
const modifiedKeyEvents = new EventEmitter()

return {
channels: new EventEmitter(),
expires,
data: createSharedData(expires),
data: createSharedData(expires, modifiedKeyEvents),
patternChannels: new EventEmitter(),
keyPrefix,
modifiedKeyEvents,
}
}
9 changes: 8 additions & 1 deletion src/data.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
export function createSharedData(sharedExpires) {
import { EventEmitter } from 'events'

export function createSharedData(
sharedExpires,
modifiedKeyEvents = new EventEmitter()
) {
let raw = {}

return Object.freeze({
Expand All @@ -10,6 +15,7 @@ export function createSharedData(sharedExpires) {
sharedExpires.delete(key)
}
delete raw[key]
modifiedKeyEvents.emit('modified', key)
},
get(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
Expand Down Expand Up @@ -70,6 +76,7 @@ export function createSharedData(sharedExpires) {
}

raw[key] = item
modifiedKeyEvents.emit('modified', key)
},
})
}
Expand Down
18 changes: 15 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,21 @@ class RedisMock extends EventEmitter {
contextMap.set(this.keyData, context)
}

const context = contextMap.get(this.keyData)
this.context = contextMap.get(this.keyData)

this.expires = createExpires(context.expires, optionsWithDefault.keyPrefix)
this.expires = createExpires(this.context.expires, optionsWithDefault.keyPrefix)
this.data = createData(
context.data,
this.context.data,
this.expires,
optionsWithDefault.data,
optionsWithDefault.keyPrefix
)

this.dirty = false
this.watching = new Set()
this._signalModifiedKey = this._signalModifiedKey.bind(this) // re-assign bound method to remove listener on disconnect
this.context.modifiedKeyEvents.on('modified', this._signalModifiedKey)

this._initCommands()

this.keyspaceEvents = parseKeyspaceEvents(
Expand Down Expand Up @@ -201,6 +206,7 @@ class RedisMock extends EventEmitter {

removeFrom(this.channels)
removeFrom(this.patternChannels)
this.context.modifiedKeyEvents.off('modified', this._signalModifiedKey)
// no-op
}

Expand Down Expand Up @@ -240,6 +246,12 @@ class RedisMock extends EventEmitter {
}
})
}

_signalModifiedKey(key) {
if (!this.dirty && this.watching.has(key)) {
this.dirty = true
}
}
}

RedisMock.Command = Command
Expand Down
24 changes: 24 additions & 0 deletions src/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,34 @@ class Pipeline {
this._transactions += 1
}

_isDirty() {
if (this.redis.dirty) {
return true
}

// dirty if some watched keys have expired
const watchingKeys = this.redis.watching.values()
for (const key of watchingKeys) {
if (this.redis.expires.has(key) && this.redis.expires.isExpired(key)) {
return true
}
}

return false
}

exec(callback) {
// eslint-disable-next-line prefer-destructuring
const batch = this.batch

// return null if WATCHed key was modified or expired
if (this._isDirty()) {
this.redis.dirty = false
this.redis.watching.clear()
this.batch = undefined
return asCallback(Promise.resolve(null), callback)
}

this.batch = []
return asCallback(
Promise.all(batch.map(cmd => cmd())).then(replies =>
Expand Down
24 changes: 23 additions & 1 deletion test/functional/data.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import EventEmitter from 'events'

import { createData, createSharedData } from '../../src/data'
import { createExpires, createSharedExpires } from '../../src/expires'

describe('createSharedData', () => {
const modifiedKeysEvents = new EventEmitter()
const sharedExpires = createSharedExpires()
const sharedData = createSharedData(sharedExpires)
const sharedData = createSharedData(sharedExpires, modifiedKeysEvents)
const expires = createExpires(sharedExpires)
const data1 = createData(sharedData, expires, { foo: 'bar1' }, 'data1:')
const data2 = createData(sharedData, expires, { foo: 'bar2' }, 'data2:')
Expand Down Expand Up @@ -44,6 +47,25 @@ describe('createSharedData', () => {
expect(sharedData.has('data2:baz')).toBe(true)
expect(sharedData.get('data2:baz')).toEqual('foo2')
})

it('should emit modified key events', () => {
const modifiedKeys = []
modifiedKeysEvents.on('modified', key => modifiedKeys.push(key))

// does not modify data
data1.get('bar')
data2.has('bar');
sharedData.has('data1:baz')
sharedData.keys('data1:baz')

// modifies data
data1.set('foo', 'bar1')
data2.set('foo', 'bar2')
sharedData.set('data1:foo', 'bar1')
sharedData.set('data2:foo', 'bar2')

expect(modifiedKeys).toEqual(['data1:foo', 'data2:foo', 'data1:foo', 'data2:foo'])
})
})

describe('createData', () => {
Expand Down
52 changes: 52 additions & 0 deletions test/functional/watching.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import Redis from 'ioredis'

describe('watching', () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

describe('watch', () => {
it('should set the watched keys', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
expect(redis.watching).toContain('foo', 'bar')
expect(redis.dirty).toBe(false)
})

it('should mark the client as dirty if a key is modified', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
})

it('should should not add watched key if dirty', async () => {
expect(await redis.watch('foo')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
expect(await redis.watch('bar')).toBe('OK')
expect(redis.watching).toEqual(new Set(['foo']))
})
})

describe('unwatch', () => {
it('should unwatch all watched keys', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
expect(redis.watching).toContain('foo', 'bar')
expect(await redis.unwatch()).toBe('OK')
expect(redis.watching.size).toBe(0)
})

it('should mark the client as not dirty', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
expect(await redis.unwatch()).toBe('OK')
expect(redis.dirty).toBe(false)
})
})
})
22 changes: 22 additions & 0 deletions test/integration/commands/unwatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Redis from 'ioredis'

// eslint-disable-next-line import/no-relative-parent-imports
import { runTwinSuite } from '../../../test-utils'

runTwinSuite('unwatch', (command, equals) => {
describe(command, () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

it('should return OK', async () => {
expect(equals(await redis[command](), 'OK')).toBe(true)
})
})
})
22 changes: 22 additions & 0 deletions test/integration/commands/watch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Redis from 'ioredis'

// eslint-disable-next-line import/no-relative-parent-imports
import { runTwinSuite } from '../../../test-utils'

runTwinSuite('watch', (command, equals) => {
describe(command, () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

it('should return OK', async () => {
expect(equals(await redis[command]('foo', 'bar'), 'OK')).toBe(true)
})
})
})
51 changes: 48 additions & 3 deletions test/integration/exec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ describe('exec', () => {
])
})

it('should support a callback function', done => {
redis
it('should support a callback function', async () => {
await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
Expand All @@ -35,7 +35,52 @@ describe('exec', () => {
[null, 2],
[null, 2],
])
done()
})
})

it('should return null if some watched keys have expired', async () => {
await redis.expire('user_next', 1)
await redis.watch('user_next')
await new Promise(resolve => setTimeout(resolve, 1500))
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual(null)
})

it('should return null if some watched keys have changed', async () => {
const redis2 = redis.duplicate()
await redis.watch('user_next')
await redis2.incr('user_next')
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual(null)
expect(await redis.get('user_next')).toBe('2')
redis2.disconnect()
})

it('should allow unwatching before calling exec', async () => {
const redis2 = redis.duplicate()
await redis.watch('user_next')
await redis2.incr('user_next') // dirty
await redis.unwatch() // no longer dirty
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual([
[null, 3],
[null, 2],
])
redis2.disconnect()
})
})

0 comments on commit d485615

Please sign in to comment.