Skip to content

Commit

Permalink
fix: Addressed race condition with multiple streams (#130)
Browse files Browse the repository at this point in the history
Closes #129

---------

Co-authored-by: Sebastian Smiley <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Edgar Ramírez-Mondragón <[email protected]>
  • Loading branch information
4 people authored Jul 24, 2024
1 parent 186b356 commit 5b1c4d9
Show file tree
Hide file tree
Showing 6 changed files with 498 additions and 358 deletions.
791 changes: 444 additions & 347 deletions poetry.lock

Large diffs are not rendered by default.

9 changes: 3 additions & 6 deletions target_csv/serialization.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
import csv # noqa: D100
from pathlib import Path
from typing import Any, List, Callable
import os


def create_folder_if_not_exists(func: Any) -> Callable[..., int]:
"""Decorator to create folder if it does not exist."""

def wrapper(*args: Any, **kwargs: Any) -> int:
try:
filepath = kwargs["filepath"]
filepath = Path(kwargs["filepath"])
except KeyError:
filepath = args[0]
folder = os.path.dirname(filepath)
if not os.path.exists(folder) and folder != "":
os.makedirs(folder)
filepath = Path(args[0])
filepath.parent.mkdir(parents=True, exist_ok=True)
return func(*args, **kwargs)

return wrapper
Expand Down
Empty file added tests/data_files/__init__.py
Empty file.
21 changes: 21 additions & 0 deletions tests/data_files/users_and_employees.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{"type": "STATE", "value": {}}
{"type": "SCHEMA", "stream": "employees", "schema": {"properties": {"employee_id": {"type": ["integer"]}, "first_name": {"type": ["string"]}, "last_name": {"type": ["string"]}}, "type": "object", "required": ["employee_id", "first_name", "last_name"]}, "key_properties": ["employee_id"]}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 1, "first_name": "Alice", "last_name": "Smith"}, "time_extracted": "2024-07-19T20:34:52.872497+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 2, "first_name": "Bob", "last_name": "Johnson"}, "time_extracted": "2024-07-19T20:34:52.873094+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 3, "first_name": "Carol", "last_name": "McMann"}, "time_extracted": "2024-07-19T20:34:52.873231+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 4, "first_name": "Duke", "last_name": "DeLaney"}, "time_extracted": "2024-07-19T20:34:52.873309+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 5, "first_name": "Edward", "last_name": "Smith"}, "time_extracted": "2024-07-19T20:34:52.873379+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 6, "first_name": "Frank", "last_name": "Aggarwal"}, "time_extracted": "2024-07-19T20:34:52.873462+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 7, "first_name": "Gina", "last_name": "Griffith"}, "time_extracted": "2024-07-19T20:34:52.873703+00:00"}
{"type": "RECORD", "stream": "employees", "record": {"employee_id": 8, "first_name": "Hiram", "last_name": "Green"}, "time_extracted": "2024-07-19T20:34:52.873801+00:00"}
{"type": "STATE", "value": {"bookmarks": {"employees": {}}}}
{"type": "SCHEMA", "stream": "users", "schema": {"properties": {"user_id": {"type": ["integer"]}, "first_name": {"type": ["string"]}, "last_name": {"type": ["string"]}}, "type": "object", "required": ["user_id", "first_name", "last_name"]}, "key_properties": ["user_id"]}
{"type": "RECORD", "stream": "users", "record": {"user_id": 1, "first_name": "Abraham", "last_name": "Qamar"}, "time_extracted": "2024-07-19T20:34:52.879965+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 2, "first_name": "Betty", "last_name": "Pattinson"}, "time_extracted": "2024-07-19T20:34:52.880376+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 3, "first_name": "Charlie", "last_name": "Orlando"}, "time_extracted": "2024-07-19T20:34:52.880493+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 4, "first_name": "Deborah", "last_name": "Nooning"}, "time_extracted": "2024-07-19T20:34:52.880562+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 5, "first_name": "Edward", "last_name": "Michaelson"}, "time_extracted": "2024-07-19T20:34:52.880624+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 6, "first_name": "Francesca", "last_name": "Little"}, "time_extracted": "2024-07-19T20:34:52.880686+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 7, "first_name": "Graham", "last_name": "Kingston"}, "time_extracted": "2024-07-19T20:34:52.880901+00:00"}
{"type": "RECORD", "stream": "users", "record": {"user_id": 8, "first_name": "Holly", "last_name": "Johnson"}, "time_extracted": "2024-07-19T20:34:52.881002+00:00"}
{"type": "STATE", "value": {"bookmarks": {"employees": {}, "users": {}}}}
30 changes: 29 additions & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,41 @@
"""Tests standard target features using the built-in SDK tests library."""

from pathlib import Path
import sys
from typing import Any, Dict

from singer_sdk.testing import get_target_test_class
from singer_sdk.testing.suites import TestSuite
from singer_sdk.testing.templates import TargetFileTestTemplate

from . import data_files
from target_csv.target import TargetCSV

if sys.version_info >= (3, 9):
from importlib.resources import files
else:
from importlib_resources import files

SAMPLE_CONFIG: Dict[str, Any] = {
"escape_character": '"',
}

TestTargetCSV = get_target_test_class(TargetCSV, config=SAMPLE_CONFIG)

class MultipleStreamsTest(TargetFileTestTemplate):
name = "users_and_employees"

@property
def singer_filepath(self) -> Path:
return files(data_files) / f"{self.name}.singer"


TestTargetCSV = get_target_test_class(
TargetCSV,
config=SAMPLE_CONFIG,
custom_suites=[
TestSuite(
kind="target",
tests=[MultipleStreamsTest],
)
],
)
5 changes: 1 addition & 4 deletions tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,8 @@ def test_file_paths(output_dir) -> List[Path]:
paths = []
for dir in range(4):
path = Path(output_dir / f"test-dir-{dir}/csv-test-output-{dir}.csv")
if path.exists():
path.unlink()

path.unlink(missing_ok=True)
paths.append(path)

return paths


Expand Down

0 comments on commit 5b1c4d9

Please sign in to comment.