From f8addc22a22e5547d1fbb9730c73553bdc8aa86d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 00:48:57 +0000 Subject: [PATCH 1/7] Command class has language attribute MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/livyclientlib/command.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/sparkmagic/sparkmagic/livyclientlib/command.py b/sparkmagic/sparkmagic/livyclientlib/command.py index ce36f2d30..a0d0a8495 100644 --- a/sparkmagic/sparkmagic/livyclientlib/command.py +++ b/sparkmagic/sparkmagic/livyclientlib/command.py @@ -12,13 +12,15 @@ from sparkmagic.utils.sparkevents import SparkEvents from sparkmagic.utils.constants import MAGICS_LOGGER_NAME, FINAL_STATEMENT_STATUS, \ MIMETYPE_IMAGE_PNG, MIMETYPE_TEXT_HTML, MIMETYPE_TEXT_PLAIN -from .exceptions import LivyUnexpectedStatusException +from .exceptions import LivyUnexpectedStatusException, \ + BadUserConfigurationException class Command(ObjectWithGuid): - def __init__(self, code, spark_events=None): + def __init__(self, code, spark_events=None, language=None): super(Command, self).__init__() self.code = textwrap.dedent(code) + self.language = language self.logger = SparkLog(u"Command") if spark_events is None: spark_events = SparkEvents() @@ -38,7 +40,11 @@ def execute(self, session): statement_id = -1 try: session.wait_for_idle() - data = {u"code": self.code} + try: + codekind = conf.get_livy_kind(self.language) + except BadUserConfigurationException as e: + codekind = session.kind + data = {u"code": self.code, u"kind": codekind} response = session.http_client.post_statement(session.id, data) statement_id = response[u'id'] output = self._get_statement_output(session, statement_id) @@ -97,4 +103,4 @@ def _get_statement_output(self, session, statement_id): MIMETYPE_TEXT_PLAIN) else: raise LivyUnexpectedStatusException(u"Unknown output status from Livy: '{}'" - .format(statement_output[u"status"])) \ No newline at end of file + .format(statement_output[u"status"])) From 979bdfe5f1531c48e7bcf6596733e18a67191cc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 00:58:03 +0000 Subject: [PATCH 2/7] Tests passing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/tests/test_command.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sparkmagic/sparkmagic/tests/test_command.py b/sparkmagic/sparkmagic/tests/test_command.py index 6955cf701..6c3136347 100644 --- a/sparkmagic/sparkmagic/tests/test_command.py +++ b/sparkmagic/sparkmagic/tests/test_command.py @@ -42,7 +42,7 @@ def test_execute(): result = command.execute(session) - http_client.post_statement.assert_called_with(0, {"code": command.code}) + http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind}) http_client.get_statement.assert_called_with(0, 0) assert result[0] assert_equals(tls.TestLivySession.pi_result, result[1]) @@ -84,7 +84,7 @@ def test_execute_waiting(): result = command.execute(session) - http_client.post_statement.assert_called_with(0, {"code": command.code}) + http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind}) http_client.get_statement.assert_called_with(0, 0) assert result[0] assert_equals(tls.TestLivySession.pi_result, result[1]) @@ -111,7 +111,7 @@ def test_execute_null_ouput(): result = command.execute(session) - http_client.post_statement.assert_called_with(0, {"code": command.code}) + http_client.post_statement.assert_called_with(0, {"code": command.code, "kind": kind}) http_client.get_statement.assert_called_with(0, 0) assert result[0] assert_equals(u"", result[1]) From e3805e316d3ed822c387eb4c03e240a5b7069ef9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 01:03:43 +0000 Subject: [PATCH 3/7] Added language param to SparkMagicBase.execute_spark MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/kernels/kernelmagics.py | 2 +- sparkmagic/sparkmagic/magics/remotesparkmagics.py | 2 +- sparkmagic/sparkmagic/magics/sparkmagicsbase.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/kernelmagics.py b/sparkmagic/sparkmagic/kernels/kernelmagics.py index 99d392bfe..4ef255d83 100644 --- a/sparkmagic/sparkmagic/kernels/kernelmagics.py +++ b/sparkmagic/sparkmagic/kernels/kernelmagics.py @@ -262,7 +262,7 @@ def spark(self, line, cell="", local_ns=None): coerce = get_coerce_value(args.coerce) - self.execute_spark(cell, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) + self.execute_spark(cell, None, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) else: return diff --git a/sparkmagic/sparkmagic/magics/remotesparkmagics.py b/sparkmagic/sparkmagic/magics/remotesparkmagics.py index 20b2bf915..c2d93321b 100644 --- a/sparkmagic/sparkmagic/magics/remotesparkmagics.py +++ b/sparkmagic/sparkmagic/magics/remotesparkmagics.py @@ -169,7 +169,7 @@ def spark(self, line, cell="", local_ns=None): elif len(subcommand) == 0: coerce = get_coerce_value(args.coerce) if args.context == CONTEXT_NAME_SPARK: - return self.execute_spark(cell, args.output, args.samplemethod, + return self.execute_spark(cell, None, args.output, args.samplemethod, args.maxrows, args.samplefraction, args.session, coerce) elif args.context == CONTEXT_NAME_SQL: return self.execute_sqlquery(cell, args.samplemethod, args.maxrows, args.samplefraction, diff --git a/sparkmagic/sparkmagic/magics/sparkmagicsbase.py b/sparkmagic/sparkmagic/magics/sparkmagicsbase.py index b9a1911ce..1db225cca 100644 --- a/sparkmagic/sparkmagic/magics/sparkmagicsbase.py +++ b/sparkmagic/sparkmagic/magics/sparkmagicsbase.py @@ -77,8 +77,8 @@ def do_send_to_spark(self, cell, input_variable_name, var_type, output_variable_ self.ipython_display.write(u'Successfully passed \'{}\' as \'{}\' to Spark' u' kernel'.format(input_variable_name, output_variable_name)) - def execute_spark(self, cell, output_var, samplemethod, maxrows, samplefraction, session_name, coerce): - (success, out, mimetype) = self.spark_controller.run_command(Command(cell), session_name) + def execute_spark(self, cell, language, output_var, samplemethod, maxrows, samplefraction, session_name, coerce): + (success, out, mimetype) = self.spark_controller.run_command(Command(cell, language), session_name) if not success: if conf.shutdown_session_on_spark_statement_errors(): self.spark_controller.cleanup() From 130693628df6bb2b9e2288224aa08076c21cf0e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 01:10:21 +0000 Subject: [PATCH 4/7] Tests passing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/tests/test_kernel_magics.py | 4 ++-- .../sparkmagic/tests/test_remotesparkmagics.py | 8 ++++---- .../sparkmagic/tests/test_sparkmagicsbase.py | 14 +++++++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/sparkmagic/sparkmagic/tests/test_kernel_magics.py b/sparkmagic/sparkmagic/tests/test_kernel_magics.py index c59ec2155..04848ddd6 100644 --- a/sparkmagic/sparkmagic/tests/test_kernel_magics.py +++ b/sparkmagic/sparkmagic/tests/test_kernel_magics.py @@ -595,7 +595,7 @@ def test_spark_sample_options(): magic.execute_spark = MagicMock() ret = magic.spark(line, cell) - magic.execute_spark.assert_called_once_with(cell, "var_name", "sample", 142, 0.3, None, True) + magic.execute_spark.assert_called_once_with(cell, None, "var_name", "sample", 142, 0.3, None, True) @with_setup(_setup, _teardown) @@ -605,7 +605,7 @@ def test_spark_false_coerce(): magic.execute_spark = MagicMock() ret = magic.spark(line, cell) - magic.execute_spark.assert_called_once_with(cell, "var_name", "sample", 142, 0.3, None, False) + magic.execute_spark.assert_called_once_with(cell, None, "var_name", "sample", 142, 0.3, None, False) @with_setup(_setup, _teardown) diff --git a/sparkmagic/sparkmagic/tests/test_remotesparkmagics.py b/sparkmagic/sparkmagic/tests/test_remotesparkmagics.py index 91e4733ba..57f142c48 100644 --- a/sparkmagic/sparkmagic/tests/test_remotesparkmagics.py +++ b/sparkmagic/sparkmagic/tests/test_remotesparkmagics.py @@ -284,7 +284,7 @@ def test_run_spark_command_parses(): result = magic.spark(line, cell) magic.execute_spark.assert_called_once_with("cell code", - None, "sample", None, None, "sessions_name", None) + None, None, "sample", None, None, "sessions_name", None) @with_setup(_setup, _teardown) @@ -305,7 +305,7 @@ def test_run_spark_command_parses_with_coerce(): result = magic.spark(line, cell) magic.execute_spark.assert_called_once_with("cell code", - None, "sample", None, None, "sessions_name", True) + None, None, "sample", None, None, "sessions_name", True) @with_setup(_setup, _teardown) @@ -326,7 +326,7 @@ def test_run_spark_command_parses_with_coerce_false(): result = magic.spark(line, cell) magic.execute_spark.assert_called_once_with("cell code", - None, "sample", None, None, "sessions_name", False) + None, None, "sample", None, None, "sessions_name", False) @with_setup(_setup, _teardown) @@ -367,7 +367,7 @@ def test_run_spark_with_store_command_parses(): result = magic.spark(line, cell) magic.execute_spark.assert_called_once_with("cell code", - "var_name", "sample", None, None, "sessions_name", None) + None, "var_name", "sample", None, None, "sessions_name", None) @with_setup(_setup, _teardown) diff --git a/sparkmagic/sparkmagic/tests/test_sparkmagicsbase.py b/sparkmagic/sparkmagic/tests/test_sparkmagicsbase.py index a188e8de6..609f0a530 100644 --- a/sparkmagic/sparkmagic/tests/test_sparkmagicsbase.py +++ b/sparkmagic/sparkmagic/tests/test_sparkmagicsbase.py @@ -229,12 +229,12 @@ def test_spark_execution_without_output_var(): output_var = None magic.spark_controller.run_command.return_value = (True,'out',MIMETYPE_TEXT_PLAIN) - magic.execute_spark("", output_var, None, None, None, session, None) + magic.execute_spark("", None, output_var, None, None, None, session, None) magic.ipython_display.write.assert_called_once_with('out') assert not magic.spark_controller._spark_store_command.called magic.spark_controller.run_command.return_value = (False,'out',MIMETYPE_TEXT_PLAIN) - assert_raises(SparkStatementException, magic.execute_spark,"", output_var, None, None, None, session, True) + assert_raises(SparkStatementException, magic.execute_spark,"", None, output_var, None, None, None, session, True) assert not magic.spark_controller._spark_store_command.called @with_setup(_setup, _teardown) @@ -245,14 +245,14 @@ def test_spark_execution_with_output_var(): df = 'df' magic.spark_controller.run_command.side_effect = [(True,'out',MIMETYPE_TEXT_PLAIN), df] - magic.execute_spark("", output_var, None, None, None, session, True) + magic.execute_spark("", None, output_var, None, None, None, session, True) magic.ipython_display.write.assert_called_once_with('out') magic._spark_store_command.assert_called_once_with(output_var, None, None, None, True) assert shell.user_ns[output_var] == df magic.spark_controller.run_command.side_effect = None magic.spark_controller.run_command.return_value = (False,'out',MIMETYPE_TEXT_PLAIN) - assert_raises(SparkStatementException, magic.execute_spark,"", output_var, None, None, None, session, True) + assert_raises(SparkStatementException, magic.execute_spark,"", None, output_var, None, None, None, session, True) @with_setup(_setup, _teardown) @@ -264,7 +264,7 @@ def test_spark_exception_with_output_var(): df = 'df' magic.spark_controller.run_command.side_effect = [(True,'out',MIMETYPE_TEXT_PLAIN), exception] - assert_raises(BadUserDataException, magic.execute_spark,"", output_var, None, None, None, session, True) + assert_raises(BadUserDataException, magic.execute_spark,"", None, output_var, None, None, None, session, True) magic.ipython_display.write.assert_called_once_with('out') magic._spark_store_command.assert_called_once_with(output_var, None, None, None, True) assert shell.user_ns == {} @@ -276,7 +276,7 @@ def test_spark_statement_exception(): exception = BadUserDataException("Ka-boom!") magic.spark_controller.run_command.side_effect = [(False, 'out', "text/plain"), exception] - assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, session, True) + assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, None, session, True) magic.spark_controller.cleanup.assert_not_called() @with_setup(_setup, _teardown) @@ -290,5 +290,5 @@ def test_spark_statement_exception_shutdowns_livy_session(): exception = BadUserDataException("Ka-boom!") magic.spark_controller.run_command.side_effect = [(False, 'out', "text/plain"), exception] - assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, session, True) + assert_raises(SparkStatementException, magic.execute_spark,"", None, None, None, None, None, session, True) magic.spark_controller.cleanup.assert_called_once() From c367dccab9ef993e3852409720f905f75445e70d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 01:28:33 +0000 Subject: [PATCH 5/7] Plugged the -l argument in MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/kernels/kernelmagics.py | 6 ++++-- sparkmagic/sparkmagic/magics/remotesparkmagics.py | 2 +- sparkmagic/sparkmagic/magics/sparkmagicsbase.py | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/kernelmagics.py b/sparkmagic/sparkmagic/kernels/kernelmagics.py index 4ef255d83..0f4863828 100644 --- a/sparkmagic/sparkmagic/kernels/kernelmagics.py +++ b/sparkmagic/sparkmagic/kernels/kernelmagics.py @@ -16,7 +16,7 @@ from sparkmagic.utils import constants from sparkmagic.utils.utils import parse_argstring_or_throw, get_coerce_value from sparkmagic.utils.sparkevents import SparkEvents -from sparkmagic.utils.constants import LANGS_SUPPORTED +from sparkmagic.utils.constants import LANGS_SUPPORTED, LANG_PYTHON, LANG_R, LANG_SCALA from sparkmagic.livyclientlib.command import Command from sparkmagic.livyclientlib.endpoint import Endpoint from sparkmagic.magics.sparkmagicsbase import SparkMagicBase @@ -248,6 +248,8 @@ def configure(self, line, cell="", local_ns=None): @needs_local_scope @argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable" "of this name in user's local context.") + @argument("-l", "--language", type=str, default=None, + help="Language for command; one of {}".format(', '.join([LANG_PYTHON, LANG_SCALA, LANG_R]))) @argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample") @argument("-n", "--maxrows", type=int, default=None, help="Maximum number of rows that will be pulled back " "from the dataframe on the server for storing") @@ -262,7 +264,7 @@ def spark(self, line, cell="", local_ns=None): coerce = get_coerce_value(args.coerce) - self.execute_spark(cell, None, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) + self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) else: return diff --git a/sparkmagic/sparkmagic/magics/remotesparkmagics.py b/sparkmagic/sparkmagic/magics/remotesparkmagics.py index c2d93321b..3d5610e4e 100644 --- a/sparkmagic/sparkmagic/magics/remotesparkmagics.py +++ b/sparkmagic/sparkmagic/magics/remotesparkmagics.py @@ -169,7 +169,7 @@ def spark(self, line, cell="", local_ns=None): elif len(subcommand) == 0: coerce = get_coerce_value(args.coerce) if args.context == CONTEXT_NAME_SPARK: - return self.execute_spark(cell, None, args.output, args.samplemethod, + return self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, args.session, coerce) elif args.context == CONTEXT_NAME_SQL: return self.execute_sqlquery(cell, args.samplemethod, args.maxrows, args.samplefraction, diff --git a/sparkmagic/sparkmagic/magics/sparkmagicsbase.py b/sparkmagic/sparkmagic/magics/sparkmagicsbase.py index 1db225cca..17e67787f 100644 --- a/sparkmagic/sparkmagic/magics/sparkmagicsbase.py +++ b/sparkmagic/sparkmagic/magics/sparkmagicsbase.py @@ -78,7 +78,7 @@ def do_send_to_spark(self, cell, input_variable_name, var_type, output_variable_ u' kernel'.format(input_variable_name, output_variable_name)) def execute_spark(self, cell, language, output_var, samplemethod, maxrows, samplefraction, session_name, coerce): - (success, out, mimetype) = self.spark_controller.run_command(Command(cell, language), session_name) + (success, out, mimetype) = self.spark_controller.run_command(Command(cell, language=language), session_name) if not success: if conf.shutdown_session_on_spark_statement_errors(): self.spark_controller.cleanup() From 6a63f65cfb057a1cf90dd8da23d6664955722200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 01:35:07 +0000 Subject: [PATCH 6/7] Added option to send code to some session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Léopold Mebazaa --- sparkmagic/sparkmagic/kernels/kernelmagics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sparkmagic/sparkmagic/kernels/kernelmagics.py b/sparkmagic/sparkmagic/kernels/kernelmagics.py index 0f4863828..ed5292b80 100644 --- a/sparkmagic/sparkmagic/kernels/kernelmagics.py +++ b/sparkmagic/sparkmagic/kernels/kernelmagics.py @@ -248,6 +248,7 @@ def configure(self, line, cell="", local_ns=None): @needs_local_scope @argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable" "of this name in user's local context.") + @argument("-s", "--session", type=str, default=None, help="The name of the Livy session to use.") @argument("-l", "--language", type=str, default=None, help="Language for command; one of {}".format(', '.join([LANG_PYTHON, LANG_SCALA, LANG_R]))) @argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample") @@ -264,7 +265,7 @@ def spark(self, line, cell="", local_ns=None): coerce = get_coerce_value(args.coerce) - self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) + self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, args.session, coerce) else: return From f1505466f1ebf1714a96610e3bb37d9de38bd18f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9opold=20Mebazaa?= Date: Thu, 5 Dec 2019 03:22:17 +0000 Subject: [PATCH 7/7] Revert "Added option to send code to some session" This reverts commit 6a63f65cfb057a1cf90dd8da23d6664955722200. --- sparkmagic/sparkmagic/kernels/kernelmagics.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sparkmagic/sparkmagic/kernels/kernelmagics.py b/sparkmagic/sparkmagic/kernels/kernelmagics.py index ed5292b80..0f4863828 100644 --- a/sparkmagic/sparkmagic/kernels/kernelmagics.py +++ b/sparkmagic/sparkmagic/kernels/kernelmagics.py @@ -248,7 +248,6 @@ def configure(self, line, cell="", local_ns=None): @needs_local_scope @argument("-o", "--output", type=str, default=None, help="If present, indicated variable will be stored in variable" "of this name in user's local context.") - @argument("-s", "--session", type=str, default=None, help="The name of the Livy session to use.") @argument("-l", "--language", type=str, default=None, help="Language for command; one of {}".format(', '.join([LANG_PYTHON, LANG_SCALA, LANG_R]))) @argument("-m", "--samplemethod", type=str, default=None, help="Sample method for dataframe: either take or sample") @@ -265,7 +264,7 @@ def spark(self, line, cell="", local_ns=None): coerce = get_coerce_value(args.coerce) - self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, args.session, coerce) + self.execute_spark(cell, args.language, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce) else: return