Skip to content

Commit

Permalink
Feat/pbserver (#59)
Browse files Browse the repository at this point in the history
* Protobuf-based plugin RPC
  • Loading branch information
javierguerragiraldez authored Apr 13, 2021
1 parent 35746ae commit a04730d
Show file tree
Hide file tree
Showing 37 changed files with 5,457 additions and 1,005 deletions.
220 changes: 124 additions & 96 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,146 +4,174 @@ Used internally for the RPC protocol.
package bridge

import (
"encoding/binary"
"errors"
"io"
"log"
"net"

"github.com/Kong/go-pdk/server/kong_plugin_protocol"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)

type PdkBridge struct {
ch chan interface{}
conn net.Conn
}

type StepData struct {
Method string
Args []interface{}
}

func New(ch chan interface{}) PdkBridge {
return PdkBridge{ch: ch}
func New(conn net.Conn) PdkBridge {
return PdkBridge{
conn: conn,
}
}

func (b PdkBridge) Ask(method string, args ...interface{}) (interface{}, error) {
b.ch <- StepData{method, args}
func readPbFrame(conn net.Conn) (data []byte, err error) {
var len uint32
err = binary.Read(conn, binary.LittleEndian, &len)
if err != nil {
return
}

reply := <-b.ch
data = make([]byte, len)
if data == nil {
return nil, errors.New("no memory")
}

err, ok := reply.(error)
if ok {
_, err = io.ReadFull(conn, data)
if err != nil {
return nil, err
}

return reply, nil
}

func (b PdkBridge) AskClose(method string, args ...interface{}) {
b.ch <- StepData{ method, args }
close(b.ch)
return
}

func (b PdkBridge) AskInt(method string, args ...interface{}) (i int, err error) {
val, err := b.Ask(method, args...)
func writePbFrame(conn net.Conn, data []byte) (err error) {
var len uint32 = uint32(len(data))
err = binary.Write(conn, binary.LittleEndian, len)
if err != nil {
return
}
if val == nil {
err = errors.New("null response")
return
}

switch val := val.(type) {
case int:
i = int(val)
case int8:
i = int(val)
case int16:
i = int(val)
case int32:
i = int(val)
case int64:
i = int(val)
case uint:
i = int(val)
case uint8:
i = int(val)
case uint16:
i = int(val)
case uint32:
i = int(val)
case uint64:
i = int(val)
default:
err = ReturnTypeError("integer")
if len > 0 {
_, err = conn.Write(data)
}

return
}

func (b PdkBridge) AskFloat(method string, args ...interface{}) (f float64, err error) {
val, err := b.Ask(method, args...)
if err != nil {
return
func WrapString(s string) *kong_plugin_protocol.String {
return &kong_plugin_protocol.String{V: s}
}

func WrapHeaders(h map[string][]string) (*structpb.Struct, error) {
h2 := make(map[string]interface{}, len(h))
for k, v := range h {
l := make([]interface{}, len(v))
for i, v2 := range v {
l[i] = v2
}
h2[k] = l
}
if val == nil {
err = errors.New("null response")
return

st, err := structpb.NewStruct(h2)
if err != nil {
return nil, err
}

switch val := val.(type) {
case int:
f = float64(val)
case int8:
f = float64(val)
case int16:
f = float64(val)
case int32:
f = float64(val)
case int64:
f = float64(val)
case uint:
f = float64(val)
case uint8:
f = float64(val)
case uint16:
f = float64(val)
case uint32:
f = float64(val)
case uint64:
f = float64(val)
case float32:
f = float64(val)
case float64:
f = float64(val)
default:
err = ReturnTypeError("float")
return st, nil
}

func UnwrapHeaders(st *structpb.Struct) map[string][]string {
m := st.AsMap()
m2 := make(map[string][]string)
for k, v := range m {
switch v2 := v.(type) {
case string:
m2[k] = []string{v2}
case []string:
m2[k] = v2
case []interface{}:
m2[k] = make([]string, len(v2))
for i, v3 := range v2 {
if s, ok := v3.(string); ok {
m2[k][i] = s
}
}
default:
log.Printf("unexpected type %T on header %s:%v", v2, k, v2)
}
}
return

return m2
}

func (b PdkBridge) AskString(method string, args ...interface{}) (s string, err error) {
val, err := b.Ask(method, args...)
func (b PdkBridge) Ask(method string, args proto.Message, out proto.Message) error {
err := writePbFrame(b.conn, []byte(method))
if err != nil {
return
return err
}
if val == nil {
err = errors.New("null response")
return

var args_d []byte

if args != nil {
args_d, err = proto.Marshal(args)
if err != nil {
return err
}
}

var ok bool
if s, ok = val.(string); !ok {
err = ReturnTypeError("string")
err = writePbFrame(b.conn, args_d)
if err != nil {
return err
}
return
}

func (b PdkBridge) AskMap(method string, args ...interface{}) (m map[string][]string, err error) {
val, err := b.Ask(method, args...)
out_d, err := readPbFrame(b.conn)
if err != nil {
return
return err
}

var ok bool
if m, ok = val.(map[string][]string); !ok {
err = ReturnTypeError("map[string][]string")
if out != nil {
err = proto.Unmarshal(out_d, out)
}
return

return err
}

func (b PdkBridge) AskString(method string, args proto.Message) (string, error) {
out := new(kong_plugin_protocol.String)
err := b.Ask(method, args, out)
return out.V, err
}

func (b PdkBridge) AskInt(method string, args proto.Message) (int, error) {
out := new(kong_plugin_protocol.Int)
err := b.Ask(method, args, out)
return int(out.V), err
}

func (b PdkBridge) AskNumber(method string, args proto.Message) (float64, error) {
out := new(kong_plugin_protocol.Number)
err := b.Ask(method, args, out)
return out.V, err
}

func (b PdkBridge) AskValue(method string, args proto.Message) (interface{}, error) {
out := new(structpb.Value)
err := b.Ask(method, args, out)
if err != nil {
return nil, err
}

return out.AsInterface(), nil
}

func (b PdkBridge) Close() error {
return b.conn.Close()
}

func ReturnTypeError(expected string) error {
Expand Down
63 changes: 29 additions & 34 deletions bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,38 @@ package bridge
import (
"testing"

"github.com/Kong/go-pdk/entities"
"github.com/stretchr/testify/assert"
"github.com/Kong/go-pdk/bridge/bridgetest"
"github.com/Kong/go-pdk/server/kong_plugin_protocol"
)

var ch chan interface{}
var bridge PdkBridge

func init() {
ch = make(chan interface{})
bridge = New(ch)
func TestAsk(t *testing.T) {
b := New(bridgetest.Mock(t, []bridgetest.MockStep{
{"foo.bar", WrapString("first"), WrapString("resp")},
}))


out := new(kong_plugin_protocol.String)
err := b.Ask("foo.bar", WrapString("first"), out)
if err != nil {
t.Fatalf("got this: %s", err)
}
if out.V != "resp" {
t.Fatalf("no 'resp': %v", out.V)
}
b.Close()
}

func TestAsk(t *testing.T) {
go func() {
bridge.Ask("foo.bar", 1, 2, 3, 1.23, false)
}()

call := <-ch
ch <- ""

assert.Equal(t, call, StepData{
Method: "foo.bar",
Args: []interface{}{1, 2, 3, 1.23, false},
})

go func() {
n := "gs"
bridge.Ask("foo.bar", entities.Consumer{Username: n})
}()

call = <-ch
ch <- ""

n := "gs"
consumer := []interface{}{entities.Consumer{Username: n}}
assert.Equal(t, StepData{
Method: "foo.bar",
Args: consumer,
}, call)
func TestAskString(t *testing.T) {
b := New(bridgetest.Mock(t, []bridgetest.MockStep{
{"foo.bar", WrapString("first"), WrapString("resp")},
}))

ret, err := b.AskString("foo.bar", WrapString("first"))
if err != nil {
t.Fatalf("got this: %s", err)
}
if ret != "resp" {
t.Fatalf("no 'resp': %v", ret)
}
}
Loading

0 comments on commit a04730d

Please sign in to comment.