Skip to content

Commit

Permalink
refactor(processor): replace sync.Map with xsync.MapOf
Browse files Browse the repository at this point in the history
  • Loading branch information
nuxencs committed Sep 7, 2024
1 parent 040f390 commit dfd405a
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 34 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/moistari/rls v0.5.12
github.com/mrobinsn/go-tvmaze v1.2.1
github.com/pkg/errors v0.9.1
github.com/puzpuzpuz/xsync/v3 v3.4.0
github.com/rs/zerolog v1.33.0
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (c *AppConfig) load(configPath string) {

// read config
if err := viper.ReadInConfig(); err != nil {
log.Printf("config read error: %q", err)
log.Fatalf("config read error: %q", err)
}

if err := viper.Unmarshal(c.Config); err != nil {
Expand Down
61 changes: 28 additions & 33 deletions internal/http/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/autobrr/go-qbittorrent"
"github.com/moistari/rls"
"github.com/puzpuzpuz/xsync/v3"
"github.com/rs/zerolog"
)

Expand Down Expand Up @@ -54,9 +55,9 @@ type matchPaths struct {
}

var (
clientMap sync.Map
matchesMap sync.Map
torrentMap sync.Map
clientMap = xsync.NewMapOf[string, *qbittorrent.Client]()
matchesMap = xsync.NewMapOf[string, []matchPaths]()
torrentMap = xsync.NewMapOf[string, *torrentRlsEntries]()
)

func newProcessor(log logger.Logger, config *config.AppConfig, notification domain.Sender) *processor {
Expand All @@ -67,43 +68,37 @@ func newProcessor(log logger.Logger, config *config.AppConfig, notification doma
}
}

func (p *processor) getClient(client *domain.Client) error {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

c, ok := clientMap.Load(clientCfg)
func (p *processor) getClient(client *domain.Client, clientName string) error {
c, ok := clientMap.Load(clientName)
if !ok {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

c = qbittorrent.NewClient(clientCfg)

if err := c.(*qbittorrent.Client).Login(); err != nil {
if err := c.Login(); err != nil {
return errors.Wrap(err, "failed to login to qbittorrent")
}

clientMap.Store(clientCfg, c)
clientMap.Store(clientName, c)
}

p.req.Client = c.(*qbittorrent.Client)
p.req.Client = c
return nil
}

func (p *processor) getAllTorrents(client *domain.Client) torrentRlsEntries {
clientCfg := qbittorrent.Config{
Host: fmt.Sprintf("http://%s:%d", client.Host, client.Port),
Username: client.Username,
Password: client.Password,
}

func (p *processor) getAllTorrents(clientName string) torrentRlsEntries {
f := func() *torrentRlsEntries {
tre, ok := torrentMap.Load(clientCfg)
tre, ok := torrentMap.Load(clientName)
if ok {
return tre.(*torrentRlsEntries)
return tre
}

entries := &torrentRlsEntries{rlsMap: make(map[string]rls.Release)}
torrentMap.Store(clientCfg, entries)
torrentMap.Store(clientName, entries)
return entries
}

Expand Down Expand Up @@ -140,7 +135,7 @@ func (p *processor) getAllTorrents(client *domain.Client) torrentRlsEntries {
entries.entriesMap[fmtTitle] = append(entries.entriesMap[fmtTitle], domain.Entry{T: t, R: r})
}

torrentMap.Store(clientCfg, entries)
torrentMap.Store(clientName, entries)
return *entries
}

Expand Down Expand Up @@ -203,21 +198,21 @@ func (p *processor) processSeasonPack() (int, error) {
return c.Str("release", p.req.Name).Str("clientname", clientName)
})

client, ok := p.cfg.Config.Clients[clientName]
clientCfg, ok := p.cfg.Config.Clients[clientName]
if !ok {
return domain.StatusClientNotFound, fmt.Errorf("client not found in config")
}
p.log.Info().Msgf("using %s client serving at %s:%d", clientName, client.Host, client.Port)
p.log.Info().Msgf("using %s client serving at %s:%d", clientName, clientCfg.Host, clientCfg.Port)

if len(p.req.Name) == 0 {
return domain.StatusAnnounceNameError, fmt.Errorf("couldn't get announce name")
}

if err := p.getClient(client); err != nil {
if err := p.getClient(clientCfg, clientName); err != nil {
return domain.StatusGetClientError, err
}

tre := p.getAllTorrents(client)
tre := p.getAllTorrents(clientName)
if tre.err != nil {
return domain.StatusGetTorrentsError, tre.err
}
Expand Down Expand Up @@ -321,7 +316,7 @@ func (p *processor) processSeasonPack() (int, error) {

epRls := rls.ParseString(entry.T.Name)
epPathClient := filepath.Join(entry.T.SavePath, fileName)
announcedEpPath := filepath.Join(client.PreImportPath, announcedPackName, filepath.Base(fileName))
announcedEpPath := filepath.Join(clientCfg.PreImportPath, announcedPackName, filepath.Base(fileName))

matchedEps = append(matchedEps, epRls.Episode)

Expand All @@ -338,7 +333,7 @@ func (p *processor) processSeasonPack() (int, error) {
oldMatches = currentMatch
}

newMatches := append(oldMatches.([]matchPaths), currentMatch...)
newMatches := append(oldMatches, currentMatch...)
matchesMap.Store(p.req.Name, newMatches)
p.log.Debug().Msgf("matched torrent from client: name(%s), size(%d), hash(%s)",
entry.T.Name, size, entry.T.Hash)
Expand Down Expand Up @@ -376,7 +371,7 @@ func (p *processor) processSeasonPack() (int, error) {
return domain.StatusSuccessfulMatch, nil
}

matches := utils.DedupeSlice(matchesSlice.([]matchPaths))
matches := utils.DedupeSlice(matchesSlice)
var hardlinkRespCodes []int

for _, match := range matches {
Expand Down Expand Up @@ -479,7 +474,7 @@ func (p *processor) parseTorrent() (int, error) {
return domain.StatusNoMatches, fmt.Errorf("no matching releases in client")
}

matches := utils.DedupeSlice(matchesSlice.([]matchPaths))
matches := utils.DedupeSlice(matchesSlice)
var hardlinkRespCodes []int
var matchedEpPath string
var matchErr error
Expand Down

0 comments on commit dfd405a

Please sign in to comment.