Skip to content

Commit e54e8fa

Browse files
committed
Merge pull request moby#1290 from dotcloud/parallel_pull
* Runtime: Parallel pull
2 parents 2f1c05d + 946bbee commit e54e8fa

File tree

8 files changed

+130
-78
lines changed

8 files changed

+130
-78
lines changed

api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func postImagesCreate(srv *Server, version float64, w http.ResponseWriter, r *ht
386386
}
387387
sf := utils.NewStreamFormatter(version > 1.0)
388388
if image != "" { //pull
389-
if err := srv.ImagePull(image, tag, w, sf, &auth.AuthConfig{}); err != nil {
389+
if err := srv.ImagePull(image, tag, w, sf, &auth.AuthConfig{}, version > 1.3); err != nil {
390390
if sf.Used() {
391391
w.Write(sf.FormatError(err))
392392
return nil

buildfile.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (b *buildFile) CmdFrom(name string) error {
5656
if err != nil {
5757
if b.runtime.graph.IsNotExist(err) {
5858
remote, tag := utils.ParseRepositoryTag(name)
59-
if err := b.srv.ImagePull(remote, tag, b.out, utils.NewStreamFormatter(false), nil); err != nil {
59+
if err := b.srv.ImagePull(remote, tag, b.out, utils.NewStreamFormatter(false), nil, true); err != nil {
6060
return err
6161
}
6262
image, err = b.runtime.repositories.LookupImage(name)

commands.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ import (
3030
const VERSION = "0.5.0-dev"
3131

3232
var (
33-
GITCOMMIT string
34-
AuthRequiredError = fmt.Errorf("Authentication is required.")
33+
GITCOMMIT string
3534
)
3635

3736
func (cli *DockerCli) getMethod(name string) (reflect.Method, bool) {
@@ -197,7 +196,7 @@ func (cli *DockerCli) CmdBuild(args ...string) error {
197196
// FIXME: ProgressReader shouldn't be this annoyning to use
198197
if context != nil {
199198
sf := utils.NewStreamFormatter(false)
200-
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("Uploading context", "%v bytes%0.0s%0.0s"), sf)
199+
body = utils.ProgressReader(ioutil.NopCloser(context), 0, cli.err, sf.FormatProgress("", "Uploading context", "%v bytes%0.0s%0.0s"), sf)
201200
}
202201
// Upload the build context
203202
v := &url.Values{}
@@ -846,7 +845,7 @@ func (cli *DockerCli) CmdPush(args ...string) error {
846845
}
847846

848847
if err := push(); err != nil {
849-
if err == AuthRequiredError {
848+
if err == fmt.Errorf("Authentication is required.") {
850849
if err = cli.checkIfLogged("push"); err == nil {
851850
return push()
852851
}
@@ -1569,19 +1568,7 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e
15691568
}
15701569

15711570
if resp.Header.Get("Content-Type") == "application/json" {
1572-
dec := json.NewDecoder(resp.Body)
1573-
for {
1574-
var jm utils.JSONMessage
1575-
if err := dec.Decode(&jm); err == io.EOF {
1576-
break
1577-
} else if err != nil {
1578-
return err
1579-
}
1580-
if jm.Error != nil && jm.Error.Code == 401 {
1581-
return AuthRequiredError
1582-
}
1583-
jm.Display(out)
1584-
}
1571+
return utils.DisplayJSONMessagesStream(resp.Body, out)
15851572
} else {
15861573
if _, err := io.Copy(out, resp.Body); err != nil {
15871574
return err

docs/sources/api/docker_remote_api.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ You can still call an old version of the api using
4040
What's new
4141
----------
4242

43+
.. http:post:: /images/create
44+
45+
**New!** When pull a repo, all images are now downloaded in parallel.
46+
4347
.. http:get:: /containers/(id)/top
4448
4549
**New!** You can now use ps args with docker top, like `docker top <container_id> aux`

graph.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (graph *Graph) TempLayerArchive(id string, compression Compression, sf *uti
159159
if err != nil {
160160
return nil, err
161161
}
162-
return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("Buffering to disk", "%v/%v (%v)"), sf), tmp.Root)
162+
return NewTempArchive(utils.ProgressReader(ioutil.NopCloser(archive), 0, output, sf.FormatProgress("", "Buffering to disk", "%v/%v (%v)"), sf), tmp.Root)
163163
}
164164

165165
// Mktemp creates a temporary sub-directory inside the graph's filesystem.

runtime_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func init() {
101101
// If the unit test is not found, try to download it.
102102
if img, err := globalRuntime.repositories.LookupImage(unitTestImageName); err != nil || img.ID != unitTestImageID {
103103
// Retrieve the Image
104-
if err := srv.ImagePull(unitTestImageName, "", os.Stdout, utils.NewStreamFormatter(false), nil); err != nil {
104+
if err := srv.ImagePull(unitTestImageName, "", os.Stdout, utils.NewStreamFormatter(false), nil, true); err != nil {
105105
panic(err)
106106
}
107107
}

server.go

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,15 @@ func (srv *Server) ImageInsert(name, url, path string, out io.Writer, sf *utils.
145145
return "", err
146146
}
147147

148-
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), path); err != nil {
148+
if err := c.Inject(utils.ProgressReader(file.Body, int(file.ContentLength), out, sf.FormatProgress("", "Downloading", "%8v/%v (%v)"), sf), path); err != nil {
149149
return "", err
150150
}
151151
// FIXME: Handle custom repo, tag comment, author
152152
img, err = b.Commit(c, "", "", img.Comment, img.Author, nil)
153153
if err != nil {
154154
return "", err
155155
}
156-
out.Write(sf.FormatStatus(img.ID))
156+
out.Write(sf.FormatStatus("", img.ID))
157157
return img.ShortID(), nil
158158
}
159159

@@ -420,7 +420,7 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
420420
// FIXME: Launch the getRemoteImage() in goroutines
421421
for _, id := range history {
422422
if !srv.runtime.graph.Exists(id) {
423-
out.Write(sf.FormatStatus("Pulling %s metadata", id))
423+
out.Write(sf.FormatStatus(utils.TruncateID(id), "Pulling metadata"))
424424
imgJSON, imgSize, err := r.GetRemoteImageJSON(id, endpoint, token)
425425
if err != nil {
426426
// FIXME: Keep goging in case of error?
@@ -432,22 +432,22 @@ func (srv *Server) pullImage(r *registry.Registry, out io.Writer, imgID, endpoin
432432
}
433433

434434
// Get the layer
435-
out.Write(sf.FormatStatus("Pulling %s fs layer", id))
435+
out.Write(sf.FormatStatus(utils.TruncateID(id), "Pulling fs layer"))
436436
layer, err := r.GetRemoteImageLayer(img.ID, endpoint, token)
437437
if err != nil {
438438
return err
439439
}
440440
defer layer.Close()
441-
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress("Downloading", "%8v/%v (%v)"), sf), img); err != nil {
441+
if err := srv.runtime.graph.Register(imgJSON, utils.ProgressReader(layer, imgSize, out, sf.FormatProgress(utils.TruncateID(id), "Downloading", "%8v/%v (%v)"), sf), img); err != nil {
442442
return err
443443
}
444444
}
445445
}
446446
return nil
447447
}
448448

449-
func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag, indexEp string, sf *utils.StreamFormatter) error {
450-
out.Write(sf.FormatStatus("Pulling repository %s", localName))
449+
func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName, remoteName, askedTag, indexEp string, sf *utils.StreamFormatter, parallel bool) error {
450+
out.Write(sf.FormatStatus("", "Pulling repository %s", localName))
451451

452452
repoData, err := r.GetRepositoryData(indexEp, remoteName)
453453
if err != nil {
@@ -484,30 +484,51 @@ func (srv *Server) pullRepository(r *registry.Registry, out io.Writer, localName
484484
repoData.ImgList[id].Tag = askedTag
485485
}
486486

487-
for _, img := range repoData.ImgList {
488-
if askedTag != "" && img.Tag != askedTag {
489-
utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
490-
continue
487+
errors := make(chan error)
488+
for _, image := range repoData.ImgList {
489+
downloadImage := func(img *registry.ImgData) {
490+
if askedTag != "" && img.Tag != askedTag {
491+
utils.Debugf("(%s) does not match %s (id: %s), skipping", img.Tag, askedTag, img.ID)
492+
errors <- nil
493+
return
494+
}
495+
496+
if img.Tag == "" {
497+
utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
498+
errors <- nil
499+
return
500+
}
501+
out.Write(sf.FormatStatus(utils.TruncateID(img.ID), "Pulling image (%s) from %s", img.Tag, localName))
502+
success := false
503+
for _, ep := range repoData.Endpoints {
504+
if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
505+
out.Write(sf.FormatStatus(utils.TruncateID(img.ID), "Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err))
506+
continue
507+
}
508+
success = true
509+
break
510+
}
511+
if !success {
512+
errors <- fmt.Errorf("Could not find repository on any of the indexed registries.")
513+
}
514+
errors <- nil
491515
}
492516

493-
if img.Tag == "" {
494-
utils.Debugf("Image (id: %s) present in this repository but untagged, skipping", img.ID)
495-
continue
517+
if parallel {
518+
go downloadImage(image)
519+
} else {
520+
downloadImage(image)
496521
}
497-
out.Write(sf.FormatStatus("Pulling image %s (%s) from %s", img.ID, img.Tag, localName))
498-
success := false
499-
for _, ep := range repoData.Endpoints {
500-
if err := srv.pullImage(r, out, img.ID, ep, repoData.Tokens, sf); err != nil {
501-
out.Write(sf.FormatStatus("Error while retrieving image for tag: %s (%s); checking next endpoint", askedTag, err))
502-
continue
522+
}
523+
524+
if parallel {
525+
for i := 0; i < len(repoData.ImgList); i++ {
526+
if err := <-errors; err != nil {
527+
return err
503528
}
504-
success = true
505-
break
506-
}
507-
if !success {
508-
return fmt.Errorf("Could not find repository on any of the indexed registries.")
509529
}
510530
}
531+
511532
for tag, id := range tagsList {
512533
if askedTag != "" && tag != askedTag {
513534
continue
@@ -558,7 +579,7 @@ func (srv *Server) poolRemove(kind, key string) error {
558579
return nil
559580
}
560581

561-
func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig) error {
582+
func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *utils.StreamFormatter, authConfig *auth.AuthConfig, parallel bool) error {
562583
r, err := registry.NewRegistry(srv.runtime.root, authConfig, srv.HTTPRequestFactory())
563584
if err != nil {
564585
return err
@@ -580,7 +601,7 @@ func (srv *Server) ImagePull(localName string, tag string, out io.Writer, sf *ut
580601
}
581602

582603
out = utils.NewWriteFlusher(out)
583-
err = srv.pullRepository(r, out, localName, remoteName, tag, endpoint, sf)
604+
err = srv.pullRepository(r, out, localName, remoteName, tag, endpoint, sf, parallel)
584605
if err != nil {
585606
if err := srv.pullImage(r, out, remoteName, endpoint, nil, sf); err != nil {
586607
return err
@@ -620,12 +641,11 @@ func (srv *Server) getImageList(localRepo map[string]string) ([]*registry.ImgDat
620641

621642
func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName, remoteName string, localRepo map[string]string, indexEp string, sf *utils.StreamFormatter) error {
622643
out = utils.NewWriteFlusher(out)
623-
624644
imgList, err := srv.getImageList(localRepo)
625645
if err != nil {
626646
return err
627647
}
628-
out.Write(sf.FormatStatus("Sending image list"))
648+
out.Write(sf.FormatStatus("", "Sending image list"))
629649

630650
var repoData *registry.RepositoryData
631651
repoData, err = r.PushImageJSONIndex(indexEp, remoteName, imgList, false, nil)
@@ -634,14 +654,14 @@ func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName
634654
}
635655

636656
for _, ep := range repoData.Endpoints {
637-
out.Write(sf.FormatStatus("Pushing repository %s (%d tags)", localName, len(localRepo)))
657+
out.Write(sf.FormatStatus("", "Pushing repository %s (%d tags)", localName, len(localRepo)))
638658
// For each image within the repo, push them
639659
for _, elem := range imgList {
640660
if _, exists := repoData.ImgList[elem.ID]; exists {
641-
out.Write(sf.FormatStatus("Image %s already pushed, skipping", elem.ID))
661+
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", elem.ID))
642662
continue
643663
} else if r.LookupRemoteImage(elem.ID, ep, repoData.Tokens) {
644-
out.Write(sf.FormatStatus("Image %s already pushed, skipping", elem.ID))
664+
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", elem.ID))
645665
continue
646666
}
647667
if checksum, err := srv.pushImage(r, out, remoteName, elem.ID, ep, repoData.Tokens, sf); err != nil {
@@ -650,7 +670,7 @@ func (srv *Server) pushRepository(r *registry.Registry, out io.Writer, localName
650670
} else {
651671
elem.Checksum = checksum
652672
}
653-
out.Write(sf.FormatStatus("Pushing tags for rev [%s] on {%s}", elem.ID, ep+"repositories/"+remoteName+"/tags/"+elem.Tag))
673+
out.Write(sf.FormatStatus("", "Pushing tags for rev [%s] on {%s}", elem.ID, ep+"repositories/"+remoteName+"/tags/"+elem.Tag))
654674
if err := r.PushRegistryTag(remoteName, elem.ID, elem.Tag, ep, repoData.Tokens); err != nil {
655675
return err
656676
}
@@ -670,7 +690,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
670690
if err != nil {
671691
return "", fmt.Errorf("Error while retreiving the path for {%s}: %s", imgID, err)
672692
}
673-
out.Write(sf.FormatStatus("Pushing %s", imgID))
693+
out.Write(sf.FormatStatus("", "Pushing %s", imgID))
674694

675695
imgData := &registry.ImgData{
676696
ID: imgID,
@@ -679,7 +699,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
679699
// Send the json
680700
if err := r.PushImageJSONRegistry(imgData, jsonRaw, ep, token); err != nil {
681701
if err == registry.ErrAlreadyExists {
682-
out.Write(sf.FormatStatus("Image %s already pushed, skipping", imgData.ID))
702+
out.Write(sf.FormatStatus("", "Image %s already pushed, skipping", imgData.ID))
683703
return "", nil
684704
}
685705
return "", err
@@ -691,7 +711,7 @@ func (srv *Server) pushImage(r *registry.Registry, out io.Writer, remote, imgID,
691711
}
692712

693713
// Send the layer
694-
if checksum, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("Pushing", "%8v/%v (%v)"), sf), ep, token, jsonRaw); err != nil {
714+
if checksum, err := r.PushImageLayerRegistry(imgData.ID, utils.ProgressReader(layerData, int(layerData.Size), out, sf.FormatProgress("", "Pushing", "%8v/%v (%v)"), sf), ep, token, jsonRaw); err != nil {
695715
return "", err
696716
} else {
697717
imgData.Checksum = checksum
@@ -727,7 +747,7 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo
727747

728748
if err != nil {
729749
reposLen := len(srv.runtime.repositories.Repositories[localName])
730-
out.Write(sf.FormatStatus("The push refers to a repository [%s] (len: %d)", localName, reposLen))
750+
out.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", localName, reposLen))
731751
// If it fails, try to get the repository
732752
if localRepo, exists := srv.runtime.repositories.Repositories[localName]; exists {
733753
if err := srv.pushRepository(r, out, localName, remoteName, localRepo, endpoint, sf); err != nil {
@@ -739,7 +759,7 @@ func (srv *Server) ImagePush(localName string, out io.Writer, sf *utils.StreamFo
739759
}
740760

741761
var token []string
742-
out.Write(sf.FormatStatus("The push refers to an image: [%s]", localName))
762+
out.Write(sf.FormatStatus("", "The push refers to an image: [%s]", localName))
743763
if _, err := srv.pushImage(r, out, remoteName, img.ID, endpoint, token, sf); err != nil {
744764
return err
745765
}
@@ -762,14 +782,14 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
762782
u.Host = src
763783
u.Path = ""
764784
}
765-
out.Write(sf.FormatStatus("Downloading from %s", u))
785+
out.Write(sf.FormatStatus("", "Downloading from %s", u))
766786
// Download with curl (pretty progress bar)
767787
// If curl is not available, fallback to http.Get()
768788
resp, err = utils.Download(u.String(), out)
769789
if err != nil {
770790
return err
771791
}
772-
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("Importing", "%8v/%v (%v)"), sf)
792+
archive = utils.ProgressReader(resp.Body, int(resp.ContentLength), out, sf.FormatProgress("", "Importing", "%8v/%v (%v)"), sf)
773793
}
774794
img, err := srv.runtime.graph.Create(archive, nil, "Imported from "+src, "", nil)
775795
if err != nil {
@@ -781,7 +801,7 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write
781801
return err
782802
}
783803
}
784-
out.Write(sf.FormatStatus(img.ShortID()))
804+
out.Write(sf.FormatStatus("", img.ShortID()))
785805
return nil
786806
}
787807

0 commit comments

Comments
 (0)