Skip to content

新增基于状态机模型的规则模块 #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -403,5 +403,10 @@
<scope>provided</scope>
<classifier>linux</classifier>
</dependency>
<dependency>
<groupId>com.alibaba.cola</groupId>
<artifactId>cola-component-statemachine</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
</project>
98 changes: 70 additions & 28 deletions src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.ghostchu.peerbanhelper;

import com.ghostchu.peerbanhelper.alert.AlertManager;
import com.ghostchu.peerbanhelper.btn.BtnNetwork;
import com.ghostchu.peerbanhelper.database.DatabaseHelper;
import com.ghostchu.peerbanhelper.database.DatabaseManager;
Expand Down Expand Up @@ -72,6 +71,8 @@ public class PeerBanHelperServer {
@Getter
private final long banDuration;
@Getter
private final long disconnectTimeout;
@Getter
private final int httpdPort;
@Getter
private final boolean hideFinishLogs;
Expand All @@ -96,21 +97,21 @@ public class PeerBanHelperServer {
@Getter
private DatabaseHelper databaseHelper;
@Getter
private ModuleManager moduleManager;
private final ModuleManager moduleManager;
@Getter
@Nullable
private IPDB ipdb = null;
private WatchDog banWaveWatchDog;
@Getter
private JavalinWebContainer webContainer;
@Getter
private AlertManager alertManager;

private final Map<String, PeerMatchRecord> matchRecords = new ConcurrentHashMap<>();

public PeerBanHelperServer(String pbhServerAddress, YamlConfiguration profile, YamlConfiguration mainConfig) throws SQLException {
this.pbhServerAddress = pbhServerAddress;
this.profile = profile;
this.banDuration = profile.getLong("ban-duration");
this.disconnectTimeout = profile.getLong("disconnect-timeout");
this.mainConfig = mainConfig;
this.httpdPort = mainConfig.getInt("server.http");
this.hideFinishLogs = mainConfig.getBoolean("logger.hide-finish-log");
Expand Down Expand Up @@ -377,7 +378,7 @@ public void banWave() {
banWaveWatchDog.setLastOperation("Reset last status");
// 声明基本集合
// 需要重启的种子列表
Map<Downloader, Collection<Torrent>> needRelaunched = new ConcurrentHashMap<>();
Map<Downloader, List<Torrent>> needRelaunched = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the banWave method to improve readability and reduce complexity.

The method is very long and handles multiple responsibilities. Consider breaking it down into smaller, more manageable methods, each handling a specific part of the process.

Also applies to: 393-393, 395-395, 397-397, 399-399, 401-401, 403-403, 405-405, 407-407, 409-409, 427-427, 429-429, 431-431, 433-433, 435-435, 437-437

// 被解除封禁的对等体列表
banWaveWatchDog.setLastOperation("Remove expired bans");
Collection<BanMetadata> unbannedPeers = removeExpiredBans();
Expand All @@ -389,6 +390,27 @@ public void banWave() {
// 更新 LIVE_PEERS 用于数据展示
banWaveWatchDog.setLastOperation("Update live peers");
executor.submit(() -> updateLivePeers(peers));
// ===============基于 状态机 的封禁逻辑
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑拆分成多个函数以降低复杂度

banWaveWatchDog.setLastOperation("Check Bans New");
// 按模块并行检查Peer
List<FeatureModule> ruleBlockers = moduleManager.getModules().stream().filter(ele -> ele instanceof RuleBlocker).toList();
try (TimeoutProtect protect = new TimeoutProtect(ExceptedTime.RUN_BLOCKER.getTimeout(), (t) -> {
log.warn(Lang.TIMING_CHECK_BANS);
})) {
ruleBlockers.forEach(ele -> {
RuleBlocker blocker = (RuleBlocker) ele;
protect.getService().submit(() -> {
try (TimeoutProtect banProtect = new TimeoutProtect(ExceptedTime.BAN_PEER.getTimeout(), (t) -> log.warn(Lang.TIMING_ADD_BANS))) {
blocker.runCheck(
null, (record) ->
banProtect.getService().submit(() ->
banPeer(record.getDownloader(), record.getTorrent(), record.getPeer(), record.getResult().getModuleContext().getClass().getName(), record.getResult().getRule(), record.getResult().getReason(), bannedPeers, needRelaunched)
), null, null
);
}
});
});
}
// ========== 处理封禁逻辑 ==========
Map<Downloader, List<BanDetail>> downloaderBanDetailMap = new ConcurrentHashMap<>();
banWaveWatchDog.setLastOperation("Check Bans");
Expand All @@ -402,25 +424,19 @@ public void banWave() {
try (TimeoutProtect protect = new TimeoutProtect(ExceptedTime.ADD_BAN_ENTRY.getTimeout(), (t) -> {
log.warn(Lang.TIMING_ADD_BANS);
})) {
downloaderBanDetailMap.forEach((downloader, details) -> {
List<Torrent> relaunch = Collections.synchronizedList(new ArrayList<>());
details.forEach(detail -> {
protect.getService().submit(() -> {
downloaderBanDetailMap.forEach((downloader, details) -> Optional.ofNullable(needRelaunched.get(downloader)).ifPresentOrElse(torrents ->
details.forEach(detail -> protect.getService().submit(() -> {
if (detail.result().action() == PeerAction.BAN) {
IPDBResponse ipdbResponse = queryIPDB(detail.peer().getAddress());
BanMetadata banMetadata = new BanMetadata(detail.result().moduleContext().getClass().getName(), downloader.getName(),
System.currentTimeMillis(), System.currentTimeMillis() + banDuration,
detail.torrent(), detail.peer(), detail.result().rule(), detail.result().reason(),
ipdbResponse.cityResponse(), ipdbResponse.asnResponse());
bannedPeers.add(banMetadata);
relaunch.add(detail.torrent());
banPeer(banMetadata, detail.torrent(), detail.peer());
log.warn(Lang.BAN_PEER, detail.peer().getAddress(), detail.peer().getPeerId(), detail.peer().getClientName(), detail.peer().getProgress(), detail.peer().getUploaded(), detail.peer().getDownloaded(), detail.torrent().getName(), detail.result().reason());
banPeer(downloader, detail.torrent(), detail.peer(), detail.result().moduleContext().getClass().getName(), detail.result().rule(), detail.result().reason(), bannedPeers, needRelaunched);
}
});
});
needRelaunched.put(downloader, relaunch);
});
})), () -> {
details.forEach(detail -> protect.getService().submit(() -> {
if (detail.result().action() == PeerAction.BAN) {
banPeer(downloader, detail.torrent(), detail.peer(), detail.result().moduleContext().getClass().getName(), detail.result().rule(), detail.result().reason(), bannedPeers, needRelaunched);
}
}));
//needRelaunched.put(downloader, torrents);
}));
}
banWaveWatchDog.setLastOperation("Apply banlist");
// 如果需要,则应用更改封禁列表到下载器
Expand Down Expand Up @@ -477,6 +493,16 @@ private void updateLivePeers(Map<Downloader, Map<Torrent, List<Peer>>> peers) {
torrent, p, ipdbResponse.cityResponse(), ipdbResponse.asnResponse()
);
livePeers.put(address, metadata);
// 更新匹配记录
String recordKey = downloader.getName() + "@" + torrent.getHash() + "@" + address.getIp();
if (matchRecords.containsKey(recordKey)) {
PeerMatchRecord peerMatchRecord = matchRecords.get(recordKey);
peerMatchRecord.setDownloader(downloader);
peerMatchRecord.setTorrent(torrent);
peerMatchRecord.setPeer(p);
} else {
matchRecords.put(recordKey, new PeerMatchRecord(downloader, torrent, p, new MatchResultDetail(null, PeerState.INIT, "N/A", "no matches", System.currentTimeMillis() + disconnectTimeout)));
}
});
})));
}
Expand Down Expand Up @@ -537,15 +563,16 @@ public Collection<BanMetadata> removeExpiredBans() {
private void registerModules() {
log.info(Lang.WAIT_FOR_MODULES_STARTUP);
moduleManager.register(new IPBlackList(this, profile));
moduleManager.register(new PeerIdBlacklist(this, profile));
//moduleManager.register(new PeerIdBlacklist(this, profile));
moduleManager.register(new PeerIdBlocker(this, profile));
moduleManager.register(new ClientNameBlacklist(this, profile));
moduleManager.register(new ProgressCheatBlocker(this, profile));
moduleManager.register(new MultiDialingBlocker(this, profile));
//moduleManager.register(new ActiveProbing(this, profile));
moduleManager.register(new AutoRangeBan(this, profile));
moduleManager.register(new BtnNetworkOnline(this, profile));
moduleManager.register(new DownloaderCIDRBlockList(this, profile));
moduleManager.register(new IPBlackRuleList(this, profile, databaseHelper));
moduleManager.register(new RuleSubBlocker(this, profile, databaseHelper));
moduleManager.register(new PBHMetricsController(this, profile));
moduleManager.register(new PBHBanController(this, profile, databaseHelper));
moduleManager.register(new PBHMetadataController(this, profile));
Expand Down Expand Up @@ -677,10 +704,24 @@ public Map<PeerAddress, BanMetadata> getBannedPeers() {
/**
* 以指定元数据封禁一个特定的对等体
*
* @param peer 对等体 IP 地址
* @param banMetadata 封禁元数据
* @param peer 对等体 IP 地址
*/
public void banPeer(@NotNull BanMetadata banMetadata, @NotNull Torrent torrentObj, @NotNull Peer peer) {
public synchronized void banPeer(@NotNull Downloader downloader, @NotNull Torrent torrent, @NotNull Peer peer, @NotNull String module, @NotNull String ruleName, @NotNull String reason, @NotNull Collection<BanMetadata> bannedPeers, @NotNull Map<Downloader, List<Torrent>> needRelaunched) {
if (BAN_LIST.containsKey(peer.getAddress())) {
return;
}
Optional.ofNullable(needRelaunched.get(downloader)).ifPresentOrElse(torrents -> {
if (torrents.contains(torrent)) {
torrents.add(torrent);
} else {
needRelaunched.put(downloader, List.of(torrent));
}
}, () -> needRelaunched.put(downloader, List.of(torrent)));
IPDBResponse ipdbResponse = queryIPDB(peer.getAddress());
BanMetadata banMetadata = new BanMetadata(module, downloader.getName(),
System.currentTimeMillis(), System.currentTimeMillis() + banDuration,
torrent, peer, ruleName, reason, ipdbResponse.cityResponse(), ipdbResponse.asnResponse());
bannedPeers.add(banMetadata);
BAN_LIST.put(peer.getAddress(), banMetadata);
metrics.recordPeerBan(peer.getAddress(), banMetadata);
banListInvoker.forEach(i -> i.add(peer.getAddress(), banMetadata));
Expand All @@ -700,7 +741,8 @@ public void banPeer(@NotNull BanMetadata banMetadata, @NotNull Torrent torrentOb
} else {
banMetadata.setReverseLookup("N/A");
}
Main.getEventBus().post(new PeerBanEvent(peer.getAddress(), banMetadata, torrentObj, peer));
Main.getEventBus().post(new PeerBanEvent(peer.getAddress(), banMetadata, torrent, peer));
log.warn(Lang.BAN_PEER, peer.getAddress(), peer.getPeerId(), peer.getClientName(), peer.getProgress(), peer.getUploaded(), peer.getDownloaded(), torrent.getName(), reason);
}

public List<Downloader> getDownloaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ghostchu.peerbanhelper.text.Lang;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.bspfsystems.yamlconfiguration.configuration.ConfigurationSection;
import org.bspfsystems.yamlconfiguration.file.YamlConfiguration;

import java.util.ArrayList;
Expand All @@ -18,6 +19,15 @@ public ProfileUpdateScript(YamlConfiguration conf) {
this.conf = conf;
}

@UpdateScript(version = 8)
public void subModuleUpdate() {
conf.set("disconnect-timeout", 60000);
conf.set("module.rule-sub-blockers", conf.getConfigurationSection("module.ip-address-blocker-rules"));
conf.set("module.ip-address-blocker-rules", null);
ConfigurationSection rules = conf.getConfigurationSection("module.rule-sub-blockers.rules");
rules.getKeys(false).forEach(ruleId -> rules.getConfigurationSection(ruleId).set("type", "IP"));
}

@UpdateScript(version = 7)
public void progressCheckerIPPrefixLength() {
conf.set("module.progress-cheat-blocker.ipv4-prefix-length", 32);
Expand All @@ -33,7 +43,6 @@ public void subModule() {
conf.set("module.ip-address-blocker-rules.rules.example-rule.url", "https://example.com/example.txt");
}


@UpdateScript(version = 4)
public void ipDatabase() {
conf.set("module.ip-address-blocker.asns", new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ghostchu.peerbanhelper.database;

import com.ghostchu.peerbanhelper.module.IPBanRuleUpdateType;
import com.ghostchu.peerbanhelper.module.RuleUpdateType;
import com.ghostchu.peerbanhelper.text.Lang;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -366,7 +366,7 @@ public List<RuleSubLog> queryRuleSubLogs(String ruleId, int pageIndex, int pageS
set.getString("rule_id"),
set.getLong("update_time"),
set.getInt("ent_count"),
IPBanRuleUpdateType.valueOf(set.getString("update_type"))
RuleUpdateType.valueOf(set.getString("update_type"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The current implementation of RuleUpdateType.valueOf(set.getString("update_type")) in DatabaseHelper.java does not include any handling for invalid enum values, which could lead to exceptions if the database contains unexpected values.

  • Location: src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java line 369

To prevent potential IllegalArgumentException, it's recommended to add a validation or a try-catch block around this code.

Analysis chain

Ensure that RuleUpdateType.valueOf properly handles invalid enum values to prevent exceptions.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the RuleUpdateType enum contains all necessary values.

# Test: List all values of RuleUpdateType.
rg --type java 'public enum RuleUpdateType' -A 10

Length of output: 375



Script:

#!/bin/bash
# Description: Verify how RuleUpdateType.valueOf handles invalid enum values.

# Search for RuleUpdateType.valueOf usage in the codebase.
rg 'RuleUpdateType.valueOf' -A 5

Length of output: 653



Script:

#!/bin/bash
# Description: Extract more surrounding lines of the usage of RuleUpdateType.valueOf in DatabaseHelper.java to check for handling of invalid values.

# Extract 10 lines before and after the usage of RuleUpdateType.valueOf in DatabaseHelper.java.
rg 'RuleUpdateType.valueOf' -C 10 src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java

Length of output: 813

));
}
return infos;
Expand All @@ -383,7 +383,7 @@ public List<RuleSubLog> queryRuleSubLogs(String ruleId, int pageIndex, int pageS
* @param updateType 更新类型
* @throws SQLException SQL异常
*/
public void insertRuleSubLog(String ruleId, int count, IPBanRuleUpdateType updateType) throws SQLException {
public void insertRuleSubLog(String ruleId, int count, RuleUpdateType updateType) throws SQLException {
try (Connection connection = manager.getConnection()) {
PreparedStatement ps;
ps = connection.prepareStatement("INSERT INTO rule_sub_logs (rule_id, update_time, ent_count, update_type) VALUES (?,?,?,?)");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.ghostchu.peerbanhelper.database;

import com.ghostchu.peerbanhelper.module.IPBanRuleUpdateType;
import com.ghostchu.peerbanhelper.module.RuleUpdateType;

public record RuleSubLog(
String ruleId,
long updateTime,
int count,
IPBanRuleUpdateType updateType
RuleUpdateType updateType
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.ghostchu.peerbanhelper.module;

import com.alibaba.cola.statemachine.StateMachine;
import com.ghostchu.peerbanhelper.PeerBanHelperServer;
import com.ghostchu.peerbanhelper.util.rule.RuleMatcher;
import lombok.Getter;
import org.bspfsystems.yamlconfiguration.file.YamlConfiguration;

import java.util.List;

public abstract class AbstractRuleBlocker extends AbstractFeatureModule implements RuleBlocker {

@Getter
public List<RuleMatcher> rules;

public StateMachine<PeerState, MatchEvents, PeerMatchContext> stateMachine;

public AbstractRuleBlocker(PeerBanHelperServer server, YamlConfiguration profile) {
super(server, profile);
}

@Override
public boolean isConfigurable() {
return true;
}

@Override
public void onEnable() {
stateMachine = ruleSmBuilder().build(getConfigName());
init();
}

@Override
public void onDisable() {
}

@Override
public StateMachine<PeerState, MatchEvents, PeerMatchContext> getStateMachine() {
return stateMachine;
}

public abstract void init();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.ghostchu.peerbanhelper.module;

/**
*
*/
public enum MatchEvents {
/**
* 命中规则
*/
HIT,
/**
* 通过规则
*/
PASS,
/**
* 断开
*/
DISCONNECT,
/**
* 超时
*/
TIMEOUT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.ghostchu.peerbanhelper.module;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class MatchResultDetail {
private FeatureModule moduleContext;
private PeerState state;
private String rule;
private String reason;
private long expireTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ghostchu.peerbanhelper.module;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.function.Consumer;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class PeerMatchContext {
private PeerMatchRecord record;
private Consumer<PeerMatchRecord> activeFunc;
private Consumer<PeerMatchRecord> banFunc;
private Consumer<PeerMatchRecord> disconnectFunc;
private Consumer<PeerMatchRecord> timeoutFunc;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ghostchu.peerbanhelper.module;

import com.ghostchu.peerbanhelper.downloader.Downloader;
import com.ghostchu.peerbanhelper.peer.Peer;
import com.ghostchu.peerbanhelper.torrent.Torrent;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Peer匹配记录
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PeerMatchRecord {
Downloader downloader;
Torrent torrent;
Peer peer;
MatchResultDetail result;
}
Loading
Loading