Skip to content

Commit

Permalink
Merge pull request #128 from andrewshan/main
Browse files Browse the repository at this point in the history
#104: fix health status not changed in some cases
  • Loading branch information
andrewshan authored Oct 29, 2021
2 parents 0ed386d + 4ed3cf6 commit 08f88c4
Show file tree
Hide file tree
Showing 68 changed files with 2,373 additions and 1,657 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
pushd ./naming/cache
go test -v
popd
pushd ./plugin/ratelimit/tokenbucket
pushd ./plugin/ratelimit/token
go test -v
popd
pushd ./store/sqldb
Expand Down
2 changes: 1 addition & 1 deletion apiserver/grpcserver/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (g *GRPCServer) Discover(server api.PolarisGRPC_DiscoverServer) error {
* @brief 上报心跳
*/
func (g *GRPCServer) Heartbeat(ctx context.Context, in *api.Instance) (*api.Response, error) {
out := g.namingServer.Heartbeat(convertContext(ctx), in)
out := g.healthCheckServer.Report(convertContext(ctx), in)
return out, nil
}

Expand Down
28 changes: 15 additions & 13 deletions apiserver/grpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package grpcserver
import (
"context"
"fmt"
"github.com/polarismesh/polaris-server/healthcheck"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -49,32 +50,27 @@ type GRPCServer struct {
restart bool
exitCh chan struct{}

server *grpc.Server
namingServer *naming.Server
statis plugin.Statis
ratelimit plugin.Ratelimit
server *grpc.Server
namingServer *naming.Server
healthCheckServer *healthcheck.Server
statis plugin.Statis
ratelimit plugin.Ratelimit

openAPI map[string]apiserver.APIConfig
openMethod map[string]bool
}

/**
* @brief 获取端口
*/
// GetPort 获取端口
func (g *GRPCServer) GetPort() uint32 {
return g.listenPort
}

/**
* @brief 获取Server的协议
*/
// GetProtocol 获取Server的协议
func (g *GRPCServer) GetProtocol() string {
return "grpc"
}

/**
* Initialize 初始化GRPC API服务器
*/
// Initialize 初始化GRPC API服务器
func (g *GRPCServer) Initialize(_ context.Context, option map[string]interface{},
api map[string]apiserver.APIConfig) error {
g.listenIP = option["listenIP"].(string)
Expand Down Expand Up @@ -110,6 +106,12 @@ func (g *GRPCServer) Run(errCh chan error) {

var err error
// 引入功能模块和插件
g.healthCheckServer, err = healthcheck.GetServer()
if err != nil {
log.Errorf("%v", err)
errCh <- err
return
}
g.namingServer, err = naming.GetServer()
if err != nil {
log.Errorf("%v", err)
Expand Down
2 changes: 1 addition & 1 deletion apiserver/httpserver/client_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,5 +184,5 @@ func (h *HTTPServer) Heartbeat(req *restful.Request, rsp *restful.Response) {
return
}

handler.WriteHeaderAndProto(h.namingServer.Heartbeat(ctx, instance))
handler.WriteHeaderAndProto(h.healthCheckServer.Report(ctx, instance))
}
16 changes: 5 additions & 11 deletions apiserver/httpserver/console_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package httpserver
import (
"context"
"fmt"
proto "github.com/golang/protobuf/proto"
"github.com/polarismesh/polaris-server/common/log"
"net/http"

"github.com/emicklei/go-restful"
"github.com/golang/protobuf/proto"
api "github.com/polarismesh/polaris-server/common/api/v1"
"github.com/polarismesh/polaris-server/common/log"
"github.com/polarismesh/polaris-server/common/utils"
)

Expand All @@ -34,9 +34,7 @@ const (
defaultAccess string = "default"
)

/**
* GetConsoleAccessServer 注册管理端接口
*/
// GetConsoleAccessServer 注册管理端接口
func (h *HTTPServer) GetConsoleAccessServer(include []string) (*restful.WebService, error) {
consoleAccess := []string{defaultAccess}

Expand Down Expand Up @@ -72,9 +70,7 @@ func (h *HTTPServer) GetConsoleAccessServer(include []string) (*restful.WebServi
return ws, nil
}

/**
* addDefaultReadAccess 增加默认读接口
*/
// addDefaultReadAccess 增加默认读接口
func (h *HTTPServer) addDefaultReadAccess(ws *restful.WebService) {
// 管理端接口:只包含读接口
ws.Route(ws.GET("/namespaces").To(h.GetNamespaces))
Expand Down Expand Up @@ -107,9 +103,7 @@ func (h *HTTPServer) addDefaultReadAccess(ws *restful.WebService) {
ws.Route(ws.GET("/platform/token").To(h.GetPlatformToken))
}

/**
* addDefaultAccess 增加默认接口
*/
// addDefaultAccess 增加默认接口
func (h *HTTPServer) addDefaultAccess(ws *restful.WebService) {
// 管理端接口:增删改查请求全部操作存储层
ws.Route(ws.POST("/namespaces").To(h.CreateNamespaces))
Expand Down
35 changes: 14 additions & 21 deletions apiserver/httpserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,13 @@ import (
"go.uber.org/zap"
)

/**
* Handler HTTP请求/回复处理器
*/
// Handler HTTP请求/回复处理器
type Handler struct {
*restful.Request
*restful.Response
}

/**
* Parse 解析请求
*/
func (h *Handler) Parse(message proto.Message) (context.Context, error) {
requestID := h.Request.HeaderParameter("Request-Id")
if err := jsonpb.Unmarshal(h.Request.Request.Body, message); err != nil {
log.Error(err.Error(), zap.String("request-id", requestID))
return nil, err
}
return h.postParseMessage(requestID)
}

// ParseArray 解析PB数组对象
func (h *Handler) ParseArray(createMessage func() proto.Message) (context.Context, error) {
requestID := h.Request.HeaderParameter("Request-Id")

Expand Down Expand Up @@ -102,9 +89,17 @@ func (h *Handler) postParseMessage(requestID string) (context.Context, error) {
return ctx, nil
}

/**
* WriteHeader 仅返回Code
*/
// Parse 解析请求
func (h *Handler) Parse(message proto.Message) (context.Context, error) {
requestID := h.Request.HeaderParameter("Request-Id")
if err := jsonpb.Unmarshal(h.Request.Request.Body, message); err != nil {
log.Error(err.Error(), zap.String("request-id", requestID))
return nil, err
}
return h.postParseMessage(requestID)
}

// WriteHeader 仅返回Code
func (h *Handler) WriteHeader(polarisCode uint32, httpStatus int) {
requestID := h.Request.HeaderParameter(utils.PolarisRequestID)
h.Request.SetAttribute(utils.PolarisCode, polarisCode) // api统计的时候,用该code
Expand All @@ -118,9 +113,7 @@ func (h *Handler) WriteHeader(polarisCode uint32, httpStatus int) {
h.Response.WriteHeader(httpStatus)
}

/**
* WriteHeaderAndProto 返回Code和Proto
*/
// WriteHeaderAndProto 返回Code和Proto
func (h *Handler) WriteHeaderAndProto(obj api.ResponseMessage) {
requestID := h.Request.HeaderParameter(utils.PolarisRequestID)
h.Request.SetAttribute(utils.PolarisCode, obj.GetCode().GetValue())
Expand Down
4 changes: 2 additions & 2 deletions apiserver/httpserver/maintain_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (h *HTTPServer) GetLastHeartbeat(req *restful.Request, rsp *restful.Respons
instance := &api.Instance{}
if id, ok := params["id"]; ok && id != "" {
instance.Id = utils.NewStringValue(id)
ret := h.namingServer.GetLastHeartbeat(instance)
ret := h.healthCheckServer.GetLastHeartbeat(instance)
handler.WriteHeaderAndProto(ret)
return
}
Expand All @@ -227,6 +227,6 @@ func (h *HTTPServer) GetLastHeartbeat(req *restful.Request, rsp *restful.Respons
port, _ := strconv.Atoi(params["port"])
instance.Port = utils.NewUInt32Value(uint32(port))

ret := h.namingServer.GetLastHeartbeat(instance)
ret := h.healthCheckServer.GetLastHeartbeat(instance)
handler.WriteHeaderAndProto(ret)
}
19 changes: 13 additions & 6 deletions apiserver/httpserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package httpserver
import (
"context"
"fmt"
"github.com/polarismesh/polaris-server/healthcheck"
"net"
"net/http"
"net/http/pprof"
Expand Down Expand Up @@ -58,11 +59,12 @@ type HTTPServer struct {

freeMemMu *sync.Mutex

server *http.Server
namingServer *naming.Server
rateLimit plugin.Ratelimit
statis plugin.Statis
auth plugin.Auth
server *http.Server
namingServer *naming.Server
healthCheckServer *healthcheck.Server
rateLimit plugin.Ratelimit
statis plugin.Statis
auth plugin.Auth
}

const (
Expand Down Expand Up @@ -137,6 +139,12 @@ func (h *HTTPServer) Run(errCh chan error) {
errCh <- err
return
}
h.healthCheckServer, err = healthcheck.GetServer()
if err != nil {
log.Errorf("%v", err)
errCh <- err
return
}
h.statis = plugin.GetStatis()

// 初始化http server
Expand All @@ -159,7 +167,6 @@ func (h *HTTPServer) Run(errCh chan error) {
}

ln = &tcpKeepAliveListener{ln.(*net.TCPListener)}

// 开启最大连接数限制
if h.connLimitConfig != nil && h.connLimitConfig.OpenConnLimit {
log.Infof("http server use max connection limit per ip: %d, http max limit: %d",
Expand Down
38 changes: 31 additions & 7 deletions bootstrap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"errors"
"fmt"
"github.com/polarismesh/polaris-server/common/model"
"github.com/polarismesh/polaris-server/healthcheck"
"net"
"strings"
"time"
Expand Down Expand Up @@ -105,15 +107,11 @@ func Start(configFilePath string) {
fmt.Printf("[ERROR] %v\n", err)
return
}

cfg.Naming.HealthCheck.LocalHost = LocalHost // 补充healthCheck的配置
naming.SetHealthCheckConfig(&cfg.Naming.HealthCheck)
err = naming.Initialize(ctx, &cfg.Naming, &cfg.Cache)
err = StartComponents(ctx, cfg)
if err != nil {
fmt.Printf("[ERROR] %v\n", err)
return
}

errCh := make(chan error, len(cfg.APIServers))
servers, err := StartServers(ctx, cfg, errCh)
if err != nil {
Expand All @@ -130,7 +128,32 @@ func Start(configFilePath string) {
RunMainLoop(servers, errCh)
}

// 启动server
// StartComponents start healthcheck and naming components
func StartComponents(ctx context.Context, cfg *config.Config) error {
var err error
if len(cfg.HealthChecks.LocalHost) == 0 {
cfg.HealthChecks.LocalHost = LocalHost // 补充healthCheck的配置
}
err = healthcheck.Initialize(ctx, &cfg.HealthChecks, cfg.Cache.Open)
if err != nil {
return err
}
healthCheckServer, err := healthcheck.GetServer()
if err != nil {
return err
}
cacheProvider, err := healthCheckServer.CacheProvider()
if err != nil {
return err
}
err = naming.Initialize(ctx, &cfg.Naming, &cfg.Cache, cacheProvider)
if err != nil {
return err
}
return nil
}

// StartServers 启动server
func StartServers(ctx context.Context, cfg *config.Config, errCh chan error) (
[]apiserver.Apiserver, error) {
// 启动API服务器
Expand Down Expand Up @@ -362,7 +385,8 @@ func selfRegister(host string, port uint32, protocol string, isolated bool, pola
Version: utils.NewStringValue(version.Get()),
Isolate: utils.NewBoolValue(isolated), // 自注册,默认是隔离的
Metadata: map[string]string{
"build-revision": version.GetRevision(),
model.MetaKeyBuildRevision: version.GetRevision(),
model.MetaKeyPolarisService: name,
},
}

Expand Down
1 change: 1 addition & 0 deletions common/api/v1/codeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
HealthCheckNotOpen = 400140
HeartbeatOnDisabledIns = 400141
HeartbeatExceedLimit = 400142
HeartbeatTypeNotFound = 400143
InvalidMetadata = 400150
InvalidRateLimitID = 400151
InvalidRateLimitLabels = 400152
Expand Down
26 changes: 26 additions & 0 deletions common/model/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Tencent is pleased to support the open source community by making Polaris available.
*
* Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package model

const (
// MetaKeyPolarisService service identifier by self registration
MetaKeyPolarisService = "polaris_service"

// MetaKeyBuildRevision build revision for server
MetaKeyBuildRevision = "build-revision"
)
Loading

0 comments on commit 08f88c4

Please sign in to comment.