Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xload prefetch initial code #1585

Open
wants to merge 55 commits into
base: feature/xload
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
7ed70c3
Creating component
vibhansa-msft Apr 12, 2024
88ec516
Cleanup
vibhansa-msft Apr 12, 2024
3cbfc73
cleanup'
vibhansa-msft Apr 12, 2024
5019e96
Updating copyright statement
souravgupta-msft Apr 12, 2024
1dc590e
Adding block
vibhansa-msft Apr 12, 2024
0f14e2a
Adding block and blockpool
vibhansa-msft Apr 12, 2024
1c7e914
Merge branch 'feature/xload' of https://github.com/Azure/azure-storag…
vibhansa-msft Apr 12, 2024
4a54134
Adding threadpool
vibhansa-msft Apr 12, 2024
c309894
Base code
vibhansa-msft Apr 13, 2024
c60bad0
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Apr 15, 2024
d0bfd1b
changing mode type to enum
souravgupta-msft Apr 15, 2024
cad5756
changing mode type to enum (#1386)
souravgupta-msft Apr 16, 2024
141a549
Adding splitter logic
vibhansa-msft Apr 17, 2024
741a0ca
local listing
souravgupta-msft Apr 22, 2024
2329c4b
Added splitter logic
vibhansa-msft Apr 22, 2024
dc31b1e
Correct error handling
vibhansa-msft Apr 22, 2024
3d9357f
Sync with lister
vibhansa-msft Apr 22, 2024
0e90768
Updated
vibhansa-msft Apr 22, 2024
84b562e
Error handling
vibhansa-msft Apr 22, 2024
60cb00e
Adding remote lister
souravgupta-msft Apr 22, 2024
68e4bd8
log correction
souravgupta-msft Apr 22, 2024
a7b545f
sample
souravgupta-msft May 6, 2024
3faa5f2
lister
souravgupta-msft May 13, 2024
360bbec
adding xlister
souravgupta-msft Jun 7, 2024
e153b3f
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Jun 11, 2024
73a444c
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Jun 18, 2024
a5eca7a
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Jul 2, 2024
1f43c23
check
souravgupta-msft Jul 2, 2024
a840414
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Sep 24, 2024
93f0224
temp
souravgupta-msft Sep 24, 2024
26d19c9
test
souravgupta-msft Oct 7, 2024
9fb087e
code
souravgupta-msft Oct 9, 2024
177a177
more code
souravgupta-msft Oct 10, 2024
3243675
refactor
souravgupta-msft Oct 16, 2024
c27ed4b
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Oct 29, 2024
2aaca33
changes
souravgupta-msft Nov 4, 2024
4c18f76
xload to azstorage
souravgupta-msft Nov 4, 2024
3483983
sync with main
souravgupta-msft Nov 19, 2024
20c4853
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Nov 21, 2024
96e0968
changes
souravgupta-msft Nov 25, 2024
96a8e28
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Nov 25, 2024
9c054d3
more logs
souravgupta-msft Nov 27, 2024
523b64b
changes
souravgupta-msft Nov 27, 2024
2f9e8ec
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Dec 2, 2024
0f462cd
config
souravgupta-msft Dec 3, 2024
6ed0cb3
add readonly check
souravgupta-msft Dec 3, 2024
34e567e
Merge branch 'main' of https://github.com/Azure/azure-storage-fuse in…
souravgupta-msft Dec 4, 2024
a7c8fe2
Removing upload code
souravgupta-msft Dec 4, 2024
dd4840b
go version update
souravgupta-msft Dec 4, 2024
6f5121b
copyright fix
souravgupta-msft Dec 4, 2024
17a9985
Merge branch 'feature/xload' of https://github.com/Azure/azure-storag…
souravgupta-msft Dec 30, 2024
ce69c5e
refactoring
souravgupta-msft Dec 30, 2024
16eb04c
refactor
souravgupta-msft Dec 30, 2024
db6ffcc
Merge branch 'feature/xload' of https://github.com/Azure/azure-storag…
souravgupta-msft Jan 3, 2025
151a2fa
update copyright statements
souravgupta-msft Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ import (
_ "github.com/Azure/azure-storage-fuse/v2/component/file_cache"
_ "github.com/Azure/azure-storage-fuse/v2/component/libfuse"
_ "github.com/Azure/azure-storage-fuse/v2/component/loopback"
_ "github.com/Azure/azure-storage-fuse/v2/component/xload"
)
5 changes: 5 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ func NewUUID() (u uuid) {
return
}

// returns block id of given length
func GetBlockID(len int64) string {
return base64.StdEncoding.EncodeToString(NewUUIDWithLength(len))
}

func GetIdLength(id string) int64 {
existingBlockId, _ := base64.StdEncoding.DecodeString(id)
return int64(len(existingBlockId))
Expand Down
7 changes: 3 additions & 4 deletions component/azstorage/block_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ package azstorage
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -996,7 +995,7 @@ func (bb *BlockBlob) GetFileBlockOffsets(name string) (*common.BlockOffsetList,
}

func (bb *BlockBlob) createBlock(blockIdLength, startIndex, size int64) *common.Block {
newBlockId := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(blockIdLength))
newBlockId := common.GetBlockID(blockIdLength)
newBlock := &common.Block{
Id: newBlockId,
StartIndex: startIndex,
Expand Down Expand Up @@ -1080,14 +1079,14 @@ func (bb *BlockBlob) TruncateFile(name string, size int64) error {
blobClient := bb.Container.NewBlockBlobClient(blobName)

blkList := make([]string, 0)
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
id := common.GetBlockID(16)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider declaring 16 as a constant. This approach would make it easier to update the value in the future, as it would only need to be changed in one place. Additionally, assigning it to a well-named constant would enhance readability for developers.


for i := 0; size > 0; i++ {
if i == 0 || size < blkSize {
// Only first and last block we upload and rest all we replicate with the first block itself
if size < blkSize {
blkSize = size
id = base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
id = common.GetBlockID(16)
}
data := make([]byte, blkSize)

Expand Down
6 changes: 2 additions & 4 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ package block_cache
import (
"container/list"
"context"
"encoding/base64"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -1315,8 +1314,7 @@ func shouldCommitAndDownload(blockID int64, handle *handlemap.Handle) (bool, boo

// lineupUpload : Create a work item and schedule the upload
func (bc *BlockCache) lineupUpload(handle *handlemap.Handle, block *Block, listMap map[int64]*blockInfo) {

id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
id := common.GetBlockID(16)
listMap[block.id] = &blockInfo{
id: id,
committed: false,
Expand Down Expand Up @@ -1584,7 +1582,7 @@ func (bc *BlockCache) stageZeroBlock(handle *handlemap.Handle, tryCnt int) (stri
return "", fmt.Errorf("3 attempts to upload zero block have failed for %v=>%v", handle.ID, handle.Path)
}

id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
id := common.GetBlockID(16)

log.Debug("BlockCache::stageZeroBlock : Staging zero block for %v=>%v, try = %v", handle.ID, handle.Path, tryCnt)
err := bc.NextComponent().StageData(internal.StageDataOptions{
Expand Down
96 changes: 96 additions & 0 deletions component/xload/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____


Licensed under the MIT License <http://opensource.org/licenses/MIT>.

Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/

package xload

import (
"fmt"
"syscall"
)

// Block is a memory mapped buffer with its state to hold data
type Block struct {
index int // Index of the block in the pool
offset int64 // Start offset of the data this block holds
length int64 // Length of data that this block holds
id string // ID to represent this block in the blob
data []byte // Data this block holds
}

// AllocateBlock creates a new memory mapped buffer for the given size
func AllocateBlock(size uint64) (*Block, error) {
if size == 0 {
return nil, fmt.Errorf("invalid size")
}

prot, flags := syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE
addr, err := syscall.Mmap(-1, 0, int(size), prot, flags)

if err != nil {
return nil, fmt.Errorf("mmap error: %v", err)
}

block := &Block{
index: 0,
data: addr,
offset: 0,
length: 0,
id: "",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we omit explicit assignments for default values (index: 0, offset: 0, etc.) as they are implicit in Go. This could make the code cleaner unless clarity is the goal.

}

return block, nil
}

// Delete cleans up the memory mapped buffer
func (b *Block) Delete() error {
if b.data == nil {
return fmt.Errorf("invalid buffer")
}

err := syscall.Munmap(b.data)
b.data = nil
if err != nil {
// if we get here, there is likely memory corruption.
return fmt.Errorf("munmap error: %v", err)
}

return nil
}

// Clear the old data of this block
func (b *Block) ReUse() {
b.id = ""
b.index = 0
b.offset = 0
b.length = 0
}
118 changes: 118 additions & 0 deletions component/xload/blockpool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
_____ _____ _____ ____ ______ _____ ------
| | | | | | | | | | | | |
| | | | | | | | | | | | |
| --- | | | | |-----| |---- | | |-----| |----- ------
| | | | | | | | | | | | |
| ____| |_____ | ____| | ____| | |_____| _____| |_____ |_____


Licensed under the MIT License <http://opensource.org/licenses/MIT>.

Copyright © 2020-2024 Microsoft Corporation. All rights reserved.
Author : <[email protected]>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
*/

package xload

import "github.com/Azure/azure-storage-fuse/v2/common/log"

const _1MB uint64 = (1024 * 1024)

// BlockPool is a pool of Blocks
type BlockPool struct {
// Channel holding free blocks
blocksCh chan *Block

// Size of each block this pool holds
blockSize uint64

// Number of block that this pool can handle at max
maxBlocks uint32
}

// NewBlockPool allocates a new pool of blocks
func NewBlockPool(blockSize uint64, blockCount uint32) *BlockPool {
// Ignore if config is invalid
if blockSize == 0 {
log.Err("blockpool::NewBlockPool : blockSize : %v", blockSize)
return nil
}

pool := &BlockPool{
blocksCh: make(chan *Block, blockCount),
maxBlocks: uint32(blockCount),
blockSize: blockSize,
}

// Preallocate all blocks so that during runtime we do not spend CPU cycles on this
for i := (uint32)(0); i < blockCount; i++ {
b, err := AllocateBlock(blockSize)
if err != nil {
return nil
}

pool.blocksCh <- b
}

return pool
}

// Terminate ends the block pool life
func (pool *BlockPool) Terminate() {
close(pool.blocksCh)

// Release back the memory allocated to each block
for {
b := <-pool.blocksCh
if b == nil {
break
}
_ = b.Delete()
}
}

// Usage provides % usage of this block pool
func (pool *BlockPool) Usage() uint32 {
return ((pool.maxBlocks - (uint32)(len(pool.blocksCh))) * 100) / pool.maxBlocks
}

// Get a Block from the pool, return back if nothing is available
func (pool *BlockPool) Get() *Block {
// getting a block from pool will be a blocking operation if the pool is empty
b := <-pool.blocksCh

// Mark the buffer ready for reuse now
if b != nil {
b.ReUse()
}
return b
}

// Release back the Block to the pool
func (pool *BlockPool) Release(b *Block) {
select {
case pool.blocksCh <- b:
break
default:
_ = b.Delete()
}
}
Loading