Skip to content

Commit

Permalink
enhance: support single pchannel level transaction (milvus-io#35289)
Browse files Browse the repository at this point in the history
issue: milvus-io#33285

- support transaction on single wal.
- last confirmed message id can still be used when enable transaction.
- add fence operation for segment allocation interceptor.

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Aug 19, 2024
1 parent 853d2f3 commit 4d69898
Show file tree
Hide file tree
Showing 98 changed files with 5,283 additions and 803 deletions.
4 changes: 2 additions & 2 deletions cmd/components/streaming_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ type StreamingNode struct {
}

// NewStreamingNode creates a new StreamingNode
func NewStreamingNode(_ context.Context, _ dependency.Factory) (*StreamingNode, error) {
svr, err := streamingnode.NewServer()
func NewStreamingNode(_ context.Context, factory dependency.Factory) (*StreamingNode, error) {
svr, err := streamingnode.NewServer(factory)
if err != nil {
return nil, err
}
Expand Down
15 changes: 11 additions & 4 deletions internal/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ packages:
github.com/milvus-io/milvus/internal/distributed/streaming:
interfaces:
WALAccesser:
Utility:
github.com/milvus-io/milvus/internal/streamingcoord/server/balancer:
interfaces:
Balancer:
github.com/milvus-io/milvus/internal/streamingnode/client/manager:
interfaces:
ManagerClient:
github.com/milvus-io/milvus/internal/streamingcoord/client:
interfaces:
Client:
github.com/milvus-io/milvus/internal/streamingnode/client/handler:
interfaces:
HandlerClient:
github.com/milvus-io/milvus/internal/streamingnode/client/handler/assignment:
interfaces:
Watcher:
Expand All @@ -37,11 +44,11 @@ packages:
Interceptor:
InterceptorWithReady:
InterceptorBuilder:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector:
interfaces:
? github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/segment/inspector
: interfaces:
SealOperator:
github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector:
interfaces:
? github.com/milvus-io/milvus/internal/streamingnode/server/wal/interceptors/timetick/inspector
: interfaces:
TimeTickSyncOperator:
google.golang.org/grpc:
interfaces:
Expand Down
176 changes: 22 additions & 154 deletions internal/distributed/streaming/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,169 +2,19 @@ package streaming

import (
"context"
"sync"

"github.com/milvus-io/milvus/internal/distributed/streaming/internal/producer"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/funcutil"
)

// newAppendResponseN creates a new append response.
func newAppendResponseN(n int) AppendResponses {
return AppendResponses{
Responses: make([]AppendResponse, n),
}
}

// AppendResponse is the response of one append operation.
type AppendResponse struct {
AppendResult *types.AppendResult
Error error
}

// AppendResponses is the response of append operation.
type AppendResponses struct {
Responses []AppendResponse
}

// UnwrapFirstError returns the first error in the responses.
func (a AppendResponses) UnwrapFirstError() error {
for _, r := range a.Responses {
if r.Error != nil {
return r.Error
}
}
return nil
}

// MaxTimeTick returns the max time tick in the responses.
func (a AppendResponses) MaxTimeTick() uint64 {
maxTimeTick := uint64(0)
for _, r := range a.Responses {
if r.AppendResult.TimeTick > maxTimeTick {
maxTimeTick = r.AppendResult.TimeTick
}
}
return maxTimeTick
}

// fillAllError fills all the responses with the same error.
func (a *AppendResponses) fillAllError(err error) {
for i := range a.Responses {
a.Responses[i].Error = err
}
}

// fillResponseAtIdx fill the response at idx
func (a *AppendResponses) fillResponseAtIdx(resp AppendResponse, idx int) {
a.Responses[idx] = resp
}

// dispatchByPChannel dispatches the message into different pchannel.
func (w *walAccesserImpl) dispatchByPChannel(ctx context.Context, msgs ...message.MutableMessage) AppendResponses {
if len(msgs) == 0 {
return newAppendResponseN(0)
}

// dispatch the messages into different pchannel.
dispatchedMessages, indexes := w.dispatchMessages(msgs...)

// only one pchannel, append it directly, no more goroutine needed.
if len(dispatchedMessages) == 1 {
for pchannel, msgs := range dispatchedMessages {
return w.appendToPChannel(ctx, pchannel, msgs...)
}
}

// otherwise, start multiple goroutine to append to different pchannel.
resp := newAppendResponseN(len(msgs))
wg := sync.WaitGroup{}
wg.Add(len(dispatchedMessages))

mu := sync.Mutex{}
for pchannel, msgs := range dispatchedMessages {
pchannel := pchannel
msgs := msgs
idxes := indexes[pchannel]
w.appendExecutionPool.Submit(func() (struct{}, error) {
defer wg.Done()
singleResp := w.appendToPChannel(ctx, pchannel, msgs...)
mu.Lock()
for i, idx := range idxes {
resp.fillResponseAtIdx(singleResp.Responses[i], idx)
}
mu.Unlock()
return struct{}{}, nil
})
}
wg.Wait()
return resp
}

// dispatchMessages dispatches the messages into different pchannel.
func (w *walAccesserImpl) dispatchMessages(msgs ...message.MutableMessage) (map[string][]message.MutableMessage, map[string][]int) {
dispatchedMessages := make(map[string][]message.MutableMessage, 0)
// record the index of the message in the msgs, used to fill back response.
indexes := make(map[string][]int, 0)
for idx, msg := range msgs {
pchannel := funcutil.ToPhysicalChannel(msg.VChannel())
if _, ok := dispatchedMessages[pchannel]; !ok {
dispatchedMessages[pchannel] = make([]message.MutableMessage, 0)
indexes[pchannel] = make([]int, 0)
}
dispatchedMessages[pchannel] = append(dispatchedMessages[pchannel], msg)
indexes[pchannel] = append(indexes[pchannel], idx)
}
return dispatchedMessages, indexes
}

// appendToPChannel appends the messages to the specified pchannel.
func (w *walAccesserImpl) appendToPChannel(ctx context.Context, pchannel string, msgs ...message.MutableMessage) AppendResponses {
if len(msgs) == 0 {
return newAppendResponseN(0)
}
resp := newAppendResponseN(len(msgs))

// appendToWAL appends the message to the wal.
func (w *walAccesserImpl) appendToWAL(ctx context.Context, msg message.MutableMessage) (*types.AppendResult, error) {
pchannel := funcutil.ToPhysicalChannel(msg.VChannel())
// get producer of pchannel.
p := w.getProducer(pchannel)

// if only one message here, append it directly, no more goroutine needed.
// at most time, there's only one message here.
// TODO: only the partition-key with high partition will generate many message in one time on the same pchannel,
// we should optimize the message-format, make it into one; but not the goroutine count.
if len(msgs) == 1 {
produceResult, err := p.Produce(ctx, msgs[0])
resp.fillResponseAtIdx(AppendResponse{
AppendResult: produceResult,
Error: err,
}, 0)
return resp
}

// concurrent produce here.
wg := sync.WaitGroup{}
wg.Add(len(msgs))

mu := sync.Mutex{}
for i, msg := range msgs {
i := i
msg := msg
w.appendExecutionPool.Submit(func() (struct{}, error) {
defer wg.Done()
msgID, err := p.Produce(ctx, msg)

mu.Lock()
resp.fillResponseAtIdx(AppendResponse{
AppendResult: msgID,
Error: err,
}, i)
mu.Unlock()
return struct{}{}, nil
})
}
wg.Wait()
return resp
return p.Produce(ctx, msg)
}

// createOrGetProducer creates or get a producer.
Expand All @@ -183,3 +33,21 @@ func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProduc
w.producers[pchannel] = p
return p
}

// assertNoSystemMessage asserts the message is not system message.
func assertNoSystemMessage(msgs ...message.MutableMessage) {
for _, msg := range msgs {
if msg.MessageType().IsSystem() {
panic("system message is not allowed to append from client")
}
}
}

// We only support delete and insert message for txn now.
func assertIsDmlMessage(msgs ...message.MutableMessage) {
for _, msg := range msgs {
if msg.MessageType() != message.MessageTypeInsert && msg.MessageType() != message.MessageTypeDelete {
panic("only insert and delete message is allowed in txn")
}
}
}
5 changes: 3 additions & 2 deletions internal/distributed/streaming/internal/errs/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// All error in streamingservice package should be marked by streamingservice/errs package.
var (
ErrClosed = errors.New("closed")
ErrCanceled = errors.New("canceled")
ErrClosed = errors.New("closed")
ErrCanceled = errors.New("canceled")
ErrTxnUnavailable = errors.New("transaction unavailable")
)
7 changes: 7 additions & 0 deletions internal/distributed/streaming/internal/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ func (p *ResumableProducer) Produce(ctx context.Context, msg message.MutableMess
if status.IsCanceled(err) {
return nil, errors.Mark(err, errs.ErrCanceled)
}
if sErr := status.AsStreamingError(err); sErr != nil {
// if the error is txn unavailable, it cannot be retried forever.
// we should mark it and return.
if sErr.IsTxnUnavilable() {
return nil, errors.Mark(err, errs.ErrTxnUnavailable)
}
}
}
}

Expand Down
67 changes: 61 additions & 6 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package streaming

import (
"context"
"time"

kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
)

var singleton WALAccesser = nil
Expand All @@ -19,14 +21,33 @@ func Init() {

// Release releases the resources of the wal accesser.
func Release() {
singleton.Close()
if w, ok := singleton.(*walAccesserImpl); ok && w != nil {
w.Close()
}
}

// WAL is the entrance to interact with the milvus write ahead log.
func WAL() WALAccesser {
return singleton
}

// AppendOption is the option for append operation.
type AppendOption struct {
BarrierTimeTick uint64 // BarrierTimeTick is the barrier time tick of the message.
// Must be allocated from tso, otherwise undetermined behavior.
}

type TxnOption struct {
// VChannel is the target vchannel to write.
// TODO: support cross-wal txn in future.
VChannel string

// Keepalive is the time to keepalive of the transaction.
// If the txn don't append message in the keepalive time, the txn will be expired.
// Only make sense when ttl is greater than 1ms.
Keepalive time.Duration
}

type ReadOption struct {
// VChannel is the target vchannel to read.
VChannel string
Expand Down Expand Up @@ -55,13 +76,47 @@ type Scanner interface {

// WALAccesser is the interfaces to interact with the milvus write ahead log.
type WALAccesser interface {
// Append writes a record to the log.
// !!! Append didn't promise the order of the message and atomic write.
Append(ctx context.Context, msgs ...message.MutableMessage) AppendResponses
// Txn returns a transaction for writing records to the log.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
Txn(ctx context.Context, opts TxnOption) (Txn, error)

// Append writes a records to the log.
Append(ctx context.Context, msgs message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error)

// Read returns a scanner for reading records from the wal.
Read(ctx context.Context, opts ReadOption) Scanner

// Close closes the wal accesser
Close()
// Utility returns the utility for writing records to the log.
Utility() Utility
}

// Txn is the interface for writing transaction into the wal.
type Txn interface {
// Append writes a record to the log.
Append(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) error

// Commit commits the transaction.
// Commit and Rollback can be only call once, and not concurrent safe with append operation.
Commit(ctx context.Context) (*types.AppendResult, error)

// Rollback rollbacks the transaction.
// Commit and Rollback can be only call once, and not concurrent safe with append operation.
// TODO: Manually rollback is make no sense for current single wal txn.
// It is preserved for future cross-wal txn.
Rollback(ctx context.Context) error
}

type Utility interface {
// AppendMessages appends messages to the wal.
// It it a helper utility function to append messages to the wal.
// If the messages is belong to one vchannel, it will be sent as a transaction.
// Otherwise, it will be sent as individual messages.
// !!! This function do not promise the atomicity and deliver order of the messages appending.
// TODO: Remove after we support cross-wal txn.
AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses

// AppendMessagesWithOption appends messages to the wal with the given option.
// Same with AppendMessages, but with the given option.
// TODO: Remove after we support cross-wal txn.
AppendMessagesWithOption(ctx context.Context, opts AppendOption, msgs ...message.MutableMessage) AppendResponses
}
Loading

0 comments on commit 4d69898

Please sign in to comment.