@@ -49,19 +49,11 @@ class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {
4949 }
5050
5151 logger.info(s " $topic truncated. " )
52-
5352 }
5453
55- private def deleteRecords (latestOffsets : Map [TopicPartition , ListOffsetsResultInfo ]): Unit =
56- try {
54+ private def deleteRecords (latestOffsets : Map [TopicPartition , ListOffsetsResultInfo ]): Unit = {
5755 val recordsToDelete = generateRecordsToDelete(latestOffsets)
5856 admin.deleteRecords(recordsToDelete.asJava).all().get()
59- } catch {
60- case _ : InterruptedException =>
61- Thread .currentThread().interrupt()
62- throw new RuntimeException (" the deleteRecords operation was interrupted, aborting; it may have still completed." )
63- case e : ExecutionException =>
64- throw convertExecutionException(e)
6557 }
6658
6759 /**
@@ -84,76 +76,48 @@ class KafkaTruncateTopic(private val admin: Admin) extends LazyLogging {
8476 /**
8577 * used by getLatestOffsets() and getEarliestOffsets() for obtaining the offsets
8678 */
87- private def getOffsets (offsetSpecs : Map [TopicPartition , OffsetSpec ]): Map [TopicPartition , ListOffsetsResultInfo ] =
88- try {
79+ private def getOffsets (offsetSpecs : Map [TopicPartition , OffsetSpec ]): Map [TopicPartition , ListOffsetsResultInfo ] = {
8980 admin.listOffsets(offsetSpecs.asJava).all().get().asScala.toMap
90- } catch {
91- case _ : InterruptedException =>
92- Thread .currentThread().interrupt()
93- throw new RuntimeException (" listOffsets operation interrupted, deleteRecords will not executed." )
94- case e : ExecutionException =>
95- throw convertExecutionException(e)
9681 }
9782
9883 /**
9984 * Get all TopicPartitions for the given topic; truncation is performed is performed at the partition level.
10085 */
101- private def getTopicPartitions (topic : String ): List [TopicPartition ] =
102- try {
86+ private def getTopicPartitions (topic : String ): List [TopicPartition ] = {
10387 val topicInfo = admin.describeTopics(Collections .singleton(topic)).allTopicNames().get().get(topic)
10488 topicInfo.partitions().asScala.map(info => new TopicPartition (topic, info.partition())).toList
105- } catch {
106- case _ : InterruptedException =>
107- Thread .currentThread().interrupt()
108- throw new RuntimeException (" describeTopics operation interrupted, deleteRecords will not executed." )
109- case e : ExecutionException =>
110- throw convertExecutionException(e)
11189 }
11290
11391 /**
11492 * Check if the topic has the 'delete' cleanup policy.
11593 */
116- private def hasDeleteCleanupPolicy (topicName : String ): Boolean =
117- try {
118- val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topicName)
119- val configsResult = admin.describeConfigs(Collections .singleton(configResource))
120- val config = configsResult.all().get().get(configResource)
121-
122- config.get(TopicConfig .CLEANUP_POLICY_CONFIG ).value().contains(TopicConfig .CLEANUP_POLICY_DELETE )
123- } catch {
124- case _ : InterruptedException =>
125- Thread .currentThread().interrupt()
126- throw new RuntimeException (" describeTopics operation interrupted, deleteRecords will not executed." )
127- case e : ExecutionException =>
128- throw convertExecutionException(e)
129- }
94+ private def hasDeleteCleanupPolicy (topicName : String ): Boolean = {
95+ val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topicName)
96+ val configsResult = admin.describeConfigs(Collections .singleton(configResource))
97+ val config = configsResult.all().get().get(configResource)
98+
99+ config.get(TopicConfig .CLEANUP_POLICY_CONFIG ).value().contains(TopicConfig .CLEANUP_POLICY_DELETE )
100+ }
130101
131102 /**
132103 * Add the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
133104 */
134- private def addDeleteCleanupPolicy (topic : String ): Unit =
135- try {
136- val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
137- val alterConfigOp = new AlterConfigOp (new ConfigEntry (TopicConfig .CLEANUP_POLICY_CONFIG , TopicConfig .CLEANUP_POLICY_DELETE ), AlterConfigOp .OpType .APPEND )
138- val configs : util.Map [ConfigResource , util.Collection [AlterConfigOp ]] = Map (configResource -> alterConfigOpColl(alterConfigOp)).asJava
139- admin.incrementalAlterConfigs(configs).all().get()
140- } catch {
141- case _ : InterruptedException =>
142- Thread .currentThread().interrupt()
143- throw new RuntimeException (" incrementalAlterConfigs operation interrupted, ..." )
144- case e : ExecutionException =>
145- throw convertExecutionException(e)
146- }
105+ private def addDeleteCleanupPolicy (topic : String ): Unit = {
106+ val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
107+ val alterConfigOp = new AlterConfigOp (new ConfigEntry (TopicConfig .CLEANUP_POLICY_CONFIG , TopicConfig .CLEANUP_POLICY_DELETE ), AlterConfigOp .OpType .APPEND )
108+ val configs : util.Map [ConfigResource , util.Collection [AlterConfigOp ]] = Map (configResource -> alterConfigOpColl(alterConfigOp)).asJava
109+ admin.incrementalAlterConfigs(configs).all().get()
110+ }
147111
148112 /**
149113 * Remove the 'delete' cleanup policy to the topic's 'cleanup.policy' config.
150114 */
151115 private def removeDeleteCleanupPolicy (topic : String ): Unit = {
152- val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
153- val alterConfigOp = new AlterConfigOp (new ConfigEntry (TopicConfig .CLEANUP_POLICY_CONFIG , TopicConfig .CLEANUP_POLICY_DELETE ), AlterConfigOp .OpType .SUBTRACT )
154- val configs : util.Map [ConfigResource , util.Collection [AlterConfigOp ]] = Map (configResource -> alterConfigOpColl(alterConfigOp)).asJava
155- admin.incrementalAlterConfigs(configs).all().get()
156- }
116+ val configResource = new ConfigResource (ConfigResource .Type .TOPIC , topic)
117+ val alterConfigOp = new AlterConfigOp (new ConfigEntry (TopicConfig .CLEANUP_POLICY_CONFIG , TopicConfig .CLEANUP_POLICY_DELETE ), AlterConfigOp .OpType .SUBTRACT )
118+ val configs : util.Map [ConfigResource , util.Collection [AlterConfigOp ]] = Map (configResource -> alterConfigOpColl(alterConfigOp)).asJava
119+ admin.incrementalAlterConfigs(configs).all().get()
120+ }
157121
158122 /**
159123 * Singleton wrapper for AlertConfigOp.
0 commit comments