Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions internal/downloader/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,10 @@ func (c *DingCache) Close() error {
}
c.fileLock.Lock()
defer c.fileLock.Unlock()
if err := c.flushHeader(); err != nil {
return err
}
// 20250619 fix when the file is read, the update date is modified
// if err := c.flushHeader(); err != nil {
// return err
// }
c.path = ""
c.header = nil
c.isOpen = false
Expand Down Expand Up @@ -251,14 +252,6 @@ func (c *DingCache) getHeaderSize() int64 {
return c.header.GetHeaderSize()
}

func (c *DingCache) resizeHeader(blockNum, fileSize int64) error {
c.headerLock.RLock()
defer c.headerLock.RUnlock()
c.header.BlockNumber = blockNum
c.header.FileSize = fileSize
return c.header.ValidHeader()
}

func (c *DingCache) setHeaderBlock(blockIndex int64) error {
// c.headerLock.Lock()
// defer c.headerLock.Unlock()
Expand All @@ -272,15 +265,6 @@ func (c *DingCache) testHeaderBlock(blockIndex int64) (bool, error) {
return c.header.BlockMask.Test(blockIndex)
}

// padBlock 填充块数据
func (c *DingCache) padBlock(rawBlock []byte) []byte {
blockSize := c.getBlockSize()
if int64(len(rawBlock)) < blockSize {
return append(rawBlock, bytes.Repeat([]byte{0}, int(blockSize)-len(rawBlock))...)
}
return rawBlock
}

// HasBlock 检查块是否存在
func (c *DingCache) HasBlock(blockIndex int64) (bool, error) {
return c.testHeaderBlock(blockIndex)
Expand Down Expand Up @@ -385,6 +369,15 @@ func (c *DingCache) readBlockAndCache(f *os.File, blockIndex int64) {
}
}

// padBlock 填充块数据
func (c *DingCache) padBlock(rawBlock []byte) []byte {
blockSize := c.getBlockSize()
if int64(len(rawBlock)) < blockSize {
return append(rawBlock, bytes.Repeat([]byte{0}, int(blockSize)-len(rawBlock))...)
}
return rawBlock
}

func (c *DingCache) WriteBlock(blockIndex int64, blockBytes []byte) error {
if !c.isOpen {
return errors.New("this file has been closed")
Expand Down Expand Up @@ -427,6 +420,24 @@ func (c *DingCache) WriteBlock(blockIndex int64, blockBytes []byte) error {
return nil
}

func (c *DingCache) Resize(fileSize int64) error {
if !c.isOpen {
return errors.New("this file has been closed")
}
bs := c.getBlockSize()
newBlockNum := (fileSize + bs - 1) / bs
c.fileLock.Lock()
defer c.fileLock.Unlock()
if err := c.resizeFileSize(fileSize); err != nil {
return err
}
// 设置块数量、文件大小参数
if err := c.resizeHeader(newBlockNum, fileSize); err != nil {
return err
}
return c.flushHeader()
}

// resizeFileSize 调整文件大小
func (c *DingCache) resizeFileSize(fileSize int64) error {
if !c.isOpen {
Expand Down Expand Up @@ -457,23 +468,12 @@ func (c *DingCache) resizeFileSize(fileSize int64) error {
return nil
}

// Resize 调整缓存大小
func (c *DingCache) Resize(fileSize int64) error {
if !c.isOpen {
return errors.New("this file has been closed")
}
bs := c.getBlockSize()
newBlockNum := (fileSize + bs - 1) / bs
c.fileLock.Lock()
defer c.fileLock.Unlock()
if err := c.resizeFileSize(fileSize); err != nil {
return err
}
// 设置块数量、文件大小参数
if err := c.resizeHeader(newBlockNum, fileSize); err != nil {
return err
}
return c.flushHeader()
func (c *DingCache) resizeHeader(blockNum, fileSize int64) error {
c.headerLock.RLock()
defer c.headerLock.RUnlock()
c.header.BlockNumber = blockNum
c.header.FileSize = fileSize
return c.header.ValidHeader()
}

func (c *DingCache) getBlockKey(blockIndex int64) string {
Expand Down
2 changes: 1 addition & 1 deletion internal/downloader/file_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (f *DingCacheManager) GetDingFile(savePath string, fileSize int64) (*DingCa
zap.S().Errorf("NewDingCache err.%v", err)
return nil, err
}
if dingFile.GetFileSize() == 0 {
if dingFile.GetFileSize() == 0 { // 表示首次获取当前文件句柄,需要Resize。
if err = dingFile.Resize(fileSize); err != nil {
zap.S().Errorf("Resize err.%v", err)
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion internal/downloader/remote_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r RemoteFileTask) DoTask() {
if config.SysConfig.EnableMetric() {
// 原子性地更新总下载字节数
source := util.Itoa(r.Context.Value(consts.PromSource))
prom.PromRequestByteCounter(prom.RequestRemoteByte, source, chunkLen)
prom.PromRequestByteCounter(prom.RequestRemoteByte, source, chunkLen, r.orgRepo)
}

if len(chunk) != 0 {
Expand Down
4 changes: 3 additions & 1 deletion internal/handler/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (handler *FileHandler) GetFileHandler3(c echo.Context) error {

func (handler *FileHandler) fileGetCommon(c echo.Context, repoType, org, repo, commit, filePath string) error {
if config.SysConfig.EnableMetric() {
orgRepo := fmt.Sprintf("%s/%s", org, repo)
c.Set(consts.PromOrgRepo, orgRepo)
labels := prometheus.Labels{}
labels[repoType] = fmt.Sprintf("%s/%s", org, repo)
labels[repoType] = orgRepo
source := util.Itoa(c.Get(consts.PromSource))
if _, ok := consts.RepoTypesMapping[repoType]; ok {
labels["source"] = source
Expand Down
4 changes: 4 additions & 0 deletions internal/handler/meta_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package handler

import (
"fmt"
"strings"

"dingospeed/internal/service"
"dingospeed/pkg/consts"

"github.com/labstack/echo/v4"
)
Expand All @@ -38,6 +40,8 @@ func (handler *MetaHandler) MetaProxyCommonHandler(c echo.Context) error {
repo := c.Param("repo")
commit := c.Param("commit")
method := strings.ToLower(c.Request().Method)
orgRepo := fmt.Sprintf("%s/%s", org, repo)
c.Set(consts.PromOrgRepo, orgRepo)
return handler.metaService.MetaProxyCommon(c, repoType, org, repo, commit, method)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/service/file_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package service

import (
"fmt"

"dingospeed/internal/dao"
"dingospeed/pkg/config"
"dingospeed/pkg/consts"
Expand All @@ -35,6 +37,8 @@ func NewFileService(fileDao *dao.FileDao) *FileService {
}

func (d *FileService) FileHeadCommon(c echo.Context, repoType, org, repo, commit, filePath string) error {
orgRepo := fmt.Sprintf("%s/%s", org, repo)
c.Set(consts.PromOrgRepo, orgRepo)
commitSha, err := d.getFileCommitSha(c, repoType, org, repo, commit)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ const (

const RespChanSize = 100
const PromSource = "source"
const PromOrgRepo = "orgRepo"
7 changes: 4 additions & 3 deletions pkg/prom/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ var (
RequestRemoteByte = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "request_remote_byte",
Help: "Total number of request remote byte",
}, []string{"source"})
}, []string{"source", "orgRepo"})

RequestResponseByte = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "request_response_byte",
Help: "Total number of request response byte",
}, []string{"source"})
}, []string{"source", "orgRepo"})
)

func PromSourceCounter(vec *prometheus.GaugeVec, source string) {
Expand All @@ -63,8 +63,9 @@ func PromSourceCounter(vec *prometheus.GaugeVec, source string) {
vec.With(labels).Inc()
}

func PromRequestByteCounter(vec *prometheus.CounterVec, source string, len int64) {
func PromRequestByteCounter(vec *prometheus.CounterVec, source string, len int64, orgRepo string) {
labels := prometheus.Labels{}
labels["source"] = source
labels["orgRepo"] = orgRepo
vec.With(labels).Add(float64(len))
}
3 changes: 2 additions & 1 deletion pkg/util/http_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ func ResponseStream(c echo.Context, fileName string, headers map[string]string,
if config.SysConfig.EnableMetric() {
// 原子性地更新响应总数
source := Itoa(c.Get(consts.PromSource))
prom.PromRequestByteCounter(prom.RequestResponseByte, source, int64(len(b)))
orgRepo := Itoa(c.Get(consts.PromOrgRepo))
prom.PromRequestByteCounter(prom.RequestResponseByte, source, int64(len(b)), orgRepo)
}
}
flusher.Flush()
Expand Down
48 changes: 38 additions & 10 deletions repair/data_repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,30 @@ func init() {
}

func main() {
fmt.Println("starting data repair....")
if repoPathParam == "" || repoTypeParam == "" {
log.Errorf("repoPath,repoType不能为空")
return
}
if exist := util.FileExists(repoPathParam); !exist {
log.Errorf("repoPath:%s目录不存在", repoPathParam)
return
}
if repoTypeParam != "models" && repoTypeParam != "datasets" {
log.Errorf("repoType的可选值为models或datasets。")
return
}
typePath := fmt.Sprintf("%s/api/%s", repoPathParam, repoTypeParam)
if exist := util.FileExists(typePath); !exist {
log.Errorf("不存在类型为%s的缓存数据", repoTypeParam)
return
}

if orgParam != "" && repoParam != "" {
repoRepair(repoPathParam, repoTypeParam, orgParam, repoParam)
} else {
if orgParam != "" && repoParam == "" {
orgRepair(repoPathParam, repoTypeParam, orgParam)
} else if orgParam == "" && repoParam == "" {
typePath := fmt.Sprintf("%s/api/%s", repoPathParam, repoTypeParam)
// 读取目录内容
orgEntries, err := os.ReadDir(typePath)
if err != nil {
Expand Down Expand Up @@ -97,17 +109,18 @@ func repoRepair(repoPath, repoType, org, repo string) {
}
filePath := fmt.Sprintf("%s/files/%s/%s/%s", repoPath, repoType, org, repo)
if exist := util.FileExists(filePath); !exist {
log.Errorf("不存在org:%s, repo:%s的缓存数据", org, repo)
return
}
fileBlobs := fmt.Sprintf("%s/blobs", filePath)
if exist := util.FileExists(fileBlobs); exist {
// log.Errorf(fmt.Sprintf("该仓库已完成修复:%s", fileBlobs))
// return
log.Infof(fmt.Sprintf("该仓库已完成修复:%s", fileBlobs))
return
}
metaGetPath := fmt.Sprintf("%s/api/%s/%s/%s/revision/main/meta_get.json", repoPath, repoType, org, repo)
if exist := util.FileExists(metaGetPath); !exist {
// log.Errorf(fmt.Sprintf("该%s/%s不存在meta_get文件,无法修复.", org, repo))
// return
log.Errorf(fmt.Sprintf("该%s/%s不存在meta_get文件,无法修复.", org, repo))
return
}
log.Infof("start repair:%s/%s/%s", repoType, org, repo)
cacheContent, err := ReadCacheRequest(metaGetPath)
Expand Down Expand Up @@ -141,7 +154,7 @@ func repoRepair(repoPath, repoType, org, repo string) {
}
filePath = fmt.Sprintf("%s/files/%s/%s/%s/resolve/%s/%s", repoPath, repoType, org, repo, sha.Sha, fileName)
if exist := util.FileExists(filePath); !exist {
log.Warnf(fmt.Sprintf("该文件%s不存在.", filePath))
// 文件不存在,则无需处理,直接跳过
continue
}
newBlobsFilePath := fmt.Sprintf("%s/files/%s/%s/%s/blobs/%s", repoPath, repoType, org, repo, etag)
Expand All @@ -154,6 +167,8 @@ func repoRepair(repoPath, repoType, org, repo string) {
}
// 删除其他版本
resolvePath := fmt.Sprintf("%s/files/%s/%s/%s/resolve", repoPath, repoType, org, repo)
revisionPath := fmt.Sprintf("%s/api/%s/%s/%s/revision", repoPath, repoType, org, repo)
pathInfoPath := fmt.Sprintf("%s/api/%s/%s/%s/paths-info", repoPath, repoType, org, repo)
entries, err := os.ReadDir(resolvePath)
if err != nil {
fmt.Printf("读取目录失败: %v\n", err)
Expand All @@ -162,10 +177,23 @@ func repoRepair(repoPath, repoType, org, repo string) {
for _, entry := range entries {
if entry.IsDir() {
if entry.Name() != sha.Sha {
err := os.RemoveAll(fmt.Sprintf("%s/%s", resolvePath, entry.Name()))
// 清除resolve目录
rpp := fmt.Sprintf("%s/%s", resolvePath, entry.Name())
err = os.RemoveAll(rpp)
if err != nil {
fmt.Printf("删除resolve目录%s失败: %v\n", rpp, err)
}
// 清除revision目录
revisionP := fmt.Sprintf("%s/%s", revisionPath, entry.Name())
err = os.RemoveAll(revisionP)
if err != nil {
fmt.Printf("删除revision目录%s失败: %v\n", revisionP, err)
}
// 清除paths-info目录
ppth := fmt.Sprintf("%s/%s", pathInfoPath, entry.Name())
err = os.RemoveAll(ppth)
if err != nil {
fmt.Printf("删除目录失败: %v\n", err)
continue
fmt.Printf("删除pathinfo目录%s失败: %v\n", ppth, err)
}
}
}
Expand Down