diff --git a/internal/converter/converter.go b/internal/converter/converter.go index e0a3563a26..24180b15bc 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -23,6 +23,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/converter/binary" "github.com/lf-edge/ekuiper/v2/internal/converter/delimited" "github.com/lf-edge/ekuiper/v2/internal/converter/json" + "github.com/lf-edge/ekuiper/v2/internal/converter/urlencoded" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/errorx" "github.com/lf-edge/ekuiper/v2/pkg/message" @@ -39,6 +40,9 @@ func init() { modules.RegisterConverter(message.FormatDelimited, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { return delimited.NewConverter(props) }) + modules.RegisterConverter(message.FormatUrlEncoded, func(_ api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, props map[string]any) (message.Converter, error) { + return urlencoded.NewConverter(props) + }) } func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (c message.Converter, err error) { diff --git a/internal/converter/urlencoded/converter.go b/internal/converter/urlencoded/converter.go new file mode 100644 index 0000000000..1b96ad8faf --- /dev/null +++ b/internal/converter/urlencoded/converter.go @@ -0,0 +1,77 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package urlencoded + +import ( + "fmt" + "net/url" + + "github.com/lf-edge/ekuiper/contract/v2/api" + + "github.com/lf-edge/ekuiper/v2/pkg/cast" + "github.com/lf-edge/ekuiper/v2/pkg/errorx" + "github.com/lf-edge/ekuiper/v2/pkg/message" +) + +type Converter struct{} + +func (c *Converter) Encode(_ api.StreamContext, d any) (b []byte, err error) { + defer func() { + if err != nil { + err = errorx.NewWithCode(errorx.CovnerterErr, err.Error()) + } + }() + switch m := d.(type) { + case map[string]any: + form := url.Values{} + for key, value := range m { + switch vt := value.(type) { + case []any: + for _, item := range vt { + ss, _ := cast.ToString(item, cast.CONVERT_ALL) + form.Add(key, ss) + } + default: + ss, _ := cast.ToString(value, cast.CONVERT_ALL) + form.Set(key, ss) + } + } + return []byte(form.Encode()), nil + default: + return nil, fmt.Errorf("unsupported type %v, must be a map", d) + } +} + +func (c *Converter) Decode(ctx api.StreamContext, b []byte) (any, error) { + values, err := url.ParseQuery(string(b)) + if err != nil { + return nil, fmt.Errorf("fail to parse url encoded value %s", string(b)) + } + queryMap := make(map[string]interface{}) + for key, value := range values { + if len(value) == 1 { + queryMap[key] = value[0] + } else { + queryMap[key] = value + } + } + return queryMap, nil +} + +var c = &Converter{} + +func NewConverter(props map[string]any) (message.Converter, error) { + return c, nil +} diff --git a/internal/converter/urlencoded/converter_test.go b/internal/converter/urlencoded/converter_test.go new file mode 100644 index 0000000000..af311602cb --- /dev/null +++ b/internal/converter/urlencoded/converter_test.go @@ -0,0 +1,82 @@ +// Copyright 2022-2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package urlencoded + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/lf-edge/ekuiper/v2/internal/topo/context" +) + +func TestEncodeDecode(t *testing.T) { + tt := []struct { + name string + m any + s string + nm map[string]any + err string + }{ + { + name: "normal", + m: map[string]any{ + "a": "b", + "c": 20, + }, + s: `a=b&c=20`, + nm: map[string]any{ + "a": "b", + "c": "20", + }, + }, + { + name: "nested", + m: map[string]any{ + "a": []any{10, 20, 40}, + "b": []map[string]any{{"a": "b"}}, + "c": map[string]any{"a": "b"}, + }, + s: `a=10&a=20&a=40&b=%5Bmap%5Ba%3Ab%5D%5D&c=map%5Ba%3Ab%5D`, + nm: map[string]any{ + "a": []string{"10", "20", "40"}, + "b": "[map[a:b]]", + "c": "map[a:b]", + }, + }, + { + name: "unsupport", + m: []map[string]any{{"a": 1}}, + err: "unsupported type [map[a:1]], must be a map", + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + cc, err := NewConverter(nil) + require.NoError(t, err) + encode, err := cc.Encode(context.Background(), tc.m) + if tc.err != "" { + require.EqualError(t, err, tc.err) + return + } else { + require.NoError(t, err) + require.Equal(t, tc.s, string(encode)) + } + rr, err := cc.Decode(context.Background(), []byte(tc.s)) + require.NoError(t, err) + require.Equal(t, tc.nm, rr) + }) + } +} diff --git a/internal/io/http/client.go b/internal/io/http/client.go index ed58d5d940..48ff667625 100644 --- a/internal/io/http/client.go +++ b/internal/io/http/client.go @@ -69,6 +69,7 @@ type RawConf struct { Method string `json:"method"` Body string `json:"body"` BodyType string `json:"bodyType"` + Format string `json:"format"` Headers map[string]string `json:"headers"` Timeout cast.DurationConf `json:"timeout"` Incremental bool `json:"incremental"` diff --git a/internal/io/http/rest_sink.go b/internal/io/http/rest_sink.go index f074e3bed5..1be67b34e3 100644 --- a/internal/io/http/rest_sink.go +++ b/internal/io/http/rest_sink.go @@ -30,9 +30,24 @@ type RestSink struct { *ClientConf } +var bodyTypeFormat = map[string]string{ + "json": "json", + "form": "urlencoded", +} + func (r *RestSink) Provision(ctx api.StreamContext, configs map[string]any) error { r.ClientConf = &ClientConf{} - return r.InitConf("", configs) + err := r.InitConf("", configs) + if err != nil { + return err + } + if r.ClientConf.config.Format == "" { + r.ClientConf.config.Format = "json" + } + if rf, ok := bodyTypeFormat[r.ClientConf.config.BodyType]; ok && r.ClientConf.config.Format != rf { + return fmt.Errorf("format must be %s if bodyType is %s", rf, r.ClientConf.config.BodyType) + } + return nil } func (r *RestSink) Close(ctx api.StreamContext) error { diff --git a/internal/io/http/rest_sink_test.go b/internal/io/http/rest_sink_test.go index 9d927a46d0..458acfc5f1 100644 --- a/internal/io/http/rest_sink_test.go +++ b/internal/io/http/rest_sink_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/internal/xsql" "github.com/lf-edge/ekuiper/v2/pkg/errorx" mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context" @@ -99,6 +100,7 @@ func TestRestSink_Apply(t *testing.T) { "method": "post", //"url": "http://localhost/test", //set dynamically to the test server "bodyType": "form", + "format": "urlencoded", "sendSingle": true, }, data: []map[string]interface{}{{ @@ -188,6 +190,16 @@ func TestRestSink_Apply(t *testing.T) { } } +func TestRestSinkProvision(t *testing.T) { + s := &RestSink{} + require.EqualError(t, s.Provision(context.Background(), map[string]any{ + "url": "http://localhost/test", + "method": "get", + "bodyType": "form", + "format": "json", + }), "format must be urlencoded if bodyType is form") +} + func TestRestSinkCollect(t *testing.T) { server := createServer() defer func() { diff --git a/internal/topo/node/props.go b/internal/topo/node/props.go index 52f98bcccf..378c1dff4b 100644 --- a/internal/topo/node/props.go +++ b/internal/topo/node/props.go @@ -21,7 +21,6 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/pkg/cast" - "github.com/lf-edge/ekuiper/v2/pkg/message" ) type SinkConf struct { @@ -62,9 +61,6 @@ func ParseConf(logger api.Logger, props map[string]any) (*SinkConf, error) { } if sconf.Format == "" { sconf.Format = "json" - } else if sconf.Format != message.FormatJson && sconf.Format != message.FormatProtobuf && sconf.Format != message.FormatBinary && sconf.Format != message.FormatCustom && sconf.Format != message.FormatDelimited { - logger.Warnf("invalid type for format property, should be json protobuf or binary but found %s", sconf.Format) - sconf.Format = "json" } err = cast.MapToStruct(props, &sconf.SinkConf) if err != nil { diff --git a/internal/xsql/format_type.go b/internal/xsql/format_type.go index 9b7e84be69..849543fe52 100644 --- a/internal/xsql/format_type.go +++ b/internal/xsql/format_type.go @@ -17,5 +17,5 @@ package xsql import "github.com/lf-edge/ekuiper/v2/pkg/message" func IsTextFormat(format string) bool { - return format == message.FormatJson || format == message.FormatDelimited + return format == message.FormatJson || format == message.FormatDelimited || format == message.FormatUrlEncoded } diff --git a/pkg/message/artifacts.go b/pkg/message/artifacts.go index 32463b4d6b..0fbfedb715 100644 --- a/pkg/message/artifacts.go +++ b/pkg/message/artifacts.go @@ -21,11 +21,12 @@ import ( ) const ( - FormatBinary = "binary" - FormatJson = "json" - FormatProtobuf = "protobuf" - FormatDelimited = "delimited" - FormatCustom = "custom" + FormatBinary = "binary" + FormatJson = "json" + FormatProtobuf = "protobuf" + FormatDelimited = "delimited" + FormatUrlEncoded = "urlencoded" + FormatCustom = "custom" DefaultField = "self" MetaKey = "__meta"