16
16
17
17
package org .springframework .cloud .stream .function ;
18
18
19
- import java .nio .charset .StandardCharsets ;
20
- import java .util .Collections ;
21
19
import java .util .Locale ;
22
- import java .util .Map ;
23
20
import java .util .function .Function ;
24
21
25
22
import org .junit .jupiter .api .BeforeAll ;
29
26
import org .springframework .boot .autoconfigure .EnableAutoConfiguration ;
30
27
import org .springframework .boot .builder .SpringApplicationBuilder ;
31
28
import org .springframework .cloud .function .context .message .MessageUtils ;
32
- import org .springframework .cloud .function .json .JsonMapper ;
33
- import org .springframework .cloud .stream .binder .BinderHeaders ;
34
- import org .springframework .cloud .stream .binder .test .EnableTestBinder ;
35
29
import org .springframework .cloud .stream .binder .test .InputDestination ;
36
30
import org .springframework .cloud .stream .binder .test .OutputDestination ;
37
31
import org .springframework .cloud .stream .binder .test .TestChannelBinderConfiguration ;
38
- import org .springframework .cloud .stream .utils .BuildInformationProvider ;
39
- import org .springframework .context .ApplicationContext ;
40
32
import org .springframework .context .ConfigurableApplicationContext ;
41
33
import org .springframework .context .annotation .Bean ;
42
34
import org .springframework .context .annotation .Configuration ;
51
43
/**
52
44
* @author Omer Celik
53
45
*/
46
+
54
47
public class HeaderTests {
55
48
56
49
@ BeforeAll
@@ -70,8 +63,10 @@ void checkWithEmptyPojo() {
70
63
71
64
OutputDestination outputDestination = context .getBean (OutputDestination .class );
72
65
Message <byte []> messageReceived = outputDestination .receive (1000 , "emptyConfigurationDestination" );
73
-
74
- checkCommonHeaders (messageReceived .getHeaders ());
66
+ MessageHeaders headers = messageReceived .getHeaders ();
67
+ assertThat (headers ).isNotNull ();
68
+ assertThat (headers .get (MessageUtils .TARGET_PROTOCOL )).isEqualTo ("kafka" );
69
+ assertThat (headers .get (MessageHeaders .CONTENT_TYPE )).isEqualTo ("application/json" );
75
70
}
76
71
}
77
72
@@ -80,20 +75,20 @@ void checkIfHeaderProvidedInData() {
80
75
try (ConfigurableApplicationContext context = new SpringApplicationBuilder (
81
76
TestChannelBinderConfiguration .getCompleteConfiguration (EmptyConfiguration .class ))
82
77
.web (WebApplicationType .NONE ).run ("--spring.jmx.enabled=false" )) {
83
-
84
78
StreamBridge streamBridge = context .getBean (StreamBridge .class );
85
79
String jsonPayload = "{\" name\" :\" Omer\" }" ;
86
80
streamBridge .send ("myBinding-out-0" ,
87
81
MessageBuilder .withPayload (jsonPayload .getBytes ())
88
82
.setHeader ("anyHeader" , "anyValue" )
89
83
.build (),
90
84
MimeTypeUtils .APPLICATION_JSON );
91
-
92
85
OutputDestination output = context .getBean (OutputDestination .class );
93
86
Message <byte []> result = output .receive (1000 , "myBinding-out-0" );
94
-
95
- checkCommonHeaders (result .getHeaders ());
96
- assertThat (result .getHeaders ().get ("anyHeader" )).isEqualTo ("anyValue" );
87
+ MessageHeaders headers = result .getHeaders ();
88
+ assertThat (headers ).isNotNull ();
89
+ assertThat (headers .get (MessageUtils .TARGET_PROTOCOL )).isEqualTo ("kafka" );
90
+ assertThat (headers .get (MessageHeaders .CONTENT_TYPE )).isEqualTo ("application/json" );
91
+ assertThat (headers .get ("anyHeader" )).isEqualTo ("anyValue" );
97
92
}
98
93
}
99
94
@@ -104,35 +99,16 @@ void checkGenericMessageSent() {
104
99
.web (WebApplicationType .NONE )
105
100
.run ("--spring.jmx.enabled=false" ,
106
101
"--spring.cloud.function.definition=uppercase" )) {
107
-
108
102
String jsonPayload = "{\" surname\" :\" Celik\" }" ;
109
103
InputDestination input = context .getBean (InputDestination .class );
110
104
input .send (new GenericMessage <>(jsonPayload .getBytes ()), "uppercase-in-0" );
111
-
112
105
OutputDestination output = context .getBean (OutputDestination .class );
113
- Message <byte []> result = output .receive (1000 , "uppercase-out-0" );
114
-
115
- checkCommonHeaders (result .getHeaders ());
116
- }
117
- }
118
106
119
- @ Test
120
- void checkGenericMessageSentUsingStreamBridge () {
121
- try (ConfigurableApplicationContext context = new SpringApplicationBuilder (
122
- TestChannelBinderConfiguration .getCompleteConfiguration (FunctionUpperCaseConfiguration .class ))
123
- .web (WebApplicationType .NONE )
124
- .run ("--spring.jmx.enabled=false" ,
125
- "--spring.cloud.function.definition=uppercase" )) {
126
-
127
- String jsonPayload = "{\" anyFieldName\" :\" anyValue\" }" ;
128
- final StreamBridge streamBridge = context .getBean (StreamBridge .class );
129
- GenericMessage <String > message = new GenericMessage <>(jsonPayload );
130
- streamBridge .send ("uppercase-in-0" , message );
131
-
132
- OutputDestination output = context .getBean (OutputDestination .class );
133
107
Message <byte []> result = output .receive (1000 , "uppercase-out-0" );
134
-
135
- checkCommonHeaders (result .getHeaders ());
108
+ MessageHeaders headers = result .getHeaders ();
109
+ assertThat (headers ).isNotNull ();
110
+ assertThat (headers .get (MessageUtils .TARGET_PROTOCOL )).isEqualTo ("kafka" );
111
+ assertThat (headers .get (MessageHeaders .CONTENT_TYPE )).isEqualTo ("application/json" );
136
112
}
137
113
}
138
114
@@ -151,96 +127,11 @@ void checkMessageWrappedFunctionalConsumer() {
151
127
152
128
OutputDestination target = context .getBean (OutputDestination .class );
153
129
Message <byte []> message = target .receive (5 , "uppercase-out-0" );
154
-
155
- checkCommonHeaders (message .getHeaders ());
156
- }
157
-
158
- @ Test
159
- void checkStringToMapMessageStreamListener () {
160
- ApplicationContext context = new SpringApplicationBuilder (
161
- StringToMapMessageConfiguration .class ).web (WebApplicationType .NONE )
162
- .run ("--spring.jmx.enabled=false" );
163
- InputDestination source = context .getBean (InputDestination .class );
164
- String jsonPayload = "{\" name\" :\" Omer\" }" ;
165
- source .send (new GenericMessage <>(jsonPayload .getBytes ()));
166
- OutputDestination target = context .getBean (OutputDestination .class );
167
- Message <byte []> outputMessage = target .receive ();
168
- checkCommonHeaders (outputMessage .getHeaders ());
169
- }
170
-
171
- @ Test
172
- void checkPojoToPojo () {
173
- ApplicationContext context = new SpringApplicationBuilder (
174
- PojoToPojoConfiguration .class ).web (WebApplicationType .NONE )
175
- .run ("--spring.jmx.enabled=false" );
176
- InputDestination source = context .getBean (InputDestination .class );
177
- String jsonPayload = "{\" name\" :\" Omer\" }" ;
178
- source .send (new GenericMessage <>(jsonPayload .getBytes ()));
179
- OutputDestination target = context .getBean (OutputDestination .class );
180
- Message <byte []> outputMessage = target .receive ();
181
- checkCommonHeaders (outputMessage .getHeaders ());
182
- }
183
-
184
- @ Test
185
- void checkPojoToString () {
186
- ApplicationContext context = new SpringApplicationBuilder (
187
- PojoToStringConfiguration .class ).web (WebApplicationType .NONE )
188
- .run ("--spring.jmx.enabled=false" );
189
- InputDestination source = context .getBean (InputDestination .class );
190
- OutputDestination target = context .getBean (OutputDestination .class );
191
- String jsonPayload = "{\" name\" :\" Neso\" }" ;
192
- source .send (new GenericMessage <>(jsonPayload .getBytes ()));
193
- Message <byte []> outputMessage = target .receive ();
194
- checkCommonHeaders (outputMessage .getHeaders ());
195
- }
196
-
197
- @ Test
198
- void checkPojoToByteArray () {
199
- ApplicationContext context = new SpringApplicationBuilder (
200
- PojoToByteArrayConfiguration .class ).web (WebApplicationType .NONE )
201
- .run ("--spring.jmx.enabled=false" );
202
- InputDestination source = context .getBean (InputDestination .class );
203
- OutputDestination target = context .getBean (OutputDestination .class );
204
- String jsonPayload = "{\" name\" :\" Neptune\" }" ;
205
- source .send (new GenericMessage <>(jsonPayload .getBytes ()));
206
- Message <byte []> outputMessage = target .receive ();
207
- checkCommonHeaders (outputMessage .getHeaders ());
208
- }
209
-
210
- @ Test
211
- void checkStringToPojoInboundContentTypeHeader () {
212
- ApplicationContext context = new SpringApplicationBuilder (
213
- StringToPojoConfiguration .class ).web (WebApplicationType .NONE )
214
- .run ("--spring.jmx.enabled=false" );
215
- InputDestination source = context .getBean (InputDestination .class );
216
- OutputDestination target = context .getBean (OutputDestination .class );
217
- String jsonPayload = "{\" name\" :\" Mercury\" }" ;
218
- source .send (new GenericMessage <>(jsonPayload .getBytes (),
219
- new MessageHeaders (Collections .singletonMap (MessageHeaders .CONTENT_TYPE ,
220
- MimeTypeUtils .APPLICATION_JSON_VALUE ))));
221
- Message <byte []> outputMessage = target .receive ();
222
- checkCommonHeaders (outputMessage .getHeaders ());
223
- }
224
-
225
- @ Test
226
- void checkPojoMessageToStringMessage () {
227
- ApplicationContext context = new SpringApplicationBuilder (
228
- PojoMessageToStringMessageConfiguration .class )
229
- .web (WebApplicationType .NONE ).run ("--spring.jmx.enabled=false" );
230
- InputDestination source = context .getBean (InputDestination .class );
231
- OutputDestination target = context .getBean (OutputDestination .class );
232
- String jsonPayload = "{\" name\" :\" Earth\" }" ;
233
- source .send (new GenericMessage <>(jsonPayload .getBytes ()));
234
- Message <byte []> outputMessage = target .receive ();
235
- MessageHeaders headers = outputMessage .getHeaders ();
236
- assertThat (BuildInformationProvider .isVersionValid ((String ) headers .get (BinderHeaders .SCST_VERSION ))).isTrue ();
237
- }
238
-
239
- private void checkCommonHeaders (MessageHeaders headers ) {
130
+ MessageHeaders headers = message .getHeaders ();
131
+ assertThat (headers ).isNotNull ();
240
132
assertThat (headers ).isNotNull ();
241
- assertThat (headers .get (MessageUtils .TARGET_PROTOCOL )).isEqualTo ("kafka" );
242
133
assertThat (headers .get (MessageHeaders .CONTENT_TYPE )).isEqualTo ("application/json" );
243
- assertThat (BuildInformationProvider . isVersionValid (( String ) headers .get (BinderHeaders . SCST_VERSION ))). isTrue ( );
134
+ assertThat (headers .get (MessageUtils . TARGET_PROTOCOL )). isEqualTo ( "kafka" );
244
135
}
245
136
246
137
@ EnableAutoConfiguration
@@ -265,97 +156,6 @@ public Function<String, String> uppercase() {
265
156
}
266
157
}
267
158
268
- @ EnableTestBinder
269
- @ EnableAutoConfiguration
270
- public static class StringToMapMessageConfiguration {
271
- @ Bean
272
- public Function <Message <Map <?, ?>>, String > echo () {
273
- return value -> {
274
- assertThat (value .getPayload () instanceof Map ).isTrue ();
275
- return (String ) value .getPayload ().get ("name" );
276
- };
277
- }
278
- }
279
-
280
- @ EnableTestBinder
281
- @ EnableAutoConfiguration
282
- public static class PojoToPojoConfiguration {
283
-
284
- @ Bean
285
- public Function <Planet , Planet > echo () {
286
- return value -> value ;
287
- }
288
- }
289
-
290
- @ EnableTestBinder
291
- @ EnableAutoConfiguration
292
- public static class PojoToStringConfiguration {
293
-
294
- @ Bean
295
- public Function <Planet , String > echo () {
296
- return Planet ::toString ;
297
- }
298
- }
299
-
300
- @ EnableTestBinder
301
- @ EnableAutoConfiguration
302
- public static class PojoToByteArrayConfiguration {
303
-
304
- @ Bean
305
- public Function <Planet , byte []> echo () {
306
- return value -> value .toString ().getBytes (StandardCharsets .UTF_8 );
307
- }
308
- }
309
-
310
- @ EnableTestBinder
311
- @ EnableAutoConfiguration
312
- public static class StringToPojoConfiguration {
313
-
314
- @ Bean
315
- public Function <String , Planet > echo (JsonMapper mapper ) {
316
- return value -> mapper .fromJson (value , Planet .class );
317
- }
318
- }
319
-
320
- @ EnableTestBinder
321
- @ EnableAutoConfiguration
322
- public static class PojoMessageToStringMessageConfiguration {
323
-
324
- @ Bean
325
- public Function <Message <Planet >, Message <String >> echo () {
326
- return value -> MessageBuilder .withPayload (value .getPayload ().toString ())
327
- .setHeader ("expected-content-type" , MimeTypeUtils .TEXT_PLAIN_VALUE )
328
- .build ();
329
- }
330
- }
331
-
332
- public static class Planet {
333
-
334
- private String name ;
335
-
336
- Planet () {
337
- this (null );
338
- }
339
-
340
- Planet (String name ) {
341
- this .name = name ;
342
- }
343
-
344
- public String getName () {
345
- return this .name ;
346
- }
347
-
348
- public void setName (String name ) {
349
- this .name = name ;
350
- }
351
-
352
- @ Override
353
- public String toString () {
354
- return this .name ;
355
- }
356
-
357
- }
358
-
359
159
public static class EmptyPojo {
360
160
361
161
}
0 commit comments