Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

Add a cross-plank example (WIP) #52

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions plank/cmd/cross_plank_communication/server_one/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"debug": false,
"no_banner": false,
"root_dir": "./",
"spa_config": {
"root_folder": "public/",
"base_uri": "/",
"static_assets": [
"public/assets:/assets"
],
"cache_control_rules": {
"*.{js,css}": "public, max-age=86400",
"*.{ico,jpg,jpeg,svg,png,gif,tiff}": "public, max-age=604800"
}
},
"host": "localhost",
"port": 30081,
"log_config": {
"root": ".",
"access_log": "access.log",
"error_log": "errors.log",
"output_log": "stdout",
"format_options": {
"force_colors": false,
"disable_colors": true,
"force_quote": false,
"disable_quote": false,
"environment_override_colors": false,
"disable_timestamp": false,
"full_timestamp": true,
"timestamp_format": "",
"disable_sorting": false,
"disable_level_truncation": false,
"pad_level_text": false,
"quote_empty_fields": false,
"is_terminal": true
}
},
"shutdown_timeout_in_minutes": 1,
"rest_bridge_timeout_in_minutes": 1,
"fabric_config": {
"fabric_endpoint": "/ws",
"use_tcp": false,
"tcp_port": 61613,
"endpoint_config": {
"TopicPrefix": "/topic",
"UserQueuePrefix": "/queue",
"AppRequestPrefix": "/pub",
"AppRequestQueuePrefix": "/pub/queue",
"Heartbeat": 60000
}
},
"enable_prometheus": true
}
26 changes: 26 additions & 0 deletions plank/cmd/cross_plank_communication/server_one/server_one.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright 2022 VMware, Inc.
// SPDX-License-Identifier: BSD-2-Clause

package main

import (
"github.com/vmware/transport-go/plank/pkg/server"
"github.com/vmware/transport-go/plank/services"
"github.com/vmware/transport-go/plank/utils"
"os"
)

// configure flags
func main() {
serverConfig, err := server.CreateServerConfig()
if err != nil {
utils.Log.Fatalln(err)
}
platformServer := server.NewPlatformServer(serverConfig)
if err = platformServer.RegisterService(services.NewPingPongService(), services.PingPongServiceChan); err != nil {
utils.Log.Fatalln(err)
}

syschan := make(chan os.Signal, 1)
platformServer.StartServer(syschan)
}
54 changes: 54 additions & 0 deletions plank/cmd/cross_plank_communication/server_two/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"debug": false,
"no_banner": false,
"root_dir": "./",
"spa_config": {
"root_folder": "public/",
"base_uri": "/",
"static_assets": [
"public/assets:/assets"
],
"cache_control_rules": {
"*.{js,css}": "public, max-age=86400",
"*.{ico,jpg,jpeg,svg,png,gif,tiff}": "public, max-age=604800"
}
},
"host": "localhost",
"port": 30080,
"log_config": {
"root": ".",
"access_log": "access.log",
"error_log": "errors.log",
"output_log": "stdout",
"format_options": {
"force_colors": false,
"disable_colors": true,
"force_quote": false,
"disable_quote": false,
"environment_override_colors": false,
"disable_timestamp": false,
"full_timestamp": true,
"timestamp_format": "",
"disable_sorting": false,
"disable_level_truncation": false,
"pad_level_text": false,
"quote_empty_fields": false,
"is_terminal": true
}
},
"shutdown_timeout_in_minutes": 1,
"rest_bridge_timeout_in_minutes": 1,
"fabric_config": {
"fabric_endpoint": "/ws",
"use_tcp": false,
"tcp_port": 61613,
"endpoint_config": {
"TopicPrefix": "/topic",
"UserQueuePrefix": "/queue",
"AppRequestPrefix": "/pub",
"AppRequestQueuePrefix": "/pub/queue",
"Heartbeat": 60000
}
},
"enable_prometheus": true
}
38 changes: 38 additions & 0 deletions plank/cmd/cross_plank_communication/server_two/server_two.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 VMware, Inc.
// SPDX-License-Identifier: BSD-2-Clause

package main

import (
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/plank/pkg/server"
"github.com/vmware/transport-go/plank/services"
"github.com/vmware/transport-go/plank/utils"
"os"
"time"
)

// configure flags
func main() {
serverConfig, err := server.CreateServerConfig()
if err != nil {
utils.Log.Fatalln(err)
}
platformServer := server.NewPlatformServer(serverConfig)
brokerConfigForAnotherWs := &bridge.BrokerConnectorConfig{
Username: "guest",
Password: "guest",
ServerAddr: "localhost:30081",
UseWS: true,
WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/ws"},
HeartBeatOut: 30 * time.Second,
STOMPHeader: map[string]string{},
}
if err = platformServer.RegisterService(services.NewExternalBrokerExampleService(bus.GetBus(), brokerConfigForAnotherWs), services.ExternalBrokerExampleServiceChannel); err != nil {
utils.Log.Fatalln(err)
}

syschan := make(chan os.Signal, 1)
platformServer.StartServer(syschan)
}
136 changes: 136 additions & 0 deletions plank/services/cross_plank_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package services

