From c8a42e821df0946786957fa7f79ad2f822f9d0be Mon Sep 17 00:00:00 2001 From: Josh Kim Date: Mon, 1 Nov 2021 11:49:45 -0700 Subject: [PATCH] fix: REST bridge performance (#39) * fix: REST bridge performance During a benchmark test using `wrk` it was revealed that a REST bridged service was significantly slower than comparable servers such as Spring Boot. Profiling the server revealed that the ListenOnce() method was causing a significant amount of overheads as it was being called per HTTP request, thus generating a new message handler and invoking once.Do() call involving mutex every single time. The fix involves using the ListenStream() method to stop generating excessive time and space overheads during the bridge process. See the attached screenshots for comparsion between before and after fix. Signed-off-by: Josh Kim * new: support noop io.Writer for logs Signed-off-by: Josh Kim * fix broken unit tests Signed-off-by: Josh Kim * isolate bus instance throughout platformServer Signed-off-by: Josh Kim --- plank/pkg/server/core_models.go | 13 +-- .../pkg/server/endpointer_handler_factory.go | 35 ++------ .../server/endpointer_handler_factory_test.go | 65 ++++++++++---- plank/pkg/server/initialize.go | 23 +++-- .../initialize_rest_bridge_override_test.go | 9 +- plank/pkg/server/server.go | 56 +++++++++--- plank/pkg/server/server_smoke_test.go | 16 +++- plank/pkg/server/server_test.go | 87 +++++++++++++++++-- plank/pkg/server/test_suite_harness.go | 7 +- plank/pkg/server/test_suite_harness_test.go | 3 +- plank/utils/cli.go | 6 +- plank/utils/logger.go | 30 +++++-- plank/utils/paths.go | 2 +- 13 files changed, 254 insertions(+), 98 deletions(-) diff --git a/plank/pkg/server/core_models.go b/plank/pkg/server/core_models.go index 1c1d771..2e5bd2f 100644 --- a/plank/pkg/server/core_models.go +++ b/plank/pkg/server/core_models.go @@ -12,6 +12,7 @@ import ( "github.com/vmware/transport-go/plank/utils" "github.com/vmware/transport-go/service" "github.com/vmware/transport-go/stompserver" + "io" "net/http" "os" "sync" @@ -66,22 +67,24 @@ type PlatformServer interface { type platformServer struct { HttpServer *http.Server // Http server instance SyscallChan chan os.Signal // syscall channel to receive SIGINT, SIGKILL events + eventbus bus.EventBus // event bus pointer serverConfig *PlatformServerConfig // server config instance middlewareManager middleware.MiddlewareManager // middleware maanger instance router *mux.Router // *mux.Router instance routerConcurrencyProtection *int32 // atomic int32 to protect the main router being concurrently written to - out *os.File // platform log output pointer + out io.Writer // platform log output pointer endpointHandlerMap map[string]http.HandlerFunc // internal map to store rest endpoint -handler mappings serviceChanToBridgeEndpoints map[string][]string // internal map to store service channel - endpoint handler key mappings fabricConn stompserver.RawConnectionListener // WebSocket listener instance ServerAvailability *ServerAvailability // server availability (not much used other than for internal monitoring for now) lock sync.Mutex // lock + messageBridgeMap map[string]*MessageBridge } -// TransportChannelResponse wraps Transport *Message.Message with an error object for easier transfer -type TransportChannelResponse struct { - Message *model.Message // wrapper object that contains the payload - Err error // error object if there is any +// MessageBridge is a conduit used for returning service responses as HTTP responses +type MessageBridge struct { + ServiceListenStream bus.MessageHandler // message handler returned by bus.ListenStream responsible for relaying back messages as HTTP responses + payloadChannel chan *model.Message // internal golang channel used for passing bus responses/errors across goroutines } // ServerAvailability contains boolean fields to indicate what components of the system are available or not diff --git a/plank/pkg/server/endpointer_handler_factory.go b/plank/pkg/server/endpointer_handler_factory.go index 3f7978d..19e0156 100644 --- a/plank/pkg/server/endpointer_handler_factory.go +++ b/plank/pkg/server/endpointer_handler_factory.go @@ -4,8 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/sirupsen/logrus" - "github.com/vmware/transport-go/bus" "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/utils" "github.com/vmware/transport-go/service" @@ -16,7 +14,7 @@ import ( // buildEndpointHandler builds a http.HandlerFunc that wraps Transport Bus operations in an HTTP request-response cycle. // service channel, request builder and rest bridge timeout are passed as parameters. -func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration) http.HandlerFunc { +func (ps *platformServer) buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, restBridgeTimeout time.Duration, msgChan chan *model.Message) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { defer func() { if r := recover(); r != nil { @@ -28,23 +26,10 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, // set context that expires after the provided amount of time in restBridgeTimeout to prevent requests from hanging forever ctx, cancelFn := context.WithTimeout(context.Background(), restBridgeTimeout) defer cancelFn() - h, err := bus.GetBus().ListenOnce(svcChannel) - if err != nil { - panic(err) - } - - // set up a channel through which to receive the raw response from transport channel - // handler function runs in another thread so we need to utilize channel to use the correct writer. - chanReturn := make(chan *TransportChannelResponse) - h.Handle(func(message *model.Message) { - chanReturn <- &TransportChannelResponse{Message: message} - }, func(err error) { - chanReturn <- &TransportChannelResponse{Err: err} - }) // relay the request to transport channel reqModel := reqBuilder(w, r) - err = bus.GetBus().SendRequestMessage(svcChannel, reqModel, reqModel.Id) + err := ps.eventbus.SendRequestMessage(svcChannel, reqModel, reqModel.Id) // get a response from the channel, render the results using ResponseWriter and log the data/error // to the console as well. @@ -53,14 +38,14 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, http.Error( w, fmt.Sprintf("No response received from service channel in %s, request timed out", restBridgeTimeout.String()), 500) - case chanResponse := <-chanReturn: - if chanResponse.Err != nil { - utils.Log.WithError(chanResponse.Err).Errorf( + case msg := <-msgChan: + if msg.Error != nil { + utils.Log.WithError(msg.Error).Errorf( "Error received from channel %s:", svcChannel) - http.Error(w, chanResponse.Err.Error(), 500) + http.Error(w, msg.Error.Error(), 500) } else { - // only send the actual user payload not wrapper information - response := chanResponse.Message.Payload.(*model.Response) + // only send the actual user payloadChannel not wrapper information + response := msg.Payload.(*model.Response) var respBody interface{} if response.Error { if response.Payload != nil { @@ -72,10 +57,6 @@ func buildEndpointHandler(svcChannel string, reqBuilder service.RequestBuilder, respBody = response.Payload } - utils.Log.WithFields(logrus.Fields{ - //"payload": respBody, // don't show this, we may be sending around big byte arrays - }).Debugf("Response received from channel %s:", svcChannel) - // if our Message is an error and it has a code, lets send that back to the client. if response.Error { diff --git a/plank/pkg/server/endpointer_handler_factory_test.go b/plank/pkg/server/endpointer_handler_factory_test.go index c7b2497..b4fb94e 100644 --- a/plank/pkg/server/endpointer_handler_factory_test.go +++ b/plank/pkg/server/endpointer_handler_factory_test.go @@ -9,6 +9,7 @@ import ( "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/service" "net/http" + "os" "testing" "time" ) @@ -16,53 +17,73 @@ import ( func TestBuildEndpointHandler_Timeout(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { return model.Request{ Payload: nil, Request: "test-request", } - }, 5*time.Millisecond), "GET", "http://localhost", nil, "request timed out") + }, 5*time.Millisecond, msgChan), "GET", "http://localhost", nil, "request timed out") } func TestBuildEndpointHandler_ChanResponseErr(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPErrorf(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPErrorf(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { uId := &uuid.UUID{} - _ = b.SendErrorMessage("test-chan", fmt.Errorf("test error"), uId) + msgChan <- &model.Message{Error: fmt.Errorf("test error")} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "test error") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "test error") } func TestBuildEndpointHandler_SuccessResponse(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { uId := &uuid.UUID{} - _ = b.SendResponseMessage("test-chan", &model.Response{ + msgChan <- &model.Message{Payload: &model.Response{ Id: uId, Payload: []byte("{\"error\": false}"), - }, uId) + }} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "{\"error\": false}") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "{\"error\": false}") } func TestBuildEndpointHandler_ErrorResponse(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() _ = b.GetChannelManager().CreateChannel("test-chan") + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + msgChan := make(chan *model.Message, 1) uId := &uuid.UUID{} rsp := &model.Response{ Id: uId, @@ -72,21 +93,26 @@ func TestBuildEndpointHandler_ErrorResponse(t *testing.T) { } expected, _ := json.Marshal(rsp.Payload) - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { - _ = b.SendResponseMessage("test-chan", rsp, uId) + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + msgChan <- &model.Message{Payload: rsp} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, string(expected)) + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, string(expected)) } func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b uId := &uuid.UUID{} rsp := &model.Response{ @@ -95,26 +121,31 @@ func TestBuildEndpointHandler_ErrorResponseAlternative(t *testing.T) { Error: true, } - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { - _ = b.SendResponseMessage("test-chan", rsp, uId) + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + msgChan <- &model.Message{Payload: rsp} return model.Request{ Id: uId, Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "418") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "418") } func TestBuildEndpointHandler_CatchPanic(t *testing.T) { b := bus.ResetBus() service.ResetServiceRegistry() + msgChan := make(chan *model.Message, 1) _ = b.GetChannelManager().CreateChannel("test-chan") - assert.HTTPBodyContains(t, buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config).(*platformServer) + ps.eventbus = b + assert.HTTPBodyContains(t, ps.buildEndpointHandler("test-chan", func(w http.ResponseWriter, r *http.Request) model.Request { panic("peekaboo") return model.Request{ Payload: nil, Request: "test-request", } - }, 5*time.Second), "GET", "http://localhost", nil, "Internal Server Error") + }, 5*time.Second, msgChan), "GET", "http://localhost", nil, "Internal Server Error") } diff --git a/plank/pkg/server/initialize.go b/plank/pkg/server/initialize.go index 7bb2742..276af09 100644 --- a/plank/pkg/server/initialize.go +++ b/plank/pkg/server/initialize.go @@ -14,8 +14,10 @@ import ( "github.com/vmware/transport-go/stompserver" "log" "net/http" + _ "net/http/pprof" "path/filepath" "reflect" + "runtime" "time" ) @@ -25,12 +27,11 @@ func (ps *platformServer) initialize() { var err error // initialize core components - var busInstance = bus.GetBus() var serviceRegistryInstance = service.GetServiceRegistry() var svcLifecycleManager = service.GetServiceLifecycleManager() // create essential bus channels - busInstance.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL) + ps.eventbus.GetChannelManager().CreateChannel(PLANK_SERVER_ONLINE_CHANNEL) // initialize HTTP endpoint handlers map ps.endpointHandlerMap = map[string]http.HandlerFunc{} @@ -49,10 +50,18 @@ func (ps *platformServer) initialize() { utils.Log.SetFormatter(formatter) utils.Log.SetOutput(ps.out) - // if debug flag is provided enable extra logging + // if debug flag is provided enable extra logging. also, enable profiling at port 6060 if ps.serverConfig.Debug { utils.Log.SetLevel(logrus.DebugLevel) - utils.Log.Debugln("Debug logging enabled") + go func() { + runtime.SetBlockProfileRate(1) // capture traces of all possible contended mutex holders + profilerRouter := mux.NewRouter() + profilerRouter.PathPrefix("/debug/pprof/").Handler(http.DefaultServeMux) + if err := http.ListenAndServe(":6060", profilerRouter); err != nil { + panic(err) + } + }() + utils.Log.Debugln("Debug logging and profiling enabled. Available types of profiles at http://localhost:6060/debug/pprof") } // set a new route handler @@ -92,7 +101,7 @@ func (ps *platformServer) initialize() { } // set up a listener to receive REST bridge configs for services and set them up according to their specs - lcmChanHandler, err := busInstance.ListenStreamForDestination(service.LifecycleManagerChannelName, busInstance.GetId()) + lcmChanHandler, err := ps.eventbus.ListenStreamForDestination(service.LifecycleManagerChannelName, ps.eventbus.GetId()) if err != nil { utils.Log.Fatalln(err) } @@ -104,7 +113,7 @@ func (ps *platformServer) initialize() { } fabricSvc, _ := serviceRegistryInstance.GetService(request.ServiceChannel) - svcReadyStore := busInstance.GetStoreManager().GetStore(service.ServiceReadyStore) + svcReadyStore := ps.eventbus.GetStoreManager().GetStore(service.ServiceReadyStore) hooks := svcLifecycleManager.GetServiceHooks(request.ServiceChannel) if request.Override { @@ -135,7 +144,7 @@ func (ps *platformServer) initialize() { // create an internal bus channel to notify significant changes in sessions such as disconnect if ps.serverConfig.FabricConfig != nil { - channelManager := busInstance.GetChannelManager() + channelManager := ps.eventbus.GetChannelManager() channelManager.CreateChannel(bus.STOMP_SESSION_NOTIFY_CHANNEL) } diff --git a/plank/pkg/server/initialize_rest_bridge_override_test.go b/plank/pkg/server/initialize_rest_bridge_override_test.go index 4658d04..204e9b6 100644 --- a/plank/pkg/server/initialize_rest_bridge_override_test.go +++ b/plank/pkg/server/initialize_rest_bridge_override_test.go @@ -37,7 +37,7 @@ func TestInitialize_DebugLogging(t *testing.T) { func TestInitialize_RestBridgeOverride(t *testing.T) { // arrange - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) @@ -47,6 +47,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", GetTestPort(), true) baseUrl, _, testServerInterface := CreateTestServer(cfg) testServer := testServerInterface.(*platformServer) + testServer.eventbus = newBus // register ping pong service with default bridge points of /rest/ping-pong, /rest/ping-pong2 and /rest/ping-pong/{from}/{to}/{message} testServerInterface.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) @@ -62,8 +63,8 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { oldRouter := testServer.router // assert - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { - _ = bus.GetBus().SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{ + RunWhenServerReady(t, newBus, func(t2 *testing.T) { + _ = newBus.SendResponseMessage(service.LifecycleManagerChannelName, &service.SetupRESTBridgeRequest{ ServiceChannel: services.PingPongServiceChan, Override: true, Config: []*service.RESTBridgeConfig{ @@ -76,7 +77,7 @@ func TestInitialize_RestBridgeOverride(t *testing.T) { }, }, }, - }, bus.GetBus().GetId()) + }, newBus.GetId()) // router instance should have been swapped time.Sleep(1 * time.Second) diff --git a/plank/pkg/server/server.go b/plank/pkg/server/server.go index 5c79fe2..82f02a3 100644 --- a/plank/pkg/server/server.go +++ b/plank/pkg/server/server.go @@ -11,6 +11,7 @@ import ( "fmt" "github.com/spf13/cobra" "github.com/spf13/pflag" + "github.com/vmware/transport-go/model" "io/ioutil" "net" "net/http" @@ -46,6 +47,8 @@ func NewPlatformServer(config *PlatformServerConfig) PlatformServer { ps.serverConfig = config ps.ServerAvailability = &ServerAvailability{} ps.routerConcurrencyProtection = new(int32) + ps.messageBridgeMap = make(map[string]*MessageBridge) + ps.eventbus = bus.GetBus() ps.initialize() return ps } @@ -73,6 +76,7 @@ func NewPlatformServerFromConfig(configPath string) (PlatformServer, error) { } ps := new(platformServer) + ps.eventbus = bus.GetBus() sanitizeConfigRootPath(&config) // ensure references to file system paths are relative to config.RootDir @@ -165,7 +169,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { utils.Log.Infof("[plank] Starting Transport broker at %s:%d%s", ps.serverConfig.Host, ps.serverConfig.Port, ps.serverConfig.FabricConfig.FabricEndpoint) ps.ServerAvailability.Fabric = true - if err := bus.GetBus().StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil { + if err := ps.eventbus.StartFabricEndpoint(ps.fabricConn, *ps.serverConfig.FabricConfig.EndpointConfig); err != nil { panic(err) } }() @@ -175,7 +179,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { go func() { <-ps.SyscallChan // notify subscribers that the server is shutting down - _ = bus.GetBus().SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, false, nil) + _ = ps.eventbus.SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, false, nil) ps.StopServer() close(connClosed) }() @@ -190,7 +194,7 @@ func (ps *platformServer) StartServer(syschan chan os.Signal) { utils.Log.Debugln("waiting for http server to be ready to accept connections") continue } - _ = bus.GetBus().SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, true, nil) + _ = ps.eventbus.SendResponseMessage(PLANK_SERVER_ONLINE_CHANNEL, true, nil) break } @@ -284,7 +288,7 @@ func (ps *platformServer) RegisterService(svc service.FabricService, svcChannel var hooks service.ServiceLifecycleHookEnabled if hooks = svcLifecycleManager.GetServiceHooks(svcChannel); hooks == nil { // if service has no lifecycle hooks mark the channel as ready straight up - storeManager := bus.GetBus().GetStoreManager() + storeManager := ps.eventbus.GetStoreManager() store := storeManager.GetStore(service.ServiceReadyStore) store.Put(svcChannel, true, service.ServiceInitStateChange) utils.Log.Infof("[plank] Service '%s' initialized successfully", svcType.String()) @@ -293,7 +297,7 @@ func (ps *platformServer) RegisterService(svc service.FabricService, svcChannel return err } -// SetHttpChannelBridge establishes a conduit between the the transport service channel and an HTTP endpoint +// SetHttpChannelBridge establishes a conduit between the transport service channel and an HTTP endpoint // that allows a client to invoke the service via REST. func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeConfig) { ps.lock.Lock() @@ -312,11 +316,24 @@ func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeC ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = make([]string, 0) } + if _, exists := ps.messageBridgeMap[bridgeConfig.ServiceChannel]; !exists { + handler, _ := ps.eventbus.ListenStream(bridgeConfig.ServiceChannel) + handler.Handle(func(message *model.Message) { + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel <- message + }, func(err error) {}) + + ps.messageBridgeMap[bridgeConfig.ServiceChannel] = &MessageBridge{ + ServiceListenStream: handler, + payloadChannel: make(chan *model.Message, 100), + } + } + // build endpoint handler - ps.endpointHandlerMap[endpointHandlerKey] = buildEndpointHandler( + ps.endpointHandlerMap[endpointHandlerKey] = ps.buildEndpointHandler( bridgeConfig.ServiceChannel, bridgeConfig.FabricRequestBuilder, - ps.serverConfig.RestBridgeTimeout) + ps.serverConfig.RestBridgeTimeout, + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel) ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = append( ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel], endpointHandlerKey) @@ -349,7 +366,7 @@ func (ps *platformServer) SetHttpChannelBridge(bridgeConfig *service.RESTBridgeC bridgeConfig.ServiceChannel, bridgeConfig.Uri, bridgeConfig.Method) } -// SetHttpPathPrefixChannelBridge establishes a conduit between the the transport service channel and a path prefix +// SetHttpPathPrefixChannelBridge establishes a conduit between the transport service channel and a path prefix // every request on this prefix will be sent through to the target service, all methods, all sub paths, lock, stock and barrel. func (ps *platformServer) SetHttpPathPrefixChannelBridge(bridgeConfig *service.RESTBridgeConfig) { ps.lock.Lock() @@ -368,11 +385,24 @@ func (ps *platformServer) SetHttpPathPrefixChannelBridge(bridgeConfig *service.R ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = make([]string, 0) } + if _, exists := ps.messageBridgeMap[bridgeConfig.ServiceChannel]; !exists { + handler, _ := ps.eventbus.ListenStream(bridgeConfig.ServiceChannel) + handler.Handle(func(message *model.Message) { + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel <- message + }, func(err error) {}) + + ps.messageBridgeMap[bridgeConfig.ServiceChannel] = &MessageBridge{ + ServiceListenStream: handler, + payloadChannel: make(chan *model.Message, 100), + } + } + // build endpoint handler - ps.endpointHandlerMap[endpointHandlerKey] = buildEndpointHandler( + ps.endpointHandlerMap[endpointHandlerKey] = ps.buildEndpointHandler( bridgeConfig.ServiceChannel, bridgeConfig.FabricRequestBuilder, - ps.serverConfig.RestBridgeTimeout) + ps.serverConfig.RestBridgeTimeout, + ps.messageBridgeMap[bridgeConfig.ServiceChannel].payloadChannel) ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel] = append( ps.serviceChanToBridgeEndpoints[bridgeConfig.ServiceChannel], endpointHandlerKey) @@ -492,3 +522,9 @@ func (ps *platformServer) checkPortAvailability() { ps.serverConfig.Host, ps.serverConfig.Port) } } + +func (ps *platformServer) setEventBusRef(evtBus bus.EventBus) { + ps.lock.Lock() + ps.eventbus = evtBus + ps.lock.Unlock() +} diff --git a/plank/pkg/server/server_smoke_test.go b/plank/pkg/server/server_smoke_test.go index 45476ad..ed65144 100644 --- a/plank/pkg/server/server_smoke_test.go +++ b/plank/pkg/server/server_smoke_test.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/stretchr/testify/assert" "github.com/vmware/transport-go/bus" + "github.com/vmware/transport-go/service" "io/ioutil" "net/http" "os" @@ -13,6 +14,8 @@ import ( ) func TestSmokeTests(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") //testOutFile := filepath.Join(testRoot, "plank-server-tests.log") _ = os.MkdirAll(testRoot, 0755) @@ -32,6 +35,7 @@ func TestSmokeTests(t *testing.T) { }, } baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -39,7 +43,7 @@ func TestSmokeTests(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t *testing.T) { + RunWhenServerReady(t, newBus, func(t *testing.T) { // root url - 404 t.Run("404 on root", func(t2 *testing.T) { cl := http.DefaultClient @@ -63,6 +67,8 @@ func TestSmokeTests(t *testing.T) { } func TestSmokeTests_NoFabric(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) defer os.RemoveAll(testRoot) @@ -71,6 +77,7 @@ func TestSmokeTests_NoFabric(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", port, true) cfg.FabricConfig = nil baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -78,7 +85,7 @@ func TestSmokeTests_NoFabric(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t *testing.T) { + RunWhenServerReady(t, newBus, func(t *testing.T) { // fabric - 404 t.Run("404 on fabric endpoint", func(t2 *testing.T) { cl := http.DefaultClient @@ -94,6 +101,8 @@ func TestSmokeTests_NoFabric(t *testing.T) { } func TestSmokeTests_HealthEndpoint(t *testing.T) { + newBus := bus.ResetBus() + service.ResetServiceRegistry() testRoot := filepath.Join(os.TempDir(), "plank-tests") _ = os.MkdirAll(testRoot, 0755) defer os.RemoveAll(testRoot) @@ -102,6 +111,7 @@ func TestSmokeTests_HealthEndpoint(t *testing.T) { cfg := GetBasicTestServerConfig(testRoot, "stdout", "stdout", "stderr", port, true) cfg.FabricConfig = nil baseUrl, _, testServer := CreateTestServer(cfg) + testServer.(*platformServer).eventbus = newBus assert.EqualValues(t, fmt.Sprintf("http://localhost:%d", port), baseUrl) @@ -109,7 +119,7 @@ func TestSmokeTests_HealthEndpoint(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go testServer.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(*testing.T) { + RunWhenServerReady(t, newBus, func(*testing.T) { t.Run("/health returns OK", func(t2 *testing.T) { cl := http.DefaultClient rsp, err := cl.Get(fmt.Sprintf("%s/health", baseUrl)) diff --git a/plank/pkg/server/server_test.go b/plank/pkg/server/server_test.go index 992f828..1459da8 100644 --- a/plank/pkg/server/server_test.go +++ b/plank/pkg/server/server_test.go @@ -4,6 +4,7 @@ package server import ( + "context" "fmt" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -11,10 +12,12 @@ import ( "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/services" "github.com/vmware/transport-go/service" + "golang.org/x/net/context/ctxhttp" "io/ioutil" "net/http" "os" "path/filepath" + "strings" "sync" "testing" ) @@ -43,25 +46,27 @@ func TestNewPlatformServer_FileLog(t *testing.T) { _ = os.Remove(filepath.Join(os.TempDir(), "testlog.log")) }() - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() newConfig := GetBasicTestServerConfig(os.TempDir(), filepath.Join(os.TempDir(), "testlog.log"), "stdout", "stderr", port, true) - NewPlatformServer(newConfig) + ps := NewPlatformServer(newConfig) + ps.(*platformServer).eventbus = newBus assert.FileExists(t, filepath.Join(os.TempDir(), "testlog.log")) } func TestPlatformServer_StartServer(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus syschan := make(chan os.Signal, 1) wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d", port)) assert.Nil(t, err) @@ -76,29 +81,92 @@ func TestPlatformServer_StartServer(t *testing.T) { } func TestPlatformServer_RegisterService(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus err := ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) assert.Nil(t, err) } +func TestPlatformServer_SetHttpPathPrefixChannelBridge(t *testing.T) { + // get a new bus instance and create a new platform server instance + newBus := bus.ResetBus() + service.ResetServiceRegistry() + port := GetTestPort() + config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) + ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus + + // register a service + _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) + + // set PathPrefix bridge + bridgeConfig := &service.RESTBridgeConfig{ + ServiceChannel: services.PingPongServiceChan, + Uri: "/ping-pong", + FabricRequestBuilder: func(w http.ResponseWriter, r *http.Request) model.Request { + return model.Request{ + Payload: "hello", + Request: "ping-get", + } + }, + } + ps.SetHttpPathPrefixChannelBridge(bridgeConfig) + + syschan := make(chan os.Signal, 1) + wg := sync.WaitGroup{} + wg.Add(1) + go ps.StartServer(syschan) + RunWhenServerReady(t, newBus, func(t2 *testing.T) { + // GET + rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/ping-pong", port)) + assert.Nil(t, err) + + body, err := ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + // POST + rsp, err = http.Post(fmt.Sprintf("http://localhost:%d/ping-pong", port), "application/json", strings.NewReader("")) + assert.Nil(t, err) + body, err = ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + // DELETE + req, _ := http.NewRequest("DELETE", fmt.Sprintf("http://localhost:%d/ping-pong", port), strings.NewReader("")) + rsp, err = ctxhttp.Do(context.Background(), http.DefaultClient, req) + assert.Nil(t, err) + body, err = ioutil.ReadAll(rsp.Body) + assert.Nil(t, err) + assert.Contains(t, string(body), "hello") + + ps.StopServer() + service.GetServiceRegistry().UnregisterService(services.PingPongServiceChan) + wg.Done() + }) + + wg.Wait() +} + func TestPlatformServer_SetHttpChannelBridge(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) syschan := make(chan os.Signal, 1) wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/rest/ping-pong2?message=hello", port)) assert.Nil(t, err) @@ -114,11 +182,12 @@ func TestPlatformServer_SetHttpChannelBridge(t *testing.T) { } func TestPlatformServer_UnknownRequest(t *testing.T) { - bus.ResetBus() + newBus := bus.ResetBus() service.ResetServiceRegistry() port := GetTestPort() config := GetBasicTestServerConfig(os.TempDir(), "stdout", "stdout", "stderr", port, true) ps := NewPlatformServer(config) + ps.(*platformServer).eventbus = newBus _ = ps.RegisterService(services.NewPingPongService(), services.PingPongServiceChan) defer service.GetServiceRegistry().UnregisterService(services.PingPongServiceChan) setupBridge(ps, "/ping", "GET", services.PingPongServiceChan, "bubble") @@ -127,7 +196,7 @@ func TestPlatformServer_UnknownRequest(t *testing.T) { wg := sync.WaitGroup{} wg.Add(1) go ps.StartServer(syschan) - RunWhenServerReady(t, bus.GetBus(), func(t2 *testing.T) { + RunWhenServerReady(t, newBus, func(t2 *testing.T) { rsp, err := http.Get(fmt.Sprintf("http://localhost:%d/ping?msg=hello", port)) assert.Nil(t, err) diff --git a/plank/pkg/server/test_suite_harness.go b/plank/pkg/server/test_suite_harness.go index e60883a..f839131 100644 --- a/plank/pkg/server/test_suite_harness.go +++ b/plank/pkg/server/test_suite_harness.go @@ -12,7 +12,7 @@ import ( "github.com/vmware/transport-go/bus" "github.com/vmware/transport-go/model" "github.com/vmware/transport-go/plank/utils" - "github.com/vmware/transport-go/service" + svc "github.com/vmware/transport-go/service" "io/ioutil" "net" "os" @@ -81,7 +81,7 @@ func GetBasicTestServerConfig(rootDir, outLog, accessLog, errLog string, port in // SetupPlankTestSuite will boot a new instance of plank on your chosen port and will also fire up your service // Ready to be tested. This always runs on localhost. -func SetupPlankTestSuite(service service.FabricService, serviceChannel string, port int, +func SetupPlankTestSuite(service svc.FabricService, serviceChannel string, port int, config *PlatformServerConfig) (*PlankIntegrationTestSuite, error) { s := &PlankIntegrationTestSuite{} @@ -103,7 +103,8 @@ func SetupPlankTestSuite(service service.FabricService, serviceChannel string, p s.Syschan = make(chan os.Signal, 1) go s.PlatformServer.StartServer(s.Syschan) - s.EventBus = bus.GetBus() + s.EventBus = bus.ResetBus() + svc.ResetServiceRegistry() // get a pointer to the channel manager s.ChannelManager = s.EventBus.GetChannelManager() diff --git a/plank/pkg/server/test_suite_harness_test.go b/plank/pkg/server/test_suite_harness_test.go index fdf7063..ab4927d 100644 --- a/plank/pkg/server/test_suite_harness_test.go +++ b/plank/pkg/server/test_suite_harness_test.go @@ -35,7 +35,8 @@ func (m *testPlankTestIntegration) SetBus(eventBus bus.EventBus) { } func TestSetupPlankTestSuiteForTest(t *testing.T) { - b := bus.GetBus() + b := bus.ResetBus() + service.ResetServiceRegistry() cm := b.GetChannelManager() pit := &PlankIntegrationTestSuite{ Suite: suite.Suite{}, diff --git a/plank/utils/cli.go b/plank/utils/cli.go index 2b486ea..b7c2a82 100644 --- a/plank/utils/cli.go +++ b/plank/utils/cli.go @@ -71,17 +71,17 @@ var PlatformServerFlagConstants = map[string]map[string]string{ "OutputLog": { "FlagName": "output-log", "ShortFlag": "l", - "Description": "Platform log output", + "Description": "Platform log output. Possible values: stdout, stderr, null, or path to a file", }, "AccessLog": { "FlagName": "access-log", "ShortFlag": "a", - "Description": "HTTP server access log output", + "Description": "HTTP server access log output. Possible values: stdout, stderr, null, or path to a file", }, "ErrorLog": { "FlagName": "error-log", "ShortFlag": "e", - "Description": "HTTP server error log output", + "Description": "HTTP server error log output. Possible values: stdout, stderr, null, or path to a file", }, "Debug": { "FlagName": "debug", diff --git a/plank/utils/logger.go b/plank/utils/logger.go index 493168f..f986e0d 100644 --- a/plank/utils/logger.go +++ b/plank/utils/logger.go @@ -5,6 +5,7 @@ package utils import ( "github.com/sirupsen/logrus" + "io" "os" "path" ) @@ -17,9 +18,9 @@ type LogConfig struct { OutputLog string `json:"output_log"` FormatOptions *LogFormatOption `json:"format_options"` Root string `json:"root"` - accessLogFp *os.File `json:"-"` - errorLogFp *os.File `json:"-"` - outputLogFp *os.File `json:"-"` + accessLogFp io.Writer `json:"-"` + errorLogFp io.Writer `json:"-"` + outputLogFp io.Writer `json:"-"` } // LogFormatOption is merely a wrapper of logrus.TextFormatter because TextFormatter does not allow serializing @@ -82,7 +83,7 @@ type LogFormatOption struct { } func (lc *LogConfig) PrepareLogFiles() error { - var fp *os.File + var fp io.Writer var err error if fp, err = lc.prepareLogFilePointer(lc.AccessLog); err != nil { return err @@ -102,23 +103,25 @@ func (lc *LogConfig) PrepareLogFiles() error { return nil } -func (lc *LogConfig) GetAccessLogFilePointer() *os.File { +func (lc *LogConfig) GetAccessLogFilePointer() io.Writer { return lc.accessLogFp } -func (lc *LogConfig) GetErrorLogFilePointer() *os.File { +func (lc *LogConfig) GetErrorLogFilePointer() io.Writer { return lc.errorLogFp } -func (lc *LogConfig) GetPlatformLogFilePointer() *os.File { +func (lc *LogConfig) GetPlatformLogFilePointer() io.Writer { return lc.outputLogFp } -func (lc *LogConfig) prepareLogFilePointer(target string) (fp *os.File, err error) { +func (lc *LogConfig) prepareLogFilePointer(target string) (fp io.Writer, err error) { if target == "stdout" { fp = os.Stdout } else if target == "stderr" { fp = os.Stderr + } else if target == "null" { + fp = &noopWriter{} } else { logFilePath := JoinBasePathIfRelativeRegularFilePath(lc.Root, target) fp, err = GetNewLogFilePointer(logFilePath) @@ -213,6 +216,17 @@ func (l *PlankLogger) Panicf(format string, args ...interface{}) { l.setCommonFields().Panicf(format, args...) } +// noopWriter does absolutely nothing and return immediately. for references +// to those who wonder why this is here then in the first place, this is so +// this no-op writer instance can be passed as an access logger to not bother +// logging HTTP access logs which may not be necessary for some applications +// that want as lowest IO bottlenecks as possible. +type noopWriter struct{} + +func (noopWriter *noopWriter) Write(p []byte) (n int, err error) { + return 0, nil +} + func init() { Log = &PlankLogger{logrus.New()} } diff --git a/plank/utils/paths.go b/plank/utils/paths.go index bc20ee0..ecc3bda 100644 --- a/plank/utils/paths.go +++ b/plank/utils/paths.go @@ -55,7 +55,7 @@ func DeriveStaticURIFromPath(input string) (string, string) { func JoinBasePathIfRelativeRegularFilePath(base string, in string) (out string) { out = in - if in == "stdout" || in == "stderr" { + if in == "stdout" || in == "stderr" || in == "null" { return }