Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit 96ea6b1

Browse files
committed
Add a cross-plank example (WIP)
Signed-off-by: Josh Kim <[email protected]>
1 parent 9ad7c33 commit 96ea6b1

File tree

5 files changed

+308
-0
lines changed

5 files changed

+308
-0
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{
2+
"debug": false,
3+
"no_banner": false,
4+
"root_dir": "./",
5+
"spa_config": {
6+
"root_folder": "public/",
7+
"base_uri": "/",
8+
"static_assets": [
9+
"public/assets:/assets"
10+
],
11+
"cache_control_rules": {
12+
"*.{js,css}": "public, max-age=86400",
13+
"*.{ico,jpg,jpeg,svg,png,gif,tiff}": "public, max-age=604800"
14+
}
15+
},
16+
"host": "localhost",
17+
"port": 30081,
18+
"log_config": {
19+
"root": ".",
20+
"access_log": "access.log",
21+
"error_log": "errors.log",
22+
"output_log": "stdout",
23+
"format_options": {
24+
"force_colors": false,
25+
"disable_colors": true,
26+
"force_quote": false,
27+
"disable_quote": false,
28+
"environment_override_colors": false,
29+
"disable_timestamp": false,
30+
"full_timestamp": true,
31+
"timestamp_format": "",
32+
"disable_sorting": false,
33+
"disable_level_truncation": false,
34+
"pad_level_text": false,
35+
"quote_empty_fields": false,
36+
"is_terminal": true
37+
}
38+
},
39+
"shutdown_timeout_in_minutes": 1,
40+
"rest_bridge_timeout_in_minutes": 1,
41+
"fabric_config": {
42+
"fabric_endpoint": "/ws",
43+
"use_tcp": false,
44+
"tcp_port": 61613,
45+
"endpoint_config": {
46+
"TopicPrefix": "/topic",
47+
"UserQueuePrefix": "/queue",
48+
"AppRequestPrefix": "/pub",
49+
"AppRequestQueuePrefix": "/pub/queue",
50+
"Heartbeat": 60000
51+
}
52+
},
53+
"enable_prometheus": true
54+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Copyright 2022 VMware, Inc.
2+
// SPDX-License-Identifier: BSD-2-Clause
3+
4+
package main
5+
6+
import (
7+
"github.com/vmware/transport-go/plank/pkg/server"
8+
"github.com/vmware/transport-go/plank/services"
9+
"github.com/vmware/transport-go/plank/utils"
10+
"os"
11+
)
12+
13+
// configure flags
14+
func main() {
15+
serverConfig, err := server.CreateServerConfig()
16+
if err != nil {
17+
utils.Log.Fatalln(err)
18+
}
19+
platformServer := server.NewPlatformServer(serverConfig)
20+
if err = platformServer.RegisterService(services.NewPingPongService(), services.PingPongServiceChan); err != nil {
21+
utils.Log.Fatalln(err)
22+
}
23+
24+
syschan := make(chan os.Signal, 1)
25+
platformServer.StartServer(syschan)
26+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
{
2+
"debug": false,
3+
"no_banner": false,
4+
"root_dir": "./",
5+
"spa_config": {
6+
"root_folder": "public/",
7+
"base_uri": "/",
8+
"static_assets": [
9+
"public/assets:/assets"
10+
],
11+
"cache_control_rules": {
12+
"*.{js,css}": "public, max-age=86400",
13+
"*.{ico,jpg,jpeg,svg,png,gif,tiff}": "public, max-age=604800"
14+
}
15+
},
16+
"host": "localhost",
17+
"port": 30080,
18+
"log_config": {
19+
"root": ".",
20+
"access_log": "access.log",
21+
"error_log": "errors.log",
22+
"output_log": "stdout",
23+
"format_options": {
24+
"force_colors": false,
25+
"disable_colors": true,
26+
"force_quote": false,
27+
"disable_quote": false,
28+
"environment_override_colors": false,
29+
"disable_timestamp": false,
30+
"full_timestamp": true,
31+
"timestamp_format": "",
32+
"disable_sorting": false,
33+
"disable_level_truncation": false,
34+
"pad_level_text": false,
35+
"quote_empty_fields": false,
36+
"is_terminal": true
37+
}
38+
},
39+
"shutdown_timeout_in_minutes": 1,
40+
"rest_bridge_timeout_in_minutes": 1,
41+
"fabric_config": {
42+
"fabric_endpoint": "/ws",
43+
"use_tcp": false,
44+
"tcp_port": 61613,
45+
"endpoint_config": {
46+
"TopicPrefix": "/topic",
47+
"UserQueuePrefix": "/queue",
48+
"AppRequestPrefix": "/pub",
49+
"AppRequestQueuePrefix": "/pub/queue",
50+
"Heartbeat": 60000
51+
}
52+
},
53+
"enable_prometheus": true
54+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// Copyright 2022 VMware, Inc.
2+
// SPDX-License-Identifier: BSD-2-Clause
3+
4+
package main
5+
6+
import (
7+
"github.com/vmware/transport-go/bridge"
8+
"github.com/vmware/transport-go/bus"
9+
"github.com/vmware/transport-go/plank/pkg/server"
10+
"github.com/vmware/transport-go/plank/services"
11+
"github.com/vmware/transport-go/plank/utils"
12+
"os"
13+
"time"
14+
)
15+
16+
// configure flags
17+
func main() {
18+
serverConfig, err := server.CreateServerConfig()
19+
if err != nil {
20+
utils.Log.Fatalln(err)
21+
}
22+
platformServer := server.NewPlatformServer(serverConfig)
23+
brokerConfigForAnotherWs := &bridge.BrokerConnectorConfig{
24+
Username: "guest",
25+
Password: "guest",
26+
ServerAddr: "localhost:30081",
27+
UseWS: true,
28+
WebSocketConfig: &bridge.WebSocketConfig{WSPath: "/ws"},
29+
HeartBeatOut: 30 * time.Second,
30+
STOMPHeader: map[string]string{},
31+
}
32+
if err = platformServer.RegisterService(services.NewExternalBrokerExampleService(bus.GetBus(), brokerConfigForAnotherWs), services.ExternalBrokerExampleServiceChannel); err != nil {
33+
utils.Log.Fatalln(err)
34+
}
35+
36+
syschan := make(chan os.Signal, 1)
37+
platformServer.StartServer(syschan)
38+
}

plank/services/cross_plank_example.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
package services
2+
3+
import (
4+
"encoding/json"
5+
"github.com/google/uuid"
6+
"github.com/vmware/transport-go/bridge"
7+
"github.com/vmware/transport-go/bus"
8+
"github.com/vmware/transport-go/model"
9+
"github.com/vmware/transport-go/plank/utils"
10+
"github.com/vmware/transport-go/service"
11+
"sync"
12+
"time"
13+
)
14+
15+
const ExternalBrokerExampleServiceChannel = "external-broker-example-service"
16+
const localSyncBusChannel string = "ext-svc"
17+
18+
type ExternalBrokerExampleService struct {
19+
targetBroker *bridge.BrokerConnectorConfig
20+
conn bridge.Connection
21+
bus bus.EventBus
22+
mu sync.Mutex
23+
}
24+
25+
func NewExternalBrokerExampleService(bus bus.EventBus, config *bridge.BrokerConnectorConfig) *ExternalBrokerExampleService {
26+
service := &ExternalBrokerExampleService{
27+
bus: bus,
28+
targetBroker: config,
29+
}
30+
return service
31+
}
32+
33+
func (s *ExternalBrokerExampleService) HandleServiceRequest(request *model.Request, core service.FabricServiceCore) {
34+
// in this example, I wrote it so that "ping" request assumes the broker session is established already, but in real
35+
// scenarios you may wish to ensure the connection is alive and healthy. maybe you will want to create a few more
36+
// service requests to handle the lifecycle of the WS connection to the other broker. for example, you could create
37+
// a request "connect" for connecting to the external broker and "disconnect" for closing the connection explicitly.
38+
switch request.Request {
39+
case "ping":
40+
// we create a single-fire listener for the channel that's linked to the other broker
41+
handler, err := s.bus.ListenOnce(localSyncBusChannel)
42+
if err != nil {
43+
// failed to set up the listener. this rarely happens though but still it helps to make sure all errors are tracked.
44+
core.SendErrorResponse(request, 500, err.Error())
45+
return
46+
}
47+
48+
// we will take the payload from the local request and use it to send a remote request in L67
49+
utils.Log.Infoln("RECEIVED LOCAL REQUEST PAYLOAD", request.Payload)
50+
51+
// set up the handler for responses returned back from the external broker
52+
handler.Handle(func(message *model.Message) {
53+
var payload model.Response
54+
if err := message.CastPayloadToType(&payload); err != nil {
55+
utils.Log.Errorln("RECEIVED RESPONSE BUT FAILED TO CAST", err)
56+
core.SendErrorResponse(request, 500, err.Error())
57+
}
58+
// pass back the success response from the external broker back to the requester.
59+
core.SendResponse(request, payload.Payload)
60+
utils.Log.Infof("RECEIVED RESPONSE: %v\n", payload.Payload)
61+
}, func(err error) {
62+
// external broker sent an error. pass it back to the requester.
63+
core.SendErrorResponse(request, 400, err.Error())
64+
utils.Log.Errorln("RECEIVED ERROR RESPONSE FROM EXTERNAL BROKER", err)
65+
return
66+
})
67+
68+
// create a request object for the remote service "ping-pong-service" with request named "ping-get" and pass it
69+
// through the external broker using the connection we established when the service started. see ping-pong-service
70+
// L60 for the data structure for the request payload that the service accepts
71+
req := &model.Request{Request: "ping-get", Payload: request.Payload.(string)}
72+
reqMarshalled, _ := json.Marshal(req)
73+
if err = s.conn.SendJSONMessage("/pub/queue/ping-pong-service", reqMarshalled); err != nil {
74+
core.SendErrorResponse(request, 400, err.Error())
75+
return
76+
}
77+
78+
default:
79+
core.SendErrorResponse(request, 400, "unknown service")
80+
}
81+
}
82+
83+
func (s *ExternalBrokerExampleService) OnServiceReady() chan bool {
84+
ready := make(chan bool, 1)
85+
s.mu.Lock()
86+
defer s.mu.Unlock()
87+
88+
// connect to the external broker
89+
var err error
90+
if s.conn, err = s.bus.ConnectBroker(s.targetBroker); err != nil {
91+
utils.Log.Errorln("[external-broker-example-service] could not connect to the broker. Service failed to start")
92+
ready <- false
93+
return ready
94+
} else {
95+
utils.Log.Infoln("[external-broker-example-service] connected to external broker!")
96+
97+
// create a local channel named "ext-svc" and bridge it to the external broker
98+
busChannelManager := s.bus.GetChannelManager()
99+
busChannelManager.CreateChannel(localSyncBusChannel)
100+
if err = s.bus.GetChannelManager().MarkChannelAsGalactic(localSyncBusChannel, "/queue/ping-pong-service", s.conn); err != nil {
101+
utils.Log.Errorln(err)
102+
ready <- false
103+
return ready
104+
}
105+
}
106+
107+
// this is just to demonstrate that a message travels from this Plank process to another.
108+
// you can use any WebSocket client instead to send a request message to this service and get a response back.
109+
go func() {
110+
t := time.NewTicker(5 * time.Second)
111+
<-t.C
112+
_ = s.bus.SendRequestMessage(ExternalBrokerExampleServiceChannel, model.Request{Id: &uuid.UUID{}, Request: "ping", Payload: "HI!"}, nil)
113+
}()
114+
115+
// at this point, a connection between this Plank and the other broker is established and the local channel "ext-svc"
116+
// is synced to the bus channel "ping-pong-service" from the other Plank. set the service to ready and return the channel.
117+
ready <- true
118+
return ready
119+
}
120+
121+
func (js *ExternalBrokerExampleService) GetRESTBridgeConfig() []*service.RESTBridgeConfig {
122+
return nil
123+
}
124+
125+
func (s *ExternalBrokerExampleService) OnServerShutdown() {
126+
s.mu.Lock()
127+
defer s.mu.Unlock()
128+
129+
// disconnect from bridge connection
130+
if s.conn != nil {
131+
utils.Log.Infoln("[external-broker-example-service] disconnecting from external broker")
132+
if err := s.conn.Disconnect(); err != nil {
133+
utils.Log.Errorln(err)
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)