Skip to content

Commit a1c4ba5

Browse files
N3koxmeguminnnnnnnnn
authored andcommitted
chore: modify stream copy
* chore: remove stream reader close protection * chore: genericTransformWithCallbacks stream copy See merge request: !219
1 parent a7214d9 commit a1c4ba5

File tree

2 files changed

+3
-6
lines changed

2 files changed

+3
-6
lines changed

compose/utils.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ func genericTransformWithCallbacks(t transform) transform {
206206
}
207207

208208
ctx, is = callbacks.OnStartWithStreamInput(ctx, is)
209+
is.Close() // goroutine free copy buffer release
209210

210211
output, err = t(ctx, inArr[0], opts...)
211212
if err != nil {
@@ -219,7 +220,8 @@ func genericTransformWithCallbacks(t transform) transform {
219220
return outArr[0], nil
220221
}
221222

222-
_, _ = callbacks.OnEndWithStreamOutput(ctx, os)
223+
_, os = callbacks.OnEndWithStreamOutput(ctx, os)
224+
os.Close()
223225

224226
return outArr[0], nil
225227
}

schema/stream.go

-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"reflect"
2424
"runtime/debug"
2525
"sync"
26-
"sync/atomic"
2726

2827
"github.com/cloudwego/eino/utils/safe"
2928
)
@@ -305,10 +304,6 @@ func (s *stream[T]) closeSend() {
305304
}
306305

307306
func (s *stream[T]) closeRecv() {
308-
if !atomic.CompareAndSwapUint32(&s.isClosed, 0, 1) {
309-
return
310-
}
311-
312307
close(s.closed)
313308
}
314309

0 commit comments

Comments
 (0)