Skip to content

Commit

Permalink
update modbus mapper
Browse files Browse the repository at this point in the history
Signed-off-by: wbc6080 <[email protected]>
  • Loading branch information
wbc6080 committed Oct 30, 2023
1 parent 60ca411 commit 8eaef1a
Show file tree
Hide file tree
Showing 49 changed files with 1,082 additions and 707 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/device"
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/config"
"github.com/kubeedge/virtualdevice/pkg/grpcserver"
"github.com/kubeedge/virtualdevice/pkg/httpserver"
"github.com/kubeedge/virtualdevice/pkg/util/grpcclient"
"github.com/kubeedge/virtualdevice/pkg/util/parse"
"github.com/kubeedge/modbus/device"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/config"
"github.com/kubeedge/modbus/pkg/grpcserver"
"github.com/kubeedge/modbus/pkg/httpserver"
"github.com/kubeedge/modbus/pkg/util/grpcclient"
"github.com/kubeedge/modbus/pkg/util/parse"
)

func main() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
grpc_server:
socket_path: /etc/kubeedge/virtualdevice.sock
socket_path: /etc/kubeedge/modbus.sock
common:
name: Virtualdevice-mapper
name: Modbus-mapper
version: v1.13.0
api_version: v1.0.0
protocol: virtualProtocol # replace by your protocol name
protocol: modbus # TODO add your protocol name
address: 127.0.0.1
edgecore_sock: /etc/kubeedge/dmi.sock
dev_init:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"k8s.io/klog/v2"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/modbus/pkg/common"
)

