Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add watch and unwatch commands #1347

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/commands/discard.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ export function discard() {
if (!this.batch) {
throw new Error('ERR DISCARD without MULTI')
}
this.dirty = false
this.watching.clear()
this.batch = undefined
return 'OK'
}
Expand Down
2 changes: 2 additions & 0 deletions src/commands/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ export * from './ttl'
export * from './type'
export * from './unlink'
export * from './unsubscribe'
export * from './unwatch'
export * from './watch'
export * from './xadd'
export * from './xlen'
export * from './xrange'
Expand Down
12 changes: 12 additions & 0 deletions src/commands/unwatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'

export function unwatch() {
this.dirty = false
this.watching.clear()
return 'OK'
}

export function unwatchBuffer() {
const val = unwatch.call(this)
return convertStringToBuffer(val)
}
16 changes: 16 additions & 0 deletions src/commands/watch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'

export function watch(...keys) {
if (!this.dirty) {
for (const key of keys) {
this.watching.add(key)
}
}

return 'OK'
}

export function watchBuffer(...keys) {
const val = watch.apply(this, keys)
return convertStringToBuffer(val)
}
4 changes: 3 additions & 1 deletion src/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ export default contextMap

export function createContext(keyPrefix) {
const expires = createSharedExpires()
const modifiedKeyEvents = new EventEmitter()

return {
channels: new EventEmitter(),
expires,
data: createSharedData(expires),
data: createSharedData(expires, modifiedKeyEvents),
patternChannels: new EventEmitter(),
keyPrefix,
modifiedKeyEvents,
}
}
9 changes: 8 additions & 1 deletion src/data.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
export function createSharedData(sharedExpires) {
import { EventEmitter } from 'events'

export function createSharedData(
sharedExpires,
modifiedKeyEvents = new EventEmitter()
) {
let raw = {}

return Object.freeze({
Expand All @@ -10,6 +15,7 @@ export function createSharedData(sharedExpires) {
sharedExpires.delete(key)
}
delete raw[key]
modifiedKeyEvents.emit('modified', key)
},
get(key) {
if (sharedExpires.has(key) && sharedExpires.isExpired(key)) {
Expand Down Expand Up @@ -70,6 +76,7 @@ export function createSharedData(sharedExpires) {
}

raw[key] = item
modifiedKeyEvents.emit('modified', key)
},
})
}
Expand Down
18 changes: 15 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,21 @@ class RedisMock extends EventEmitter {
contextMap.set(this.keyData, context)
}

const context = contextMap.get(this.keyData)
this.context = contextMap.get(this.keyData)

this.expires = createExpires(context.expires, optionsWithDefault.keyPrefix)
this.expires = createExpires(this.context.expires, optionsWithDefault.keyPrefix)
this.data = createData(
context.data,
this.context.data,
this.expires,
optionsWithDefault.data,
optionsWithDefault.keyPrefix
)

this.dirty = false
this.watching = new Set()
this._signalModifiedKey = this._signalModifiedKey.bind(this) // re-assign bound method to remove listener on disconnect
this.context.modifiedKeyEvents.on('modified', this._signalModifiedKey)

this._initCommands()

this.keyspaceEvents = parseKeyspaceEvents(
Expand Down Expand Up @@ -201,6 +206,7 @@ class RedisMock extends EventEmitter {

removeFrom(this.channels)
removeFrom(this.patternChannels)
this.context.modifiedKeyEvents.off('modified', this._signalModifiedKey)
// no-op
}

Expand Down Expand Up @@ -240,6 +246,12 @@ class RedisMock extends EventEmitter {
}
})
}

_signalModifiedKey(key) {
if (!this.dirty && this.watching.has(key)) {
this.dirty = true
}
}
}

RedisMock.Command = Command
Expand Down
24 changes: 24 additions & 0 deletions src/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,31 @@ class Pipeline {
this._transactions += 1
}

_isDirty() {
if (this.redis.dirty) {
return true
}

// dirty if some watched keys have expired
const watchingKeys = this.redis.watching.values()
for (const key of watchingKeys) {
if (this.redis.expires.has(key) && this.redis.expires.isExpired(key)) {
return true
}
}

return false
}

