Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Async for native Spark Operators #119

Open
1 of 13 tasks
phanikumv opened this issue Mar 14, 2022 · 10 comments
Open
1 of 13 tasks

Implement Async for native Spark Operators #119

phanikumv opened this issue Mar 14, 2022 · 10 comments
Assignees
Labels
area/async Deferrable/async operators pri/medium Medium priority

Comments

@phanikumv
Copy link
Collaborator

phanikumv commented Mar 14, 2022

Implement async versions for the following operators(Aligned on descending order of priority):


Acceptance Criteria:

@phanikumv phanikumv self-assigned this Mar 14, 2022
@phanikumv phanikumv added area/async Deferrable/async operators pri/medium Medium priority labels Mar 14, 2022
@kaxil
Copy link
Collaborator

kaxil commented Mar 21, 2022

If we have too many issues with it, we can park it for now, or use LivyOperator as an alternative to submit Spark jobs

@phanikumv
Copy link
Collaborator Author

phanikumv commented Mar 22, 2022

We've got the local connection working for the SparkSubmitOperator yesterday post further analysis. I think this story is unblocked for now @sunank200 , @bharanidharan14 - please comment.

@sunank200
Copy link
Collaborator

Yes, spark-submit works for the deploy-mode local. So SparkSubmitOperator example dag works with local spark setup. For SparkSqlOperator i am setting up the hive along spark to create a table to test the example DAG.

@bharanidharan14
Copy link
Contributor

on 7-03-2022 and 21-03-2022 Successfully Installed spark in the airflow worker to run the spark-submit job and Created the spark cluster container tried running the sample spark job via airflow worker, I was facing some issues with container resource allocation issue.
22-03-2022 Tried creating separate docker image, created spark master, worker cluster container and tried submitting the job via airflow worker it was able to pick up the job but there is python version mismatch

@bharanidharan14
Copy link
Contributor

Working on implementing Spark submit operator async

@sunank200
Copy link
Collaborator

sunank200 commented Mar 24, 2022

SparkSqlOperator requires the spark-sql to work when it's installed along in the docker. Right now, for deployment mode as local, the SparkSqlOperator fails as spark-sql fails with the following error:

Caused by: ERROR XBM0H: Directory /opt/spark-3.2.1-bin-hadoop3.2/metastore_db cannot be created. at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.iapi.error.StandardException.newException(Unknown Source) at org.apache.derby.impl.services.monitor.StorageFactoryService$10.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.services.monitor.StorageFactoryService.createServiceRoot(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.bootService(Unknown Source) at org.apache.derby.impl.services.monitor.BaseMonitor.createPersistentService(Unknown Source) at org.apache.derby.impl.services.monitor.FileMonitor.createPersistentService(Unknown Source) at org.apache.derby.iapi.services.monitor.Monitor.createPersistentService(Unknown Source) at org.apache.derby.impl.jdbc.EmbedConnection$5.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at org.apache.derby.impl.jdbc.EmbedConnection.createPersistentService(Unknown Source) ... 104 more 22/03/24 05:12:55 ERROR Utils: Uncaught exception in thread shutdown-hook-0 java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at org.sparkproject.jetty.server.AbstractConnector.doStop(AbstractConnector.java:371) at org.sparkproject.jetty.server.AbstractNetworkConnector.doStop(AbstractNetworkConnector.java:88) at org.sparkproject.jetty.server.ServerConnector.doStop(ServerConnector.java:246) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.sparkproject.jetty.server.Server.doStop(Server.java:459) at org.sparkproject.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:94) at org.apache.spark.ui.ServerInfo.stop(JettyUtils.scala:525) at org.apache.spark.ui.WebUI.$anonfun$stop$2(WebUI.scala:174) at org.apache.spark.ui.WebUI.$anonfun$stop$2$adapted(WebUI.scala:174) at scala.Option.foreach(Option.scala:407) at org.apache.spark.ui.WebUI.stop(WebUI.scala:174) at org.apache.spark.ui.SparkUI.stop(SparkUI.scala:101) at org.apache.spark.SparkContext.$anonfun$stop$6(SparkContext.scala:2071) at org.apache.spark.SparkContext.$anonfun$stop$6$adapted(SparkContext.scala:2071) at scala.Option.foreach(Option.scala:407) at org.apache.spark.SparkContext.$anonfun$stop$5(SparkContext.scala:2071) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1442) at org.apache.spark.SparkContext.stop(SparkContext.scala:2071) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.stop(SparkSQLEnv.scala:77) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.$anonfun$main$2(SparkSQLCLIDriver.scala:143) at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019) at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at scala.util.Try$.apply(Try.scala:213) at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188) at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Trying to debug the issue right now and blocked on this. Submit job runs fine for me though.

@phanikumv
Copy link
Collaborator Author

@sunank200 lets stop further effort on SparkSQLOperator and you can start on LivyOperator

@bharanidharan14
Copy link
Contributor

bharanidharan14 commented Mar 25, 2022

SparkSubmitOperator

  • Completed Spark submit async Operator, Trigger, Hooks
  • Completed Test case, Mypy, doc string

@bharanidharan14
Copy link
Contributor

Added the changes to submit the spark submit job in execute method and getting the status from trigger

@phanikumv
Copy link
Collaborator Author

Below operators are on hold due to connectivity issue between airflow worker container and EMR Spark

  • SparkSubmitOperator
  • SparkSqlOperator
  • SparkJDBCOperator
  • SparkKubernetesOperator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/async Deferrable/async operators pri/medium Medium priority
Projects
None yet
Development

No branches or pull requests

4 participants