Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Fix doris spark sink return sucess when error occurs #5876

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.spark.doris.sink

import com.google.gson.Gson
import org.apache.commons.net.util.Base64
import org.apache.http.HttpHeaders
import org.apache.http.client.config.RequestConfig
Expand Down Expand Up @@ -48,10 +49,11 @@ object DorisUtil extends Serializable {
messages: String,
api: String,
user: String,
password: String): (Boolean, CloseableHttpClient, CloseableHttpResponse) = {
password: String): (Boolean, String) = {

var response: CloseableHttpResponse = null
var status = true
var msg = ""
try {
val httpPut = new HttpPut(api)
val requestConfig = RequestConfig.custom()
Expand Down Expand Up @@ -87,11 +89,20 @@ object DorisUtil extends Serializable {
|Batch Messages Response:
|${stringBuffer.toString}
|""".stripMargin)
val respJson = new Gson()
val map = respJson.fromJson(stringBuffer.toString, classOf[java.util.Map[String, AnyRef]])
val respStatus = map.get("Status")
LOG.info(s"reponse status: $respStatus")
if (respStatus == null || "Fail".equalsIgnoreCase(respStatus.toString)) {
status = false
msg = stringBuffer.toString
}
} catch {
case _: Exception => status = false
(status, httpclient, response)
case e: Exception =>
status = false
msg = e.toString
}
(status, httpclient, response)
(status, msg)
}


Expand All @@ -106,15 +117,13 @@ object DorisUtil extends Serializable {
class DorisUtil(httpHeader: Map[String, String], apiUrl: String, user: String, password: String) {
def saveMessages(messages: String): Unit = {
val httpClient = DorisUtil.createClient
val result = Try(DorisUtil.streamLoad(httpClient, httpHeader, messages, apiUrl, user, password))
result match {
case Success(_) =>
val (status, errMessage) = DorisUtil.streamLoad(httpClient, httpHeader, messages, apiUrl, user, password)
status match {
case true =>
httpClient.close()
result.get._2.close()
case Failure(var1: Exception) =>
case false =>
httpClient.close()
result.get._2.close()
throw new RuntimeException(var1.getMessage)
throw new RuntimeException(errMessage)
}
}
}
Loading