Skip to content

Commit 55a1b4e

Browse files
committed
cmd/dist: implement fetch prototype
With the rename of fetch to fetch-object, we now introduce the `fetch` command. It will fetch all of the resources required for an image into the content store. We'll still need to follow this up with metadata registration but this is a good start. Signed-off-by: Stephen J Day <[email protected]>
1 parent 971b9ca commit 55a1b4e

File tree

7 files changed

+509
-0
lines changed

7 files changed

+509
-0
lines changed

cmd/dist/fetch.go

+348
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io/ioutil"
9+
"os"
10+
"sync"
11+
"text/tabwriter"
12+
"time"
13+
14+
"github.com/Sirupsen/logrus"
15+
contentapi "github.com/docker/containerd/api/services/content"
16+
"github.com/docker/containerd/content"
17+
"github.com/docker/containerd/log"
18+
"github.com/docker/containerd/progress"
19+
"github.com/docker/containerd/remotes"
20+
contentservice "github.com/docker/containerd/services/content"
21+
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
22+
"github.com/urfave/cli"
23+
"golang.org/x/sync/errgroup"
24+
)
25+
26+
var fetchCommand = cli.Command{
27+
Name: "fetch",
28+
Usage: "fetch all content for an image into containerd",
29+
ArgsUsage: "[flags] <remote> <object>",
30+
Description: `Fetch an image into containerd.
31+
32+
This command ensures that containerd has all the necessary resources to build
33+
an image's rootfs and convert the configuration to a runtime format supported
34+
by containerd.
35+
36+
This command uses the same syntax, of remote and object, as 'dist
37+
fetch-object'. We may want to make this nicer, but agnostism is preferred for
38+
the moment.
39+
40+
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
41+
not use this implementation as a guide. The end goal should be having metadata,
42+
content and snapshots ready for a direct use via the 'ctr run'.
43+
44+
Most of this is experimental and there are few leaps to make this work.`,
45+
Flags: []cli.Flag{},
46+
Action: func(clicontext *cli.Context) error {
47+
var (
48+
ctx = background
49+
locator = clicontext.Args().First()
50+
object = clicontext.Args().Get(1)
51+
)
52+
53+
conn, err := connectGRPC(clicontext)
54+
if err != nil {
55+
return err
56+
}
57+
58+
resolver, err := getResolver(ctx)
59+
if err != nil {
60+
return err
61+
}
62+
63+
fetcher, err := resolver.Resolve(ctx, locator)
64+
if err != nil {
65+
return err
66+
}
67+
68+
ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn))
69+
cs, err := resolveContentStore(clicontext)
70+
if err != nil {
71+
return err
72+
}
73+
74+
eg, ctx := errgroup.WithContext(ctx)
75+
76+
ctx = withJobsContext(ctx)
77+
78+
eg.Go(func() error {
79+
return fetchManifest(ctx, ingester, fetcher, object)
80+
})
81+
82+
errs := make(chan error)
83+
go func() {
84+
defer close(errs)
85+
errs <- eg.Wait()
86+
}()
87+
88+
ticker := time.NewTicker(100 * time.Millisecond)
89+
fw := progress.NewWriter(os.Stdout)
90+
start := time.Now()
91+
defer ticker.Stop()
92+
var done bool
93+
94+
for {
95+
select {
96+
case <-ticker.C:
97+
fw.Flush()
98+
99+
tw := tabwriter.NewWriter(fw, 1, 8, 1, '\t', 0)
100+
// fmt.Fprintln(tw, "REF\tSIZE\tAGE")
101+
var total int64
102+
103+
js := getJobs(ctx)
104+
type statusInfo struct {
105+
Ref string
106+
Status string
107+
Offset int64
108+
Total int64
109+
StartedAt time.Time
110+
UpdatedAt time.Time
111+
}
112+
statuses := map[string]statusInfo{}
113+
114+
activeSeen := map[string]struct{}{}
115+
if !done {
116+
active, err := cs.Active()
117+
if err != nil {
118+
log.G(ctx).WithError(err).Error("active check failed")
119+
continue
120+
}
121+
// update status of active entries!
122+
for _, active := range active {
123+
statuses[active.Ref] = statusInfo{
124+
Ref: active.Ref,
125+
Status: "downloading",
126+
Offset: active.Offset,
127+
Total: active.Total,
128+
StartedAt: active.StartedAt,
129+
UpdatedAt: active.UpdatedAt,
130+
}
131+
activeSeen[active.Ref] = struct{}{}
132+
}
133+
}
134+
135+
// now, update the items in jobs that are not in active
136+
for _, j := range js {
137+
if _, ok := activeSeen[j]; ok {
138+
continue
139+
}
140+
141+
statuses[j] = statusInfo{
142+
Ref: j,
143+
Status: "done", // for now!
144+
}
145+
}
146+
147+
for _, j := range js {
148+
status := statuses[j]
149+
150+
total += status.Offset
151+
switch status.Status {
152+
case "downloading":
153+
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
154+
fmt.Fprintf(tw, "%s:\t%s\t%40r\t%8.8s/%s\n",
155+
status.Ref,
156+
status.Status,
157+
bar,
158+
progress.Bytes(status.Offset), progress.Bytes(status.Total))
159+
case "done":
160+
bar := progress.Bar(1.0)
161+
fmt.Fprintf(tw, "%s:\t%s\t%40r\n",
162+
status.Ref,
163+
status.Status,
164+
bar)
165+
}
166+
}
167+
168+
fmt.Fprintf(tw, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\n",
169+
time.Since(start).Seconds(),
170+
// TODO(stevvooe): These calculations are actually way off.
171+
// Need to account for previously downloaded data. These
172+
// will basically be right for a download the first time
173+
// but will be skewed if restarting, as it includes the
174+
// data into the start time before.
175+
progress.Bytes(total),
176+
progress.NewBytesPerSecond(total, time.Since(start)))
177+
tw.Flush()
178+
179+
if done {
180+
fw.Flush()
181+
return nil
182+
}
183+
case err := <-errs:
184+
if err != nil {
185+
return err
186+
}
187+
done = true
188+
case <-ctx.Done():
189+
done = true // allow ui to update once more
190+
}
191+
}
192+
193+
return nil
194+
},
195+
}
196+
197+
// jobs provides a way of identifying the download keys for a particular task
198+
// encountering during the pull walk.
199+
//
200+
// This is very minimal and will probably be replaced with something more
201+
// featured.
202+
type jobs struct {
203+
added map[string]struct{}
204+
refs []string
205+
mu sync.Mutex
206+
}
207+
208+
// jobsKeys let's us store the jobs instance in the context.
209+
//
210+
// This is a very cute way to do things but not ideal.
211+
type jobsKey struct{}
212+
213+
func getJobs(ctx context.Context) []string {
214+
return ctx.Value(jobsKey{}).(*jobs).jobs()
215+
}
216+
217+
func addJob(ctx context.Context, job string) {
218+
ctx.Value(jobsKey{}).(*jobs).add(job)
219+
}
220+
221+
func withJobsContext(ctx context.Context) context.Context {
222+
jobs := newJobs()
223+
return context.WithValue(ctx, jobsKey{}, jobs)
224+
}
225+
226+
func newJobs() *jobs {
227+
return &jobs{added: make(map[string]struct{})}
228+
}
229+
230+
func (j *jobs) add(ref string) {
231+
j.mu.Lock()
232+
defer j.mu.Unlock()
233+
234+
if _, ok := j.added[ref]; ok {
235+
return
236+
}
237+
j.refs = append(j.refs, ref)
238+
j.added[ref] = struct{}{}
239+
}
240+
241+
func (j *jobs) jobs() []string {
242+
j.mu.Lock()
243+
defer j.mu.Unlock()
244+
245+
var jobs []string
246+
for _, j := range j.refs {
247+
jobs = append(jobs, j)
248+
}
249+
250+
return jobs
251+
}
252+
253+
func fetchManifest(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, object string, hints ...string) error {
254+
const manifestMediaType = "application/vnd.docker.distribution.manifest.v2+json"
255+
hints = append(hints, "mediatype:"+manifestMediaType)
256+
257+
ref := "manifest-" + object
258+
addJob(ctx, ref)
259+
260+
rc, err := fetcher.Fetch(ctx, object, hints...)
261+
if err != nil {
262+
return err
263+
}
264+
defer rc.Close()
265+
266+
// it would be better to read the content back from the content store in this case.
267+
p, err := ioutil.ReadAll(rc)
268+
if err != nil {
269+
return err
270+
}
271+
272+
if err := content.WriteBlob(ctx, ingester, ref, bytes.NewReader(p), 0, ""); err != nil {
273+
return err
274+
}
275+
276+
// TODO(stevvooe): This assumption that we get a manifest is unfortunate.
277+
// Need to provide way to resolve what the type of the target is.
278+
var manifest ocispec.Manifest
279+
if err := json.Unmarshal(p, &manifest); err != nil {
280+
return err
281+
}
282+
283+
var descs []ocispec.Descriptor
284+
285+
descs = append(descs, manifest.Config)
286+
for _, desc := range manifest.Layers {
287+
descs = append(descs, desc)
288+
}
289+
290+
return dispatch(ctx, ingester, fetcher, descs...)
291+
}
292+
293+
func fetch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, desc ocispec.Descriptor) error {
294+
var (
295+
hints []string
296+
object = desc.Digest.String()
297+
)
298+
if desc.MediaType != "" {
299+
hints = append(hints, "mediatype:"+desc.MediaType)
300+
}
301+
302+
ref := "fetch-" + object
303+
addJob(ctx, ref)
304+
log.G(ctx).Debug("fetch")
305+
rc, err := fetcher.Fetch(ctx, object, hints...)
306+
if err != nil {
307+
log.G(ctx).WithError(err).Error("fetch error")
308+
return err
309+
}
310+
defer rc.Close()
311+
312+
// TODO(stevvooe): Need better remote key selection here. Should be a
313+
// product of the fetcher. We may need more narrow infomation on fetcher or
314+
// just pull from env/context.
315+
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
316+
}
317+
318+
// dispatch blocks until all content in `descs` is retrieved.
319+
func dispatch(ctx context.Context, ingester content.Ingester, fetcher remotes.Fetcher, descs ...ocispec.Descriptor) error {
320+
eg, ctx := errgroup.WithContext(ctx)
321+
for _, desc := range descs {
322+
if err := func(desc ocispec.Descriptor) error {
323+
ctx := log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
324+
"digest": desc.Digest,
325+
"mediatype": desc.MediaType,
326+
"size": desc.Size,
327+
}))
328+
switch desc.MediaType {
329+
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
330+
eg.Go(func() error {
331+
return fetchManifest(ctx, ingester, fetcher, desc.Digest.String(), "mediatype:"+desc.MediaType)
332+
})
333+
case MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
334+
return fmt.Errorf("%v not yet supported", desc.MediaType)
335+
default:
336+
eg.Go(func() error {
337+
return fetch(ctx, ingester, fetcher, desc)
338+
})
339+
}
340+
341+
return nil
342+
}(desc); err != nil {
343+
return err
344+
}
345+
}
346+
347+
return eg.Wait()
348+
}

cmd/dist/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ distribution tool
6161
},
6262
}
6363
app.Commands = []cli.Command{
64+
fetchCommand,
6465
fetchObjectCommand,
6566
ingestCommand,
6667
activeCommand,

0 commit comments

Comments
 (0)