1414
1515package org .apache .zeppelin .hbase ;
1616
17- import org .jruby .embed .LocalContextScope ;
18- import org .jruby .embed .ScriptingContainer ;
19- import org .slf4j .Logger ;
20- import org .slf4j .LoggerFactory ;
21-
17+ import java .io .ByteArrayOutputStream ;
2218import java .io .File ;
23- import java .io .FileInputStream ;
2419import java .io .IOException ;
25- import java .io .StringWriter ;
26- import java .nio .file .Path ;
20+ import java .nio .file .Files ;
2721import java .nio .file .Paths ;
28- import java .util .List ;
22+ import java .util .HashMap ;
23+ import java .util .Map ;
2924import java .util .Properties ;
3025
26+ import org .apache .commons .exec .CommandLine ;
27+ import org .apache .commons .exec .DefaultExecutor ;
28+ import org .apache .commons .exec .ExecuteException ;
29+ import org .apache .commons .exec .ExecuteWatchdog ;
30+ import org .apache .commons .exec .Executor ;
31+ import org .apache .commons .exec .PumpStreamHandler ;
32+ import org .apache .commons .io .FileUtils ;
33+ import org .apache .commons .lang3 .StringUtils ;
3134import org .apache .zeppelin .interpreter .Interpreter ;
3235import org .apache .zeppelin .interpreter .InterpreterContext ;
3336import org .apache .zeppelin .interpreter .InterpreterException ;
3437import org .apache .zeppelin .interpreter .InterpreterResult ;
35- import org .apache .zeppelin .interpreter .thrift .InterpreterCompletion ;
3638import org .apache .zeppelin .scheduler .Scheduler ;
3739import org .apache .zeppelin .scheduler .SchedulerFactory ;
40+ import org .slf4j .Logger ;
41+ import org .slf4j .LoggerFactory ;
3842
3943/**
40- * Support for HBase Shell. All the commands documented here
41- * http://hbase.apache.org/book.html#shell is supported.
42- *
43- * Requirements:
44- * HBase Shell should be installed on the same machine. To be more specific, the following dir.
45- * should be available: https://github.com/apache/hbase/tree/master/hbase-shell/src/main/ruby
46- * HBase Shell should be able to connect to the HBase cluster from terminal. This makes sure
47- * that the client is configured properly.
48- *
49- * The interpreter takes 3 config parameters:
50- * hbase.home: Root directory where HBase is installed. Default is /usr/lib/hbase/
51- * hbase.ruby.sources: Dir where shell ruby code is installed.
52- * Path is relative to hbase.home. Default: lib/ruby
53- * zeppelin.hbase.test.mode: (Testing only) Disable checks for unit and manual tests. Default: false
44+ * HBase interpreter. It uses the hbase shell to interpret the commands.
5445 */
5546public class HbaseInterpreter extends Interpreter {
47+ private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
48+
5649 public static final String HBASE_HOME = "hbase.home" ;
57- public static final String HBASE_RUBY_SRC = "hbase.ruby.sources" ;
58- public static final String HBASE_TEST_MODE = "zeppelin.hbase.test.mode" ;
5950
60- private static final Logger LOGGER = LoggerFactory .getLogger (HbaseInterpreter .class );
61- private ScriptingContainer scriptingContainer ;
51+ private Map <String , Executor > runningProcesses = new HashMap <>();
6252
63- private StringWriter writer ;
53+ private Map < String , File > tempFiles = new HashMap <>() ;
6454
65- public HbaseInterpreter (Properties property ) {
66- super (property );
55+ private static final int SIGTERM_CODE = 143 ;
56+
57+ private long commandTimeout = 60000 ;
58+
59+ public HbaseInterpreter (Properties properties ) {
60+ super (properties );
6761 }
6862
6963 @ Override
7064 public void open () throws InterpreterException {
71- this .scriptingContainer = new ScriptingContainer (LocalContextScope .SINGLETON );
72- this .writer = new StringWriter ();
73- scriptingContainer .setOutput (this .writer );
74-
75- if (!Boolean .parseBoolean (getProperty (HBASE_TEST_MODE ))) {
76- String hbaseHome = getProperty (HBASE_HOME );
77- String rubySrc = getProperty (HBASE_RUBY_SRC );
78- Path absRubySrc = Paths .get (hbaseHome , rubySrc ).toAbsolutePath ();
79-
80- LOGGER .info ("Home:" + hbaseHome );
81- LOGGER .info ("Ruby Src:" + rubySrc );
82-
83- File f = absRubySrc .toFile ();
84- if (!f .exists () || !f .isDirectory ()) {
85- throw new InterpreterException ("HBase ruby sources is not available at '" + absRubySrc
86- + "'" );
87- }
88-
89- LOGGER .info ("Absolute Ruby Source:" + absRubySrc .toString ());
90- // hirb.rb:41 requires the following system properties to be set.
91- Properties sysProps = System .getProperties ();
92- sysProps .setProperty (HBASE_RUBY_SRC , absRubySrc .toString ());
93-
94- Path absHirbPath = Paths .get (hbaseHome , "bin/hirb.rb" );
95- try {
96- FileInputStream fis = new FileInputStream (absHirbPath .toFile ());
97- this .scriptingContainer .runScriptlet (fis , "hirb.rb" );
98- fis .close ();
99- } catch (IOException e ) {
100- throw new InterpreterException (e .getCause ());
101- }
102- }
65+ // Do nothing
10366 }
10467
10568 @ Override
10669 public void close () {
107- if (this .scriptingContainer != null ) {
108- this .scriptingContainer .terminate ();
109- }
70+ runningProcesses .clear ();
71+ runningProcesses = null ;
72+ tempFiles .clear ();
73+ tempFiles = null ;
11074 }
11175
11276 @ Override
113- public InterpreterResult interpret (String cmd , InterpreterContext interpreterContext ) {
77+ public InterpreterResult interpret (String st , InterpreterContext context ) {
78+ LOGGER .debug ("Run HBase shell script: {}" , st );
79+
80+ if (StringUtils .isEmpty (st )) {
81+ return new InterpreterResult (InterpreterResult .Code .SUCCESS );
82+ }
83+
84+ String paragraphId = context .getParagraphId ();
85+ final File scriptFile ;
11486 try {
115- LOGGER .info (cmd );
116- this .writer .getBuffer ().setLength (0 );
117- this .scriptingContainer .runScriptlet (cmd );
118- this .writer .flush ();
119- LOGGER .debug (writer .toString ());
120- return new InterpreterResult (InterpreterResult .Code .SUCCESS , writer .getBuffer ().toString ());
121- } catch (Throwable t ) {
122- LOGGER .error ("Can not run '" + cmd + "'" , t );
123- return new InterpreterResult (InterpreterResult .Code .ERROR , t .getMessage ());
87+ // Write script in a temporary file
88+ // The script is enriched with extensions
89+ scriptFile = createTempFile (paragraphId );
90+ FileUtils .write (scriptFile , st + "\n exit" );
91+ } catch (IOException e ) {
92+ LOGGER .error ("Can not write script in temp file" , e );
93+ return new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
94+ }
95+
96+ InterpreterResult result = new InterpreterResult (InterpreterResult .Code .SUCCESS );
97+
98+ final DefaultExecutor executor = new DefaultExecutor ();
99+ final ByteArrayOutputStream errorStream = new ByteArrayOutputStream ();
100+
101+ executor .setStreamHandler (new PumpStreamHandler (context .out , errorStream ));
102+ executor .setWatchdog (new ExecuteWatchdog (commandTimeout ));
103+
104+ String hbaseCmdPath = Paths .get (getProperty (HBASE_HOME ), "bin" , "hbase" ).toString ();
105+ final CommandLine cmdLine = CommandLine .parse (hbaseCmdPath );
106+ cmdLine .addArgument ("shell" , false );
107+ cmdLine .addArgument (scriptFile .getAbsolutePath (), false );
108+
109+ try {
110+ executor .execute (cmdLine );
111+ runningProcesses .put (paragraphId , executor );
112+ } catch (ExecuteException e ) {
113+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
114+
115+ final int exitValue = e .getExitValue ();
116+ InterpreterResult .Code code = InterpreterResult .Code .ERROR ;
117+ String msg = errorStream .toString ();
118+
119+ if (exitValue == SIGTERM_CODE ) {
120+ code = InterpreterResult .Code .INCOMPLETE ;
121+ msg = msg + "Paragraph received a SIGTERM.\n " ;
122+ LOGGER .info ("The paragraph {} stopped executing: {}" , paragraphId , msg );
123+ }
124+
125+ msg += "ExitValue: " + exitValue ;
126+ result = new InterpreterResult (code , msg );
127+ } catch (IOException e ) {
128+ LOGGER .error ("Can not run script in paragraph {}" , paragraphId , e );
129+ result = new InterpreterResult (InterpreterResult .Code .ERROR , e .getMessage ());
130+ } finally {
131+ deleteTempFile (paragraphId );
132+ stopProcess (paragraphId );
124133 }
134+ return result ;
125135 }
126136
127137 @ Override
128- public void cancel (InterpreterContext context ) {}
138+ public void cancel (InterpreterContext context ) {
139+ stopProcess (context .getParagraphId ());
140+ deleteTempFile (context .getParagraphId ());
141+ }
129142
130143 @ Override
131144 public FormType getFormType () {
@@ -143,30 +156,26 @@ public Scheduler getScheduler() {
143156 HbaseInterpreter .class .getName () + this .hashCode ());
144157 }
145158
146- @ Override
147- public List <InterpreterCompletion > completion (String buf , int cursor ,
148- InterpreterContext interpreterContext ) {
149- return null ;
159+ private void stopProcess (String paragraphId ) {
160+ if (runningProcesses .containsKey (paragraphId )) {
161+ final Executor executor = runningProcesses .get (paragraphId );
162+ final ExecuteWatchdog watchdog = executor .getWatchdog ();
163+ watchdog .destroyProcess ();
164+ runningProcesses .remove (paragraphId );
165+ }
150166 }
151167
152- private static String getSystemDefault (
153- String envName ,
154- String propertyName ,
155- String defaultValue ) {
156-
157- if (envName != null && !envName .isEmpty ()) {
158- String envValue = System .getenv ().get (envName );
159- if (envValue != null ) {
160- return envValue ;
161- }
162- }
168+ private File createTempFile (String paragraphId ) throws IOException {
169+ File temp = Files .createTempFile (
170+ Paths .get (System .getProperty ("java.io.tmpdir" ), "zeppelin-hbase-scripts" ), paragraphId , ".txt" ).toFile ();
171+ tempFiles .put (paragraphId , temp );
172+ return temp ;
173+ }
163174
164- if (propertyName != null && !propertyName .isEmpty ()) {
165- String propValue = System .getProperty (propertyName );
166- if (propValue != null ) {
167- return propValue ;
168- }
175+ private void deleteTempFile (String paragraphId ) {
176+ File tmpFile = tempFiles .remove (paragraphId );
177+ if (null != tmpFile ) {
178+ FileUtils .deleteQuietly (tmpFile );
169179 }
170- return defaultValue ;
171180 }
172181}
0 commit comments