Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
DrmagicE committed Jan 22, 2021
1 parent 6b9850c commit bc88939
Show file tree
Hide file tree
Showing 24 changed files with 1,071 additions and 141 deletions.
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ generate-swagger:

# generate mock code
generate-mocks:
go generate ./...

@./mock_gen.sh
run:
go run ./cmd/gmqttd start -c ./cmd/gmqttd/default_config.yml

Expand Down
2 changes: 2 additions & 0 deletions cmd/gmqttd/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,14 @@ func NewStartCmd() *cobra.Command {
if useDefault {
l.Warn("config file not exist, use default configration")
}

s := server.New(
server.WithConfig(c),
server.WithTCPListener(tcpListeners...),
server.WithWebsocketServer(websockets...),
server.WithLogger(l),
)

err = s.Init()
if err != nil {
fmt.Println(err)
Expand Down
26 changes: 17 additions & 9 deletions cmd/gmqttd/default_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,18 @@ listeners:
# cert_file: "path_to_cert_file"
# key_file: "path_to_key_file"

- address: ":8883"
# websocket setting
websocket:
path: "/"
# - address: ":8883"
# # websocket setting
# websocket:
# path: "/"

gRPC:
endpoint: "unix://./gmqttd.sock"
# /var/run

api:


mqtt:
session_expiry: 2h
session_expiry_check_timer: 20s
Expand All @@ -23,8 +31,8 @@ mqtt:
shared_subscription_available: true
maximum_qos: 2
retain_available: true
max_queued_messages: 1000
max_inflight: 100
max_queued_messages: 10000
max_inflight: 1000
queue_qos0_messages: true
delivery_mode: onlyonce # overlap or onlyonce
allow_zero_length_clientid: true
Expand Down Expand Up @@ -61,7 +69,7 @@ plugins:
enable: true
addr: :8083
grpc:
addr: 8084
#addr: 'unix://./gmqttd.sock'
auth:
# Password hash type. (plain | md5 | sha256 | bcrypt)
# Default to MD5.
Expand All @@ -80,8 +88,8 @@ plugin_order:
# Uncomment auth to enable authentication.
#- auth
#- prometheus
#- admin
- federation
- admin
#- federation
log:
level: info # debug | info | warn | error
format: text # json | text
Expand Down
94 changes: 94 additions & 0 deletions cmd/gmqttd/default_config1.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
pid_file: ./gmqttd1.pid
listeners:
# bind address
- address: ":1884"
# tls setting
# tls:
# cert_file: "path_to_cert_file"
# key_file: "path_to_key_file"

# - address: ":8883"
# # websocket setting
# websocket:
# path: "/"
mqtt:
session_expiry: 2h
session_expiry_check_timer: 20s
message_expiry: 2h
max_packet_size: 268435456
server_receive_maximum: 100
max_keepalive: 60
topic_alias_maximum: 10
subscription_identifier_available: true
wildcard_subscription_available: true
shared_subscription_available: true
maximum_qos: 2
retain_available: true
max_queued_messages: 10000
max_inflight: 100
queue_qos0_messages: true
delivery_mode: onlyonce # overlap or onlyonce
allow_zero_length_clientid: true

persistence:
type: memory # memory | redis
# The redis configuration only take effect when type == redis.
redis:
# redis server address
addr: "127.0.0.1:6379"
# the maximum number of idle connections in the redis connection pool.
max_idle: 1000
# the maximum number of connections allocated by the redis connection pool at a given time.
# If zero, there is no limit on the number of connections in the pool.
max_active: 0
# the connection idle timeout, connection will be closed after remaining idle for this duration. If the value is zero, then idle connections are not closed.
idle_timeout: 240s
password: ""
# the number of the redis database.
database: 0

# The topic alias manager setting. The topic alias feature is introduced by MQTT V5.
# This setting is used to control how the broker manage topic alias.
topic_alias_manager:
# Currently, only FIFO strategy is supported.
type: fifo

plugins:
prometheus:
path: "/metrics"
listen_address: ":8082"
admin:
http:
enable: true
addr: :8083
grpc:
addr: 8084
auth:
# Password hash type. (plain | md5 | sha256 | bcrypt)
# Default to MD5.
hash: md5
# The file to store password. Default to $HOME/gmqtt_password.yml
# password_file:
federation:
node_name: node1
fed_addr: 127.0.0.1:8080
gossip_addr: 127.0.0.1:7070
join:
- 127.0.0.1:7071

# plugin loading orders
plugin_order:
# Uncomment auth to enable authentication.
#- auth
#- prometheus
#- admin
- federation
log:
level: warn # debug | info | warn | error
format: text # json | text
# whether to dump MQTT packet in debug level
dump_packet: false




1 change: 1 addition & 0 deletions cmd/gmqttd/gmqttd1.pid
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
27046
16 changes: 9 additions & 7 deletions cmd/gmqttd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
//"runtime/pprof"
_ "runtime/pprof"

"os"

"github.com/mitchellh/go-homedir"
Expand Down Expand Up @@ -41,12 +43,12 @@ func init() {
}

func main() {
// f, err := os.Create("cpu.profile")
// if err != nil {
// panic(err)
// }
// pprof.StartCPUProfile(f)
// defer pprof.StopCPUProfile()
//f, err := os.Create("cpu.profile")
//if err != nil {
// panic(err)
//}
//pprof.StartCPUProfile(f)
//defer pprof.StopCPUProfile()
go func() {
http.ListenAndServe(":6060", nil)
}()
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (p pluginConfig) UnmarshalYAML(unmarshal func(interface{}) error) error {
type Config struct {
Listeners []*ListenerConfig `yaml:"listeners"`
MQTT MQTT `yaml:"mqtt,omitempty"`
GRPC GRPC `yaml:"gRPC"`
Log LogConfig `yaml:"log"`
PidFile string `yaml:"pid_file"`
Plugins pluginConfig `yaml:"plugins"`
Expand All @@ -118,6 +119,10 @@ type Config struct {
TopicAliasManager TopicAliasManager `yaml:"topic_alias_manager"`
}

type GRPC struct {
Endpoint string `yaml:"endpoint"`
}

type TLSOptions struct {
CertFile string `yaml:"cert_file"`
KeyFile string `yaml:"key_file"`
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/DrmagicE/gmqtt
go 1.14

require (
github.com/golang/mock v1.2.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2
github.com/gomodule/redigo v1.8.2
github.com/google/uuid v1.1.2
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs=
Expand Down Expand Up @@ -322,6 +324,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
Expand Down
22 changes: 22 additions & 0 deletions mock_gen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
mockgen -source=config/config.go -destination=./config/config_mock.go -package=config -self_package=github.com/DrmagicE/gmqtt/config
mockgen -source=persistence/queue/elem.go -destination=./persistence/queue/elem_mock.go -package=queue -self_package=github.com/DrmagicE/gmqtt/queue
mockgen -source=persistence/queue/queue.go -destination=./persistence/queue/queue_mock.go -package=queue -self_package=github.com/DrmagicE/gmqtt/queue
mockgen -source=persistence/session/session.go -destination=./persistence/session/session_mock.go -package=session -self_package=github.com/DrmagicE/gmqtt/session
mockgen -source=persistence/subscription/interface.go -destination=./persistence/subscription/interface_mock.go -package=subscription -self_package=github.com/DrmagicE/gmqtt/subscription
mockgen -source=persistence/unack/unack.go -destination=./persistence/unack/unack_mock.go -package=unack -self_package=github.com/DrmagicE/gmqtt/unack
mockgen -source=pkg/packets/packets.go -destination=./pkg/packets/packets_mock.go -package=packets -self_package=github.com/DrmagicE/gmqtt/packets
mockgen -source=plugin/auth/account_grpc.pb.go -destination=./plugin/auth/account_grpc.pb_mock.go -package=auth -self_package=github.com/DrmagicE/gmqtt/auth
mockgen -source=plugin/federation/federation.pb.go -destination=./plugin/federation/federation.pb_mock.go -package=federation -self_package=github.com/DrmagicE/gmqtt/federation
mockgen -source=plugin/federation/peer.go -destination=./plugin/federation/peer_mock.go -package=federation -self_package=github.com/DrmagicE/gmqtt/federation
mockgen -source=retained/interface.go -destination=./retained/interface_mock.go -package=retained -self_package=github.com/DrmagicE/gmqtt/retained
mockgen -source=server/client.go -destination=./server/client_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/persistence.go -destination=./server/persistence_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/plugin.go -destination=./server/plugin_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/server.go -destination=./server/server_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/service.go -destination=./server/service_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/stats.go -destination=./server/stats_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server
mockgen -source=server/topic_alias.go -destination=./server/topic_alias_mock.go -package=server -self_package=github.com/DrmagicE/gmqtt/server

# reflection mode.
# gRPC streaming mock issue: https://github.com/golang/mock/pull/163
mockgen -package=federation -destination=/usr/local/gopath/src/github.com/DrmagicE/gmqtt/plugin/federation/federation_grpc.pb_mock.go github.com/DrmagicE/gmqtt/plugin/federation FederationClient,Federation_EventStreamClient
2 changes: 2 additions & 0 deletions persistence/subscription/mem/trie_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ func (db *TrieDB) UnsubscribeLocked(clientID string, topics ...string) {
shareName, topic := subscription.SplitTopic(topic)
if shareName != "" {
topicTrie = db.sharedTrie
// TODO
index = db.sharedIndex
} else if isSystemTopic(topic) {
index = db.systemIndex
topicTrie = db.systemTrie
Expand Down
Loading

0 comments on commit bc88939

Please sign in to comment.