type DataBaseConfig struct {
Expand Down Expand Up @@ -60,6 +60,7 @@ func (d *DataBaseConfig) CloseSession(client influxdb2.Client) {
}

func (d *DataBaseConfig) AddData(data *common.DataModel, client influxdb2.Client) error {
// write device data to influx database
writeAPI := client.WriteAPIBlocking(d.Influxdb2ClientConfig.Org, d.Influxdb2ClientConfig.Bucket)
p := influxdb2.NewPoint(d.Influxdb2DataConfig.Measurement,
d.Influxdb2DataConfig.Tag,
Expand Down
48 changes: 48 additions & 0 deletions mappers/v1beta1-mapper/modbus/data/dbmethod/openGemini/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package opengemini

import (
"encoding/json"
_ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod
client "github.com/influxdata/influxdb1-client/v2"
"github.com/kubeedge/modbus/pkg/common"
)

type DataBaseConfig struct {
OpengeminiClientConfig *OpengeminiClientConfig `json:"opengeminiClientConfig,omitempty"`
OpengeminiDataConfig *OpengeminiDataConfig `json:"opengeminiDataConfig,omitempty"`
}

type OpengeminiClientConfig struct {
URL string `json:"url,omitempty"`
Database string `json:"database,omitempty"`
RetentionPolicy string `json:"retentionPolicy,omitempty"`
}

type OpengeminiDataConfig struct {
Measurement string `json:"measurement,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
FieldKey string `json:"fieldKey,omitempty"`
}

func NewDataBaseClient(clientConfig json.RawMessage, dataConfig json.RawMessage) (*DataBaseConfig, error) {
// TODO parse opengemini database config data

return &DataBaseConfig{}, nil
}

func (d *DataBaseConfig) InitDbClient() (client.Client, error) {
// TODO add opengemini database initialization code

conf := client.HTTPConfig{}
return client.NewHTTPClient(conf)
}

func (d *DataBaseConfig) CloseSession(cli client.Client) error {
// TODO add opengemini database close code
return nil
}

func (d *DataBaseConfig) AddData(data *common.DataModel, cli client.Client) error {
// TODO add opengemini database data push code
return nil
}
48 changes: 48 additions & 0 deletions mappers/v1beta1-mapper/modbus/data/dbmethod/redis/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package redis

import (
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/global"
)

type DataBaseConfig struct {
}

func NewDataBaseClient() (global.DataBaseClient, error) {
return &DataBaseConfig{}, nil
}

func (d *DataBaseConfig) InitDbClient() error {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) CloseSession() {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) AddData(data *common.DataModel) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByDeviceName(deviceName string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetPropertyDataByDeviceName(deviceName string, propertyData string) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) GetDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}

func (d *DataBaseConfig) DeleteDataByTimeRange(start int64, end int64) ([]*common.DataModel, error) {
//TODO implement me
panic("implement me")
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/global"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/global"
)

type PushMethod struct {
Expand All @@ -38,18 +38,21 @@ func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
}

func (pm *PushMethod) InitPushMethod() error {
fmt.Println("Init Http")
klog.V(1).Info("Init HTTP")
return nil
}

func (pm *PushMethod) Push(data *common.DataModel) {
klog.V(2).Info("Publish device data by HTTP")

targetUrl := pm.HTTP.HostName + ":" + strconv.Itoa(pm.HTTP.Port) + pm.HTTP.RequestPath
klog.V(1).Infof("targetUrl = %s", targetUrl)
payload := data.PropertyName + "=" + data.Value
formatTimeStr := time.Unix(data.TimeStamp/1e3, 0).Format("2006-01-02 15:04:05")
currentTime := "&time" + "=" + formatTimeStr
payload += currentTime

klog.V(3).Infof("Publish %v to %s", payload, targetUrl)

resp, err := http.Post(targetUrl,
"application/x-www-form-urlencoded",
strings.NewReader(payload))
Expand All @@ -61,7 +64,10 @@ func (pm *PushMethod) Push(data *common.DataModel) {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
// handle error
klog.Errorf("Publish device data by HTTP failed, err = %v", err)
return
}
klog.V(1).Info(string(body))
klog.V(1).Info("############### Message published. ###############")
klog.V(3).Infof("HTTP reviced %s", string(body))

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"k8s.io/klog/v2"

"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/global"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/global"
)

type PushMethod struct {
Expand All @@ -36,7 +36,7 @@ func NewDataPanel(config json.RawMessage) (global.DataPanel, error) {
}

func (pm *PushMethod) InitPushMethod() error {
fmt.Println("Init Mqtt")
klog.V(1).Info("Init MQTT")
return nil
}

Expand All @@ -59,5 +59,5 @@ func (pm *PushMethod) Push(data *common.DataModel) {
token.Wait()

client.Disconnect(250)
klog.V(1).Info("############### Message published. ###############")
klog.V(2).Info("############### Message published. ###############")
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"

"os"
"os/signal"
"strings"
Expand All @@ -13,14 +14,15 @@ import (

"k8s.io/klog/v2"

dbInflux "github.com/kubeedge/virtualdevice/data/dbmethod/influxdb2"
httpMethod "github.com/kubeedge/virtualdevice/data/publish/http"
mqttMethod "github.com/kubeedge/virtualdevice/data/publish/mqtt"
"github.com/kubeedge/virtualdevice/driver"
"github.com/kubeedge/virtualdevice/pkg/common"
"github.com/kubeedge/virtualdevice/pkg/config"
"github.com/kubeedge/virtualdevice/pkg/global"
"github.com/kubeedge/virtualdevice/pkg/util/parse"
dbInflux "github.com/kubeedge/modbus/data/dbmethod/influxdb2"
dbOpenGemini "github.com/kubeedge/modbus/data/dbmethod/openGemini"
httpMethod "github.com/kubeedge/modbus/data/publish/http"
mqttMethod "github.com/kubeedge/modbus/data/publish/mqtt"
"github.com/kubeedge/modbus/driver"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/config"
"github.com/kubeedge/modbus/pkg/global"
"github.com/kubeedge/modbus/pkg/util/parse"
)

type DevPanel struct {
Expand Down Expand Up @@ -109,7 +111,6 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
var visitorConfig driver.VisitorConfig

err := json.Unmarshal(twin.Property.Visitors, &visitorConfig)
visitorConfig.VisitorConfigData.DataType = strings.ToLower(visitorConfig.VisitorConfigData.DataType)
if err != nil {
klog.Errorf("Unmarshal VisitorConfig error: %v", err)
continue
Expand All @@ -132,19 +133,12 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
}
go twinData.Run(ctx)
// handle push method
testconfig := make(map[string]interface{})
err = json.Unmarshal(twin.Property.PushMethod.MethodConfig, &testconfig)
if err == nil {
klog.V(1).Infof("twin.Property.PushMethod.MethodConfig = %v", testconfig)
} else {
klog.Error(err)
}
// todo need to consider if PushMethod == nil
if twin.Property.PushMethod.MethodConfig != nil && twin.Property.PushMethod.MethodName != "" {
dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type))
pushHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
}
// handle database

if twin.Property.PushMethod.DBMethod.DBMethodName != "" {
dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type))
dbHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
Expand All @@ -156,18 +150,20 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
func pushHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
var dataPanel global.DataPanel
var err error
// initialization dataPanel
switch twin.Property.PushMethod.MethodName {
case "http":
dataPanel, err = httpMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig)
case "mqtt":
dataPanel, err = mqttMethod.NewDataPanel(twin.Property.PushMethod.MethodConfig)
default:
err = errors.New("Custom protocols are not currently supported")
err = errors.New("custom protocols are not currently supported when push data")
}
if err != nil {
klog.Errorf("new data panel error: %v", err)
return
}
// initialization PushMethod
err = dataPanel.InitPushMethod()
if err != nil {
klog.Errorf("init publish method err: %v", err)
Expand Down Expand Up @@ -205,6 +201,7 @@ func pushHandler(ctx context.Context, twin *common.Twin, client *driver.Customiz
// dbHandler start db client to save data
func dbHandler(ctx context.Context, twin *common.Twin, client *driver.CustomizedClient, visitorConfig *driver.VisitorConfig, dataModel *common.DataModel) {
switch twin.Property.PushMethod.DBMethod.DBMethodName {
// TODO add more database
case "influx":
dbConfig, err := dbInflux.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2ClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.Influxdb2DataConfig)
if err != nil {
Expand Down Expand Up @@ -249,13 +246,57 @@ func dbHandler(ctx context.Context, twin *common.Twin, client *driver.Customized
}
}
}()
case "openGemini":
dbConfig, err := dbOpenGemini.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.OpenGeminiClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.OpenGeminiDataConfig)
if err != nil {
klog.Errorf("new openGemini database client error: %v", err)
return
}
dbClient, err := dbConfig.InitDbClient()
if err != nil {
klog.Errorf("init openGemini database client err: %v", err)
return
}
reportCycle := time.Duration(twin.Property.ReportCycle)
if reportCycle == 0 {
reportCycle = 1 * time.Second
}
ticker := time.NewTicker(reportCycle)
go func() {
for {
select {
case <-ticker.C:
deviceData, err := client.GetDeviceData(visitorConfig)
if err != nil {
klog.Errorf("publish error: %v", err)
continue
}
sData, err := common.ConvertToString(deviceData)
if err != nil {
klog.Errorf("Failed to convert publish method data : %v", err)
continue
}
dataModel.SetValue(sData)
dataModel.SetTimeStamp()

err = dbConfig.AddData(dataModel, dbClient)
if err != nil {
klog.Errorf("openGemini database add data error: %v", err)
return
}
case <-ctx.Done():
dbConfig.CloseSession(dbClient)
return
}
}
}()
}
}

// setVisitor check if visitor property is readonly, if not then set it.
func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *driver.CustomizedDev) error {
if twin.Property.PProperty.AccessMode == "ReadOnly" {
klog.V(1).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName)
klog.V(3).Infof("%s twin readonly property: %s", dev.Instance.Name, twin.PropertyName)
return nil
}
klog.V(2).Infof("Convert type: %s, value: %s ", twin.Property.PProperty.DataType, twin.ObservedDesired.Value)
Expand All @@ -264,7 +305,7 @@ func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *dri
klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err)
return err
}
err = dev.CustomizedClient.SetDeviceData(value, visitorConfig)
_, err = dev.CustomizedClient.SetDeviceData(value, visitorConfig)
if err != nil {
return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err)
}
Expand Down
Loading

0 comments on commit 8eaef1a

Please sign in to comment.