Description
一.应用场景
首先感谢社区各位大佬的指点,学习到很多。
知因智慧是一家toB金融公司,里面需要大量的ETL过程,原先用Shell脚本连接各种Hql,Spark等等,XXL- Job调度,可能一个模块就被一个大的脚本包含住了,耦合性特别强,调度这块也有问题,无法监控中间的报错,2019下半年时看到社区开源组件,一直研究怎么跟公司整合。
希望借助社区的力量,结合公司实际情况,打通公司级数据中台的流程,目前数据建设主要集中在元数据管理,数据仓库ETL流程,数据质量,任务调度这几个方面。
二. 解决的问题
基于LDAP服务
基于LDAP管理用户,代理服务模块修改,以组为单位共用账户,公司的整个数据开发人员不多,基于这种方式可以支撑下去。
object LDAPUtils extends Logging {
val url = CommonVars("wds.linkis.ldap.proxy.url", "").getValue
val baseDN = CommonVars("wds.linkis.ldap.proxy.baseDN", "").getValue
def login(userID: String, password: String): Unit = {
if(userID.isEmpty) throw new NamingException("userID is null")
val env = new Hashtable[String, String]()
val bindDN = "uid="+userID+","
val bindPassword = password
env.put(Context.SECURITY_AUTHENTICATION, "simple")
env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory")
env.put(Context.PROVIDER_URL, url)
env.put(Context.SECURITY_PRINCIPAL, bindDN+baseDN)
env.put(Context.SECURITY_CREDENTIALS, bindPassword)
new InitialLdapContext(env, null)
info(s"user $userID login success.")
}
}
放开权限
因为资源有限,有些权限管理很繁琐,放开一些权限:
filesystem.path 存储日志,脚本的权限,统一根据nfs挂载,把目录权限统一根据用户随意修改。
linkis-metadata元数据管理,把Hive相关权限放开(HiveMetaDao.xml):
<select id="getDbsByUser" resultType="java.lang.String" parameterType="java.lang.String" databaseId="mysql">
select NAME from DBS GROUP BY NAME order by NAME
</select>
<select id="getTablesByDbNameAndUser" resultType="map" parameterType="map" databaseId="mysql">
select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
from TBLS t2, DBS t3 where 1=1 and t2.DB_ID=t3.DB_ID and t3.NAME = #{dbName,jdbcType=VARCHAR}
order by NAME;
</select>
依赖兼容问题
因为环境是CDH-5.16.2 编译部署DSS和Linkis是根据原生的版本号,大部分服务都没有问题,但是有的服务有些问题,因为CDH会把组件重新编译,有的指令会改变。
原先发生过Hive 和 Spark应该支持的函数,到Scriptis上运行脚本,不支持,这是因为得把两个服务相关的Hive 和 Spark jar 都变成后缀带有CDH的。
tez的支持
Linkis Hive引擎对tez的支持:
- tez相关jar放到linkis-ujes-hive-enginemanager/lib
- linkis.properties配置Hive配置文件目录,hive-site.xml配置文件中
<property>
<name>tez.lib.uris</name>
<value>hdfs:///apps/tez/tez-0.8.5.tar.gz</value>
</property>
<property>
<name>hive.tez.container.size</name>
<value>10240</value>
</property>
Shell定义变量
自定义变量的支持:
CustomVariableUtils 工具类中,Shell关枚举都要添加上。
/**
* @Classname ShellScriptCompaction
* @Description TODO
* @Date 2020/8/19 18:22
* @Created by limeng
*/
class ShellScriptCompaction private extends CommonScriptCompaction{
override def prefixConf: String = "#conf@set"
override def prefix: String = "#@set"
override def belongTo(suffix: String): Boolean ={
suffix match {
case "sh"=>true
case _=>false
}
}
}
object ShellScriptCompaction{
val shellScriptCompaction:ShellScriptCompaction=new ShellScriptCompaction
def apply(): CommonScriptCompaction = shellScriptCompaction
}
ScriptFsWriter Shell相关 def listCompactions(): Array[Compaction] = Array(PYScriptCompaction(),QLScriptCompaction(),ScalaScriptCompaction(),ShellScriptCompaction())
WorkspaceUtil 工具类正则有问题,无法修改名称,中间有.符号去除
public static void fileAndDirNameSpecialCharCheck(String path) throws WorkSpaceException {
String name = new File(path).getName();
LOGGER.info(path);
String specialRegEx = "[ _`~!@#$%^&*()+=|{}':;',\\[\\]<>/?~!@#¥%……&*()——+|{}【】‘;:”“’。,、?]|\n|\r|\t";
Pattern specialPattern = Pattern.compile(specialRegEx);
if(specialPattern.matcher(name).find()){
throw new WorkSpaceException("the path exist special char");
}
}
使用 eventReceiver节点异常(eventchecker组件)#247
EventCheckerNodeExecution.scala
Utils.tryFinally {
resultSetWriter.addMetaData(null)
resultSetWriter.addRecord(new LineRecord(action.saveKeyAndValue))
}(Utils.tryQuietly(resultSetWriter.close()))
}
response.setIsSucceed(true)
}else{
.............................................
AppJointEntranceJob.scala
override def run(): Unit = {
if(!isScheduled) return
info(s"$getId starts to run")
getLogListener.foreach(_.onLogUpdate(this, LogUtils.generateInfo(s"$getId starts to execute.")))
startTime = System.currentTimeMillis
getExecutor match {
case appjointEntranceEngine:AppJointEntranceEngine => appjointEntranceEngine.setJob(this)
appjointEntranceEngine.setInstance(Sender.getThisInstance)
}
Utils.tryAndErrorMsg(transition(Running))(s"transition $getId from Scheduler to Running failed.")
加入spark streaming
- linkis-ujes-spark-enginemanager引入依赖,spark-streaming相关。
- SparkScalaExecutor.scala bindSparkSession方法引入相关依赖
调试服务
把有问题的服务,bin目录下启动脚本,远程debug打开
因为平台Cookie的原因,直接用接口发送请求,有的无法调试:
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
/**
* @Classname HttpUtil
* @Description TODO
* @Date 2020/10/30 14:49
* @Created by limeng
*/
public class HttpUtil {
public static RestTemplate getRestClient(){
CloseableHttpClient build = HttpClientBuilder.create().useSystemProperties().build();
return new RestTemplate(new HttpComponentsClientHttpRequestFactory(build));
}
}
import com.linkis.web.utils.HttpUtil;
import net.sf.json.JSONObject;
import org.springframework.http.ResponseEntity;
import org.springframework.web.client.RestTemplate;
/**
* @Classname LinkisMain
* @Description TODO
* @Date 2020/10/30 14:54
* @Created by limeng
* 测试类 EntranceRestfulTest
*/
public class LinkisMain {
public static void main(String[] args) {
RestTemplate restClient = HttpUtil.getRestClient();
JSONObject postData = new JSONObject();
postData.put("password","hdfs");
postData.put("userName","hdfs");
String loginUrl = "http://192.168.200.116:8088/api/rest_j/v1/user/login";
ResponseEntity<JSONObject> jsonResponseEntity = restClient.postForEntity(loginUrl, postData, JSONObject.class);
System.out.println("状态码:"+jsonResponseEntity.getStatusCodeValue());
JSONObject body = jsonResponseEntity.getBody();
System.out.println("body :" + body.toString());
}
}
三.最佳实践
我以公司标签库为例,讲述下操作流程。
对企业数据进行挖掘和分析,建立标签特征体系,创建个性化的多层级标签,并在此基础上进行细分和精准营销场景应用,有利于对企业/集团进行深入经营,充分挖掘企业/集团潜力,提升企业/集团价值。以此为目标的标签库的构建,将有利于了解和深耕企业/集团,可更好地助力于企业金融服务。
这是执行流程图,中间调度过程,Hql,Spark目前运行在DSS平台上。
其中一个企业经营特别的跑批流程。
软件版本
CDH-5.16.2
Hadoop-2.6
Hive-1.1
Spark-2.2
kafka_2.11-1.0.1