Skip to content

Commit

Permalink
update mapper
Browse files Browse the repository at this point in the history
Signed-off-by: wbc6080 <[email protected]>
  • Loading branch information
wbc6080 committed Jan 30, 2024
1 parent 5c7881c commit 4f15256
Show file tree
Hide file tree
Showing 42 changed files with 258 additions and 719 deletions.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"github.com/kubeedge/modbus/device"
"github.com/kubeedge/modbus/pkg/common"
"github.com/kubeedge/modbus/pkg/config"
"github.com/kubeedge/modbus/pkg/grpcclient"
"github.com/kubeedge/modbus/pkg/grpcserver"
"github.com/kubeedge/modbus/pkg/httpserver"
"github.com/kubeedge/modbus/pkg/util/grpcclient"
"github.com/kubeedge/modbus/pkg/util/parse"
)

Expand Down Expand Up @@ -50,7 +50,7 @@ func main() {
// if dev init mode is register, mapper's dev will init when registry to edgecore
if c.DevInit.Mode != common.DevInitModeRegister {
klog.Infoln("======dev init mode is not register, will register to edgecore")
if _, _, err = grpcclient.RegisterMapper(&c, false); err != nil {
if _, _, err = grpcclient.RegisterMapper(false); err != nil {
klog.Fatal(err)
}
klog.Infoln("registerMapper finished")
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"

"os"
"os/signal"
"strings"
Expand All @@ -15,7 +14,6 @@ import (
"k8s.io/klog/v2"

dbInflux "github.com/kubeedge/modbus/data/dbmethod/influxdb2"
dbOpenGemini "github.com/kubeedge/modbus/data/dbmethod/openGemini"
httpMethod "github.com/kubeedge/modbus/data/publish/http"
mqttMethod "github.com/kubeedge/modbus/data/publish/mqtt"
"github.com/kubeedge/modbus/driver"
Expand Down Expand Up @@ -111,6 +109,7 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
var visitorConfig driver.VisitorConfig

err := json.Unmarshal(twin.Property.Visitors, &visitorConfig)
visitorConfig.VisitorConfigData.DataType = strings.ToLower(visitorConfig.VisitorConfigData.DataType)
if err != nil {
klog.Errorf("Unmarshal VisitorConfig error: %v", err)
continue
Expand All @@ -130,10 +129,10 @@ func dataHandler(ctx context.Context, dev *driver.CustomizedDev) {
VisitorConfig: &visitorConfig,
Topic: fmt.Sprintf(common.TopicTwinUpdate, dev.Instance.ID),
CollectCycle: time.Duration(twin.Property.CollectCycle),
ReportToCloud: twin.Property.ReportToCloud,
}
go twinData.Run(ctx)
// handle push method
// todo need to consider if PushMethod == nil
if twin.Property.PushMethod.MethodConfig != nil && twin.Property.PushMethod.MethodName != "" {
dataModel := common.NewDataModel(dev.Instance.Name, twin.Property.PropertyName, common.WithType(twin.ObservedDesired.Metadata.Type))
pushHandler(ctx, &twin, dev.CustomizedClient, &visitorConfig, dataModel)
Expand Down Expand Up @@ -246,50 +245,6 @@ func dbHandler(ctx context.Context, twin *common.Twin, client *driver.Customized
}
}
}()
case "openGemini":
dbConfig, err := dbOpenGemini.NewDataBaseClient(twin.Property.PushMethod.DBMethod.DBConfig.OpenGeminiClientConfig, twin.Property.PushMethod.DBMethod.DBConfig.OpenGeminiDataConfig)
if err != nil {
klog.Errorf("new openGemini database client error: %v", err)
return
}
dbClient, err := dbConfig.InitDbClient()
if err != nil {
klog.Errorf("init openGemini database client err: %v", err)
return
}
reportCycle := time.Duration(twin.Property.ReportCycle)
if reportCycle == 0 {
reportCycle = 1 * time.Second
}
ticker := time.NewTicker(reportCycle)
go func() {
for {
select {
case <-ticker.C:
deviceData, err := client.GetDeviceData(visitorConfig)
if err != nil {
klog.Errorf("publish error: %v", err)
continue
}
sData, err := common.ConvertToString(deviceData)
if err != nil {
klog.Errorf("Failed to convert publish method data : %v", err)
continue
}
dataModel.SetValue(sData)
dataModel.SetTimeStamp()

err = dbConfig.AddData(dataModel, dbClient)
if err != nil {
klog.Errorf("openGemini database add data error: %v", err)
return
}
case <-ctx.Done():
dbConfig.CloseSession(dbClient)
return
}
}
}()
}
}

Expand All @@ -305,7 +260,7 @@ func setVisitor(visitorConfig *driver.VisitorConfig, twin *common.Twin, dev *dri
klog.Errorf("Failed to convert value as %s : %v", twin.Property.PProperty.DataType, err)
return err
}
_, err = dev.CustomizedClient.SetDeviceData(value, visitorConfig)
err = dev.CustomizedClient.SetDeviceData(value, visitorConfig)
if err != nil {
return fmt.Errorf("%s set device data error: %v", twin.PropertyName, err)
}
Expand All @@ -322,7 +277,7 @@ func (d *DevPanel) DevInit(cfg *config.Config) error {
// return err
// }
case common.DevInitModeRegister:
if err := parse.ParseByUsingRegister(cfg, devs, d.models, d.protocols); err != nil {
if err := parse.ParseByUsingRegister(devs, d.models, d.protocols); err != nil {
return err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/kubeedge/modbus/driver"
"github.com/kubeedge/modbus/pkg/common"
dmiapi "github.com/kubeedge/modbus/pkg/dmi-api"
"github.com/kubeedge/modbus/pkg/util/grpcclient"
"github.com/kubeedge/modbus/pkg/grpcclient"
"github.com/kubeedge/modbus/pkg/util/parse"
)

Expand All @@ -26,11 +26,12 @@ type TwinData struct {
Topic string
Results interface{}
CollectCycle time.Duration
ReportToCloud bool
}

func (td *TwinData) GetPayLoad() ([]byte, error) {
var err error
//td.VisitorConfig.VisitorConfigData.DataType = strings.ToLower(td.VisitorConfig.VisitorConfigData.DataType)
td.VisitorConfig.VisitorConfigData.DataType = strings.ToLower(td.VisitorConfig.VisitorConfigData.DataType)
td.Results, err = td.Client.GetDeviceData(td.VisitorConfig)
if err != nil {
return nil, fmt.Errorf("get device data failed: %v", err)
Expand Down Expand Up @@ -87,6 +88,9 @@ func (td *TwinData) PushToEdgeCore() {
}

func (td *TwinData) Run(ctx context.Context) {
if !td.ReportToCloud {
return
}
if td.CollectCycle == 0 {
td.CollectCycle = 1 * time.Second
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package driver

import (
"github.com/kubeedge/modbus/pkg/common"
"github.com/sailorvii/modbus"
"sync"
"time"

"github.com/sailorvii/modbus"

"github.com/kubeedge/modbus/pkg/common"
)

// CustomizedDev is the customized device configuration and client information.
Expand Down Expand Up @@ -54,6 +56,7 @@ type VisitorConfig struct {

type VisitorConfigData struct {
// TODO: add your visitor config data
DataType string `json:"dataType"`
Register string `json:"register"`
Offset uint16 `json:"offset"`
Limit int `json:"limit"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package driver

import (
"errors"
"k8s.io/klog/v2"
"sync"
"time"

"github.com/sailorvii/modbus"
"k8s.io/klog/v2"
)

var clients *sync.Map
Expand Down Expand Up @@ -89,7 +89,7 @@ func (c *CustomizedClient) GetDeviceData(visitor *VisitorConfig) (interface{}, e
return results, err
}

func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfig) (interface{}, error) {
func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfig) error {
// TODO: set device's data
// you can use c.ProtocolConfig and visitor
var results []byte
Expand All @@ -109,16 +109,16 @@ func (c *CustomizedClient) SetDeviceData(data interface{}, visitor *VisitorConfi
case 1:
valueSet = 0xFF00
default:
return nil, errors.New("Wrong value")
return errors.New("Wrong value")
}
results, err = c.ModbusClient.WriteSingleCoil(visitor.Offset, valueSet)
case "HoldingRegister":
results, err = c.ModbusClient.WriteSingleRegister(visitor.Offset, uint16(visitor.Limit))
default:
return nil, errors.New("Bad register type")
return errors.New("Bad register type")
}
klog.V(1).Info("Set result:", err, results)
return results, err
return nil
}

func (c *CustomizedClient) StopDevice() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
require (
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/influxdata/influxdb-client-go/v2 v2.12.3
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c
)

require (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/influxdata/influxdb-client-go/v2 v2.12.3 h1:28nRlNMRIV4QbtIUvxhWqaxn0IpXeMSkY/uJa/O/vC4=
github.com/influxdata/influxdb-client-go/v2 v2.12.3/go.mod h1:IrrLUbCjjfkmRuaCiGQg4m2GbkaeJDcuWoxiWdQEbA0=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs=
github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ type DBMethodConfig struct {
}

type DBConfig struct {
Influxdb2ClientConfig json.RawMessage `json:"influxdb2ClientConfig"`
Influxdb2DataConfig json.RawMessage `json:"influxdb2DataConfig"`
RedisConfigData json.RawMessage `json:"redisConfigData"`
OpenGeminiClientConfig json.RawMessage `json:"openGeminiClientConfig"`
OpenGeminiDataConfig json.RawMessage `json:"openGeminiDataConfig"`
Influxdb2ClientConfig json.RawMessage `json:"influxdb2ClientConfig"`
Influxdb2DataConfig json.RawMessage `json:"influxdb2DataConfig"`
RedisConfigData json.RawMessage `json:"redisConfigData"`
}

// Metadata is the metadata for data.
Expand Down
Loading

0 comments on commit 4f15256

Please sign in to comment.