import os import subprocess import warnings import pytest import audeer def test_deprecated(): @audeer.deprecated(removal_version="1.0.0", alternative="audeer.mkdir") def deprecated_function(): pass expected_message = ( "deprecated_function is deprecated " "and will be removed with version 1.0.0." " Use audeer.mkdir instead." ) with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning deprecated_function() assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) @audeer.deprecated(removal_version="2.0.0") def deprecated_function(): pass expected_message = ( "deprecated_function is deprecated " "and will be removed with version 2.0.0." ) with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning deprecated_function() assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) @audeer.deprecated(removal_version="2.0.0") class DeprecatedClass: pass expected_message = ( "DeprecatedClass is deprecated " "and will be removed with version 2.0.0." ) with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning DeprecatedClass() assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) def test_deprecated_default_value(): @audeer.deprecated_default_value( argument="foo", new_default_value="bar", change_in_version="1.0.0", ) def function_with_deprecated_default_value(*, foo="foo"): return foo expected_message = ( "The default of 'foo' will change from " "'foo' to 'bar' " "with version 1.0.0." ) default_value = "foo" with warnings.catch_warnings(): # no warning if we set value warnings.simplefilter("error") function_with_deprecated_default_value(foo="foo") function_with_deprecated_default_value(foo="bar") with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning assert function_with_deprecated_default_value() == default_value assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) def test_deprecated_keyword_argument(): @audeer.deprecated_keyword_argument( deprecated_argument="foo", new_argument="bar", removal_version="1.0.0", mapping=lambda x: 2 * x, ) def function_with_deprecated_keyword_argument(*, bar): return bar expected_message = ( "'foo' argument is deprecated " "and will be removed with version 1.0.0." " Use 'bar' instead." ) value = 1 assert function_with_deprecated_keyword_argument(bar=value) == value with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning r = function_with_deprecated_keyword_argument(foo=value) assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) assert r == 2 * value @audeer.deprecated_keyword_argument( deprecated_argument="foo", removal_version="1.0.0", ) def function_with_deprecated_keyword_argument(**kwargs): return 1 expected_message = ( "'foo' argument is deprecated " "and will be removed with version 1.0.0." ) assert function_with_deprecated_keyword_argument() == 1 with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning r = function_with_deprecated_keyword_argument(foo=2) assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) assert r == 1 @audeer.deprecated_keyword_argument( deprecated_argument="foo", removal_version="1.0.0", remove_from_kwargs=False, ) def function_with_deprecated_keyword_argument(**kwargs): if "foo" in kwargs: return kwargs["foo"] else: return 1 expected_message = ( "'foo' argument is deprecated " "and will be removed with version 1.0.0." ) assert function_with_deprecated_keyword_argument() == 1 with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning r = function_with_deprecated_keyword_argument(foo=2) assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) assert r == 2 @audeer.deprecated_keyword_argument( deprecated_argument="foo", new_argument="bar", removal_version="1.0.0", remove_from_kwargs=False, ) def function_with_deprecated_keyword_argument(**kwargs): if "foo" in kwargs: return kwargs["foo"] else: return 1 expected_message = ( "'foo' argument is deprecated " "and will be removed with version 1.0.0." " Use 'bar' instead." ) assert function_with_deprecated_keyword_argument() == 1 with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning r = function_with_deprecated_keyword_argument(foo=2) assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) assert r == 2 @audeer.deprecated_keyword_argument( deprecated_argument="foo", new_argument="bar", removal_version="1.0.0", ) class ClassWithDeprecatedKeywordArgument(object): def __init__(self, *, bar): self.bar = bar expected_message = ( "'foo' argument is deprecated " "and will be removed with version 1.0.0." " Use 'bar' instead." ) value = 1 assert ClassWithDeprecatedKeywordArgument(bar=value).bar == value with warnings.catch_warnings(record=True) as w: # Cause all warnings to always be triggered. warnings.simplefilter("always") # Raise warning r = ClassWithDeprecatedKeywordArgument(foo=value) assert issubclass(w[-1].category, UserWarning) assert expected_message == str(w[-1].message) assert r.bar == value @pytest.mark.parametrize( "nested_list,expected_list", [ ([1, 2, 3, [4], [], [[[[[[[[[5]]]]]]]]]], [1, 2, 3, 4, 5]), ([[1, 2], 3], [1, 2, 3]), ([1, 2, 3], [1, 2, 3]), ], ) def test_flatten_list(nested_list, expected_list): flattened_list = audeer.flatten_list(nested_list) assert flattened_list == expected_list def test_freeze_requirements(tmpdir): path = str(tmpdir.mkdir("tmp")) outfile = os.path.join(path, "requirements.txt.lock") audeer.freeze_requirements(outfile) with open(outfile) as f: requirements = f.readlines() # Remove whitespace and \n requirements = [r.strip() for r in requirements] assert any(["pytest" in r for r in requirements]) assert any(["audeer" in r for r in requirements]) with pytest.raises(RuntimeError, match=r"Freezing Python packages failed"): outfile = os.path.join(path, "not-existent/requirements.txt.lock") audeer.freeze_requirements(outfile) def test_git_repo_tags(): git = ["git", "tag"] expected_tags = subprocess.check_output(git) expected_tags = expected_tags.decode().strip().split("\n") tags = audeer.git_repo_tags() assert tags == expected_tags tags = audeer.git_repo_tags(v=True) expected_tags = [f"v{t}" if not t.startswith("v") else t for t in expected_tags] assert tags == expected_tags tags = audeer.git_repo_tags(v=False) expected_tags = [t[1:] if t.startswith("v") else t for t in expected_tags] assert tags == expected_tags def test_git_repo_version(): git = ["git", "describe", "--tags", "--always"] expected_version = subprocess.check_output(git) expected_version = expected_version.decode().strip() version = audeer.git_repo_version(v=True) if not expected_version.startswith("v"): expected_version = f"v{expected_version}" assert version == expected_version version = audeer.git_repo_version(v=False) assert version == expected_version[1:] @pytest.mark.parametrize( "version, is_semantic", [ ( "1.0.0", True, ), ( "v1.0.0", True, ), ( "4.0.0-20200206.095534-3", True, ), ( "v1.0.1-1-gdf29c4a", True, ), ( "1", False, ), ( "v1", False, ), ( "v1.3-r3", False, ), ( "v1.3.3-r3", True, ), ( "a.b.c", False, ), ( "va.b.c", False, ), ( "1.2.a", False, ), ( "v1.3.3.3-r3", False, ), ( "1.0.0+20130313144700", True, ), ( "1.0.0-alpha+001", True, ), ], ) def test_is_semantic_version(version, is_semantic): assert audeer.is_semantic_version(version) == is_semantic @pytest.mark.parametrize( "uid, expected", [ (audeer.uid(), True), (audeer.uid(short=True), True), (audeer.uid(from_string="from string"), True), (audeer.uid(from_string="from string", short=True), True), (audeer.uid(from_string="from string", short=True).upper(), True), (None, False), (1234, False), ("", False), ("some random string", False), (audeer.uid()[:-1], False), (audeer.uid(short=True)[:-1], False), ("00000000-0000-0000-0000-000000000000", True), ("000000000-0000-0000-0000-00000000000", False), ("?0000000-0000-0000-0000-000000000000", False), ], ) def test_is_uid(uid, expected): assert audeer.is_uid(uid) == expected def power(a: int = 0, *, b: int = 1): return a**b @pytest.mark.parametrize( "multiprocessing", [ True, False, ], ) @pytest.mark.parametrize( "num_workers", [ 1, 3, None, ], ) @pytest.mark.parametrize( "task_fun, params", [ (power, [([], {})]), (power, [([1], {})]), (power, [([1], {"b": 2})]), (power, [([], {"a": 1, "b": 2})]), (power, [([x], {"b": x}) for x in range(5)]), ], ) def test_run_tasks(multiprocessing, num_workers, task_fun, params): expected = [task_fun(*param[0], **param[1]) for param in params] results = audeer.run_tasks( task_fun, params, num_workers=num_workers, multiprocessing=multiprocessing, ) assert expected == results @pytest.mark.parametrize( "func,params,expected_output", [ ( lambda x, n: x**n, [(2, n) for n in range(7)], [1, 2, 4, 8, 16, 32, 64], ), (3, None, [None]), (lambda x: x, ["hello"], ["hello"]), (lambda x: x, ["hello", 1, print], ["hello", 1, print]), ], ) def test_run_worker_threads(func, params, expected_output): num_workers = [None, 1, 2, 4, 5, 100] for n in num_workers: with pytest.warns(UserWarning): output = audeer.run_worker_threads(func, params, num_workers=n) assert output == expected_output @pytest.mark.parametrize( "versions, expected_versions", [ ( ["1.0.0", "3.0.0", "1.10.1", "1.2.0", "1.0.1"], ["1.0.0", "1.0.1", "1.2.0", "1.10.1", "3.0.0"], ), ( [ "1.0.0-SNAPSHOT", "4.0.0-20200206.095424-2", "1.0.0", "2.0.0-20200131.102442-1", "3.0.0", "3.1.0", "4.0.0-20200206.095316-1", "3.2.0", "2.0.0-20200131.102728-2", "3.3.0", "3.4.0", "4.0.0-20200206.095534-3", "4.0.0", ], [ "1.0.0", "1.0.0-SNAPSHOT", "2.0.0-20200131.102442-1", "2.0.0-20200131.102728-2", "3.0.0", "3.1.0", "3.2.0", "3.3.0", "3.4.0", "4.0.0", "4.0.0-20200206.095316-1", "4.0.0-20200206.095424-2", "4.0.0-20200206.095534-3", ], ), ( ["v1.0.0", "v1.0.1", "v1.0.1-1-gdf29c4a"], ["v1.0.0", "v1.0.1", "v1.0.1-1-gdf29c4a"], ), # From https://github.com/postmarketOS/pmbootstrap/issues/342 ( ["22.7.3-r1.3", "22.7.3-r1"], ["22.7.3-r1", "22.7.3-r1.3"], ), ( ["1.0.0", "1.1.1+1", "1.2.1", "1.2.0"], ["1.0.0", "1.1.1+1", "1.2.0", "1.2.1"], ), ], ) def test_sort_versions(versions, expected_versions): sorted_versions = audeer.sort_versions(versions) assert sorted_versions == expected_versions @pytest.mark.parametrize( "versions, error_message", [ ( ["1"], ( "All version numbers have to be semantic versions, " "following 'X.Y.Z', " "where X, Y, Z are integers. " "But your version is: '1'." ), ), ], ) def test_sort_versions_errors(versions, error_message): with pytest.raises(ValueError, match=error_message): audeer.sort_versions(versions) @pytest.mark.parametrize( "input,expected_output", [ (1, [1]), ("abc", ["abc"]), (["abc", 1], ["abc", 1]), ((1, 2, 3), [1, 2, 3]), (len, [len]), ({1: "a", 2: "b"}, [1, 2]), ([], []), ("", [""]), (None, [None]), ], ) def test_to_list(input, expected_output): assert audeer.to_list(input) == expected_output @pytest.mark.parametrize( "short", [ False, True, ], ) @pytest.mark.parametrize( "from_string", [ None, "example_string", ], ) def test_uid(from_string, short): uid = audeer.uid(from_string=from_string, short=short) if short: assert len(uid) == 8 else: assert len(uid) == 36 for pos in [8, 13, 18, 23]: assert uid[pos] == "-" uid2 = audeer.uid(from_string=from_string, short=short) if from_string is not None: assert uid == uid2 else: assert uid != uid2 @pytest.mark.parametrize( "sequence, expected", [ ([], []), ([1], [1]), ([2, 1], [2, 1]), ([2, 1, 2], [2, 1]), (["a", 1, "a", 1], ["a", 1]), ((1, 1), [1]), ("dddnnfhg", ["d", "n", "f", "h", "g"]), ([None, None], [None]), ], ) def test_unique(sequence, expected): r"""Test audeer.unique(). Should return unique values in original order. Args: sequence: sequence if input values expected: expected unique list """ assert audeer.unique(sequence) == expected
Memory