Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,7 @@ dmypy.json

# Pyre type checker
.pyre/

# IDE
.idea
.vscode
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pipx install git+https://github.com/MeltanoLabs/tap-universal-file.git
| file_path | True | None | The path to obtain files from. Example: `/foo/bar`. Or, for `protocol==s3`, use `s3-bucket-name` instead. |
| file_regex | False | None | A regex pattern to only include certain files. Example: `.*\.csv`. |
| file_type | False | delimited | Must be one of `delimited`, `jsonl`, or `avro`. Indicates the type of file to sync, where `delimited` is for CSV/TSV files and similar. Note that *all* files will be read as that type, regardless of file extension. To only read from files with a matching file extension, appropriately configure `file_regex`. |
| schema | False | None | The declarative schema to use for the stream. It can be the schema itself or a path to a json file containing the schema, see [example](https://github.com/meltano/hub/blob/c4b541bbdc36b1b6efffa1eb6022367d8de43e3a/schemas/plugin_definitions/hub_metadata.schema.json). If not provided, the schema will be inferred. |
| compression | False | detect | The encoding used to decompress data. Must be one of `none`, `zip`, `bz2`, `gzip`, `lzma`, `xz`, or `detect`. If set to `none` or any encoding, that setting will be applied to *all* files, regardless of file extension. If set to `detect`, encodings will be applied based on file extension. |
| additional_info | False | 1 | If `True`, each row in tap's output will have three additional columns: `_sdc_file_name`, `_sdc_line_number`, and `_sdc_last_modified`. If `False`, these columns will not be present. Incremental replication requires `additional_info==True`. |
| start_date | False | None | Used in place of state. Files that were last modified before the `start_date` wwill not be synced. |
Expand Down Expand Up @@ -61,6 +62,7 @@ tap is available by running:
tap-universal-file --about
```


### Regular Expressions

To allow configuration for which files are synced, this tap supports the use of regular expressions to match file paths. First, the tap will find the directory specified by the `file_path` config option. Then it will compare the provided regular expression to the full file path of each file in that directory.
Expand Down
7 changes: 6 additions & 1 deletion tap_universal_file/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ def schema(self) -> dict:
A schema constructed using the get_properties() method of whichever stream
is currently in use.
"""
if hasattr(self, "_schema"):
self.logger.debug(f"Using schema from config: {self._schema}")
return self._schema

self.logger.debug(f"No schema found in config, creating schema...")
properties = self.get_properties()
additional_info = self.config["additional_info"]
if additional_info:
Expand Down Expand Up @@ -146,7 +151,7 @@ def get_properties(self) -> dict:
raise NotImplementedError(msg)

def get_compression(self, file: str) -> str | None:
"""Determines what compression encoding is appropraite for a given file.
"""Determines what compression encoding is appropriate for a given file.

Args:
file: The file to determine the encoding of.
Expand Down
6 changes: 3 additions & 3 deletions tap_universal_file/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def filesystem(self) -> fsspec.AbstractFileSystem:
return fsspec.filesystem("file")

if caching_strategy == "once":
# Creaing a filecache without specifying cache_storage location will cause
# Creating a filecache without specifying cache_storage location will cause
# the cache to be discarded after the filesystem is closed.
# Docs: https://filesystem-spec.readthedocs.io/en/latest/features.html#caching-files-locally
return fsspec.filesystem(
Expand All @@ -65,7 +65,7 @@ def filesystem(self) -> fsspec.AbstractFileSystem:
cache_storage=tempfile.gettempdir(),
)
if caching_strategy == "none":
# When caching is not used, the protocol's arguemnts have to be
# When caching is not used, the protocol's arguments have to be
# star-unpacked because fsspec.filesystem accepts them directly instead of
# as a dictionary.
return fsspec.filesystem(
Expand Down Expand Up @@ -181,7 +181,7 @@ def _get_args(self) -> dict[str, Any]:
implementation and when unpacked (**) for a direct implementation.

Returns:
A dictionary containing fsspec arguements.
A dictionary containing fsspec arguments.
"""
if self.protocol == "s3":
if self.config["s3_anonymous_connection"]:
Expand Down
6 changes: 3 additions & 3 deletions tap_universal_file/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_properties(self) -> dict:
def _get_readers(
self,
) -> Generator[tuple[ModifiedDictReader, str, str], None, None]:
"""Gets reader objects and associated meta data.
"""Gets reader objects and associated metadata.

Raises:
RuntimeError: If improper configuration is supplied.
Expand Down Expand Up @@ -339,7 +339,7 @@ def get_rows(self) -> Generator[dict[str, Any], None, None]:
"""Retrive all rows from all Avro files.

Yields:
A dictionary containing information about a row in a Avro file.
A dictionary containing information about a row in an Avro file.
"""
for reader, file_name, last_modified in self._get_readers():
self.logger.info("Starting sync of %s.", file_name)
Expand Down Expand Up @@ -461,7 +461,7 @@ def _pre_process(self, row: dict[str, Any]) -> dict[str, Any]:
def _get_readers(
self,
) -> Generator[tuple[avro.datafile.DataFileReader, str, str], None, None]:
"""Gets reader objects and associated meta data.
"""Gets reader objects and associated metadata.

Yields:
A tuple of (avro.datafile.DataFileReader, file_name, last_modified).
Expand Down
17 changes: 14 additions & 3 deletions tap_universal_file/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ class TapUniversalFile(Tap):
"matching file extension, appropriately configure `file_regex`."
),
),
th.Property(
"schema",
th.StringType,
description=(
"The declarative schema to use for the stream. It can be the schema "
"itself or a path to a json file containing the schema. If not "
"provided, the schema will be inferred from the data following "
"the coercion strategy."
),
),
th.Property(
"compression",
th.StringType,
Expand Down Expand Up @@ -301,12 +311,13 @@ def discover_streams(self) -> list[streams.FileStream]:
"""
name = self.config["stream_name"]
file_type = self.config["file_type"]
schema = self.config.get("schema", None)
if file_type == "delimited":
return [streams.DelimitedStream(self, name=name)]
return [streams.DelimitedStream(self, name=name, schema=schema)]
if file_type == "jsonl":
return [streams.JSONLStream(self, name=name)]
return [streams.JSONLStream(self, name=name, schema=schema)]
if file_type == "avro":
return [streams.AvroStream(self, name=name)]
return [streams.AvroStream(self, name=name, schema=schema)]
if file_type in {"csv", "tsv", "txt"}:
msg = f"'{file_type}' is not a valid file_type. Did you mean 'delimited'?"
raise ValueError(msg)
Expand Down