Skip to content

Commit

Permalink
cdc with ts
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Sep 26, 2023
1 parent 832468c commit 5435f02
Show file tree
Hide file tree
Showing 81 changed files with 6,634 additions and 5,147 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
**/__pycache__/*
**/.pytest_cache/*
server/cdc
**/milvus-cdc-build/*
**/milvus-cdc-build/*
*.log
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ test-go:
static-check:
@echo "Running go-lint check:"
@(env bash $(PWD)/scripts/run_go_lint.sh)

# TODO use the array to generate the name list
generate-mockery:
@echo "Generating mockery server mocks..."
@cd "$(PWD)/server"; mockery -r --name "CDCService|CDCFactory|MetaStore|MetaStoreFactory" --output ./mocks --case snake --with-expecter
@cd "$(PWD)/core"; mockery -r --name "CDCReader|CDCWriter|FactoryCreator|Monitor|WriteCallback|MilvusClientFactory|MilvusClientAPI|ChannelManager|TargetAPI|MetaOp" --output ./mocks --case snake --with-expecter
161 changes: 161 additions & 0 deletions core/api/data_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package api

import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-sdk-go/v2/entity"
"github.com/milvus-io/milvus/pkg/log"
)

type DataHandler interface {
CreateCollection(ctx context.Context, param *CreateCollectionParam) error
DropCollection(ctx context.Context, param *DropCollectionParam) error
Insert(ctx context.Context, param *InsertParam) error
Delete(ctx context.Context, param *DeleteParam) error
CreatePartition(ctx context.Context, param *CreatePartitionParam) error
DropPartition(ctx context.Context, param *DropPartitionParam) error

CreateIndex(ctx context.Context, param *CreateIndexParam) error
DropIndex(ctx context.Context, param *DropIndexParam) error
LoadCollection(ctx context.Context, param *LoadCollectionParam) error
ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error
CreateDatabase(ctx context.Context, param *CreateDataBaseParam) error
DropDatabase(ctx context.Context, param *DropDataBaseParam) error

ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error
// NOTE: please add the implements for the DataHandlerWrapper class when adding new interfaces
}

type DefaultDataHandler struct{}

func (d *DefaultDataHandler) CreateCollection(ctx context.Context, param *CreateCollectionParam) error {
log.Warn("CreateCollection is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) DropCollection(ctx context.Context, param *DropCollectionParam) error {
log.Warn("DropCollection is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) Insert(ctx context.Context, param *InsertParam) error {
log.Warn("Insert is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) Delete(ctx context.Context, param *DeleteParam) error {
log.Warn("Delete is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) CreatePartition(ctx context.Context, param *CreatePartitionParam) error {
log.Warn("CreatePartition is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) DropPartition(ctx context.Context, param *DropPartitionParam) error {
log.Warn("DropPartition is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) CreateIndex(ctx context.Context, param *CreateIndexParam) error {
log.Warn("CreateIndex is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) DropIndex(ctx context.Context, param *DropIndexParam) error {
log.Warn("DropIndex is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) LoadCollection(ctx context.Context, param *LoadCollectionParam) error {
log.Warn("LoadCollection is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) ReleaseCollection(ctx context.Context, param *ReleaseCollectionParam) error {
log.Warn("ReleaseCollection is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) CreateDatabase(ctx context.Context, param *CreateDataBaseParam) error {
log.Warn("CreateDatabase is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) DropDatabase(ctx context.Context, param *DropDataBaseParam) error {
log.Warn("DropDatabase is not implemented, please check it")
return nil
}

func (d *DefaultDataHandler) ReplicateMessage(ctx context.Context, param *ReplicateMessageParam) error {
log.Warn("Replicate is not implemented, please check it")
return nil
}

type CreateCollectionParam struct {
Schema *entity.Schema
ShardsNum int32
ConsistencyLevel commonpb.ConsistencyLevel
Properties []*commonpb.KeyValuePair
}

type DropCollectionParam struct {
CollectionName string
}

type InsertParam struct {
CollectionName string
PartitionName string
Columns []entity.Column
}

type DeleteParam struct {
CollectionName string
PartitionName string
Column entity.Column
}

type CreatePartitionParam struct {
CollectionName string
PartitionName string
}

type DropPartitionParam struct {
CollectionName string
PartitionName string
}

type CreateIndexParam struct {
milvuspb.CreateIndexRequest
}

type DropIndexParam struct {
milvuspb.DropIndexRequest
}

type LoadCollectionParam struct {
milvuspb.LoadCollectionRequest
}

type ReleaseCollectionParam struct {
milvuspb.ReleaseCollectionRequest
}

type CreateDataBaseParam struct {
milvuspb.CreateDatabaseRequest
}

type DropDataBaseParam struct {
milvuspb.DropDatabaseRequest
}

type ReplicateMessageParam struct {
ChannelName string
BeginTs, EndTs uint64
MsgsBytes [][]byte
StartPositions, EndPositions []*msgpb.MsgPosition
}
12 changes: 12 additions & 0 deletions core/api/message_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package api

type MessageManager interface {
ReplicateMessage(message *ReplicateMessage)
Close(channelName string)
}

type ReplicateMessage struct {
Param *ReplicateMessageParam
SuccessFunc func(param *ReplicateMessageParam)
FailFunc func(param *ReplicateMessageParam, err error)
}
39 changes: 39 additions & 0 deletions core/api/meta_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package api

import (
"context"

"github.com/zilliztech/milvus-cdc/core/pb"
)

// MetaOp meta operation
type MetaOp interface {
// WatchCollection its implementation should make sure it's only called once. The WatchPartition is same
WatchCollection(ctx context.Context, filter CollectionFilter)
WatchPartition(ctx context.Context, filter PartitionFilter)

// SubscribeCollectionEvent an event only is consumed once. The SubscribePartitionEvent is same
// TODO need to consider the many target, maybe try the method a meta op corresponds to a target
SubscribeCollectionEvent(taskID string, consumer CollectionEventConsumer)
SubscribePartitionEvent(taskID string, consumer PartitionEventConsumer)
UnsubscribeEvent(taskID string, eventType WatchEventType)

GetAllCollection(ctx context.Context, filter CollectionFilter) ([]*pb.CollectionInfo, error)
GetCollectionNameByID(ctx context.Context, id int64) string
}

// CollectionFilter the filter will be used before the collection is filled the schema info
type CollectionFilter func(*pb.CollectionInfo) bool

type PartitionFilter func(info *pb.PartitionInfo) bool

type CollectionEventConsumer CollectionFilter

type PartitionEventConsumer PartitionFilter

type WatchEventType int

const (
CollectionEventType WatchEventType = iota + 1
PartitionEventType
)
26 changes: 26 additions & 0 deletions core/api/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package api

import (
"context"

"github.com/milvus-io/milvus/pkg/log"
)

type Reader interface {
StartRead(ctx context.Context)
QuitRead(ctx context.Context)
GetChannelChan() <-chan string
}

// DefaultReader All CDCReader implements should combine it
type DefaultReader struct{}

// StartRead the return value is nil,
// and if you receive the data from the nil chan, will block forever, not panic
func (d *DefaultReader) StartRead(ctx context.Context) {
log.Warn("StartRead is not implemented, please check it")
}

func (d *DefaultReader) QuitRead(ctx context.Context) {
log.Warn("QuitRead is not implemented, please check it")
}
41 changes: 41 additions & 0 deletions core/api/replicate_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package api

import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"

"github.com/zilliztech/milvus-cdc/core/model"
"github.com/zilliztech/milvus-cdc/core/pb"
)

// ChannelManager a target must promise a manager
type ChannelManager interface {
StartReadCollection(ctx context.Context, info *pb.CollectionInfo, seekPositions []*msgpb.MsgPosition) error

StopReadCollection(ctx context.Context, info *pb.CollectionInfo) error

GetChannelChan() <-chan string
GetMsgChan(pChannel string) <-chan *msgstream.MsgPack
GetEventChan() <-chan *ReplicateAPIEvent
}

type TargetAPI interface {
GetCollectionInfo(ctx context.Context, collectionName string) (*model.CollectionInfo, error)
}

type ReplicateAPIEvent struct {
EventType ReplicateAPIEventType
CollectionInfo *pb.CollectionInfo
PartitionInfo *pb.PartitionInfo
}

type ReplicateAPIEventType int

const (
ReplicateCreateCollection = iota + 1
ReplicateDropCollection
ReplicateCreatePartition
ReplicateDropPartition
)
13 changes: 13 additions & 0 deletions core/api/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package api

import (
"context"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
)

type Writer interface {
HandleReplicateAPIEvent(ctx context.Context, apiEvent *ReplicateAPIEvent) error
HandleReplicateMessage(ctx context.Context, channelName string, msgPack *msgstream.MsgPack) (*commonpb.KeyDataPair, error)
}
5 changes: 5 additions & 0 deletions core/config/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func NewParamGroup() paramtable.ParamGroup {
return group
}

type MQConfig struct {
Pulsar PulsarConfig
Kafka KafkaConfig
}

type KafkaConfig struct {
Address string
}
Expand Down
11 changes: 5 additions & 6 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ go 1.18

require (
github.com/cockroachdb/errors v1.9.1
github.com/goccy/go-json v0.10.2
github.com/golang/protobuf v1.5.3
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.1-0.20230911111453-720fcfb1a048
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.2-0.20230919092633-6ef446ad2aab
github.com/milvus-io/milvus-sdk-go/v2 v2.2.1-0.20230814034926-dd5a31f64225
github.com/milvus-io/milvus/pkg v0.0.2-0.20230823021022-7af0f7d90cee
github.com/samber/lo v1.27.0
github.com/stretchr/testify v1.8.3
go.etcd.io/etcd/api/v3 v3.5.5
go.etcd.io/etcd/client/v3 v3.5.5
go.uber.org/zap v1.20.0
sigs.k8s.io/yaml v1.2.0
)

require (
Expand Down Expand Up @@ -104,6 +103,7 @@ require (
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.etcd.io/bbolt v1.3.6 // indirect
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
go.etcd.io/etcd/client/v2 v2.305.5 // indirect
go.etcd.io/etcd/pkg/v3 v3.5.5 // indirect
Expand Down Expand Up @@ -137,13 +137,12 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
)

replace (
github.com/apache/pulsar-client-go => github.com/milvus-io/pulsar-client-go v0.6.10
github.com/milvus-io/milvus-sdk-go/v2 => github.com/SimFG/milvus-sdk-go/v2 v2.0.0-20230918025012-e4fb30541113
github.com/milvus-io/milvus/pkg => github.com/SimFG/milvus/pkg v0.0.0-20230915085959-b1bd79e12920
github.com/milvus-io/milvus-sdk-go/v2 => github.com/SimFG/milvus-sdk-go/v2 v2.0.0-20230919094145-06acf1ab753c
github.com/milvus-io/milvus/pkg => github.com/SimFG/milvus/pkg v0.0.0-20230925083123-8c1cd0c4b615
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => ./../rocksdb
)
Expand Down
Loading

0 comments on commit 5435f02

Please sign in to comment.