Skip to content

Commit b89f3a7

Browse files
[Network Drive] Switch to usage of local connection cache (#3868)
## Closes #2418 Conceptual problem - `smbprotocol` has a global connection cache [here](https://github.com/jborean93/smbprotocol/blob/master/src/smbclient/_pool.py#L33) (see usages). Hash key is host/port: ```python connection_key = f"{server.lower()}:{port}" ``` Library is not natively async, that means that it can cause fun stuff when used in asynchronous/threaded environment. In our case one connector configuration can spawn at least 2 connections for the same server: 1 for running `get_docs` method while the sync is running and 1 for `ping` method that gets spawned every 30 seconds by default. Things get worse if advanced filtering rules are used and several connectors point to the same Network Drive. What happens is: 1. `get_docs` is called and a connection is established 2. `ping` is triggered, connection is re-used from the cache 3. `ping` finishes and closes the connection. That causes the loss of the connection by `get_docs` 4. `get_docs` sometimes can re-establish it (see [this code](https://github.com/elastic/connectors/blob/main/app/connectors_service/connectors/sources/network_drive/datasource.py#L276)). Sometimes it just fails This change attempts to address it by providing connection cache that's local to each connector instead of using a global one. This way connector instances would not attempt to close each other's connections if they connect to the same resource. #### Pre-Review Checklist - [x] this PR does NOT contain credentials of any kind, such as API keys or username/passwords (double check `config.yml.example`) - [x] this PR has a meaningful title - [x] this PR links to all relevant github issues that it fixes or partially addresses - [x] if there is no GH issue, please create it. Each PR should have a link to an issue - [x] this PR has a thorough description - [ ] Covered the changes with automated tests - [x] Tested the changes locally - [x] Added a label for each target release version (example: `v7.13.2`, `v7.14.0`, `v8.0.0`) - [ ] For bugfixes: backport safely to all minor branches still receiving patch releases - [ ] Considered corresponding documentation changes - [ ] Contributed any configuration settings changes to the configuration reference - [ ] if you added or changed Rich Configurable Fields for a Native Connector, you made a corresponding PR in [Kibana](https://github.com/elastic/kibana/blob/main/packages/kbn-search-connectors/types/native_connectors.ts) ## Release Note Fix the bug with Network Drive connector closing connections to SMB servers prematurely. --------- Co-authored-by: Elastic Machine <[email protected]>
1 parent b6849e6 commit b89f3a7

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

app/connectors_service/connectors/sources/network_drive/datasource.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,20 @@ def __init__(self, configuration):
7575
self.drive_type = self.configuration["drive_type"]
7676
self.identity_mappings = self.configuration["identity_mappings"]
7777
self.security_info = SecurityInfo(self.username, self.password, self.server_ip)
78+
self._connection_cache = {}
7879

7980
def advanced_rules_validators(self):
8081
return [NetworkDriveAdvancedRulesValidator(self)]
8182

8283
@cached_property
8384
def smb_connection(self):
84-
return SMBSession(self.server_ip, self.username, self.password, self.port)
85+
return SMBSession(
86+
self.server_ip,
87+
self.username,
88+
self.password,
89+
self.port,
90+
self._connection_cache,
91+
)
8592

8693
@classmethod
8794
def get_default_configuration(cls):
@@ -203,6 +210,7 @@ async def traverse_diretory(self, path):
203210
username=self.username,
204211
password=self.password,
205212
port=self.port,
213+
connection_cache=self._connection_cache,
206214
),
207215
)
208216
)
@@ -257,6 +265,9 @@ async def traverse_directory_for_syncrule(self, path, glob_pattern, indexed_rule
257265
smbclient.scandir,
258266
path=current_path,
259267
port=self.port,
268+
username=self.username,
269+
password=self.password,
270+
connection_cache=self._connection_cache,
260271
),
261272
)
262273
for file in directory_info:
@@ -337,9 +348,9 @@ async def validate_config(self):
337348

338349
async def ping(self):
339350
"""Verify the connection with Network Drive"""
351+
if self.smb_connection.session is None:
352+
await asyncio.to_thread(self.smb_connection.create_connection)
340353

341-
await asyncio.to_thread(self.smb_connection.create_connection)
342-
await self.close()
343354
self._logger.info("Successfully connected to the Network Drive")
344355

345356
async def close(self):
@@ -350,7 +361,10 @@ async def close(self):
350361
await loop.run_in_executor(
351362
executor=None,
352363
func=partial(
353-
smbclient.delete_session, server=self.server_ip, port=self.port
364+
smbclient.delete_session,
365+
server=self.server_ip,
366+
port=self.port,
367+
connection_cache=self._connection_cache,
354368
),
355369
)
356370

@@ -615,7 +629,9 @@ async def get_docs(self, filtering=None):
615629
Yields:
616630
dictionary: Dictionary containing the Network Drive files and folders as documents
617631
"""
618-
await asyncio.to_thread(self.smb_connection.create_connection)
632+
if self.smb_connection.session is None:
633+
await asyncio.to_thread(self.smb_connection.create_connection)
634+
619635
if filtering and filtering.has_advanced_rules():
620636
advanced_rules = filtering.get_advanced_rules()
621637
async for document in self.fetch_filtered_directory(advanced_rules):

app/connectors_service/connectors/sources/network_drive/netdrive.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,13 @@ def fetch_members(self, group_name):
222222
class SMBSession:
223223
_connection = None
224224

225-
def __init__(self, server_ip, username, password, port):
225+
def __init__(self, server_ip, username, password, port, connection_cache):
226226
self.server_ip = server_ip
227227
self.username = username
228228
self.password = password
229229
self.port = port
230230
self.session = None
231+
self._connection_cache = connection_cache
231232
self._logger = logger
232233

233234
def create_connection(self):
@@ -238,6 +239,7 @@ def create_connection(self):
238239
username=self.username,
239240
password=self.password,
240241
port=self.port,
242+
connection_cache=self._connection_cache,
241243
)
242244
except SMBResponseException as exception:
243245
self.handle_smb_response_errors(exception=exception)

0 commit comments

Comments
 (0)