-
Notifications
You must be signed in to change notification settings - Fork 142
SNOW-2124398: add support for custom datasource #3893
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| def _partitions(self) -> List[InputPartition]: | ||
| if self._internal_partitions is None: | ||
| self._internal_partitions = self.reader(self.schema()).partitions() | ||
| return self._internal_partitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method will crash with AttributeError if base class methods aren't overridden. The chain self.reader(self.schema()).partitions() will fail because:
self.schema()returnsNone(base implementation has onlypass)self.reader(None)returnsNone(base implementation has onlypass)None.partitions()raisesAttributeError: 'NoneType' object has no attribute 'partitions'
The base class methods should either raise NotImplementedError or the class should use ABC with @abstractmethod decorators:
from abc import ABC, abstractmethod
class DataSource(ABC):
@abstractmethod
def reader(self, schema: Union[StructType, str]) -> DataSourceReader:
raise NotImplementedError()| def _partitions(self) -> List[InputPartition]: | |
| if self._internal_partitions is None: | |
| self._internal_partitions = self.reader(self.schema()).partitions() | |
| return self._internal_partitions | |
| def _partitions(self) -> List[InputPartition]: | |
| if self._internal_partitions is None: | |
| schema = self.schema() | |
| if schema is None: | |
| raise NotImplementedError("schema() method must be implemented by subclasses") | |
| reader = self.reader(schema) | |
| if reader is None: | |
| raise NotImplementedError("reader() method must be implemented by subclasses") | |
| self._internal_partitions = reader.partitions() | |
| return self._internal_partitions | |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
This comment came from an experimental review—please leave feedback if it was helpful/unhelpful. Learn more about experimental comments here.
| for result in reader.read(partition): | ||
| if isinstance(result, list): | ||
| yield from result | ||
| else: | ||
| yield from list(reader.read(partition)) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There appears to be a logical issue in the result processing logic. When a result is not a list, the code calls reader.read(partition) a second time, which would re-execute the entire read operation and potentially return duplicate data. Additionally, the break statement after processing the first non-list result would prematurely exit the loop, potentially causing data loss.
Consider revising this section to handle non-list results directly:
for result in reader.read(partition):
if isinstance(result, list):
yield from result
else:
yield result # Simply yield the individual resultThis approach would process each result exactly once without re-executing the read operation.
| for result in reader.read(partition): | |
| if isinstance(result, list): | |
| yield from result | |
| else: | |
| yield from list(reader.read(partition)) | |
| break | |
| for result in reader.read(partition): | |
| if isinstance(result, list): | |
| yield from result | |
| else: | |
| yield result |
Spotted by Graphite Agent
Is this helpful? React 👍 or 👎 to let us know.
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-2124398
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
Please write a short description of how your code change solves the related issue.