Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1588] Retry when job fails #1400

Closed
wants to merge 16 commits into from

Conversation

bubriks
Copy link
Contributor

@bubriks bubriks commented Oct 31, 2024

This PR adds/fixes/changes...

  • please summarize your changes to the code
  • and make sure to include all changes to user-facing APIs

JIRA Issue: -

Priority for Review: -

Related PRs: -

How Has This Been Tested?

  • Unit Tests
  • Integration Tests
  • Manual Tests on VM

Checklist For The Assigned Reviewer:

- [ ] Checked if merge conflicts with master exist
- [ ] Checked if stylechecks for Java and Python pass
- [ ] Checked if all docstrings were added and/or updated appropriately
- [ ] Ran spellcheck on docstring
- [ ] Checked if guides & concepts need to be updated
- [ ] Checked if naming conventions for parameters and variables were followed
- [ ] Checked if private methods are properly declared and used
- [ ] Checked if hard-to-understand areas of code are commented
- [ ] Checked if tests are effective
- [ ] Built and deployed changes on dev VM and tested manually
- [x] (Checked if all type annotations were added and/or updated appropriately)

@bubriks bubriks added the WIP This issue or pull request is a work in progress label Oct 31, 2024
@bubriks bubriks removed the WIP This issue or pull request is a work in progress label Jan 6, 2025
@bubriks bubriks requested a review from SirOibaf January 10, 2025 10:08
@bubriks bubriks requested a review from SirOibaf January 10, 2025 12:06

