Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add private key authentication support to SFTP #2233

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions ftpreader/doc/ftpreader.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能,
"port": 22,
"username": "xx",
"password": "xxx",
"privateKey": "/path_to_private_key",
"keyPassword": "xxx",
"path": [
"/home/hanfa.shf/ftpReaderTest/data"
],
Expand Down Expand Up @@ -153,10 +155,26 @@ FtpReader实现了从远程FTP文件读取数据并转为DataX协议的功能,

* **password**

* 描述:ftp服务器访问密码。 <br />
* 描述:ftp服务器访问密码。密码和私钥必须配置一项。 <br />

* 必选:否 <br />

* 默认值:无 <br />

* 必选:是 <br />
* **privateKey**

* 描述:ftp服务器访问私钥。密码和私钥必须配置一项。 <br />

* 必选:否 <br />

* 默认值:无 <br />

* **keyPassword**

* 描述:私钥密码密码。 <br />

* 必选:否 <br />

* 默认值:无 <br />

* **path**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Map;

public abstract class FtpHelper {
/**
Expand All @@ -18,7 +19,7 @@ public abstract class FtpHelper {
* @return void
* @throws
*/
public abstract void loginFtpServer(String host, String username, String password, int port, int timeout,String connectMode) ;
public abstract void loginFtpServer(String host, String username, String password, int port, int timeout, String connectMode, Map<String, Object> extendParams) ;
/**
*
* @Title: LogoutFtpServer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +33,8 @@ public static class Job extends Reader.Job {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;
private String connectPattern;
private int maxTraversalLevel;
Expand All @@ -44,6 +49,9 @@ public void init() {
this.validateParameter();
UnstructuredStorageReaderUtil.validateParameter(this.originConfig);

Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
if ("sftp".equals(protocol)) {
//sftp协议
this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT);
Expand All @@ -53,7 +61,7 @@ public void init() {
this.port = originConfig.getInt(Key.PORT, Constant.DEFAULT_FTP_PORT);
this.ftpHelper = new StandardFtpHelper();
}
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern);
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams);

}

Expand All @@ -67,7 +75,12 @@ private void validateParameter() {
}
this.host = this.originConfig.getNecessaryValue(Key.HOST, FtpReaderErrorCode.REQUIRED_VALUE);
this.username = this.originConfig.getNecessaryValue(Key.USERNAME, FtpReaderErrorCode.REQUIRED_VALUE);
this.password = this.originConfig.getNecessaryValue(Key.PASSWORD, FtpReaderErrorCode.REQUIRED_VALUE);
this.password = this.originConfig.getString(Key.PASSWORD);
this.privateKey = this.originConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.originConfig.getString(Key.KEYPASSWORD);
if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) {
throw DataXException.asDataXException(FtpReaderErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项");
}
this.timeout = originConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT);
this.maxTraversalLevel = originConfig.getInt(Key.MAXTRAVERSALLEVEL, Constant.DEFAULT_MAX_TRAVERSAL_LEVEL);

Expand Down Expand Up @@ -175,6 +188,8 @@ public static class Task extends Reader.Task {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private String protocol;
private int timeout;
private String connectPattern;
Expand All @@ -192,10 +207,15 @@ public void init() {//连接重试
this.protocol = readerSliceConfig.getString(Key.PROTOCOL);
this.username = readerSliceConfig.getString(Key.USERNAME);
this.password = readerSliceConfig.getString(Key.PASSWORD);
this.privateKey = readerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = readerSliceConfig.getString(Key.KEYPASSWORD);
this.timeout = readerSliceConfig.getInt(Key.TIMEOUT, Constant.DEFAULT_TIMEOUT);

this.sourceFiles = this.readerSliceConfig.getList(Constant.SOURCE_FILES, String.class);

Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
if ("sftp".equals(protocol)) {
//sftp协议
this.port = readerSliceConfig.getInt(Key.PORT, Constant.DEFAULT_SFTP_PORT);
Expand All @@ -206,7 +226,7 @@ public void init() {//连接重试
this.connectPattern = readerSliceConfig.getString(Key.CONNECTPATTERN, Constant.DEFAULT_FTP_CONNECT_PATTERN);// 默认为被动模式
this.ftpHelper = new StandardFtpHelper();
}
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern);
ftpHelper.loginFtpServer(host, username, password, port, timeout, connectPattern, extendParams);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public class Key {
public static final String HOST = "host";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String PRIVATEKEY = "privateKey";
public static final String KEYPASSWORD = "keyPassword";
public static final String PORT = "port";
public static final String TIMEOUT = "timeout";
public static final String CONNECTPATTERN = "connectPattern";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import java.io.InputStream;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Vector;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,11 +26,21 @@ public class SftpHelper extends FtpHelper {

Session session = null;
ChannelSftp channelSftp = null;
@Override

@Override
public void loginFtpServer(String host, String username, String password, int port, int timeout,
String connectMode) {
String connectMode, Map<String, Object> extendParams) {
JSch jsch = new JSch(); // 创建JSch对象
try {
String privateKey = (String)extendParams.get(Key.PRIVATEKEY);
String keyPassword = (String)extendParams.get(Key.KEYPASSWORD);
if (!StringUtils.isBlank(privateKey)) {
if (!StringUtils.isBlank(keyPassword)) {
jsch.addIdentity(privateKey, keyPassword);
} else {
jsch.addIdentity(privateKey);
}
}
session = jsch.getSession(username, host, port);
// 根据用户名,主机ip,端口获取一个Session对象
// 如果服务器连接不上,则抛出异常
Expand All @@ -37,7 +49,9 @@ public void loginFtpServer(String host, String username, String password, int po
"session is null,无法通过sftp与服务器建立链接,请检查主机名和用户名是否正确.");
}

session.setPassword(password); // 设置密码
if (StringUtils.isBlank(privateKey)) {
session.setPassword(password); // 设置密码
}
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config); // 为Session对象设置properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.InputStream;
import java.net.UnknownHostException;
import java.util.HashSet;
import java.util.Map;

import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTP;
Expand All @@ -23,7 +24,7 @@ public class StandardFtpHelper extends FtpHelper {

@Override
public void loginFtpServer(String host, String username, String password, int port, int timeout,
String connectMode) {
String connectMode, Map<String, Object> extendParams) {
ftpClient = new FTPClient();
try {
// 连接
Expand Down
22 changes: 20 additions & 2 deletions ftpwriter/doc/ftpwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结
"port": 22,
"username": "xxx",
"password": "xxx",
"privateKey": "/path_to_private_key",
"keyPassword": "xxx",
"timeout": "60000",
"connectPattern": "PASV",
"path": "/tmp/data/",
Expand Down Expand Up @@ -117,9 +119,25 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结

* **password**

* 描述:ftp服务器访问密码。 <br />
* 描述:ftp服务器访问密码。密码和私钥必须配置一项。 <br />

* 必选:是 <br />
* 必选:否 <br />

* 默认值:无 <br />

* **privateKey**

* 描述:ftp服务器访问私钥。密码和私钥必须配置一项。 <br />

* 必选:否 <br />

* 默认值:无 <br />

* **keyPassword**

* 描述:私钥密码密码。 <br />

* 必选:否 <br />

* 默认值:无 <br />

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.io.OutputStream;
import java.util.HashSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;

Expand All @@ -34,6 +36,8 @@ public static class Job extends Writer.Job {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;

private IFtpHelper ftpHelper = null;
Expand All @@ -48,8 +52,11 @@ public void init() {
RetryUtil.executeWithRetry(new Callable<Void>() {
@Override
public Void call() throws Exception {
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
ftpHelper.loginFtpServer(host, username, password,
port, timeout);
port, timeout, extendParams);
return null;
}
}, 3, 4000, true);
Expand Down Expand Up @@ -81,8 +88,13 @@ private void validateParameter() {
FtpWriterErrorCode.REQUIRED_VALUE);
this.username = this.writerSliceConfig.getNecessaryValue(
Key.USERNAME, FtpWriterErrorCode.REQUIRED_VALUE);
this.password = this.writerSliceConfig.getNecessaryValue(
Key.PASSWORD, FtpWriterErrorCode.REQUIRED_VALUE);
this.password = this.writerSliceConfig.getString(Key.PASSWORD);
this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD);
if (StringUtils.isBlank(this.password) && StringUtils.isBlank(this.privateKey)) {
throw DataXException.asDataXException(
FtpWriterErrorCode.REQUIRED_VALUE, "密码和私钥路径至少需配置一项");
}
this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT,
Constant.DEFAULT_TIMEOUT);

Expand Down Expand Up @@ -208,6 +220,8 @@ public static class Task extends Writer.Task {
private int port;
private String username;
private String password;
private String privateKey;
private String keyPassword;
private int timeout;

private IFtpHelper ftpHelper = null;
Expand All @@ -225,6 +239,8 @@ public void init() {
this.port = this.writerSliceConfig.getInt(Key.PORT);
this.username = this.writerSliceConfig.getString(Key.USERNAME);
this.password = this.writerSliceConfig.getString(Key.PASSWORD);
this.privateKey = this.writerSliceConfig.getString(Key.PRIVATEKEY);
this.keyPassword = this.writerSliceConfig.getString(Key.KEYPASSWORD);
this.timeout = this.writerSliceConfig.getInt(Key.TIMEOUT,
Constant.DEFAULT_TIMEOUT);
this.protocol = this.writerSliceConfig.getString(Key.PROTOCOL);
Expand All @@ -238,8 +254,11 @@ public void init() {
RetryUtil.executeWithRetry(new Callable<Void>() {
@Override
public Void call() throws Exception {
Map<String, Object> extendParams = new HashMap<String, Object>();
extendParams.put(Key.PRIVATEKEY, privateKey);
extendParams.put(Key.KEYPASSWORD, keyPassword);
ftpHelper.loginFtpServer(host, username, password,
port, timeout);
port, timeout, extendParams);
return null;
}
}, 3, 4000, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ public class Key {
public static final String USERNAME = "username";

public static final String PASSWORD = "password";

public static final String PRIVATEKEY = "privateKey";

public static final String KEYPASSWORD = "keyPassword";

public static final String PORT = "port";

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.alibaba.datax.plugin.writer.ftpwriter.util;

import java.io.OutputStream;
import java.util.Map;
import java.util.Set;

public interface IFtpHelper {

//使用被动方式
public void loginFtpServer(String host, String username, String password, int port, int timeout);
public void loginFtpServer(String host, String username, String password, int port, int timeout, Map<String, Object> extendParams);

public void logoutFtpServer();

Expand Down
Loading