Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add strong consistency check for data on disk #1604

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
**Other Changes**
- Optimized listing operation on HNS account to support symlinks.

**Features**
- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.

## 2.4.0 (2024-12-03)
**Features**
- Added 'gen-config' command to auto generate the recommended blobfuse2 config file based on computing resources and memory available on the node. Command details can be found with `blobfuse2 gen-config --help`.
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ To learn about a specific command, just include the name of the command (For exa
* `--block-cache-prefetch=<Number of blocks>`: Number of blocks to prefetch at max when sequential reads are in progress. Default - 2 times number of CPU cores.
* `--block-cache-parallelism=<count>`: Number of parallel threads doing upload/download operation. Default - 3 times number of CPU cores.
* `--block-cache-prefetch-on-open=true`: Start prefetching on open system call instead of waiting for first read. Enhances perf if file is read sequentially from offset 0.
* `--block-cache-strong-consistency=true`: Enable strong data consistency checks in block-cache. This will increase load on your CPU and may introduce some latency.
This will need support of `xattr` on your system. Kindly install the feature manually before using this cli parameter.
syeleti-msft marked this conversation as resolved.
Show resolved Hide resolved
- Fuse options
* `--attr-timeout=<TIMEOUT IN SECONDS>`: Time the kernel can cache inode attributes.
* `--entry-timeout=<TIMEOUT IN SECONDS>`: Time the kernel can cache directory listing.
Expand Down
13 changes: 13 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/binary"
"fmt"
"hash/crc64"
"io"
"os"
"os/exec"
Expand Down Expand Up @@ -500,3 +502,14 @@ func WriteToFile(filename string, data string, options WriteToFileOptions) error

return nil
}

func GetCRC64(data []byte, len int) []byte {
// Create a CRC64 hash using the ECMA polynomial
crc64Table := crc64.MakeTable(crc64.ECMA)
checksum := crc64.Checksum(data[:len], crc64Table)

checksumBytes := make([]byte, 8)
binary.BigEndian.PutUint64(checksumBytes, checksum)

return checksumBytes
}
10 changes: 10 additions & 0 deletions common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ func (suite *utilTestSuite) TestWriteToFile() {

}

func (suite *utilTestSuite) TestCRC64() {
data := []byte("Hello World")
crc := GetCRC64(data, len(data))

data = []byte("Hello World!")
crc1 := GetCRC64(data, len(data))

suite.assert.NotEqual(crc, crc1)
}

func (suite *utilTestSuite) TestGetFuseMinorVersion() {
i := GetFuseMinorVersion()
suite.assert.GreaterOrEqual(i, 0)
Expand Down
102 changes: 79 additions & 23 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
package block_cache

import (
"bytes"
"container/list"
"context"
"encoding/base64"
Expand Down Expand Up @@ -84,6 +85,7 @@ type BlockCache struct {
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read
consistency bool // Flag to indicate if strong data consistency is enabled
stream *Stream
lazyWrite bool // Flag to indicate if lazy write is enabled
fileCloseOpt sync.WaitGroup // Wait group to wait for all async close operations to complete
Expand All @@ -99,6 +101,7 @@ type BlockCacheOptions struct {
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
Consistency bool `config:"consistency" yaml:"consistency,omitempty"`
}

const (
Expand Down Expand Up @@ -250,6 +253,8 @@ func (bc *BlockCache) Configure(_ bool) error {
bc.diskTimeout = conf.DiskTimeout
}

bc.consistency = conf.Consistency

bc.prefetchOnOpen = conf.PrefetchOnOpen
bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
bc.noPrefetch = false
Expand Down Expand Up @@ -331,8 +336,8 @@ func (bc *BlockCache) Configure(_ bool) error {
}
}

log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)
log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v, consistency %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch, bc.consistency)

return nil
}
Expand Down Expand Up @@ -947,31 +952,31 @@ func (bc *BlockCache) lineupDownload(handle *handlemap.Handle, block *Block, pre
}

