Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add strong consistency check for data on disk #1604

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
**Bug Fixes**
- Create block pool only in the child process.

**Features**
- To protect against accidental overwrites on data stored by block-cache on temp path, md5 sums will be validated on read. This feature can be enabled by using `--block-cache-strong-consistency` cli flag.

## 2.4.0 (Unreleased)
**Features**
- Added 'gen-config' command to auto generate the recommended blobfuse2 config file based on computing resources and memory available on the node. Command details can be found with `blobfuse2 gen-config --help`.
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ To learn about a specific command, just include the name of the command (For exa
* `--block-cache-prefetch=<Number of blocks>`: Number of blocks to prefetch at max when sequential reads are in progress. Default - 2 times number of CPU cores.
* `--block-cache-parallelism=<count>`: Number of parallel threads doing upload/download operation. Default - 3 times number of CPU cores.
* `--block-cache-prefetch-on-open=true`: Start prefetching on open system call instead of waiting for first read. Enhances perf if file is read sequentially from offset 0.
* `--block-cache-strong-consistency=true`: Enable strong data consistency checks in block-cache. This will increase load on your CPU and may introduce some latency.
This will need support of `xattr` on your system. Kindly install the feature manually before using this cli parameter.
syeleti-msft marked this conversation as resolved.
Show resolved Hide resolved
- Fuse options
* `--attr-timeout=<TIMEOUT IN SECONDS>`: Time the kernel can cache inode attributes.
* `--entry-timeout=<TIMEOUT IN SECONDS>`: Time the kernel can cache directory listing.
Expand Down
67 changes: 60 additions & 7 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ package block_cache
import (
"container/list"
"context"
"crypto/md5"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -84,6 +86,7 @@ type BlockCache struct {
maxDiskUsageHit bool // Flag to indicate if we have hit max disk usage
noPrefetch bool // Flag to indicate if prefetch is disabled
prefetchOnOpen bool // Start prefetching on file open call instead of waiting for first read
consistency bool // Flag to indicate if strong data consistency is enabled
stream *Stream
lazyWrite bool // Flag to indicate if lazy write is enabled
fileCloseOpt sync.WaitGroup // Wait group to wait for all async close operations to complete
Expand All @@ -99,6 +102,7 @@ type BlockCacheOptions struct {
PrefetchCount uint32 `config:"prefetch" yaml:"prefetch,omitempty"`
Workers uint32 `config:"parallelism" yaml:"parallelism,omitempty"`
PrefetchOnOpen bool `config:"prefetch-on-open" yaml:"prefetch-on-open,omitempty"`
Consistency bool `config:"consistency" yaml:"consistency,omitempty"`
}

const (
Expand Down Expand Up @@ -250,6 +254,8 @@ func (bc *BlockCache) Configure(_ bool) error {
bc.diskTimeout = conf.DiskTimeout
}

bc.consistency = conf.Consistency

bc.prefetchOnOpen = conf.PrefetchOnOpen
bc.prefetch = uint32(math.Max((MIN_PREFETCH*2)+1, (float64)(2*runtime.NumCPU())))
bc.noPrefetch = false
Expand Down Expand Up @@ -331,8 +337,8 @@ func (bc *BlockCache) Configure(_ bool) error {
}
}

log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch)
log.Crit("BlockCache::Configure : block size %v, mem size %v, worker %v, prefetch %v, disk path %v, max size %v, disk timeout %v, prefetch-on-open %t, maxDiskUsageHit %v, noPrefetch %v, consistency %v",
bc.blockSize, bc.memSize, bc.workers, bc.prefetch, bc.tmpPath, bc.diskSize, bc.diskTimeout, bc.prefetchOnOpen, bc.maxDiskUsageHit, bc.noPrefetch, bc.consistency)

return nil
}
Expand Down Expand Up @@ -997,11 +1003,35 @@ func (bc *BlockCache) download(item *workItem) {
}

f.Close()
// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete

if successfulRead {
item.block.Ready(BlockStatusDownloaded)
return
// If user has enabled consistency check then compute the md5sum and match it in xattr
if bc.consistency {
vibhansa-msft marked this conversation as resolved.
Show resolved Hide resolved
// Calculate MD5 checksum of the read data
hash := md5.Sum(item.block.data[:n])
md5sum := hex.EncodeToString(hash[:])

// Retrieve MD5 checksum from xattr
xattrMd5sum := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sum)
if err != nil {
log.Err("BlockCache::download : Failed to get md5sum for file %s [%v]", fileName, err.Error())
} else {
// Compare checksums
if md5sum != string(xattrMd5sum) {
log.Err("BlockCache::download : MD5 checksum mismatch for file %s, expected %s, got %s", fileName, md5sum, string(xattrMd5sum))
successfulRead = false
_ = os.Remove(localPath)
}
}
}

// We have read the data from disk so there is no need to go over network
// Just mark the block that download is complete
if successfulRead {
item.block.Ready(BlockStatusDownloaded)
return
}
}
}
}
Expand All @@ -1022,7 +1052,7 @@ func (bc *BlockCache) download(item *workItem) {
return
}

