diff --git a/ftpreader/doc/ftpreader.md b/ftpreader/doc/ftpreader.md index 770c6a9c96..5c49407db1 100644 --- a/ftpreader/doc/ftpreader.md +++ b/ftpreader/doc/ftpreader.md @@ -57,6 +57,8 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能, "port": 22, "username": "xx", "password": "xxx", + "privateKey": "/path_to_private_key", + "keyPassword": "xxx", "path": [ "/home/hanfa.shf/ftpReaderTest/data" ], @@ -153,10 +155,26 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能, * **password** - * 描述:ftp服务器访问密码。
+ * 描述:ftp服务器访问密码。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
- * 必选:是
+* **privateKey** + + * 描述:ftp服务器访问私钥。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
+ +* **keyPassword** + * 描述:私钥密码密码。
+ + * 必选:否
+ * 默认值:无
* **path** diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java index f8b3f56f21..a75a3189d4 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpHelper.java @@ -3,6 +3,7 @@ import java.io.InputStream; import java.util.HashSet; import java.util.List; +import java.util.Map; public abstract class FtpHelper { /** @@ -18,7 +19,7 @@ public abstract class FtpHelper { * @return void * @throws */ - public abstract void loginFtpServer(String host, String username, String password, int port, int timeout,String connectMode) ; + public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode, Map extendParams) ; /** * * @Title: LogoutFtpServer diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java index c1f20dfd7f..94f4d99db3 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/FtpReader.java @@ -2,9 +2,12 @@ import java.io.InputStream; import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +33,8 @@ public static class Job extends Reader.Job { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private String connectPattern; private int maxTraversalLevel; @@ -44,6 +49,9 @@ public void init() { this.validateParameter(); UnstructuredStorageReaderUtil.validateParameter(this.originConfig); + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); if ("sftp".equals(protocol)) { //sftp协议 this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT); @@ -53,7 +61,7 @@ public void init() { this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_FTP_PORT); this.ftpHelper = new StandardFtpHelper(); } - ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern); + ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams); } @@ -67,7 +75,12 @@ private void validateParameter() { } this.host = this.originConfig.getNecessaryValue(Key.HOST, FtpReaderErrorCode.REQUIRED_VALUE); this.username = this.originConfig.getNecessaryValue(Key.USERNAME, FtpReaderErrorCode.REQUIRED_VALUE); - this.password = this.originConfig.getNecessaryValue(Key.PASSWORD, FtpReaderErrorCode.REQUIRED_VALUE); + this.password = this.originConfig.getString(Key.PASSWORD); + this.privateKey = this.originConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.originConfig.getString(Key.KEYPASSWORD); + if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) { + throw DataXException.asDataXException(FtpReaderErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项"); + } this.timeout = originConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.maxTraversalLevel = originConfig.getInt(Key.MAXTRAVERSALLEVEL, Constant.DEFAULT_MAX_TRAVERSAL_LEVEL); @@ -175,6 +188,8 @@ public static class Task extends Reader.Task { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private String protocol; private int timeout; private String connectPattern; @@ -192,10 +207,15 @@ public void init() {//连接重试 this.protocol = readerSliceConfig.getString(Key.PROTOCOL); this.username = readerSliceConfig.getString(Key.USERNAME); this.password = readerSliceConfig.getString(Key.PASSWORD); + this.privateKey = readerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = readerSliceConfig.getString(Key.KEYPASSWORD); this.timeout = readerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.sourceFiles = this.readerSliceConfig.getList(Constant.SOURCE_FILES, String.class); + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); if ("sftp".equals(protocol)) { //sftp协议 this.port = readerSliceConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT); @@ -206,7 +226,7 @@ public void init() {//连接重试 this.connectPattern = readerSliceConfig.getString(Key.CONNECTPATTERN, Constant.DEFAULT_FTP_CONNECT_PATTERN);// 默认为被动模式 this.ftpHelper = new StandardFtpHelper(); } - ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern); + ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams); } diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java index cdbd043cd6..5b7a36aa5b 100755 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/Key.java @@ -5,6 +5,8 @@ public class Key { public static final String HOST = "host"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; + public static final String PRIVATEKEY = "privateKey"; + public static final String KEYPASSWORD = "keyPassword"; public static final String PORT = "port"; public static final String TIMEOUT = "timeout"; public static final String CONNECTPATTERN = "connectPattern"; diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java index 6e42e10c02..4c9b6c16a2 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/SftpHelper.java @@ -2,10 +2,12 @@ import java.io.InputStream; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.Vector; import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,11 +26,21 @@ public class SftpHelper extends FtpHelper { Session session = null; ChannelSftp channelSftp = null; - @Override + + @Override public void loginFtpServer(String host, String username, String password, int port, int timeout, - String connectMode) { + String connectMode, Map extendParams) { JSch jsch = new JSch(); // 创建JSch对象 try { + String privateKey = (String)extendParams.get(Key.PRIVATEKEY); + String keyPassword = (String)extendParams.get(Key.KEYPASSWORD); + if (!StringUtils.isBlank(privateKey)) { + if (!StringUtils.isBlank(keyPassword)) { + jsch.addIdentity(privateKey, keyPassword); + } else { + jsch.addIdentity(privateKey); + } + } session = jsch.getSession(username, host, port); // 根据用户名,主机ip,端口获取一个Session对象 // 如果服务器连接不上,则抛出异常 @@ -37,7 +49,9 @@ public void loginFtpServer(String host, String username, String password, int po "session is null,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确."); } - session.setPassword(password); // 设置密码 + if (StringUtils.isBlank(privateKey)) { + session.setPassword(password); // 设置密码 + } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); session.setConfig(config); // 为Session对象设置properties diff --git a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java index 79b23f8bff..d2683801b0 100644 --- a/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java +++ b/ftpreader/src/main/java/com/alibaba/datax/plugin/reader/ftpreader/StandardFtpHelper.java @@ -4,6 +4,7 @@ import java.io.InputStream; import java.net.UnknownHostException; import java.util.HashSet; +import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.commons.net.ftp.FTP; @@ -23,7 +24,7 @@ public class StandardFtpHelper extends FtpHelper { @Override public void loginFtpServer(String host, String username, String password, int port, int timeout, - String connectMode) { + String connectMode, Map extendParams) { ftpClient = new FTPClient(); try { // 连接 diff --git a/ftpwriter/doc/ftpwriter.md b/ftpwriter/doc/ftpwriter.md index a38a1052e2..e4ecde2363 100644 --- a/ftpwriter/doc/ftpwriter.md +++ b/ftpwriter/doc/ftpwriter.md @@ -53,6 +53,8 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结 "port": 22, "username": "xxx", "password": "xxx", + "privateKey": "/path_to_private_key", + "keyPassword": "xxx", "timeout": "60000", "connectPattern": "PASV", "path": "/tmp/data/", @@ -117,9 +119,25 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结 * **password** - * 描述:ftp服务器访问密码。
+ * 描述:ftp服务器访问密码。密码和私钥必须配置一项。
- * 必选:是
+ * 必选:否
+ + * 默认值:无
+ +* **privateKey** + + * 描述:ftp服务器访问私钥。密码和私钥必须配置一项。
+ + * 必选:否
+ + * 默认值:无
+ +* **keyPassword** + + * 描述:私钥密码密码。
+ + * 必选:否
* 默认值:无
diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java index eda603fc73..94cd3163b1 100755 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/FtpWriter.java @@ -18,7 +18,9 @@ import java.io.OutputStream; import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; @@ -34,6 +36,8 @@ public static class Job extends Writer.Job { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private IFtpHelper ftpHelper = null; @@ -48,8 +52,11 @@ public void init() { RetryUtil.executeWithRetry(new Callable() { @Override public Void call() throws Exception { + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); ftpHelper.loginFtpServer(host, username, password, - port, timeout); + port, timeout, extendParams); return null; } }, 3, 4000, true); @@ -81,8 +88,13 @@ private void validateParameter() { FtpWriterErrorCode.REQUIRED_VALUE); this.username = this.writerSliceConfig.getNecessaryValue( Key.USERNAME, FtpWriterErrorCode.REQUIRED_VALUE); - this.password = this.writerSliceConfig.getNecessaryValue( - Key.PASSWORD, FtpWriterErrorCode.REQUIRED_VALUE); + this.password = this.writerSliceConfig.getString(Key.PASSWORD); + this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD); + if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) { + throw DataXException.asDataXException( + FtpWriterErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项"); + } this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); @@ -208,6 +220,8 @@ public static class Task extends Writer.Task { private int port; private String username; private String password; + private String privateKey; + private String keyPassword; private int timeout; private IFtpHelper ftpHelper = null; @@ -225,6 +239,8 @@ public void init() { this.port = this.writerSliceConfig.getInt(Key.PORT); this.username = this.writerSliceConfig.getString(Key.USERNAME); this.password = this.writerSliceConfig.getString(Key.PASSWORD); + this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY); + this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD); this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT); this.protocol = this.writerSliceConfig.getString(Key.PROTOCOL); @@ -238,8 +254,11 @@ public void init() { RetryUtil.executeWithRetry(new Callable() { @Override public Void call() throws Exception { + Map extendParams = new HashMap(); + extendParams.put(Key.PRIVATEKEY, privateKey); + extendParams.put(Key.KEYPASSWORD, keyPassword); ftpHelper.loginFtpServer(host, username, password, - port, timeout); + port, timeout, extendParams); return null; } }, 3, 4000, true); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java index 1cf4812ab9..b0becd7629 100755 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/Key.java @@ -8,6 +8,10 @@ public class Key { public static final String USERNAME = "username"; public static final String PASSWORD = "password"; + + public static final String PRIVATEKEY = "privateKey"; + + public static final String KEYPASSWORD = "keyPassword"; public static final String PORT = "port"; diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java index 2e503f7f7b..fa94871b83 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/IFtpHelper.java @@ -1,12 +1,13 @@ package com.alibaba.datax.plugin.writer.ftpwriter.util; import java.io.OutputStream; +import java.util.Map; import java.util.Set; public interface IFtpHelper { //使用被动方式 - public void loginFtpServer(String host, String username, String password, int port, int timeout); + public void loginFtpServer(String host, String username, String password, int port, int timeout, Map extendParams); public void logoutFtpServer(); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java index e748f12c97..e806cb6497 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/SftpHelperImpl.java @@ -3,6 +3,7 @@ import java.io.ByteArrayOutputStream; import java.io.OutputStream; import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.Vector; @@ -13,6 +14,7 @@ import org.slf4j.LoggerFactory; import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.plugin.writer.ftpwriter.Key; import com.alibaba.datax.plugin.writer.ftpwriter.FtpWriterErrorCode; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONWriter; @@ -33,9 +35,18 @@ public class SftpHelperImpl implements IFtpHelper { @Override public void loginFtpServer(String host, String username, String password, - int port, int timeout) { + int port, int timeout, Map extendParams) { JSch jsch = new JSch(); try { + String privateKey = (String)extendParams.get(Key.PRIVATEKEY); + String keyPassword = (String)extendParams.get(Key.KEYPASSWORD); + if (!StringUtils.isBlank(privateKey)) { + if (!StringUtils.isBlank(keyPassword)) { + jsch.addIdentity(privateKey, keyPassword); + } else { + jsch.addIdentity(privateKey); + } + } this.session = jsch.getSession(username, host, port); if (this.session == null) { throw DataXException @@ -43,7 +54,9 @@ public void loginFtpServer(String host, String username, String password, "创建ftp连接this.session失败,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确."); } - this.session.setPassword(password); + if (StringUtils.isBlank(privateKey)) { + this.session.setPassword(password); + } Properties config = new Properties(); config.put("StrictHostKeyChecking", "no"); // config.put("PreferredAuthentications", "password"); diff --git a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java index d5b9a74671..7bd0ded864 100644 --- a/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java +++ b/ftpwriter/src/main/java/com/alibaba/datax/plugin/writer/ftpwriter/util/StandardFtpHelperImpl.java @@ -5,6 +5,7 @@ import java.io.OutputStream; import java.net.UnknownHostException; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.commons.io.IOUtils; @@ -28,7 +29,7 @@ public class StandardFtpHelperImpl implements IFtpHelper { @Override public void loginFtpServer(String host, String username, String password, - int port, int timeout) { + int port, int timeout, Map extendParams) { this.ftpClient = new FTPClient(); try { this.ftpClient.setControlEncoding("UTF-8");