@@ -102,16 +102,17 @@ static ngx_str_t conf_zero_value = ngx_string("0");
102
102
/* data structures */
103
103
104
104
struct ngx_http_log_json_format_s {
105
- ngx_str_t name ; /* the format name */
106
- ngx_str_t config ; /* value at config files */
107
- ngx_array_t * items ; /* format items */
105
+ ngx_str_t name ; /* the format name */
106
+ ngx_str_t config ; /* value at config files */
107
+ ngx_array_t * items ; /* format items */
108
108
ngx_http_complex_value_t * filter ; /* filter output */
109
109
};
110
110
typedef struct ngx_http_log_json_format_s ngx_http_log_json_format_t ;
111
111
112
112
struct ngx_http_log_json_loc_kafka_conf_s {
113
- rd_kafka_topic_t * rkt ; /* kafka topic */
114
- rd_kafka_topic_conf_t * rktc ; /* kafka topic configuration */
113
+ rd_kafka_topic_t * rkt ; /* kafka topic */
114
+ rd_kafka_topic_conf_t * rktc ; /* kafka topic configuration*/
115
+ ngx_http_complex_value_t * msg_id_var ; /* variable for message id */
115
116
};
116
117
117
118
/* configuration data structures */
@@ -447,6 +448,8 @@ static ngx_int_t ngx_http_log_json_log_handler(ngx_http_request_t *r) {
447
448
ngx_http_log_json_output_location_t * arr ;
448
449
ngx_http_log_json_output_location_t * location ;
449
450
451
+ ngx_str_t msg_id ;
452
+
450
453
lc = ngx_http_get_module_loc_conf (r , ngx_http_log_json_module );
451
454
452
455
/*FIXME: Try to discard local upstream requests */
@@ -526,6 +529,13 @@ static ngx_int_t ngx_http_log_json_log_handler(ngx_http_request_t *r) {
526
529
/* Write to kafka */
527
530
if (location -> type == NGX_HTTP_LOG_JSON_SINK_KAFKA ) {
528
531
532
+ ngx_http_complex_value (r , location -> kafka .msg_id_var , & msg_id );
533
+ #if (NGX_DEBUG )
534
+ ngx_log_error (NGX_LOG_DEBUG , r -> pool -> log , 0 ,
535
+ "http_log_json: kafka msg-id:[%v] msg:[%s]" ,
536
+ & msg_id , txt );
537
+ #endif
538
+
529
539
/* don't do anything if no kafka brokers to send */
530
540
if (! mcf -> kafka .valid_brokers ) {
531
541
continue ;
@@ -556,7 +566,8 @@ static ngx_int_t ngx_http_log_json_log_handler(ngx_http_request_t *r) {
556
566
/* Payload and length */
557
567
txt , strlen (txt ),
558
568
/* Optional key and its length */
559
- NULL , 0 ,
569
+ msg_id .data ? (const char * ) msg_id .data : NULL ,
570
+ msg_id .len ,
560
571
/* Message opaque, provided in
561
572
* delivery report callback as
562
573
* msg_opaque. */
@@ -573,6 +584,7 @@ static ngx_int_t ngx_http_log_json_log_handler(ngx_http_request_t *r) {
573
584
} else {
574
585
575
586
#if (NGX_DEBUG )
587
+
576
588
if (mcf ) {
577
589
ngx_log_error (NGX_LOG_DEBUG , r -> pool -> log , 0 ,
578
590
"http_log_json: kafka msg:[%s] ERR:[%d] QUEUE:[%d]" ,
@@ -957,6 +969,10 @@ ngx_http_log_json_loc_output(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
957
969
size_t i ;
958
970
ngx_uint_t found = 0 ;
959
971
ngx_http_log_json_main_conf_t * mcf ;
972
+ ngx_http_compile_complex_value_t ccv ;
973
+
974
+ /*FIXME: Change this to an user's configured variable */
975
+ ngx_str_t msg_id_variable = ngx_string ("$request_id" );
960
976
961
977
if (! args ) {
962
978
ngx_conf_log_error (NGX_LOG_EMERG , cf , 0 ,
@@ -1043,6 +1059,21 @@ ngx_http_log_json_loc_output(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
1043
1059
1044
1060
/* Set global variable */
1045
1061
http_log_json_has_kafka_locations = NGX_OK ;
1062
+
1063
+ /* Set variable for message id */
1064
+ ngx_memzero (& ccv , sizeof (ngx_http_compile_complex_value_t ));
1065
+
1066
+ ccv .cf = cf ;
1067
+ ccv .value = & msg_id_variable ;
1068
+ ccv .complex_value = ngx_pcalloc (cf -> pool ,
1069
+ sizeof (ngx_http_complex_value_t ));
1070
+ if (ccv .complex_value == NULL ) {
1071
+ return NGX_CONF_ERROR ;
1072
+ }
1073
+ if (ngx_http_compile_complex_value (& ccv ) != NGX_OK ) {
1074
+ return NGX_CONF_ERROR ;
1075
+ }
1076
+ new_location -> kafka .msg_id_var = ccv .complex_value ;
1046
1077
}
1047
1078
1048
1079
return NGX_CONF_OK ;
0 commit comments