Skip to content

Commit 0f0c7c8

Browse files
committed
fix: stamped putter support for streamed chunk endpoint
1 parent bf26745 commit 0f0c7c8

File tree

1 file changed

+53
-3
lines changed

1 file changed

+53
-3
lines changed

pkg/api/chunk_stream.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,14 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
9595
}
9696

9797
s.wsWg.Add(1)
98-
go s.handleUploadStream(logger, wsConn, putter)
98+
go s.handleUploadStream(logger, wsConn, putter, tag)
9999
}
100100

101101
func (s *Service) handleUploadStream(
102102
logger log.Logger,
103103
conn *websocket.Conn,
104104
putter storer.PutterSession,
105+
tag uint64,
105106
) {
106107
defer s.wsWg.Done()
107108

@@ -190,14 +191,56 @@ func (s *Service) handleUploadStream(
190191
return
191192
}
192193

193-
chunk, err := cac.NewWithDataSpan(msg)
194+
// Check if this message contains a per-chunk stamp prepended to the chunk data
195+
// Format: stamp[113 bytes] + chunk[data]
196+
var (
197+
chunk swarm.Chunk
198+
chunkPutter = putter // default to connection-level putter
199+
chunkData = msg
200+
)
201+
202+
// If message is large enough to contain a stamp + chunk data, try to extract the stamp
203+
if len(msg) >= postage.StampSize+swarm.SpanSize {
204+
potentialStamp := msg[:postage.StampSize]
205+
potentialChunkData := msg[postage.StampSize:]
206+
207+
// Try to unmarshal as a stamp
208+
stamp := postage.Stamp{}
209+
if err := stamp.UnmarshalBinary(potentialStamp); err == nil {
210+
// Valid stamp found - create a per-chunk putter
211+
chunkPutter, err = s.newStampedPutter(ctx, putterOptions{
212+
BatchID: stamp.BatchID(),
213+
TagID: tag,
214+
Deferred: tag != 0,
215+
}, &stamp)
216+
if err != nil {
217+
logger.Debug("chunk upload stream: failed to create stamped putter", "error", err)
218+
logger.Error(nil, "chunk upload stream: failed to create stamped putter")
219+
switch {
220+
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
221+
sendErrorClose(websocket.CloseInternalServerErr, "batch not usable")
222+
case errors.Is(err, postage.ErrNotFound):
223+
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
224+
default:
225+
sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed")
226+
}
227+
return
228+
}
229+
// Use the chunk data without the stamp
230+
chunkData = potentialChunkData
231+
// Note: we need to call Done on the per-chunk putter after Put
232+
}
233+
// If unmarshal failed, fall through to use the whole message as chunk data
234+
}
235+
236+
chunk, err = cac.NewWithDataSpan(chunkData)
194237
if err != nil {
195238
logger.Debug("chunk upload stream: create chunk failed", "error", err)
196239
logger.Error(nil, "chunk upload stream: create chunk failed")
197240
return
198241
}
199242

200-
err = putter.Put(ctx, chunk)
243+
err = chunkPutter.Put(ctx, chunk)
201244
if err != nil {
202245
logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err)
203246
logger.Error(nil, "chunk upload stream: write chunk failed")
@@ -210,6 +253,13 @@ func (s *Service) handleUploadStream(
210253
return
211254
}
212255

256+
// If we created a per-chunk putter, clean it up
257+
if chunkPutter != putter {
258+
if err := chunkPutter.Done(swarm.ZeroAddress); err != nil {
259+
logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter")
260+
}
261+
}
262+
213263
err = sendMsg(websocket.BinaryMessage, successWsMsg)
214264
if err != nil {
215265
s.logger.Debug("chunk upload stream: sending success message failed", "error", err)

0 commit comments

Comments
 (0)