Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(processor): replace sync.Map with xsync.MapOf #135

Merged
merged 2 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/moistari/rls v0.5.12
github.com/mrobinsn/go-tvmaze v1.2.1
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (c *AppConfig) load(configPath string) {

// read config
if err := viper.ReadInConfig(); err != nil {
log.Printf("config read error: %q", err)
log.Fatalf("config read error: %q", err)
}

if err := viper.Unmarshal(c.Config); err != nil {
Expand Down
65 changes: 30 additions & 35 deletions internal/http/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/autobrr/go-qbittorrent"
"github.com/moistari/rls"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -54,9 +55,9 @@ type matchPaths struct {
}

var (
clientMap sync.Map
matchesMap sync.Map
torrentMap sync.Map
clientMap = xsync.NewMapOf[string, *qbittorrent.Client]()
matchesMap = xsync.NewMapOf[string, []matchPaths]()
torrentMap = xsync.NewMapOf[string, *torrentRlsEntries]()
)

func newProcessor(log logger.Logger, config *config.AppConfig, notification domain.Sender) *processor {
Expand All @@ -67,43 +68,37 @@ func newProcessor(log logger.Logger, config *config.AppConfig, notification doma
}
}

func (p *processor) getClient(client *domain.Client) error {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

c, ok := clientMap.Load(clientCfg)
func (p *processor) getClient(client *domain.Client, clientName string) error {
c, ok := clientMap.Load(clientName)
if !ok {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

c = qbittorrent.NewClient(clientCfg)

if err := c.(*qbittorrent.Client).Login(); err != nil {
if err := c.Login(); err != nil {
return errors.Wrap(err, "failed to login to qbittorrent")
}

clientMap.Store(clientCfg, c)
clientMap.Store(clientName, c)
}

p.req.Client = c.(*qbittorrent.Client)
p.req.Client = c
return nil
}

func (p *processor) getAllTorrents(client *domain.Client) torrentRlsEntries {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

func (p *processor) getAllTorrents(clientName string) torrentRlsEntries {
f := func() *torrentRlsEntries {
tre, ok := torrentMap.Load(clientCfg)
tre, ok := torrentMap.Load(clientName)
if ok {
return tre.(*torrentRlsEntries)
return tre
}

entries := &torrentRlsEntries{rlsMap: make(map[string]rls.Release)}
torrentMap.Store(clientCfg, entries)
torrentMap.Store(clientName, entries)
return entries
}

Expand Down Expand Up @@ -140,7 +135,7 @@ func (p *processor) getAllTorrents(client *domain.Client) torrentRlsEntries {
entries.entriesMap[fmtTitle] = append(entries.entriesMap[fmtTitle], domain.Entry{T: t, R: r})
}

torrentMap.Store(clientCfg, entries)
torrentMap.Store(clientName, entries)
return *entries
}

Expand Down Expand Up @@ -203,21 +198,21 @@ func (p *processor) processSeasonPack() (int, error) {
return c.Str("release", p.req.Name).Str("clientname", clientName)
})

client, ok := p.cfg.Config.Clients[clientName]
clientCfg, ok := p.cfg.Config.Clients[clientName]
if !ok {
return domain.StatusClientNotFound, fmt.Errorf("client not found in config")
}
p.log.Info().Msgf("using %s client serving at %s:%d", clientName, client.Host, client.Port)
p.log.Info().Msgf("using %s client serving at %s:%d", clientName, clientCfg.Host, clientCfg.Port)

if len(p.req.Name) == 0 {
return domain.StatusAnnounceNameError, fmt.Errorf("couldn't get announce name")
}

if err := p.getClient(client); err != nil {
if err := p.getClient(clientCfg, clientName); err != nil {
return domain.StatusGetClientError, err
}

tre := p.getAllTorrents(client)
tre := p.getAllTorrents(clientName)
if tre.err != nil {
return domain.StatusGetTorrentsError, tre.err
}
Expand Down Expand Up @@ -321,7 +316,7 @@ func (p *processor) processSeasonPack() (int, error) {

epRls := rls.ParseString(entry.T.Name)
epPathClient := filepath.Join(entry.T.SavePath, fileName)
announcedEpPath := filepath.Join(client.PreImportPath, announcedPackName, filepath.Base(fileName))
announcedEpPath := filepath.Join(clientCfg.PreImportPath, announcedPackName, filepath.Base(fileName))

matchedEps = append(matchedEps, epRls.Episode)

Expand All @@ -338,7 +333,7 @@ func (p *processor) processSeasonPack() (int, error) {
oldMatches = currentMatch
}

newMatches := append(oldMatches.([]matchPaths), currentMatch...)
newMatches := append(oldMatches, currentMatch...)
matchesMap.Store(p.req.Name, newMatches)
p.log.Debug().Msgf("matched torrent from client: name(%s), size(%d), hash(%s)",
entry.T.Name, size, entry.T.Hash)
Expand Down Expand Up @@ -376,7 +371,7 @@ func (p *processor) processSeasonPack() (int, error) {
return domain.StatusSuccessfulMatch, nil
}

matches := utils.DedupeSlice(matchesSlice.([]matchPaths))
matches := utils.DedupeSlice(matchesSlice)
var hardlinkRespCodes []int

for _, match := range matches {
Expand Down Expand Up @@ -440,7 +435,7 @@ func (p *processor) parseTorrent() (int, error) {
return c.Str("release", p.req.Name).Str("clientname", clientName)
})

client, ok := p.cfg.Config.Clients[clientName]
clientCfg, ok := p.cfg.Config.Clients[clientName]
if !ok {
return domain.StatusClientNotFound, fmt.Errorf("client not found in config")
}
Expand Down Expand Up @@ -479,13 +474,13 @@ func (p *processor) parseTorrent() (int, error) {
return domain.StatusNoMatches, fmt.Errorf("no matching releases in client")
}

matches := utils.DedupeSlice(matchesSlice.([]matchPaths))
matches := utils.DedupeSlice(matchesSlice)
var hardlinkRespCodes []int
var matchedEpPath string
var matchErr error
var targetEpPath string

targetPackDir := filepath.Join(client.PreImportPath, parsedPackName)
targetPackDir := filepath.Join(clientCfg.PreImportPath, parsedPackName)

for _, match := range matches {
for _, torrentEp := range torrentEps {
Expand Down
Loading