Skip to content

Commit

Permalink
Hook up MetricsQueryService to main funcs (#3079)
Browse files Browse the repository at this point in the history
  • Loading branch information
albertteoh authored Jun 12, 2021
1 parent e33977e commit 013efad
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 80 deletions.
34 changes: 30 additions & 4 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package main

import (
"fmt"
"io"
"log"
"os"
Expand Down Expand Up @@ -44,10 +45,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)
Expand All @@ -71,6 +74,12 @@ func main() {
log.Fatalf("Cannot initialize sampling strategy store factory: %v", err)
}

fc := metricsPlugin.FactoryConfigFromEnv()
metricsReaderFactory, err := metricsPlugin.NewFactory(fc)
if err != nil {
log.Fatalf("Cannot initialize metrics store factory: %v", err)
}

v := viper.New()
command := &cobra.Command{
Use: "jaeger-all-in-one",
Expand Down Expand Up @@ -107,6 +116,11 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsReader, err := createMetricsReader(metricsReaderFactory, v, logger)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}

strategyStoreFactory.InitFromViper(v)
if err := strategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
Expand Down Expand Up @@ -157,8 +171,8 @@ by default uses only in-memory database.`,
// query
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader,
rootMetricsFactory, metricsFactory,
spanReader, dependencyReader, metricsReader,
metricsFactory,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -196,6 +210,7 @@ by default uses only in-memory database.`,
collectorApp.AddFlags,
queryApp.AddFlags,
strategyStoreFactory.AddFlags,
metricsReaderFactory.AddFlags,
)

if err := command.Execute(); err != nil {
Expand Down Expand Up @@ -229,12 +244,13 @@ func startQuery(
queryOpts *querysvc.QueryServiceOptions,
spanReader spanstore.Reader,
depReader dependencystore.Reader,
rootFactory metrics.Factory,
metricsReader metricsstore.Reader,
baseFactory metrics.Factory,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
server, err := queryApp.NewServer(svc.Logger, qs, qOpts, opentracing.GlobalTracer())
mqs := querysvc.NewMetricsQueryService(metricsReader)
server, err := queryApp.NewServer(svc.Logger, qs, mqs, qOpts, opentracing.GlobalTracer())
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
}
Expand Down Expand Up @@ -272,3 +288,13 @@ func initTracer(metricsFactory metrics.Factory, logger *zap.Logger) io.Closer {
opentracing.SetGlobalTracer(tracer)
return closer
}

func createMetricsReader(factory *metricsPlugin.Factory, v *viper.Viper, logger *zap.Logger) (metricsstore.Reader, error) {
if err := factory.Initialize(logger); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
factory.InitFromViper(v)
return factory.CreateMetricsReader()
}
18 changes: 1 addition & 17 deletions cmd/query/app/querysvc/metrics_query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@ package querysvc

import (
"context"
"errors"
"time"

"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)

// MetricsQueryService contains the underlying reader required for querying the metrics store.
// MetricsQueryService provides a means of querying R.E.D metrics from an underlying metrics store.
type MetricsQueryService struct {
metricsReader metricsstore.Reader
}

var errNilReader = errors.New("no reader defined for MetricsQueryService")

// NewMetricsQueryService returns a new MetricsQueryService.
// A nil reader will result in a nil MetricsQueryService being returned.
func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {
return &MetricsQueryService{
metricsReader: reader,
Expand All @@ -40,32 +36,20 @@ func NewMetricsQueryService(reader metricsstore.Reader) *MetricsQueryService {

// GetLatencies is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetLatencies(ctx context.Context, params *metricsstore.LatenciesQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetLatencies(ctx, params)
}

// GetCallRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetCallRates(ctx context.Context, params *metricsstore.CallRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetCallRates(ctx, params)
}

// GetErrorRates is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetErrorRates(ctx context.Context, params *metricsstore.ErrorRateQueryParameters) (*metrics.MetricFamily, error) {
if mqs.metricsReader == nil {
return nil, errNilReader
}
return mqs.metricsReader.GetErrorRates(ctx, params)
}

// GetMinStepDuration is the queryService implementation of metricsstore.Reader.
func (mqs MetricsQueryService) GetMinStepDuration(ctx context.Context, params *metricsstore.MinStepDurationQueryParameters) (time.Duration, error) {
if mqs.metricsReader == nil {
return 0, errNilReader
}
return mqs.metricsReader.GetMinStepDuration(ctx, params)
}
34 changes: 0 additions & 34 deletions cmd/query/app/querysvc/metrics_query_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ type testMetricsQueryService struct {

func initializeTestMetricsQueryService() *testMetricsQueryService {
metricsReader := &metricsmocks.Reader{}

tqs := testMetricsQueryService{
metricsReader: metricsReader,
}

tqs.queryService = NewMetricsQueryService(metricsReader)
return &tqs
}
Expand All @@ -58,14 +56,6 @@ func TestGetLatencies(t *testing.T) {
assert.Equal(t, expectedLatencies, actualLatencies)
}

func TestGetLatenciesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.LatenciesQueryParameters{}
r, err := qs.GetLatencies(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetCallRates()
func TestGetCallRates(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -81,14 +71,6 @@ func TestGetCallRates(t *testing.T) {
assert.Equal(t, expectedCallRates, actualCallRates)
}

func TestGetCallRatesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.CallRateQueryParameters{}
r, err := qs.GetCallRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetErrorRates()
func TestGetErrorRates(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -101,14 +83,6 @@ func TestGetErrorRates(t *testing.T) {
assert.Equal(t, expectedErrorRates, actualErrorRates)
}

func TestGetErrorRatesNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.ErrorRateQueryParameters{}
r, err := qs.GetErrorRates(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}

// Test QueryService.GetMinStepDurations()
func TestGetMinStepDurations(t *testing.T) {
tqs := initializeTestMetricsQueryService()
Expand All @@ -120,11 +94,3 @@ func TestGetMinStepDurations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expectedMinStep, actualMinStep)
}

func TestGetMinStepDurationsNilReader(t *testing.T) {
qs := NewMetricsQueryService(nil)
qParams := &metricsstore.MinStepDurationQueryParameters{}
r, err := qs.GetMinStepDuration(context.Background(), qParams)
assert.Zero(t, r)
assert.EqualError(t, err, errNilReader.Error())
}
14 changes: 9 additions & 5 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
Expand All @@ -67,12 +67,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer)
if err != nil {
return nil, err
}

httpServer, err := createHTTPServer(querySvc, options, tracer, logger)
httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger)
if err != nil {
return nil, err
}
Expand All @@ -94,7 +94,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand All @@ -111,11 +111,15 @@ func createGRPCServer(querySvc *querysvc.QueryService, options *QueryOptions, lo
server := grpc.NewServer(grpcOpts...)

handler := NewGRPCHandler(querySvc, logger, tracer)

// TODO: Register MetricsQueryService
api_v2.RegisterQueryServiceServer(server, handler)

return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc *querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, error) {
// TODO: Add HandlerOptions.MetricsQueryService
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
Expand Down
30 changes: 18 additions & 12 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"testing"
"time"

opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) {
ClientCAPath: testCertKeyLocation + "/example-CA-cert.pem",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -77,7 +77,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand All @@ -90,7 +90,7 @@ func TestCreateTLSHttpServerError(t *testing.T) {
ClientCAPath: "invalid/path",
}

_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{})
assert.NotNil(t, err)
}
Expand Down Expand Up @@ -331,7 +331,8 @@ func TestServerHTTPTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -491,7 +492,8 @@ func TestServerGRPCTLS(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})
server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
serverOptions,
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -545,12 +547,12 @@ func TestServerGRPCTLS(t *testing.T) {

}
func TestServerBadHostPort(t *testing.T) {
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true},
opentracing.NoopTracer{})

