Skip to content

Commit 05eff99

Browse files
committed
CLIENT-3365 Metrics changes
1 parent 3d64de8 commit 05eff99

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1365
-107
lines changed

base_read_command.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,10 @@ func (cmd *baseReadCommand) Execute() Error {
9898
func (cmd *baseReadCommand) commandType() commandType {
9999
return ttGet
100100
}
101+
102+
func (cmd *baseReadCommand) getNamespace() *map[string]uint64 {
103+
response := make(map[string]uint64, 1)
104+
response[cmd.key.namespace] = 1
105+
106+
return &response
107+
}

base_write_command.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,3 +100,10 @@ func (cmd *baseWriteCommand) parseHeader() (types.ResultCode, Error) {
100100

101101
return rp.resultCode, nil
102102
}
103+
104+
func (cmd *baseWriteCommand) getNamespace() *map[string]uint64 {
105+
response := make(map[string]uint64)
106+
response[cmd.key.namespace] = 1
107+
108+
return &response
109+
}

batch_command_delete.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,13 @@ func (cmd *batchCommandDelete) parseRecordResults(ifc command, receiveSize int)
8181
return false, err
8282
}
8383
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
84+
8485
generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
8586
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
8687
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
8788
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
8889
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
90+
8991
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
9092
if err != nil {
9193
return false, err
@@ -112,6 +114,12 @@ func (cmd *batchCommandDelete) parseRecordResults(ifc command, receiveSize int)
112114
return false, nil
113115
}
114116