def to_dict(self):
return {
"writeOptions": self._options,
"sparkJobConfiguration": self._spark_options,
JobConfiguration.DTO_TYPE: self._spark_options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code now fails if the user doesn't provide any spark options:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
File [~/work/feature-store-api/python/hsfs/util.py:47](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/util.py#line=46), in FeatureStoreEncoder.default(self, o)
     46 try:
---> 47     return o.to_dict()
     48 except AttributeError:

File [~/work/feature-store-api/python/hsfs/core/deltastreamer_jobconf.py:42](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/core/deltastreamer_jobconf.py#line=41), in DeltaStreamerJobConf.to_dict(self)
     39 def to_dict(self):
     40     return {
     41         "writeOptions": self._options,
---> 42         JobConfiguration.DTO_TYPE: self._spark_options,
     43     }

AttributeError: 'DeltaStreamerJobConf' object has no attribute '_spark_options'

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
Cell In[8], line 1
----> 1 fg.insert(pdf)

File [~/work/feature-store-api/python/hsfs/feature_group.py:2687](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/feature_group.py#line=2686), in FeatureGroup.insert(self, features, overwrite, operation, storage, write_options, validation_options, save_code, wait)
   2684 if "wait_for_job" not in write_options:
   2685     write_options["wait_for_job"] = wait
-> 2687 job, ge_report = self._feature_group_engine.insert(
   2688     self,
   2689     feature_dataframe=feature_dataframe,
   2690     overwrite=overwrite,
   2691     operation=operation,
   2692     storage=storage.lower() if storage is not None else None,
   2693     write_options=write_options,
   2694     validation_options={"save_report": True, **validation_options},
   2695 )
   2696 if save_code and (
   2697     ge_report is None or ge_report.ingestion_result == "INGESTED"
   2698 ):
   2699     self._code_engine.save_code(self)

File [~/work/feature-store-api/python/hsfs/core/feature_group_engine.py:99](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/core/feature_group_engine.py#line=98), in FeatureGroupEngine.insert(self, feature_group, feature_dataframe, overwrite, operation, storage, write_options, validation_options)
     93 util.validate_embedding_feature_type(
     94     feature_group.embedding_index, dataframe_features
     95 )
     97 if not feature_group._id:
     98     # only save metadata if feature group does not exist
---> 99     self.save_feature_group_metadata(
    100         feature_group, dataframe_features, write_options
    101     )
    102 else:
    103     # else, just verify that feature group schema matches user-provided dataframe
    104     self._verify_schema_compatibility(
    105         feature_group.features, dataframe_features
    106     )

File [~/work/feature-store-api/python/hsfs/core/feature_group_engine.py:396](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/core/feature_group_engine.py#line=395), in FeatureGroupEngine.save_feature_group_metadata(self, feature_group, dataframe_features, write_options)
    383     _write_options = (
    384         [
    385             {"name": k, "value": v}
   (...)
    390         else None
    391     )
    392     feature_group._deltastreamer_jobconf = DeltaStreamerJobConf(
    393         _write_options, _spark_options
    394     )
--> 396 self._feature_group_api.save(feature_group)
    397 print(
    398     "Feature Group created successfully, explore it at \n"
    399     + util.get_feature_group_url(
   (...)
    402     )
    403 )

File [~/work/feature-store-api/python/hsfs/core/feature_group_api.py:60](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/core/feature_group_api.py#line=59), in FeatureGroupApi.save(self, feature_group_instance)
     47 path_params = [
     48     "project",
     49     _client._project_id,
   (...)
     52     "featuregroups",
     53 ]
     54 headers = {"content-type": "application[/json](http://localhost:8888/json)"}
     55 feature_group_object = feature_group_instance.update_from_response_json(
     56     _client._send_request(
     57         "POST",
     58         path_params,
     59         headers=headers,
---> 60         data=feature_group_instance.json(),
     61     ),
     62 )
     63 return feature_group_object

File [~/work/feature-store-api/python/hsfs/feature_group.py:3288](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/feature_group.py#line=3287), in FeatureGroup.json(self)
   3280 def json(self) -> str:
   3281     """Get specific Feature Group metadata in json format.
   3282 
   3283     !!! example
   (...)
   3286         ```
   3287     """
-> 3288     return json.dumps(self, cls=util.FeatureStoreEncoder)

File [/opt/homebrew/Cellar/python](http://localhost:8888/opt/homebrew/Cellar/python)@3.11[/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py:238](http://localhost:8888/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/__init__.py#line=237), in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
    234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
--> 238     **kw).encode(obj)

File [/opt/homebrew/Cellar/python](http://localhost:8888/opt/homebrew/Cellar/python)@3.11[/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py:200](http://localhost:8888/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py#line=199), in JSONEncoder.encode(self, o)
    196         return encode_basestring(o)
    197 # This doesn't pass the iterator directly to ''.join() because the
    198 # exceptions aren't as detailed.  The list call should be roughly
    199 # equivalent to the PySequence_Fast that ''.join() would do.
--> 200 chunks = self.iterencode(o, _one_shot=True)
    201 if not isinstance(chunks, (list, tuple)):
    202     chunks = list(chunks)

File [/opt/homebrew/Cellar/python](http://localhost:8888/opt/homebrew/Cellar/python)@3.11[/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py:258](http://localhost:8888/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py#line=257), in JSONEncoder.iterencode(self, o, _one_shot)
    253 else:
    254     _iterencode = _make_iterencode(
    255         markers, self.default, _encoder, self.indent, floatstr,
    256         self.key_separator, self.item_separator, self.sort_keys,
    257         self.skipkeys, _one_shot)
--> 258 return _iterencode(o, 0)

File [~/work/feature-store-api/python/hsfs/util.py:49](http://localhost:8888/lab/tree/~/work/feature-store-api/python/hsfs/util.py#line=48), in FeatureStoreEncoder.default(self, o)
     47     return o.to_dict()
     48 except AttributeError:
---> 49     return super().default(o)

File [/opt/homebrew/Cellar/python](http://localhost:8888/opt/homebrew/Cellar/python)@3.11[/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py:180](http://localhost:8888/3.11.11/Frameworks/Python.framework/Versions/3.11/lib/python3.11/json/encoder.py#line=179), in JSONEncoder.default(self, o)
    161 def default(self, o):
    162     """Implement this method in a subclass such that it returns
    163     a serializable object for ``o``, or calls the base implementation
    164     (to raise a ``TypeError``).
   (...)
    178 
    179     """
--> 180     raise TypeError(f'Object of type {o.__class__.__name__} '
    181                     f'is not JSON serializable')

TypeError: Object of type DeltaStreamerJobConf is not JSON serializable

@SirOibaf
Copy link
Contributor

This PR doesn't actually work. The reason being that spark.yarn.maxAppAttempts only works if you are using spark-submit, which we are not using in Hopsworks 3.x

To increase the number of application attempts in yarn, you would need to modify the appContext here: https://github.com/logicalclocks/hopsworks-ee/blob/8c290beaa28e56d67dc216943b59dcf195dcec38/hopsworks-common/src/main/java/io/hops/hopsworks/common/jobs/yarn/YarnRunner.java#L239

However, even if you set the above to 2, the resource manager would still ignore it since the ceiling for max application attempts is set in yarn-site.xml here: https://github.com/logicalclocks/hops-hadoop-chef/blob/3f3cde383af508a6428b09ae949489195f941367/templates/default/yarn-site.xml.erb#L90

@SirOibaf SirOibaf closed this Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants