Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions Lib/test/test_free_threading/test_uuid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import os
import unittest

from test.support import import_helper, threading_helper
from test.support.threading_helper import run_concurrently
from uuid import SafeUUID

c_uuid = import_helper.import_module("_uuid")

NTHREADS = 10
UUID_PER_THREAD = 1000


@threading_helper.requires_working_threading()
class UUIDTests(unittest.TestCase):
@unittest.skipUnless(os.name == "posix", "POSIX only")
def test_generate_time_safe(self):
uuids = []

def worker():
local_uuids = []
for _ in range(UUID_PER_THREAD):
uuid, is_safe = c_uuid.generate_time_safe()
self.assertIs(type(uuid), bytes)
self.assertEqual(len(uuid), 16)
# Collect the UUID only if it is safe. If not, we cannot ensure
# UUID uniqueness. According to uuid_generate_time_safe() man
# page, it is theoretically possible for two concurrently
# running processes to generate the same UUID(s) if the return
# value is not 0.
if is_safe == SafeUUID.safe:
local_uuids.append(uuid)

# Merge all safe uuids
uuids.extend(local_uuids)

run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(uuids), len(set(uuids)))

@unittest.skipUnless(os.name == "nt", "Windows only")
def test_UuidCreate(self):
uuids = []

def worker():
local_uuids = []
for _ in range(UUID_PER_THREAD):
uuid = c_uuid.UuidCreate()
self.assertIs(type(uuid), bytes)
self.assertEqual(len(uuid), 16)
local_uuids.append(uuid)

# Merge all uuids
uuids.extend(local_uuids)

run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(uuids), len(set(uuids)))


if __name__ == "__main__":
unittest.main()
Loading