Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions estargz/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"

"github.com/containerd/stargz-snapshotter/estargz/errorutil"
"github.com/klauspost/compress/zstd"
Expand Down Expand Up @@ -145,8 +146,10 @@ func WithGzipHelperFunc(gzipHelperFunc GzipHelperFunc) Option {
// Blob is an eStargz blob.
type Blob struct {
io.ReadCloser
diffID digest.Digester
tocDigest digest.Digest
diffID digest.Digester
tocDigest digest.Digest
readCompleted *atomic.Bool
uncompressedSize *atomic.Int64
}

// DiffID returns the digest of uncompressed blob.
Expand All @@ -160,6 +163,19 @@ func (b *Blob) TOCDigest() digest.Digest {
return b.tocDigest
}

// UncompressedSize returns the size of uncompressed blob.
// UncompressedSize should only be called after the blob has been fully read.
func (b *Blob) UncompressedSize() (int64, error) {
switch {
case b.uncompressedSize == nil || b.readCompleted == nil:
return -1, fmt.Errorf("readCompleted or uncompressedSize is not initialized")
case !b.readCompleted.Load():
return -1, fmt.Errorf("called UncompressedSize before the blob has been fully read")
default:
return b.uncompressedSize.Load(), nil
}
}

// Build builds an eStargz blob which is an extended version of stargz, from a blob (gzip, zstd
// or plain tar) passed through the argument. If there are some prioritized files are listed in
// the option, these files are grouped as "prioritized" and can be used for runtime optimization
Expand Down Expand Up @@ -267,26 +283,39 @@ func Build(tarBlob *io.SectionReader, opt ...Option) (_ *Blob, rErr error) {
}
diffID := digest.Canonical.Digester()
pr, pw := io.Pipe()
readCompleted := new(atomic.Bool)
uncompressedSize := new(atomic.Int64)
go func() {
r, err := opts.compression.Reader(io.TeeReader(io.MultiReader(append(rs, tocAndFooter)...), pw))
var size int64
var decompressFunc func(io.Reader) (io.ReadCloser, error)
if _, ok := opts.compression.(*gzipCompression); ok && opts.gzipHelperFunc != nil {
decompressFunc = opts.gzipHelperFunc
} else {
decompressFunc = opts.compression.Reader
}
decompressR, err := decompressFunc(io.TeeReader(io.MultiReader(append(rs, tocAndFooter)...), pw))
if err != nil {
pw.CloseWithError(err)
return
}
defer r.Close()
if _, err := io.Copy(diffID.Hash(), r); err != nil {
defer decompressR.Close()
if size, err = io.Copy(diffID.Hash(), decompressR); err != nil {
pw.CloseWithError(err)
return
}
uncompressedSize.Store(size)
readCompleted.Store(true)
pw.Close()
}()
return &Blob{
ReadCloser: readCloser{
Reader: pr,
closeFunc: layerFiles.CleanupAll,
},
tocDigest: tocDgst,
diffID: diffID,
tocDigest: tocDgst,
diffID: diffID,
readCompleted: readCompleted,
uncompressedSize: uncompressedSize,
}, nil
}

Expand Down
32 changes: 6 additions & 26 deletions nativeconverter/estargz/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ import (
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/images/converter"
"github.com/containerd/containerd/v2/core/images/converter/uncompress"
"github.com/containerd/containerd/v2/pkg/archive/compression"
"github.com/containerd/containerd/v2/pkg/labels"
"github.com/containerd/errdefs"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/containerd/stargz-snapshotter/util/ioutils"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
Expand Down Expand Up @@ -101,35 +99,13 @@ func LayerConvertFunc(opts ...estargz.Option) converter.ConvertFunc {
return nil, err
}

// Copy and count the contents
pr, pw := io.Pipe()
c := new(ioutils.CountWriter)
doneCount := make(chan struct{})
go func() {
defer close(doneCount)
defer pr.Close()
decompressR, err := compression.DecompressStream(pr)
if err != nil {
pr.CloseWithError(err)
return
}
defer decompressR.Close()
if _, err := io.Copy(c, decompressR); err != nil {
pr.CloseWithError(err)
return
}
}()
n, err := io.Copy(w, io.TeeReader(blob, pw))
n, err := io.Copy(w, blob)
if err != nil {
return nil, err
}
if err := blob.Close(); err != nil {
return nil, err
}
if err := pw.Close(); err != nil {
return nil, err
}
<-doneCount

// update diffID label
labelz[labels.LabelUncompressed] = blob.DiffID().String()
Expand All @@ -153,7 +129,11 @@ func LayerConvertFunc(opts ...estargz.Option) converter.ConvertFunc {
newDesc.Annotations = make(map[string]string, 1)
}
newDesc.Annotations[estargz.TOCJSONDigestAnnotation] = blob.TOCDigest().String()
newDesc.Annotations[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", c.Size())
uncompressedSize, err := blob.UncompressedSize()
if err != nil {
return nil, err
}
newDesc.Annotations[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", uncompressedSize)
return &newDesc, nil
}
}
28 changes: 6 additions & 22 deletions nativeconverter/zstdchunked/zstdchunked.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ import (
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/images/converter"
"github.com/containerd/containerd/v2/core/images/converter/uncompress"
"github.com/containerd/containerd/v2/pkg/archive/compression"
"github.com/containerd/containerd/v2/pkg/labels"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"github.com/containerd/stargz-snapshotter/estargz"
"github.com/containerd/stargz-snapshotter/estargz/zstdchunked"
"github.com/containerd/stargz-snapshotter/util/ioutils"
"github.com/klauspost/compress/zstd"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -154,25 +152,7 @@ func LayerConvertFuncWithCompressionLevel(compressionLevel zstd.EncoderLevel, op
return nil, err
}

// Copy and count the contents
pr, pw := io.Pipe()
c := new(ioutils.CountWriter)
doneCount := make(chan struct{})
go func() {
defer close(doneCount)
defer pr.Close()
decompressR, err := compression.DecompressStream(pr)
if err != nil {
pr.CloseWithError(err)
return
}
defer decompressR.Close()
if _, err := io.Copy(c, decompressR); err != nil {
pr.CloseWithError(err)
return
}
}()
n, err := io.Copy(w, io.TeeReader(blob, pw))
n, err := io.Copy(w, blob)
if err != nil {
return nil, err
}
Expand All @@ -199,7 +179,11 @@ func LayerConvertFuncWithCompressionLevel(compressionLevel zstd.EncoderLevel, op
}
tocDgst := blob.TOCDigest().String()
newDesc.Annotations[estargz.TOCJSONDigestAnnotation] = tocDgst
newDesc.Annotations[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", c.Size())
uncompressedSize, err := blob.UncompressedSize()
if err != nil {
return nil, err
}
newDesc.Annotations[estargz.StoreUncompressedSizeAnnotation] = fmt.Sprintf("%d", uncompressedSize)
if p, ok := metadata[zstdchunked.ManifestChecksumAnnotation]; ok {
newDesc.Annotations[zstdchunked.ManifestChecksumAnnotation] = p
}
Expand Down
Loading