Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:shmipc prometheus #1

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
approvers:
- liu-song
49 changes: 48 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,48 @@
# .github
## shmipc-prometheus is a Prometheus monitoring for [shmipc-go](https://github.com/cloudwego/shmipc-go) (*This is a community driven project*)

## How to use with shmipc-go server?

**[example/shmipc_server/main.go](example/shmipc_server/main.go)**

```go
package main

import (
shmipcprometheus "github.com/cloudwego-contrib/shmipc-prometheus"
"github.com/cloudwego/shmipc-go"
)

func main() {
...
conf := shmipc.DefaultSessionManagerConfig()
conf.Monitor = shmipcprometheus.NewPrometheusMonitor(":9094", "/metrics")
smgr, _ := shmipc.NewSessionManager(conf)
...
}
```

## How to use with shmipc-go client?

**[example/shmipc_client/main.go](example/shmipc_client/main.go)**

```go
package main

import (
shmipcprometheus "github.com/cloudwego-contrib/shmipc-prometheus"
"github.com/cloudwego/shmipc-go"
)

func main() {
...
conf := shmipc.DefaultConfig()
conf.Monitor = shmipcprometheus.NewPrometheusMonitor(":9095", "/metrics")
server, err := shmipc.Server(conn, conf)
...
}
```

## example

**[example for shmipc-prometheus](example)**

37 changes: 37 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Prometheus monitoring for shmipc-go

## Usage Example

### Server

See [server](./shmipc_server)

### Client

See [client](./shmipc_client)

## HOW-TO-RUN

1. install docker and start docker
2. change `{{ INET_IP }}` to local ip in prometheus.yml
3. run Prometheus and Grafana
`docker-compose up`
4. run shmipc_client and shmipc_server
`sh run_shmipc_client_server.sh`
5. visit `http://localhost:3000`, the account password is `admin` by default
6. configure Prometheus data sources
1. `Configuration`
2. `Data Source`
3. `Add data source`
4. Select `Prometheus` and fill the URL with `http://prometheus:9090`
5. click `Save & Test` after configuration to test if it works
7. add dashboard `Create` -> `dashboard`, add monitoring metrics such as shmipc-client all in used share memory in bytes and shmipc-server active stream count according to your needs,
for example:

- shmipc-client all in used share memory in bytes

`all_in_used_share_memory_in_bytes{job="shmipc-client"}`

- shmipc-server active stream count

`active_stream_count{job="shmipc-server"}`
25 changes: 25 additions & 0 deletions example/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "2"
services:
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
# - ./alert.rules:/etc/prometheus/alert.rules
- prometheus_data:/prometheus
command:
- "--config.file=/etc/prometheus/prometheus.yml"
ports:
- "9090:9090"
grafana:
image: grafana/grafana:latest
volumes:
- grafana_data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
depends_on:
- prometheus
ports:
- "3000:3000"
volumes:
grafana_data: { }
prometheus_data: { }
37 changes: 37 additions & 0 deletions example/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: "shmipc-server"

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

scrape_interval: 1s
static_configs:
- targets: [ "{{ INET_IP }}:9095" ]

- job_name: "shmipc-client"

scrape_interval: 1s
static_configs:
- targets: [ "{{ INET_IP }}:9094" ]

22 changes: 22 additions & 0 deletions example/run_shmipc_client_server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

cd shmipc_server
go build
cd ../shmipc_client
go build

cd ../shmipc_server
./shmipc_server &
SERVER_PID=$!
echo "server pid is $SERVER_PID"
sleep 1s

cd ../shmipc_client
./shmipc_client &
CLIENT_PID=$!
echo "client pid is $CLIENT_PID"

trap 'echo "exiting, now kill client and server";kill $CLIENT_PID;kill $SERVER_PID' SIGHUP SIGINT SIGQUIT SIGALRM SIGTERM
cd ../

sleep 1000s
131 changes: 131 additions & 0 deletions example/shmipc_client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2023 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"flag"
"fmt"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"path/filepath"
"runtime"
"sync/atomic"
"time"

shmipcprometheus "github.com/cloudwego-contrib/shmipc-prometheus"
"github.com/cloudwego/shmipc-go"
"github.com/cloudwego/shmipc-go/example/best_practice/idl"
)

var count uint64

func init() {
go func() {
lastCount := count
for range time.Tick(time.Second) {
curCount := atomic.LoadUint64(&count)
fmt.Println("shmipc_client qps:", curCount-lastCount)
lastCount = curCount
}
}()

go func() {
http.ListenAndServe(":20001", nil) //nolint:errcheck
}()

runtime.GOMAXPROCS(1)
}

func main() {
packageSize := flag.Int("p", 1024, "-p 1024 mean that request and response's size are both near 1KB")
flag.Parse()

randContent := make([]byte, *packageSize)
rand.Read(randContent)

// 1. get current directory
dir, err := os.Getwd()
if err != nil {
panic(err)
}

// 2. init session manager
conf := shmipc.DefaultSessionManagerConfig()
conf.Address = filepath.Join(dir, "../ipc_test.sock")
conf.Network = "unix"
conf.MemMapType = shmipc.MemMapTypeMemFd
conf.SessionNum = 1
conf.InitializeTimeout = 100 * time.Second
conf.Monitor = shmipcprometheus.NewPrometheusMonitor(":9094", "/metrics")
smgr, err := shmipc.NewSessionManager(conf)
if err != nil {
panic(err)
}

concurrency := 500
qps := 50000000

for i := 0; i < concurrency; i++ {
go func() {
req := &idl.Request{}
resp := &idl.Response{}
n := qps / concurrency

for range time.Tick(time.Second) {
// 3. get stream from SessionManager
stream, err := smgr.GetStream()
if err != nil {
fmt.Println("get stream error:" + err.Error())
continue
}

for k := 0; k < n; k++ {
// 4. set request object
req.Reset()
req.ID = uint64(time.Now().UnixNano())
req.Name = "xxx"
req.Key = randContent

// 5. write req to buffer of stream
if err := req.WriteToShm(stream.BufferWriter()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
return
}

// 6. flush the buffered data of stream to peer
if err := stream.Flush(false); err != nil {
fmt.Println(" flush request to peer failed, err=" + err.Error())
return
}

// 7. wait and read response
resp.Reset()
if err := resp.ReadFromShm(stream.BufferReader()); err != nil {
fmt.Println("write request to share memory failed, err=" + err.Error())
continue
}

atomic.AddUint64(&count, 1)
}
}
}()
}

time.Sleep(1200 * time.Second)
}
Loading