assert.NotNil(t, err)
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{},
_, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, querysvc.NewMetricsQueryService(nil),
&QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true},
opentracing.NoopTracer{})

Expand All @@ -576,6 +578,7 @@ func TestServerInUseHostPort(t *testing.T) {
server, err := NewServer(
zap.NewNop(),
&querysvc.QueryService{},
querysvc.NewMetricsQueryService(nil),
&QueryOptions{
HTTPHostPort: tc.httpHostPort,
GRPCHostPort: tc.grpcHostPort,
Expand Down Expand Up @@ -608,8 +611,8 @@ func TestServerSinglePort(t *testing.T) {
spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil)

querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{})

server, err := NewServer(flagsSvc.Logger, querySvc,
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc,
&QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true},
opentracing.NoopTracer{})
assert.Nil(t, err)
Expand Down Expand Up @@ -658,8 +661,10 @@ func TestServerGracefulExit(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)

server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
go func() {
Expand All @@ -685,8 +690,9 @@ func TestServerHandlesPortZero(t *testing.T) {
flagsSvc.Logger = zap.New(zapCore)

querySvc := &querysvc.QueryService{}
metricsQuerySvc := querysvc.NewMetricsQueryService(nil)
tracer := opentracing.NoopTracer{}
server, err := NewServer(flagsSvc.Logger, querySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
server, err := NewServer(flagsSvc.Logger, querySvc, metricsQuerySvc, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, tracer)
assert.Nil(t, err)
assert.NoError(t, server.Start())
server.Close()
Expand Down
Loading

0 comments on commit 013efad

Please sign in to comment.