import (
"encoding/json"
"github.com/google/uuid"
"github.com/vmware/transport-go/bridge"
"github.com/vmware/transport-go/bus"
"github.com/vmware/transport-go/model"
"github.com/vmware/transport-go/plank/utils"
"github.com/vmware/transport-go/service"
"sync"
"time"
)

const ExternalBrokerExampleServiceChannel = "external-broker-example-service"
const localSyncBusChannel string = "ext-svc"

type ExternalBrokerExampleService struct {
targetBroker *bridge.BrokerConnectorConfig
conn bridge.Connection
bus bus.EventBus
mu sync.Mutex
}

func NewExternalBrokerExampleService(bus bus.EventBus, config *bridge.BrokerConnectorConfig) *ExternalBrokerExampleService {
service := &ExternalBrokerExampleService{
bus: bus,
targetBroker: config,
}
return service
}

func (s *ExternalBrokerExampleService) HandleServiceRequest(request *model.Request, core service.FabricServiceCore) {
// in this example, I wrote it so that "ping" request assumes the broker session is established already, but in real
// scenarios you may wish to ensure the connection is alive and healthy. maybe you will want to create a few more
// service requests to handle the lifecycle of the WS connection to the other broker. for example, you could create
// a request "connect" for connecting to the external broker and "disconnect" for closing the connection explicitly.
switch request.Request {
case "ping":
// we create a single-fire listener for the channel that's linked to the other broker
handler, err := s.bus.ListenOnce(localSyncBusChannel)
if err != nil {
// failed to set up the listener. this rarely happens though but still it helps to make sure all errors are tracked.
core.SendErrorResponse(request, 500, err.Error())
return
}

// we will take the payload from the local request and use it to send a remote request in L67
utils.Log.Infoln("RECEIVED LOCAL REQUEST PAYLOAD", request.Payload)

// set up the handler for responses returned back from the external broker
handler.Handle(func(message *model.Message) {
var payload model.Response
if err := message.CastPayloadToType(&payload); err != nil {
utils.Log.Errorln("RECEIVED RESPONSE BUT FAILED TO CAST", err)
core.SendErrorResponse(request, 500, err.Error())
}
// pass back the success response from the external broker back to the requester.
core.SendResponse(request, payload.Payload)
utils.Log.Infof("RECEIVED RESPONSE: %v\n", payload.Payload)
}, func(err error) {
// external broker sent an error. pass it back to the requester.
core.SendErrorResponse(request, 400, err.Error())
utils.Log.Errorln("RECEIVED ERROR RESPONSE FROM EXTERNAL BROKER", err)
return
})

// create a request object for the remote service "ping-pong-service" with request named "ping-get" and pass it
// through the external broker using the connection we established when the service started. see ping-pong-service
// L60 for the data structure for the request payload that the service accepts
req := &model.Request{Request: "ping-get", Payload: request.Payload.(string)}
reqMarshalled, _ := json.Marshal(req)
if err = s.conn.SendJSONMessage("/pub/queue/ping-pong-service", reqMarshalled); err != nil {
core.SendErrorResponse(request, 400, err.Error())
return
}

default:
core.SendErrorResponse(request, 400, "unknown service")
}
}

func (s *ExternalBrokerExampleService) OnServiceReady() chan bool {
ready := make(chan bool, 1)
s.mu.Lock()
defer s.mu.Unlock()

// connect to the external broker
var err error
if s.conn, err = s.bus.ConnectBroker(s.targetBroker); err != nil {
utils.Log.Errorln("[external-broker-example-service] could not connect to the broker. Service failed to start")
ready <- false
return ready
} else {
utils.Log.Infoln("[external-broker-example-service] connected to external broker!")

// create a local channel named "ext-svc" and bridge it to the external broker
busChannelManager := s.bus.GetChannelManager()
busChannelManager.CreateChannel(localSyncBusChannel)
if err = s.bus.GetChannelManager().MarkChannelAsGalactic(localSyncBusChannel, "/queue/ping-pong-service", s.conn); err != nil {
utils.Log.Errorln(err)
ready <- false
return ready
}
}

// this is just to demonstrate that a message travels from this Plank process to another.
// you can use any WebSocket client instead to send a request message to this service and get a response back.
go func() {
t := time.NewTicker(5 * time.Second)
<-t.C
_ = s.bus.SendRequestMessage(ExternalBrokerExampleServiceChannel, model.Request{Id: &uuid.UUID{}, Request: "ping", Payload: "HI!"}, nil)
}()

// at this point, a connection between this Plank and the other broker is established and the local channel "ext-svc"
// is synced to the bus channel "ping-pong-service" from the other Plank. set the service to ready and return the channel.
ready <- true
return ready
}

func (js *ExternalBrokerExampleService) GetRESTBridgeConfig() []*service.RESTBridgeConfig {
return nil
}

func (s *ExternalBrokerExampleService) OnServerShutdown() {
s.mu.Lock()
defer s.mu.Unlock()

// disconnect from bridge connection
if s.conn != nil {
utils.Log.Infoln("[external-broker-example-service] disconnecting from external broker")
if err := s.conn.Disconnect(); err != nil {
utils.Log.Errorln(err)
}
}
}