exec(callback) {
// return null if WATCHed key was modified or expired
if (this._isDirty()) {
this.redis.dirty = false
this.redis.watching.clear()
this.batch = undefined
return asCallback(Promise.resolve(null), callback)
}

// eslint-disable-next-line prefer-destructuring
const batch = this.batch

Expand Down
24 changes: 23 additions & 1 deletion test/functional/data.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import EventEmitter from 'events'

import { createData, createSharedData } from '../../src/data'
import { createExpires, createSharedExpires } from '../../src/expires'

describe('createSharedData', () => {
const modifiedKeysEvents = new EventEmitter()
const sharedExpires = createSharedExpires()
const sharedData = createSharedData(sharedExpires)
const sharedData = createSharedData(sharedExpires, modifiedKeysEvents)
const expires = createExpires(sharedExpires)
const data1 = createData(sharedData, expires, { foo: 'bar1' }, 'data1:')
const data2 = createData(sharedData, expires, { foo: 'bar2' }, 'data2:')
Expand Down Expand Up @@ -44,6 +47,25 @@ describe('createSharedData', () => {
expect(sharedData.has('data2:baz')).toBe(true)
expect(sharedData.get('data2:baz')).toEqual('foo2')
})

it('should emit modified key events', () => {
const modifiedKeys = []
modifiedKeysEvents.on('modified', key => modifiedKeys.push(key))

// does not modify data
data1.get('bar')
data2.has('bar');
sharedData.has('data1:baz')
sharedData.keys('data1:baz')

// modifies data
data1.set('foo', 'bar1')
data2.set('foo', 'bar2')
sharedData.set('data1:foo', 'bar1')
sharedData.set('data2:foo', 'bar2')

expect(modifiedKeys).toEqual(['data1:foo', 'data2:foo', 'data1:foo', 'data2:foo'])
})
})

describe('createData', () => {
Expand Down
52 changes: 52 additions & 0 deletions test/functional/watching.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import Redis from 'ioredis'

describe('watching', () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

describe('watch', () => {
it('should set the watched keys', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
expect(redis.watching).toContain('foo', 'bar')
expect(redis.dirty).toBe(false)
})

it('should mark the client as dirty if a key is modified', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
})

it('should should not add watched key if dirty', async () => {
expect(await redis.watch('foo')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
expect(await redis.watch('bar')).toBe('OK')
expect(redis.watching).toEqual(new Set(['foo']))
})
})

describe('unwatch', () => {
it('should unwatch all watched keys', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
expect(redis.watching).toContain('foo', 'bar')
expect(await redis.unwatch()).toBe('OK')
expect(redis.watching.size).toBe(0)
})

it('should mark the client as not dirty', async () => {
expect(await redis.watch('foo', 'bar')).toBe('OK')
await redis.set('foo', '')
expect(redis.dirty).toBe(true)
expect(await redis.unwatch()).toBe('OK')
expect(redis.dirty).toBe(false)
})
})
})
22 changes: 22 additions & 0 deletions test/integration/commands/unwatch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Redis from 'ioredis'

// eslint-disable-next-line import/no-relative-parent-imports
import { runTwinSuite } from '../../../test-utils'

runTwinSuite('unwatch', (command, equals) => {
describe(command, () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

it('should return OK', async () => {
expect(equals(await redis[command](), 'OK')).toBe(true)
})
})
})
22 changes: 22 additions & 0 deletions test/integration/commands/watch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Redis from 'ioredis'

// eslint-disable-next-line import/no-relative-parent-imports
import { runTwinSuite } from '../../../test-utils'

runTwinSuite('watch', (command, equals) => {
describe(command, () => {
let redis

beforeEach(() => {
redis = new Redis()
})

afterEach(() => {
redis.disconnect()
})

it('should return OK', async () => {
expect(equals(await redis[command]('foo', 'bar'), 'OK')).toBe(true)
})
})
})
51 changes: 48 additions & 3 deletions test/integration/exec.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ describe('exec', () => {
])
})

it('should support a callback function', done => {
redis
it('should support a callback function', async () => {
await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
Expand All @@ -35,7 +35,52 @@ describe('exec', () => {
[null, 2],
[null, 2],
])
done()
})
})

it('should return null if some watched keys have expired', async () => {
await redis.expire('user_next', 1)
await redis.watch('user_next')
await new Promise(resolve => setTimeout(resolve, 1500))
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual(null)
})

it('should return null if some watched keys have changed', async () => {
const redis2 = redis.duplicate()
await redis.watch('user_next')
await redis2.incr('user_next')
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual(null)
expect(await redis.get('user_next')).toBe('2')
redis2.disconnect()
})

it('should allow unwatching before calling exec', async () => {
const redis2 = redis.duplicate()
await redis.watch('user_next')
await redis2.incr('user_next') // dirty
await redis.unwatch() // no longer dirty
const results = await redis
.multi([
['incr', 'user_next'],
['incr', 'post_next'],
])
.exec()
expect(results).toEqual([
[null, 3],
[null, 2],
])
redis2.disconnect()
})
})