Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transfer files - Fix repo snapshot manager error #881

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 11 additions & 29 deletions artifactory/commands/transferfiles/fulltransfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,25 @@ func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix str
log.Debug(logMsgPrefix+"Handling folder:", path.Join(m.repoKey, params.relativePath))

// Get the directory's node from the snapshot manager, and use information from previous transfer attempts if such exist.
node, done, previousChildren, err := m.getAndHandleDirectoryNode(params, logMsgPrefix)
node, done, err := m.getAndHandleDirectoryNode(params, logMsgPrefix)
if err != nil || done {
return err
}

curUploadChunk, err := m.searchAndHandleFolderContents(params, pcWrapper,
uploadChunkChan, delayHelper, errorsChannelMng,
node, previousChildren)
uploadChunkChan, delayHelper, errorsChannelMng, node)
if err != nil {
return
}

// Mark that no more results are expected for the current folder.
err = node.MarkDoneExploring()
if err != nil {
if err = node.MarkDoneExploring(); err != nil {
return err
}

// Chunk didn't reach full size. Upload the remaining files.
if len(curUploadChunk.UploadCandidates) > 0 {
_, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(&m.phaseBase, curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError)
if err != nil {
if _, err = pcWrapper.chunkUploaderProducerConsumer.AddTaskWithError(uploadChunkWhenPossibleHandler(&m.phaseBase, curUploadChunk, uploadChunkChan, errorsChannelMng), pcWrapper.errorsQueue.AddError); err != nil {
return
}
}
Expand All @@ -163,7 +160,7 @@ func (m *fullTransferPhase) transferFolder(params folderParams, logMsgPrefix str

func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, pcWrapper producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng,
node *reposnapshot.Node, previousChildren []*reposnapshot.Node) (curUploadChunk api.UploadChunk, err error) {
node *reposnapshot.Node) (curUploadChunk api.UploadChunk, err error) {
curUploadChunk = api.UploadChunk{
TargetAuth: createTargetAuth(m.targetRtDetails, m.proxyKey),
CheckExistenceInFilestore: m.checkExistenceInFilestore,
Expand Down Expand Up @@ -201,8 +198,7 @@ func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, p
switch item.Type {
case "folder":
err = m.handleFoundChildFolder(params, pcWrapper,
uploadChunkChan, delayHelper, errorsChannelMng,
node, previousChildren, item)
uploadChunkChan, delayHelper, errorsChannelMng, item)
case "file":
err = m.handleFoundFile(pcWrapper,
uploadChunkChan, delayHelper, errorsChannelMng,
Expand All @@ -220,13 +216,9 @@ func (m *fullTransferPhase) searchAndHandleFolderContents(params folderParams, p

func (m *fullTransferPhase) handleFoundChildFolder(params folderParams, pcWrapper producerConsumerWrapper,
uploadChunkChan chan UploadedChunk, delayHelper delayUploadHelper, errorsChannelMng *ErrorsChannelMng,
node *reposnapshot.Node, previousChildren []*reposnapshot.Node, item servicesUtils.ResultItem) (err error) {
item servicesUtils.ResultItem) (err error) {
newRelativePath := getFolderRelativePath(item.Name, params.relativePath)
// Add a node for the found folder, as a child for the current folder in the snapshot manager.
err = node.AddChildNode(item.Name, previousChildren)
if err != nil {
return
}

folderHandler := m.createFolderFullTransferHandlerFunc(pcWrapper, uploadChunkChan, delayHelper, errorsChannelMng)
_, err = pcWrapper.chunkBuilderProducerConsumer.AddTaskWithError(folderHandler(folderParams{relativePath: newRelativePath}), pcWrapper.errorsQueue.AddError)
return
Expand Down Expand Up @@ -289,11 +281,12 @@ func generateFolderContentAqlQuery(repoKey, relativePath string, paginationOffse
// node - A node in the repository snapshot tree, which represents the current directory.
// completed - Whether handling the node directory was completed. If it wasn't fully transferred, we start exploring and transferring it from scratch.
// previousChildren - If the directory requires exploring, previously known children will be added from this map in order to preserve their states and references.
func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMsgPrefix string) (node *reposnapshot.Node, completed bool, previousChildren []*reposnapshot.Node, err error) {
func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMsgPrefix string) (node *reposnapshot.Node, completed bool, err error) {
node, err = m.stateManager.LookUpNode(params.relativePath)
if err != nil {
return
}

// If data was not loaded from snapshot, we know that the node is visited for the first time and was not explored.
loadedFromSnapshot, err := m.stateManager.WasSnapshotLoaded()
if err != nil || !loadedFromSnapshot {
Expand All @@ -306,21 +299,10 @@ func (m *fullTransferPhase) getAndHandleDirectoryNode(params folderParams, logMs
}
if completed {
log.Debug(logMsgPrefix+"Skipping completed folder:", path.Join(m.repoKey, params.relativePath))
return nil, true, nil, nil
}
// If the node was not completed, we will start exploring it from the beginning.
previousChildren, err = m.handleNodeRequiresExploring(node)
return
}

func (m *fullTransferPhase) handleNodeRequiresExploring(node *reposnapshot.Node) (previousChildren []*reposnapshot.Node, err error) {
// Return old children map to add every found child with its previous data and references.
previousChildren, err = node.GetChildren()
if err != nil {
return
}
// If the node was not completed, we will start exploring it from the beginning.
// Remove all files names because we will begin exploring from the beginning.
// Clear children map to avoid handling directories that may have been deleted.
err = node.RestartExploring()
return
}
18 changes: 14 additions & 4 deletions utils/reposnapshot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package reposnapshot

import (
"encoding/json"
"github.com/jfrog/jfrog-client-go/utils/errorutils"
"os"
"path"
"sync"

"github.com/jfrog/jfrog-client-go/utils/errorutils"
)

// Represents a directory in the repo state snapshot.
Expand Down Expand Up @@ -220,31 +221,40 @@ func (node *Node) IsDoneExploring() (doneExploring bool, err error) {
func (node *Node) RestartExploring() error {
return node.action(func(node *Node) error {
node.NodeStatus = Exploring
node.children = nil
node.filesCount = 0
return nil
})
}

// Recursively find the node matching the path represented by the dirs array.
// The search is done by comparing the children of each node path, till reaching the final node in the array.
// If the node is not found, nil is returned.
// If the node is not found, it is added and then returned.
// For example:
// For a structure such as repo->dir1->dir2->dir3
// The initial call will be to the root, and for an input of ({"dir1","dir2"}), and the final output will be a pointer to dir2.
func (node *Node) findMatchingNode(childrenDirs []string) (matchingNode *Node, err error) {
err = node.action(func(node *Node) error {
// The node was found in the cache. Let's return it.
if len(childrenDirs) == 0 {
matchingNode = node
return nil
}

// Check if any of the current node's children are parents of the current node.
for i := range node.children {
if node.children[i].name == childrenDirs[0] {
matchingNode, err = node.children[i].findMatchingNode(childrenDirs[1:])
return err
}
}
return nil

// None of the current node's children are parents of the current node.
// This means we need to start creating the searched node parents.
newNode := CreateNewNode(childrenDirs[0], node)
newNode.parent = node
node.children = append(node.children, newNode)
matchingNode, err = newNode.findMatchingNode(childrenDirs[1:])
return err
})
return
}
Expand Down
19 changes: 0 additions & 19 deletions utils/reposnapshot/snapshotmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ func TestLookUpNodeAndActualPath(t *testing.T) {
{"dir on root", "2", false},
{"complex path with separator suffix", "1/a/", false},
{"complex path with no separator suffix", "1/a", false},
{"repository provided", path.Join("test-local", "2"), true},
{"relative path includes root", "./2", true},
{"dir doesn't exist", "no/where", true},
{"empty path", "", true},
}

Expand Down Expand Up @@ -187,22 +184,6 @@ func createNodeBase(t *testing.T, name string, filesCount int, parent *Node) *No
return node
}

func TestAddChildNode(t *testing.T) {
root := CreateNewNode(".", nil)
// Add child with no children pool.
addAndAssertChild(t, nil, root, CreateNewNode("no-pool", root))
// Add child with empty children pool.
addAndAssertChild(t, []*Node{}, root, CreateNewNode("empty-pool", root))
// Add child with pool.
exists := CreateNewNode("exists", root)
addAndAssertChild(t, []*Node{exists}, root, exists)
}

func addAndAssertChild(t *testing.T, childrenPool []*Node, root, expectedChild *Node) {
assert.NoError(t, root.AddChildNode(expectedChild.name, childrenPool))
assert.Equal(t, expectedChild, getChild(root, expectedChild.name))
}

func getChild(node *Node, childName string) *Node {
for _, child := range node.children {
if child.name == childName {
Expand Down
Loading