1
1
import { randomBytes } from 'crypto' ;
2
2
import { getMissingProps } from '@cordis/common' ;
3
- import { encode , decode } from '@msgpack/msgpack' ;
3
+ import { encode , decode , ExtensionCodec } from '@msgpack/msgpack' ;
4
4
import { CordisBrokerTypeError } from '../error' ;
5
5
import type * as amqp from 'amqplib' ;
6
6
import type { Broker } from './Broker' ;
@@ -88,7 +88,21 @@ export class BrokerUtil {
88
88
* Broker instance
89
89
*/
90
90
public readonly broker : Broker
91
- ) { }
91
+ ) {
92
+ this . packBigintExtensionCodec . register ( {
93
+ type : 0 ,
94
+ encode : ( input : unknown ) => {
95
+ if ( typeof input === 'bigint' ) {
96
+ return encode ( input . toString ( ) ) ;
97
+ }
98
+
99
+ return null ;
100
+ } ,
101
+ decode : ( data : Uint8Array ) => BigInt ( decode ( data ) as string )
102
+ } ) ;
103
+ }
104
+
105
+ public readonly packBigintExtensionCodec = new ExtensionCodec ( ) ;
92
106
93
107
/**
94
108
* Generates a base64 string with the given length using Node.js' Crypto
@@ -118,7 +132,7 @@ export class BrokerUtil {
118
132
}
119
133
120
134
try {
121
- await cb ( decode ( msg . content ) as T , msg ) ;
135
+ await cb ( decode ( msg . content , { extensionCodec : this . packBigintExtensionCodec } ) as T , msg ) ;
122
136
} catch ( e ) {
123
137
this . broker . emit ( 'error' , e ) ;
124
138
this . broker . channel . reject ( msg , true ) ;
@@ -145,7 +159,7 @@ export class BrokerUtil {
145
159
public sendToQueue < T > ( options : SendToQueueOptions < T > ) {
146
160
const { to, content, options : amqpOptions } = options ;
147
161
148
- const encoded = encode ( content ) ;
162
+ const encoded = encode ( content , { extensionCodec : this . packBigintExtensionCodec } ) ;
149
163
const data = Buffer . from ( encoded . buffer , encoded . byteOffset , encoded . byteLength ) ;
150
164
return this . broker . channel . sendToQueue ( to , data , amqpOptions ) ;
151
165
}
@@ -158,7 +172,7 @@ export class BrokerUtil {
158
172
public sendToExchange < T > ( options : SendToExchangeOptions < T > ) {
159
173
const { to, content, key, options : amqpOptions } = options ;
160
174
161
- const encoded = encode ( content ) ;
175
+ const encoded = encode ( content , { extensionCodec : this . packBigintExtensionCodec } ) ;
162
176
const data = Buffer . from ( encoded . buffer , encoded . byteOffset , encoded . byteLength ) ;
163
177
return this . broker . channel . publish ( to , key , data , amqpOptions ) ;
164
178
}
0 commit comments