// download : Method to download the given amount of data
func (bc *BlockCache) download(item *workItem) {
func (blockCache *BlockCache) download(item *workItem) {
fileName := fmt.Sprintf("%s::%v", item.handle.Path, item.block.id)

// filename_blockindex is the key for the lock
// this ensure that at a given time a block from a file is downloaded only once across all open handles
flock := bc.fileLocks.Get(fileName)
flock := blockCache.fileLocks.Get(fileName)
flock.Lock()
defer flock.Unlock()

var diskNode any
found := false
localPath := ""

if bc.tmpPath != "" {
if blockCache.tmpPath != "" {
// Update diskpolicy to reflect the new file
diskNode, found = bc.fileNodeMap.Load(fileName)
diskNode, found = blockCache.fileNodeMap.Load(fileName)
if !found {
diskNode = bc.diskPolicy.Add(fileName)
bc.fileNodeMap.Store(fileName, diskNode)
diskNode = blockCache.diskPolicy.Add(fileName)
blockCache.fileNodeMap.Store(fileName, diskNode)
} else {
bc.diskPolicy.Refresh(diskNode.(*list.Element))
blockCache.diskPolicy.Refresh(diskNode.(*list.Element))
}

// Check local file exists for this offset and file combination or not
localPath = filepath.Join(bc.tmpPath, fileName)
localPath = filepath.Join(blockCache.tmpPath, fileName)
_, err := os.Stat(localPath)

if err == nil {
Expand All @@ -983,32 +988,38 @@ func (bc *BlockCache) download(item *workItem) {
_ = os.Remove(localPath)
} else {
var successfulRead bool = true
n, err := f.Read(item.block.data)
numberOfBytes, err := f.Read(item.block.data)
if err != nil {
log.Err("BlockCache::download : Failed to read data from disk cache %s [%s]", fileName, err.Error())
successfulRead = false
_ = os.Remove(localPath)
}

if n != int(bc.blockSize) && item.block.offset+uint64(n) != uint64(item.handle.Size) {
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", bc.getBlockSize(uint64(item.handle.Size), item.block), n, item.handle.Size)
if numberOfBytes != int(blockCache.blockSize) && item.block.offset+uint64(numberOfBytes) != uint64(item.handle.Size) {
log.Err("BlockCache::download : Local data retrieved from disk size mismatch, Expected %v, OnDisk %v, fileSize %v", blockCache.getBlockSize(uint64(item.handle.Size), item.block), numberOfBytes, item.handle.Size)
successfulRead = false
_ = os.Remove(localPath)
}

f.Close()
// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete

if successfulRead {
item.block.Ready(BlockStatusDownloaded)
return
// If user has enabled consistency check then compute the md5sum and match it in xattr
successfulRead = checkBlockConsistency(blockCache, item, numberOfBytes, localPath, fileName)

// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete
if successfulRead {
item.block.Ready(BlockStatusDownloaded)
return
}
}
}
}
}

// If file does not exists then download the block from the container
n, err := bc.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
n, err := blockCache.NextComponent().ReadInBuffer(internal.ReadInBufferOptions{
Handle: item.handle,
Offset: int64(item.block.offset),
Data: item.block.data,
Expand All @@ -1022,21 +1033,21 @@ func (bc *BlockCache) download(item *workItem) {
return
}

if err != nil {
if err != nil && err != io.EOF {
// Fail to read the data so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
item.failCnt++
bc.threadPool.Schedule(false, item)
blockCache.threadPool.Schedule(false, item)
return
} else if n == 0 {
// No data read so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [0 bytes read]", item.handle.ID, item.handle.Path, item.block.id)
item.failCnt++
bc.threadPool.Schedule(false, item)
blockCache.threadPool.Schedule(false, item)
return
}

if bc.tmpPath != "" {
if blockCache.tmpPath != "" {
err := os.MkdirAll(filepath.Dir(localPath), 0777)
if err != nil {
log.Err("BlockCache::download : error creating directory structure for file %s [%s]", localPath, err.Error())
Expand All @@ -1053,14 +1064,47 @@ func (bc *BlockCache) download(item *workItem) {
}

f.Close()
bc.diskPolicy.Refresh(diskNode.(*list.Element))
blockCache.diskPolicy.Refresh(diskNode.(*list.Element))

// If user has enabled consistency check then compute the md5sum and save it in xattr
if blockCache.consistency {
hash := common.GetCRC64(item.block.data, n)
err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}

// Just mark the block that download is complete
item.block.Ready(BlockStatusDownloaded)
}

func checkBlockConsistency(blockCache *BlockCache, item *workItem, numberOfBytes int, localPath, fileName string) bool {
if !blockCache.consistency {
return true
}
// Calculate MD5 checksum of the read data
actualHash := common.GetCRC64(item.block.data, numberOfBytes)

// Retrieve MD5 checksum from xattr
xattrHash := make([]byte, 8)
_, err := syscall.Getxattr(localPath, "user.md5sum", xattrHash)
if err != nil {
log.Err("BlockCache::download : Failed to get md5sum for file %s [%v]", fileName, err.Error())
} else {
// Compare checksums
if !bytes.Equal(actualHash, xattrHash) {
log.Err("BlockCache::download : MD5 checksum mismatch for file %s, expected %v, got %v", fileName, xattrHash, actualHash)
_ = os.Remove(localPath)
return false
}
}

return true
}

// WriteFile: Write to the local file
func (bc *BlockCache) WriteFile(options internal.WriteFileOptions) (int, error) {
// log.Debug("BlockCache::WriteFile : Writing %v bytes from %s", len(options.Data), options.Handle.Path)
Expand Down Expand Up @@ -1451,6 +1495,15 @@ func (bc *BlockCache) upload(item *workItem) {
} else {
bc.diskPolicy.Refresh(diskNode.(*list.Element))
}

// If user has enabled consistency check then compute the md5sum and save it in xattr
if bc.consistency {
hash := common.GetCRC64(item.block.data, int(blockSize))
err = syscall.Setxattr(localPath, "user.md5sum", hash, 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}

Expand Down Expand Up @@ -1827,4 +1880,7 @@ func init() {

blockCachePrefetchOnOpen := config.AddBoolFlag("block-cache-prefetch-on-open", false, "Start prefetching on open or wait for first read.")
config.BindPFlag(compName+".prefetch-on-open", blockCachePrefetchOnOpen)

strongConsistency := config.AddBoolFlag("block-cache-strong-consistency", false, "Enable strong data consistency for block cache.")
config.BindPFlag(compName+".consistency", strongConsistency)
}
79 changes: 79 additions & 0 deletions component/block_cache/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2678,6 +2678,85 @@ func (suite *blockCacheTestSuite) TestZZZZZStreamToBlockCacheConfig() {
}
}

func (suite *blockCacheTestSuite) TestStrongConsistency() {
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)

tobj.blockCache.consistency = true

path := getTestFileName(suite.T().Name())
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())

storagePath := filepath.Join(tobj.fake_storage_path, path)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(0))
//Generate random size of file in bytes less than 2MB

size := rand.Intn(2097152)
data := make([]byte, size)

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // Write data to file
suite.assert.Nil(err)
suite.assert.Equal(n, size)
suite.assert.Equal(h.Size, int64(size))

err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

localPath := filepath.Join(tobj.disk_cache_path, path+"::0")

xattrMd5sumOrg := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sumOrg)
suite.assert.Nil(err)

h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
suite.assert.Nil(err)
suite.assert.NotNil(h)
_, _ = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

xattrMd5sumRead := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sumRead)
suite.assert.Nil(err)
suite.assert.EqualValues(xattrMd5sumOrg, xattrMd5sumRead)

err = syscall.Setxattr(localPath, "user.md5sum", []byte("000"), 0)
suite.assert.Nil(err)

xattrMd5sum1 := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sum1)
suite.assert.Nil(err)

h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
suite.assert.Nil(err)
suite.assert.NotNil(h)
_, _ = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

xattrMd5sum2 := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sum2)
suite.assert.Nil(err)

suite.assert.NotEqualValues(xattrMd5sum1, xattrMd5sum2)
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestBlockCacheTestSuite(t *testing.T) {
Expand Down
Loading