117+
// Aggregate metrics
118+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
119+
if metricsEnabled {
120+
cmd.node.stats.updateOrInsert(ifc, resultCode)
121+
}
122+
115123
if resultCode == 0 {
116124
if err = cmd.parseRecord(cmd.records[batchIndex], cmd.keys[batchIndex], opCount, generation, expiration); err != nil {
117125
return false, err
@@ -211,3 +219,11 @@ func (cmd *batchCommandDelete) Execute() Error {
211219
func (cmd *batchCommandDelete) generateBatchNodes(cluster *Cluster) ([]*batchNode, Error) {
212220
return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, nil, cmd.sequenceAP, cmd.sequenceSC, cmd.batch, false)
213221
}
222+
223+
func (cmd *batchCommandDelete) getNamespace() *map[string]uint64 {
224+
response := make(map[string]uint64, len(cmd.keys))
225+
for _, key := range cmd.keys {
226+
response[key.namespace]++
227+
}
228+
return &response
229+
}

batch_command_exists.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,18 +72,17 @@ func (cmd *batchCommandExists) writeBuffer(ifc command) Error {
7272
func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int) (bool, Error) {
7373
//Parse each message response and add it to the result array
7474
cmd.dataOffset = 0
75-
7675
for cmd.dataOffset < receiveSize {
7776
if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
7877
return false, err
7978
}
8079

8180
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
82-
// generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
83-
// expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
81+
8482
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
8583
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
8684
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
85+
8786
if len(cmd.keys) > batchIndex {
8887
err := cmd.parseFieldsRead(fieldCount, cmd.keys[batchIndex])
8988
if err != nil {
@@ -108,6 +107,12 @@ func (cmd *batchCommandExists) parseRecordResults(ifc command, receiveSize int)
108107
return false, nil
109108
}
110109

110+
// Aggregate metrics
111+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
112+
if metricsEnabled {
113+
cmd.node.stats.updateOrInsert(ifc, resultCode)
114+
}
115+
111116
if opCount > 0 {
112117
return false, newCustomNodeError(cmd.node, types.PARSE_ERROR, "Received bins that were not requested!")
113118
}
@@ -163,3 +168,12 @@ func (cmd *batchCommandExists) Execute() Error {
163168
func (cmd *batchCommandExists) generateBatchNodes(cluster *Cluster) ([]*batchNode, Error) {
164169
return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, nil, cmd.sequenceAP, cmd.sequenceSC, cmd.batch, false)
165170
}
171+
172+
func (cmd *batchCommandExists) getNamespace() *map[string]uint64 {
173+
response := make(map[string]uint64, len(cmd.keys))
174+
for _, key := range cmd.keys {
175+
response[key.namespace]++
176+
}
177+
178+
return &response
179+
}

batch_command_get.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,23 +116,17 @@ func (cmd *batchCommandGet) writeBuffer(ifc command) Error {
116116
func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, Error) {
117117
//Parse each message response and add it to the result array
118118
cmd.dataOffset = 0
119-
120119
for cmd.dataOffset < receiveSize {
121120
if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
122121
return false, err
123122
}
124123
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
124+
125125
generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
126126
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
127127
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
128128
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
129129
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
130-
if len(cmd.keys) > batchIndex {
131-
err := cmd.parseFieldsRead(fieldCount, cmd.keys[batchIndex])
132-
if err != nil {
133-
return false, err
134-
}
135-
}
136130

137131
// The only valid server return codes are "ok" and "not found" and "filtered out".
138132
// If other return codes are received, then abort the batch.
@@ -151,6 +145,12 @@ func (cmd *batchCommandGet) parseRecordResults(ifc command, receiveSize int) (bo
151145
return false, nil
152146
}
153147

148+
// Aggregate metrics
149+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
150+
if metricsEnabled {
151+
cmd.node.stats.updateOrInsert(ifc, resultCode)
152+
}
153+
154154
var err Error
155155
if cmd.indexRecords != nil {
156156
if len(cmd.indexRecords) > 0 {
@@ -274,3 +274,12 @@ func (cmd *batchCommandGet) Execute() Error {
274274
func (cmd *batchCommandGet) generateBatchNodes(cluster *Cluster) ([]*batchNode, Error) {
275275
return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, nil, cmd.sequenceAP, cmd.sequenceSC, cmd.batch, false)
276276
}
277+
278+
func (cmd *batchCommandGet) getNamespace() *map[string]uint64 {
279+
response := make(map[string]uint64, len(cmd.keys))
280+
for _, key := range cmd.keys {
281+
response[key.namespace]++
282+
}
283+
284+
return &response
285+
}

batch_command_operate.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ func (cmd *batchCommandOperate) parseRecordResults(ifc command, receiveSize int)
162162
continue
163163
}
164164

165+
// Aggregate metrics
166+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
167+
if metricsEnabled {
168+
cmd.node.stats.updateOrInsert(ifc, resultCode)
169+
}
170+
165171
if resultCode == 0 {
166172
if cmd.objects == nil {
167173
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
@@ -301,3 +307,20 @@ func (cmd *batchCommandOperate) commandType() commandType {
301307
func (cmd *batchCommandOperate) generateBatchNodes(cluster *Cluster) ([]*batchNode, Error) {
302308
return newBatchOperateNodeListIfcRetry(cluster, cmd.policy, cmd.records, cmd.sequenceAP, cmd.sequenceSC, cmd.batch)
303309
}
310+
311+
func (cmd *batchCommandOperate) getNamespace() *map[string]uint64 {
312+
response := make(map[string]uint64, len(cmd.records))
313+
for _, br := range cmd.records {
314+
switch br := br.(type) {
315+
case *BatchRead:
316+
response[br.Key.namespace]++
317+
case *BatchWrite:
318+
response[br.Key.namespace]++
319+
case *BatchDelete:
320+
response[br.Key.namespace]++
321+
case *BatchUDF:
322+
response[br.Key.namespace]++
323+
}
324+
}
325+
return &response
326+
}

batch_command_udf.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,24 @@ func (cmd *batchCommandUDF) parseRecordResults(ifc command, receiveSize int) (bo
9090
return false, err
9191
}
9292
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
93+
9394
generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
9495
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
9596
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
9697
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
9798
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
9899
err := cmd.parseFieldsWrite(resultCode, fieldCount, cmd.keys[batchIndex])
100+
99101
if err != nil {
100102
return false, err
101103
}
102104

105+
// Aggregate metrics
106+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
107+
if metricsEnabled {
108+
cmd.node.stats.updateOrInsert(ifc, resultCode)
109+
}
110+
103111
// The only valid server return codes are "ok" and "not found" and "filtered out".
104112
// If other return codes are received, then abort the batch.
105113
if resultCode != 0 {
@@ -221,3 +229,11 @@ func (cmd *batchCommandUDF) Execute() Error {
221229
func (cmd *batchCommandUDF) generateBatchNodes(cluster *Cluster) ([]*batchNode, Error) {
222230
return newBatchNodeListKeys(cluster, cmd.policy, cmd.keys, nil, cmd.sequenceAP, cmd.sequenceSC, cmd.batch, false)
223231
}
232+
233+
func (cmd *batchCommandUDF) getNamespace() *map[string]uint64 {
234+
response := make(map[string]uint64, len(cmd.keys))
235+
for _, key := range cmd.keys {
236+
response[key.namespace]++
237+
}
238+
return &response
239+
}

batch_index_command_get.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ func (cmd *batchIndexCommandGet) parseRecordResults(ifc command, receiveSize int
132132
return false, err
133133
}
134134

135+
// Aggregate metrics
136+
metricsEnabled := cmd.node.cluster.metricsEnabled.Load()
137+
if metricsEnabled {
138+
cmd.node.stats.updateOrInsert(ifc, resultCode)
139+
}
140+
135141
if resultCode != 0 {
136142
if resultCode == types.FILTERED_OUT {
137143
cmd.filteredOutCnt++
@@ -248,3 +254,11 @@ func (cmd *batchIndexCommandGet) parseRecord(key *Key, opCount int, generation,
248254

249255
return newRecord(cmd.node, key, bins, generation, expiration), nil
250256
}
257+
258+
func (cmd *batchIndexCommandGet) getNamespace() *map[string]uint64 {
259+
response := make(map[string]uint64, len(cmd.records))
260+
for _, br := range cmd.records {
261+
response[br.Key.namespace]++
262+
}
263+
return &response
264+
}

0 commit comments

Comments
 (0)