Skip to content

新增基于状态机模型的规则模块 #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -403,5 +403,10 @@
<scope>provided</scope>
<classifier>linux</classifier>
</dependency>
<dependency>
<groupId>com.alibaba.cola</groupId>
<artifactId>cola-component-statemachine</artifactId>
<version>5.0.0</version>
</dependency>
</dependencies>
</project>
109 changes: 83 additions & 26 deletions src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class PeerBanHelperServer {
@Getter
private final long banDuration;
@Getter
private final long disconnectTimeout;
@Getter
private final int httpdPort;
@Getter
private final boolean hideFinishLogs;
Expand All @@ -96,21 +98,23 @@ public class PeerBanHelperServer {
@Getter
private DatabaseHelper databaseHelper;
@Getter
private ModuleManager moduleManager;
private final ModuleManager moduleManager;
@Getter
@Nullable
private IPDB ipdb = null;
private WatchDog banWaveWatchDog;
@Getter
private JavalinWebContainer webContainer;
@Getter
private final Map<PeerAddress, PeerMatchRecord> matchRecords = new ConcurrentHashMap<>();
@Getter
private AlertManager alertManager;


public PeerBanHelperServer(String pbhServerAddress, YamlConfiguration profile, YamlConfiguration mainConfig) throws SQLException {
this.pbhServerAddress = pbhServerAddress;
this.profile = profile;
this.banDuration = profile.getLong("ban-duration");
this.disconnectTimeout = profile.getLong("disconnect-timeout");
this.mainConfig = mainConfig;
this.httpdPort = mainConfig.getInt("server.http");
this.hideFinishLogs = mainConfig.getBoolean("logger.hide-finish-log");
Expand Down Expand Up @@ -377,7 +381,7 @@ public void banWave() {
banWaveWatchDog.setLastOperation("Reset last status");
// 声明基本集合
// 需要重启的种子列表
Map<Downloader, Collection<Torrent>> needRelaunched = new ConcurrentHashMap<>();
Map<Downloader, List<Torrent>> needRelaunched = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor the banWave method to improve readability and reduce complexity.

The method is very long and handles multiple responsibilities. Consider breaking it down into smaller, more manageable methods, each handling a specific part of the process.

Also applies to: 393-393, 395-395, 397-397, 399-399, 401-401, 403-403, 405-405, 407-407, 409-409, 427-427, 429-429, 431-431, 433-433, 435-435, 437-437

// 被解除封禁的对等体列表
banWaveWatchDog.setLastOperation("Remove expired bans");
Collection<BanMetadata> unbannedPeers = removeExpiredBans();
Expand All @@ -389,6 +393,43 @@ public void banWave() {
// 更新 LIVE_PEERS 用于数据展示
banWaveWatchDog.setLastOperation("Update live peers");
executor.submit(() -> updateLivePeers(peers));
// ===============基于 状态机 的封禁逻辑
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

考虑拆分成多个函数以降低复杂度

banWaveWatchDog.setLastOperation("Check Bans New");
// long t1 = System.currentTimeMillis();
// 断开连接超时剔除
matchRecords.values().stream().filter(ele -> ele.getResult().state() == PeerState.ACTIVE && ele.getResult().expireTime() < System.currentTimeMillis())
.map(ele -> ele.getPeer().getAddress()).forEach(matchRecords::remove);
// 封禁超时解禁后剔除
matchRecords.values().stream().filter(ele -> ele.getResult().state() == PeerState.BAN && ele.getResult().expireTime() < System.currentTimeMillis())
.map(ele -> ele.getPeer().getAddress()).forEach(ele -> {
unbanPeer(ele);
matchRecords.remove(ele);
});
// 按模块并行检查标记Peer
List<FeatureModule> ruleBlockers = moduleManager.getModules().stream().filter(ele -> ele instanceof RuleBlocker).toList();
CountDownLatch fsmLatch = new CountDownLatch(ruleBlockers.size());
ruleBlockers.forEach(ele -> {
RuleBlocker blocker = (RuleBlocker) ele;
try (TimeoutProtect protect = new TimeoutProtect(ExceptedTime.CHECK_BANS.getTimeout(), (t) -> {
log.warn(Lang.TIMING_CHECK_BANS);
fsmLatch.countDown();
})) {
protect.getService().submit(() -> {
blocker.runCheck();
fsmLatch.countDown();
});
}
});
fsmLatch.await();
// 标记完成后将结果为Ban的Peer封禁
banWaveWatchDog.setLastOperation("Add banned peers into banlist New");
matchRecords.values().stream().filter(ele -> ele.getResult().state() == PeerState.BAN && !BAN_LIST.containsKey(ele.getPeer().getAddress()))
.forEach(record -> Optional.ofNullable(needRelaunched.get(record.getDownloader())).ifPresentOrElse(torrents ->
banPeer(record.getDownloader(), record.getTorrent(), record.getPeer(), record.getResult().moduleContext().getClass().getName(), record.getResult().rule(), record.getResult().reason(), bannedPeers, torrents), () -> {
List<Torrent> torrents = new ArrayList<>();
banPeer(record.getDownloader(), record.getTorrent(), record.getPeer(), record.getResult().moduleContext().getClass().getName(), record.getResult().rule(), record.getResult().reason(), bannedPeers, torrents);
needRelaunched.put(record.getDownloader(), torrents);
}));
// ========== 处理封禁逻辑 ==========
Map<Downloader, List<BanDetail>> downloaderBanDetailMap = new ConcurrentHashMap<>();
banWaveWatchDog.setLastOperation("Check Bans");
Expand All @@ -402,25 +443,20 @@ public void banWave() {
try (TimeoutProtect protect = new TimeoutProtect(ExceptedTime.ADD_BAN_ENTRY.getTimeout(), (t) -> {
log.warn(Lang.TIMING_ADD_BANS);
})) {
downloaderBanDetailMap.forEach((downloader, details) -> {
List<Torrent> relaunch = Collections.synchronizedList(new ArrayList<>());
details.forEach(detail -> {
protect.getService().submit(() -> {
downloaderBanDetailMap.forEach((downloader, details) -> Optional.ofNullable(needRelaunched.get(downloader)).ifPresentOrElse(torrents ->
details.forEach(detail -> protect.getService().submit(() -> {
if (detail.result().action() == PeerAction.BAN) {
IPDBResponse ipdbResponse = queryIPDB(detail.peer().getAddress());
BanMetadata banMetadata = new BanMetadata(detail.result().moduleContext().getClass().getName(), downloader.getName(),
System.currentTimeMillis(), System.currentTimeMillis() + banDuration,
detail.torrent(), detail.peer(), detail.result().rule(), detail.result().reason(),
ipdbResponse.cityResponse(), ipdbResponse.asnResponse());
bannedPeers.add(banMetadata);
relaunch.add(detail.torrent());
banPeer(banMetadata, detail.torrent(), detail.peer());
log.warn(Lang.BAN_PEER, detail.peer().getAddress(), detail.peer().getPeerId(), detail.peer().getClientName(), detail.peer().getProgress(), detail.peer().getUploaded(), detail.peer().getDownloaded(), detail.torrent().getName(), detail.result().reason());
banPeer(downloader, detail.torrent(), detail.peer(), detail.result().moduleContext().getClass().getName(), detail.result().rule(), detail.result().reason(), bannedPeers, torrents);
}
});
});
needRelaunched.put(downloader, relaunch);
});
})), () -> {
List<Torrent> torrents = new ArrayList<>();
details.forEach(detail -> protect.getService().submit(() -> {
if (detail.result().action() == PeerAction.BAN) {
banPeer(downloader, detail.torrent(), detail.peer(), detail.result().moduleContext().getClass().getName(), detail.result().rule(), detail.result().reason(), bannedPeers, torrents);
}
}));
needRelaunched.put(downloader, torrents);
}));
}
banWaveWatchDog.setLastOperation("Apply banlist");
// 如果需要,则应用更改封禁列表到下载器
Expand All @@ -438,6 +474,8 @@ public void banWave() {
log.info(Lang.BAN_WAVE_CHECK_COMPLETED, downloadersCount, torrentsCount, peersCount, bannedPeers.size(), unbannedPeers.size(), System.currentTimeMillis() - startTimer);
}
banWaveWatchDog.setLastOperation("Completed");
} catch (InterruptedException e) {
throw new RuntimeException(e);
Copy link
Contributor

@coderabbitai coderabbitai bot Jun 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a RuntimeException to handle InterruptedException is generally not recommended. It's better to either re-interrupt the thread or handle the interruption more gracefully, depending on the desired application behavior.

- throw new RuntimeException(e);
+ Thread.currentThread().interrupt(); // Restore the interrupted status
// or handle the interruption more appropriately depending on the context
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status
// or handle the interruption more appropriately depending on the context

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

需要应用建议的更改

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ghost-chu, I've noted your decision to apply the suggested changes to handle InterruptedException more appropriately. Please ensure to update the exception handling in your development environment as discussed. If you need any more assistance or further modifications, feel free to ask!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handle InterruptedException properly by restoring the interrupt status.

- throw new RuntimeException(e);
+ Thread.currentThread().interrupt(); // Restore the interrupted status
// or handle the interruption more appropriately depending on the context

Interrupt handling is crucial in a multi-threaded environment to ensure that the thread's interrupt status is respected and that the system can respond appropriately to interrupts.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Restore the interrupted status

} finally {
banWaveWatchDog.feed();
metrics.recordCheck();
Expand Down Expand Up @@ -477,6 +515,15 @@ private void updateLivePeers(Map<Downloader, Map<Torrent, List<Peer>>> peers) {
torrent, p, ipdbResponse.cityResponse(), ipdbResponse.asnResponse()
);
livePeers.put(address, metadata);
// 更新匹配记录
if (matchRecords.containsKey(address)) {
PeerMatchRecord peerMatchRecord = matchRecords.get(address);
peerMatchRecord.setDownloader(downloader);
peerMatchRecord.setTorrent(torrent);
peerMatchRecord.setPeer(p);
} else {
matchRecords.put(address, new PeerMatchRecord(downloader, torrent, p, new MatchResultDetail(null, PeerState.INIT, "N/A", "no matches", 0)));
}
});
})));
}
Expand Down Expand Up @@ -537,15 +584,16 @@ public Collection<BanMetadata> removeExpiredBans() {
private void registerModules() {
log.info(Lang.WAIT_FOR_MODULES_STARTUP);
moduleManager.register(new IPBlackList(this, profile));
moduleManager.register(new PeerIdBlacklist(this, profile));
//moduleManager.register(new PeerIdBlacklist(this, profile));
moduleManager.register(new PeerIdBlocker(this, profile));
moduleManager.register(new ClientNameBlacklist(this, profile));
moduleManager.register(new ProgressCheatBlocker(this, profile));
moduleManager.register(new MultiDialingBlocker(this, profile));
//moduleManager.register(new ActiveProbing(this, profile));
moduleManager.register(new AutoRangeBan(this, profile));
moduleManager.register(new BtnNetworkOnline(this, profile));
moduleManager.register(new DownloaderCIDRBlockList(this, profile));
moduleManager.register(new IPBlackRuleList(this, profile, databaseHelper));
moduleManager.register(new RuleSubBlocker(this, profile, databaseHelper));
moduleManager.register(new PBHMetricsController(this, profile));
moduleManager.register(new PBHBanController(this, profile, databaseHelper));
moduleManager.register(new PBHMetadataController(this, profile));
Expand Down Expand Up @@ -677,10 +725,18 @@ public Map<PeerAddress, BanMetadata> getBannedPeers() {
/**
* 以指定元数据封禁一个特定的对等体
*
* @param peer 对等体 IP 地址
* @param banMetadata 封禁元数据
* @param peer 对等体 IP 地址
*/
public void banPeer(@NotNull BanMetadata banMetadata, @NotNull Torrent torrentObj, @NotNull Peer peer) {
public void banPeer(@NotNull Downloader downloader, @NotNull Torrent torrent, @NotNull Peer peer, @NotNull String module, @NotNull String ruleName, @NotNull String reason, @NotNull Collection<BanMetadata> bannedPeers, @NotNull List<Torrent> relaunch) {
if (BAN_LIST.containsKey(peer.getAddress())) {
return;
}
relaunch.add(torrent);
IPDBResponse ipdbResponse = queryIPDB(peer.getAddress());
BanMetadata banMetadata = new BanMetadata(module, downloader.getName(),
System.currentTimeMillis(), System.currentTimeMillis() + banDuration,
torrent, peer, ruleName, reason, ipdbResponse.cityResponse(), ipdbResponse.asnResponse());
bannedPeers.add(banMetadata);
BAN_LIST.put(peer.getAddress(), banMetadata);
metrics.recordPeerBan(peer.getAddress(), banMetadata);
banListInvoker.forEach(i -> i.add(peer.getAddress(), banMetadata));
Expand All @@ -700,7 +756,8 @@ public void banPeer(@NotNull BanMetadata banMetadata, @NotNull Torrent torrentOb
} else {
banMetadata.setReverseLookup("N/A");
}
Main.getEventBus().post(new PeerBanEvent(peer.getAddress(), banMetadata, torrentObj, peer));
Main.getEventBus().post(new PeerBanEvent(peer.getAddress(), banMetadata, torrent, peer));
log.warn(Lang.BAN_PEER, peer.getAddress(), peer.getPeerId(), peer.getClientName(), peer.getProgress(), peer.getUploaded(), peer.getDownloaded(), torrent.getName(), reason);
}

public List<Downloader> getDownloaders() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ghostchu.peerbanhelper.text.Lang;
import com.google.gson.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.bspfsystems.yamlconfiguration.configuration.ConfigurationSection;
import org.bspfsystems.yamlconfiguration.file.YamlConfiguration;

import java.util.ArrayList;
Expand All @@ -18,6 +19,15 @@ public ProfileUpdateScript(YamlConfiguration conf) {
this.conf = conf;
}

@UpdateScript(version = 8)
public void subModuleUpdate() {
conf.set("disconnect-timeout", 60000);
conf.set("module.rule-sub-blockers", conf.getConfigurationSection("module.ip-address-blocker-rules"));
conf.set("module.ip-address-blocker-rules", null);
ConfigurationSection rules = conf.getConfigurationSection("module.rule-sub-blockers.rules");
rules.getKeys(false).forEach(ruleId -> rules.getConfigurationSection(ruleId).set("type", "IP"));
}

@UpdateScript(version = 7)
public void progressCheckerIPPrefixLength() {
conf.set("module.progress-cheat-blocker.ipv4-prefix-length", 32);
Expand All @@ -33,7 +43,6 @@ public void subModule() {
conf.set("module.ip-address-blocker-rules.rules.example-rule.url", "https://example.com/example.txt");
}


@UpdateScript(version = 4)
public void ipDatabase() {
conf.set("module.ip-address-blocker.asns", new ArrayList<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ghostchu.peerbanhelper.database;

import com.ghostchu.peerbanhelper.module.IPBanRuleUpdateType;
import com.ghostchu.peerbanhelper.module.RuleUpdateType;
import com.ghostchu.peerbanhelper.text.Lang;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -366,7 +366,7 @@ public List<RuleSubLog> queryRuleSubLogs(String ruleId, int pageIndex, int pageS
set.getString("rule_id"),
set.getLong("update_time"),
set.getInt("ent_count"),
IPBanRuleUpdateType.valueOf(set.getString("update_type"))
RuleUpdateType.valueOf(set.getString("update_type"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

The current implementation of RuleUpdateType.valueOf(set.getString("update_type")) in DatabaseHelper.java does not include any handling for invalid enum values, which could lead to exceptions if the database contains unexpected values.

  • Location: src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java line 369

To prevent potential IllegalArgumentException, it's recommended to add a validation or a try-catch block around this code.

Analysis chain

Ensure that RuleUpdateType.valueOf properly handles invalid enum values to prevent exceptions.

Scripts executed

The following scripts were executed for the analysis:


Script:

#!/bin/bash
# Description: Verify that the RuleUpdateType enum contains all necessary values.

# Test: List all values of RuleUpdateType.
rg --type java 'public enum RuleUpdateType' -A 10

Length of output: 375



Script:

#!/bin/bash
# Description: Verify how RuleUpdateType.valueOf handles invalid enum values.

# Search for RuleUpdateType.valueOf usage in the codebase.
rg 'RuleUpdateType.valueOf' -A 5

Length of output: 653



Script:

#!/bin/bash
# Description: Extract more surrounding lines of the usage of RuleUpdateType.valueOf in DatabaseHelper.java to check for handling of invalid values.

# Extract 10 lines before and after the usage of RuleUpdateType.valueOf in DatabaseHelper.java.
rg 'RuleUpdateType.valueOf' -C 10 src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java

Length of output: 813

));
}
return infos;
Expand All @@ -383,7 +383,7 @@ public List<RuleSubLog> queryRuleSubLogs(String ruleId, int pageIndex, int pageS
* @param updateType 更新类型
* @throws SQLException SQL异常
*/
public void insertRuleSubLog(String ruleId, int count, IPBanRuleUpdateType updateType) throws SQLException {
public void insertRuleSubLog(String ruleId, int count, RuleUpdateType updateType) throws SQLException {
try (Connection connection = manager.getConnection()) {
PreparedStatement ps;
ps = connection.prepareStatement("INSERT INTO rule_sub_logs (rule_id, update_time, ent_count, update_type) VALUES (?,?,?,?)");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.ghostchu.peerbanhelper.database;

import com.ghostchu.peerbanhelper.module.IPBanRuleUpdateType;
import com.ghostchu.peerbanhelper.module.RuleUpdateType;

public record RuleSubLog(
String ruleId,
long updateTime,
int count,
IPBanRuleUpdateType updateType
RuleUpdateType updateType
) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.ghostchu.peerbanhelper.module;

import com.alibaba.cola.statemachine.StateMachine;
import com.ghostchu.peerbanhelper.PeerBanHelperServer;
import com.ghostchu.peerbanhelper.util.rule.Rule;
import lombok.Getter;
import org.bspfsystems.yamlconfiguration.file.YamlConfiguration;

import java.util.List;

public abstract class AbstractRuleBlocker extends AbstractFeatureModule implements RuleBlocker {

@Getter
public List<Rule> rules;

public StateMachine<PeerState, MatchEvents, PeerMatchRecord> stateMachine;

public AbstractRuleBlocker(PeerBanHelperServer server, YamlConfiguration profile) {
super(server, profile);
}

@Override
public boolean isConfigurable() {
return true;
}

@Override
public void onEnable() {
stateMachine = ruleSmBuilder().build(getConfigName());
init();
}

@Override
public void onDisable() {
}

@Override
public StateMachine<PeerState, MatchEvents, PeerMatchRecord> getStateMachine() {
return stateMachine;
}

public abstract void init();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ghostchu.peerbanhelper.module;

/**
*
*/
public enum MatchEvents {
/**
* 匹配规则
*/
MATCH,
/**
* 命中规则
*/
HIT,
/**
* 通过规则
*/
PASS,
/**
* 断开
*/
DISCONNECT,
/**
* 超时
*/
TIMEOUT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.ghostchu.peerbanhelper.module;

public record MatchResultDetail(FeatureModule moduleContext, PeerState state, String rule, String reason,
long expireTime) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ghostchu.peerbanhelper.module;

import com.ghostchu.peerbanhelper.downloader.Downloader;
import com.ghostchu.peerbanhelper.peer.Peer;
import com.ghostchu.peerbanhelper.torrent.Torrent;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Peer匹配记录
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PeerMatchRecord {
Downloader downloader;
Torrent torrent;
Peer peer;
MatchResultDetail result;
}
Loading
Loading