@@ -22,7 +22,7 @@ export type {
22
22
* Bindings for Kafka transport
23
23
* @implements {@linkcode Binding }
24
24
*/
25
- const Kafka : Binding = {
25
+ const Kafka : Binding < KafkaMessage < unknown > , KafkaMessage < string > > = {
26
26
binary : toBinaryKafkaMessage ,
27
27
structured : toStructuredKafkaMessage ,
28
28
toEvent : deserializeKafkaMessage ,
@@ -35,9 +35,9 @@ type Key = string | Buffer;
35
35
* Extends the base Message type to include
36
36
* Kafka-specific fields
37
37
*/
38
- interface KafkaMessage < T = string > extends Message {
38
+ interface KafkaMessage < T = string | Buffer | unknown > extends Message {
39
39
key : Key
40
- value : T | string | Buffer | unknown
40
+ value : T
41
41
timestamp ?: string
42
42
}
43
43
@@ -61,7 +61,7 @@ interface KafkaEvent<T> extends CloudEventV1<T> {
61
61
* @param {KafkaEvent<T> } event The event to serialize
62
62
* @returns {KafkaMessage<T> } a KafkaMessage instance
63
63
*/
64
- function toBinaryKafkaMessage < T > ( event : CloudEventV1 < T > ) : KafkaMessage < T > {
64
+ function toBinaryKafkaMessage < T > ( event : CloudEventV1 < T > ) : KafkaMessage < T | undefined > {
65
65
// 3.2.1. Content Type
66
66
// For the binary mode, the header content-type property MUST be mapped directly
67
67
// to the CloudEvents datacontenttype attribute.
@@ -86,7 +86,7 @@ function toBinaryKafkaMessage<T>(event: CloudEventV1<T>): KafkaMessage<T> {
86
86
* @param {CloudEvent<T> } event the CloudEvent to be serialized
87
87
* @returns {KafkaMessage<T> } a KafkaMessage instance
88
88
*/
89
- function toStructuredKafkaMessage < T > ( event : CloudEventV1 < T > ) : KafkaMessage < T > {
89
+ function toStructuredKafkaMessage < T > ( event : CloudEventV1 < T > ) : KafkaMessage < string > {
90
90
if ( ( event instanceof CloudEvent ) && event . data_base64 ) {
91
91
// The event's data is binary - delete it
92
92
event = event . cloneWith ( { data : undefined } ) ;
@@ -130,9 +130,9 @@ function deserializeKafkaMessage<T>(message: Message): CloudEvent<T> | CloudEven
130
130
case Mode . BINARY :
131
131
return parseBinary ( m ) ;
132
132
case Mode . STRUCTURED :
133
- return parseStructured ( m ) ;
133
+ return parseStructured ( m as unknown as KafkaMessage < string > ) ;
134
134
case Mode . BATCH :
135
- return parseBatched ( m ) ;
135
+ return parseBatched ( m as unknown as KafkaMessage < string > ) ;
136
136
default :
137
137
throw new ValidationError ( "Unknown Message mode" ) ;
138
138
}
@@ -212,14 +212,14 @@ function parseBinary<T>(message: KafkaMessage<T>): CloudEvent<T> {
212
212
* @param {KafkaMessage<T> } message the message
213
213
* @returns {CloudEvent<T> } a KafkaEvent<T>
214
214
*/
215
- function parseStructured < T > ( message : KafkaMessage < T > ) : CloudEvent < T > {
215
+ function parseStructured < T > ( message : KafkaMessage < string > ) : CloudEvent < T > {
216
216
// Although the format of a structured encoded event could be something
217
217
// other than JSON, e.g. XML, we currently only support JSON
218
218
// encoded structured events.
219
219
if ( ! message . headers [ CONSTANTS . HEADER_CONTENT_TYPE ] ?. startsWith ( CONSTANTS . MIME_CE_JSON ) ) {
220
220
throw new ValidationError ( `Unsupported event encoding ${ message . headers [ CONSTANTS . HEADER_CONTENT_TYPE ] } ` ) ;
221
221
}
222
- const eventObj = JSON . parse ( message . value as string ) ;
222
+ const eventObj = JSON . parse ( message . value ) ;
223
223
eventObj . time = new Date ( eventObj . time ) . toISOString ( ) ;
224
224
return new CloudEvent ( {
225
225
...eventObj ,
@@ -232,14 +232,14 @@ function parseStructured<T>(message: KafkaMessage<T>): CloudEvent<T> {
232
232
* @param {KafkaMessage<T> } message the message
233
233
* @returns {CloudEvent<T>[] } an array of KafkaEvent<T>
234
234
*/
235
- function parseBatched < T > ( message : KafkaMessage < T > ) : CloudEvent < T > [ ] {
235
+ function parseBatched < T > ( message : KafkaMessage < string > ) : CloudEvent < T > [ ] {
236
236
// Although the format of batch encoded events could be something
237
237
// other than JSON, e.g. XML, we currently only support JSON
238
238
// encoded structured events.
239
239
if ( ! message . headers [ CONSTANTS . HEADER_CONTENT_TYPE ] ?. startsWith ( CONSTANTS . MIME_CE_BATCH ) ) {
240
240
throw new ValidationError ( `Unsupported event encoding ${ message . headers [ CONSTANTS . HEADER_CONTENT_TYPE ] } ` ) ;
241
241
}
242
- const events = JSON . parse ( message . value as string ) as Record < string , unknown > [ ] ;
242
+ const events = JSON . parse ( message . value ) as Record < string , unknown > [ ] ;
243
243
return events . map ( ( e ) => new CloudEvent ( { ...e , partitionkey : message . key } , false ) ) ;
244
244
}
245
245
0 commit comments