if err != nil {
if err != nil && err != io.EOF {
// Fail to read the data so just reschedule this request
log.Err("BlockCache::download : Failed to read %v=>%s from offset %v [%s]", item.handle.ID, item.handle.Path, item.block.id, err.Error())
item.failCnt++
Expand Down Expand Up @@ -1054,6 +1084,16 @@ func (bc *BlockCache) download(item *workItem) {

f.Close()
bc.diskPolicy.Refresh(diskNode.(*list.Element))

// If user has enabled consistency check then compute the md5sum and save it in xattr
if bc.consistency {
hash := md5.Sum(item.block.data[:n])
md5sum := hex.EncodeToString(hash[:])
err = syscall.Setxattr(localPath, "user.md5sum", []byte(md5sum), 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}

Expand Down Expand Up @@ -1451,6 +1491,16 @@ func (bc *BlockCache) upload(item *workItem) {
} else {
bc.diskPolicy.Refresh(diskNode.(*list.Element))
}

// If user has enabled consistency check then compute the md5sum and save it in xattr
if bc.consistency {
hash := md5.Sum(item.block.data[:blockSize])
md5sum := hex.EncodeToString(hash[:])
err = syscall.Setxattr(localPath, "user.md5sum", []byte(md5sum), 0)
if err != nil {
log.Err("BlockCache::download : Failed to set md5sum for file %s [%v]", localPath, err.Error())
}
}
}
}

Expand Down Expand Up @@ -1797,4 +1847,7 @@ func init() {

blockCachePrefetchOnOpen := config.AddBoolFlag("block-cache-prefetch-on-open", false, "Start prefetching on open or wait for first read.")
config.BindPFlag(compName+".prefetch-on-open", blockCachePrefetchOnOpen)

strongConsistency := config.AddBoolFlag("block-cache-strong-consistency", false, "Enable strong data consistency for block cache.")
config.BindPFlag(compName+".consistency", strongConsistency)
}
80 changes: 80 additions & 0 deletions component/block_cache/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"path/filepath"
"strconv"
"strings"
"syscall"
"testing"
"time"

Expand Down Expand Up @@ -2625,6 +2626,85 @@ func (suite *blockCacheTestSuite) TestZZZZZStreamToBlockCacheConfig() {
}
}

func (suite *blockCacheTestSuite) TestStrongConsistency() {
tobj, err := setupPipeline("")
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)

tobj.blockCache.consistency = true

path := getTestFileName(suite.T().Name())
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())

storagePath := filepath.Join(tobj.fake_storage_path, path)
fs, err := os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(fs.Size(), int64(0))
//Generate random size of file in bytes less than 2MB

size := rand.Intn(2097152)
data := make([]byte, size)

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: 0, Data: data}) // Write data to file
suite.assert.Nil(err)
suite.assert.Equal(n, size)
suite.assert.Equal(h.Size, int64(size))

err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

localPath := filepath.Join(tobj.disk_cache_path, path+"::0")

xattrMd5sumOrg := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sumOrg)
suite.assert.Nil(err)

h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
suite.assert.Nil(err)
suite.assert.NotNil(h)
_, _ = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

xattrMd5sumRead := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sumRead)
suite.assert.Nil(err)
suite.assert.EqualValues(xattrMd5sumOrg, xattrMd5sumRead)

err = syscall.Setxattr(localPath, "user.md5sum", []byte("000"), 0)
suite.assert.Nil(err)

xattrMd5sum1 := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sum1)
suite.assert.Nil(err)

h, err = tobj.blockCache.OpenFile(internal.OpenFileOptions{Name: path, Flags: os.O_RDWR})
suite.assert.Nil(err)
suite.assert.NotNil(h)
_, _ = tobj.blockCache.ReadInBuffer(internal.ReadInBufferOptions{Handle: h, Offset: 0, Data: data})
err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)
suite.assert.Nil(h.Buffers.Cooked)
suite.assert.Nil(h.Buffers.Cooking)

xattrMd5sum2 := make([]byte, 32)
_, err = syscall.Getxattr(localPath, "user.md5sum", xattrMd5sum2)
suite.assert.Nil(err)

suite.assert.NotEqualValues(xattrMd5sum1, xattrMd5sum2)
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestBlockCacheTestSuite(t *testing.T) {
Expand Down
Loading