Edit on GitHub

hexdoc.utils

 1__all__ = [
 2    "TRACE",
 3    "ContextSource",
 4    "FieldOrProperty",
 5    "IProperty",
 6    "Inherit",
 7    "InheritType",
 8    "JSONDict",
 9    "JSONValue",
10    "NoValue",
11    "NoValueType",
12    "PydanticOrderedSet",
13    "PydanticURL",
14    "RelativePath",
15    "Sortable",
16    "TOMLDict",
17    "TOMLValue",
18    "TryGetEnum",
19    "ValidationContext",
20    "add_to_context",
21    "cast_context",
22    "cast_or_raise",
23    "clamping_validator",
24    "classproperty",
25    "decode_and_flatten_json_dict",
26    "decode_json_dict",
27    "git_root",
28    "isinstance_or_raise",
29    "listify",
30    "load_toml_with_placeholders",
31    "must_yield_something",
32    "relative_path_root",
33    "replace_suffixes",
34    "set_contextvar",
35    "setup_logging",
36    "sorted_dict",
37    "strip_suffixes",
38    "write_to_path",
39]
40
41from .cd import RelativePath, relative_path_root
42from .classproperties import classproperty
43from .context import ContextSource, ValidationContext, add_to_context, cast_context
44from .contextmanagers import set_contextvar
45from .deserialize import (
46    JSONDict,
47    JSONValue,
48    TOMLDict,
49    TOMLValue,
50    cast_or_raise,
51    decode_and_flatten_json_dict,
52    decode_json_dict,
53    isinstance_or_raise,
54    load_toml_with_placeholders,
55)
56from .git import git_root
57from .iterators import listify, must_yield_something
58from .logging import TRACE, setup_logging
59from .path import replace_suffixes, strip_suffixes, write_to_path
60from .singletons import Inherit, InheritType, NoValue, NoValueType
61from .types import (
62    FieldOrProperty,
63    IProperty,
64    PydanticOrderedSet,
65    PydanticURL,
66    Sortable,
67    TryGetEnum,
68    clamping_validator,
69    sorted_dict,
70)
TRACE = 5
ContextSource = dict[str, typing.Any] | pydantic_core.core_schema.ValidationInfo | jinja2.runtime.Context
FieldOrProperty = typing.Union[+_T_covariant, IProperty[+_T_covariant]]
class IProperty(typing.Protocol[+_T_covariant]):
53class IProperty(Protocol[_T_covariant]):
54    def __get__(
55        self,
56        __instance: Any,
57        __owner: type | None = None,
58        /,
59    ) -> _T_covariant: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto(Protocol[T]):
    def meth(self) -> T:
        ...
IProperty(*args, **kwargs)
1953def _no_init_or_replace_init(self, *args, **kwargs):
1954    cls = type(self)
1955
1956    if cls._is_protocol:
1957        raise TypeError('Protocols cannot be instantiated')
1958
1959    # Already using a custom `__init__`. No need to calculate correct
1960    # `__init__` to call. This can lead to RecursionError. See bpo-45121.
1961    if cls.__init__ is not _no_init_or_replace_init:
1962        return
1963
1964    # Initially, `__init__` of a protocol subclass is set to `_no_init_or_replace_init`.
1965    # The first instantiation of the subclass will call `_no_init_or_replace_init` which
1966    # searches for a proper new `__init__` in the MRO. The new `__init__`
1967    # replaces the subclass' old `__init__` (ie `_no_init_or_replace_init`). Subsequent
1968    # instantiation of the protocol subclass will thus use the new
1969    # `__init__` and no longer call `_no_init_or_replace_init`.
1970    for base in cls.__mro__:
1971        init = base.__dict__.get('__init__', _no_init_or_replace_init)
1972        if init is not _no_init_or_replace_init:
1973            cls.__init__ = init
1974            break
1975    else:
1976        # should not happen
1977        cls.__init__ = object.__init__
1978
1979    cls.__init__(self, *args, **kwargs)
Inherit = <InheritType._token: 0>
class InheritType(enum.Enum):
24class InheritType(Enum):
25    """Type of Inherit, a singleton representing a value that should be inherited."""
26
27    _token = 0
28
29    def __str__(self):
30        return "Inherit"

Type of Inherit, a singleton representing a value that should be inherited.

JSONDict = dict[str, JsonValue]
JSONValue = JsonValue
NoValue = <NoValueType._token: 0>
class NoValueType(enum.Enum):
 8class NoValueType(Enum):
 9    """Type of NoValue, a singleton representing the value of a nonexistent dict key."""
10
11    _token = 0
12
13    def __str__(self):
14        return "NoValue"
15
16    def __bool__(self) -> Literal[False]:
17        return False

Type of NoValue, a singleton representing the value of a nonexistent dict key.

class PydanticOrderedSet(ordered_set.OrderedSet[~_T]):
 76class PydanticOrderedSet(OrderedSet[_T]):
 77    def __init__(self, initial: OrderedSetInitializer[_T] | None = None):
 78        super().__init__(initial or [])
 79
 80    @classmethod
 81    def collect(cls, *initial: _T):
 82        return cls(initial)
 83
 84    @classmethod
 85    def __get_pydantic_core_schema__(
 86        cls,
 87        source: type[Any],
 88        handler: GetCoreSchemaHandler,
 89    ) -> core_schema.CoreSchema:
 90        type_arg: Any
 91        match len(type_args := get_args(source)):
 92            case 0:
 93                type_arg = Any
 94            case 1:
 95                type_arg = type_args[0]
 96            case n:
 97                raise ValueError(f"Expected 0 or 1 type args, got {n}: {type_args}")
 98
 99        return core_schema.union_schema(
100            [
101                core_schema.is_instance_schema(cls),
102                cls._get_non_instance_schema(type_arg, handler),
103            ],
104            serialization=cls._get_ser_schema(type_arg, handler),
105        )
106
107    @classmethod
108    def _get_non_instance_schema(
109        cls,
110        type_arg: type[Any],
111        handler: GetCoreSchemaHandler,
112    ) -> core_schema.CoreSchema:
113        # validate from OrderedSetInitializer
114        return core_schema.no_info_after_validator_function(
115            function=PydanticOrderedSet,
116            schema=handler.generate_schema(OrderedSetInitializer[type_arg]),
117        )
118
119    @classmethod
120    def _get_ser_schema(
121        cls,
122        type_arg: type[Any],
123        handler: GetCoreSchemaHandler,
124    ) -> core_schema.SerSchema:
125        # serialize to list
126        return core_schema.plain_serializer_function_ser_schema(
127            function=cls._get_items,
128        )
129
130    def _get_items(self):
131        return self.items

An OrderedSet is a custom MutableSet that remembers its order, so that every entry has an index that can be looked up.

Example:

OrderedSet([1, 1, 2, 3, 2]) OrderedSet([1, 2, 3])

PydanticOrderedSet( initial: Union[AbstractSet[~_T], Sequence[~_T], Iterable[~_T], NoneType] = None)
77    def __init__(self, initial: OrderedSetInitializer[_T] | None = None):
78        super().__init__(initial or [])
@classmethod
def collect(cls, *initial: ~_T):
80    @classmethod
81    def collect(cls, *initial: _T):
82        return cls(initial)
PydanticURL = typing.Annotated[yarl.URL, GetPydanticSchema(get_pydantic_core_schema=<function <lambda>>, get_pydantic_json_schema=None)]
RelativePath = typing.Annotated[pathlib.Path, AfterValidator(func=<function <lambda>>)]
class Sortable(abc.ABC):
31class Sortable(ABC):
32    """ABC for classes which can be sorted."""
33
34    @property
35    @abstractmethod
36    def _cmp_key(self) -> Any: ...
37
38    def __lt__(self, other: Any) -> bool:
39        if isinstance(other, Sortable):
40            return self._cmp_key < other._cmp_key
41        return NotImplemented

ABC for classes which can be sorted.

TOMLDict = dict[str, 'TOMLValue']
TOMLValue = str | int | float | bool | datetime.datetime | datetime.date | datetime.time | list['TOMLValue'] | dict[str, 'TOMLValue'] | None
@unique
class TryGetEnum(enum.Enum):
65@unique
66class TryGetEnum(Enum):
67    @classmethod
68    def get(cls, value: Any):
69        try:
70            return cls(value)
71        except ValueError:
72            return None
@classmethod
def get(cls, value: Any):
67    @classmethod
68    def get(cls, value: Any):
69        try:
70            return cls(value)
71        except ValueError:
72            return None
class ValidationContext:
14class ValidationContext:
15    @classproperty
16    @classmethod
17    def context_key(cls) -> str:
18        return str(cls)
19
20    @overload
21    @classmethod
22    def of(cls, info: ValidationInfo, /) -> Self: ...
23
24    @overload
25    @classmethod
26    def of(cls, context: dict[str, Any] | Context, /) -> Self: ...
27
28    @overload
29    @classmethod
30    def of(cls, source: ContextSource, /) -> Self: ...
31
32    @classmethod
33    def of(cls, source: ContextSource, /) -> Self:
34        match source:
35            case dict() | Context():
36                pass
37            case _:
38                source = cast_or_raise(source.context, dict)
39        return cast_or_raise(source[cls.context_key], cls)
40
41    def add_to_context(self, context: dict[str, Any], overwrite: bool = False):
42        return add_to_context(context, self.context_key, self, overwrite)
def context_key(unknown):

Equivalent of classmethod(property(...)).

Use @classproperty. Do not instantiate this class directly.

@classmethod
def of( cls, source: dict[str, typing.Any] | pydantic_core.core_schema.ValidationInfo | jinja2.runtime.Context, /) -> Self:
32    @classmethod
33    def of(cls, source: ContextSource, /) -> Self:
34        match source:
35            case dict() | Context():
36                pass
37            case _:
38                source = cast_or_raise(source.context, dict)
39        return cast_or_raise(source[cls.context_key], cls)
def add_to_context(self, context: dict[str, typing.Any], overwrite: bool = False):
41    def add_to_context(self, context: dict[str, Any], overwrite: bool = False):
42        return add_to_context(context, self.context_key, self, overwrite)
def add_to_context( context: dict[str, typing.Any], key: str, value: Any, overwrite: bool = False):
45def add_to_context(
46    context: dict[str, Any],
47    key: str,
48    value: Any,
49    overwrite: bool = False,
50):
51    if not overwrite and key in context:
52        raise KeyError(f"Key {key} for {value} already exists in context")
53    context[key] = value
def cast_context( source: dict[str, typing.Any] | pydantic_core.core_schema.ValidationInfo | jinja2.runtime.Context) -> dict[str, typing.Any]:
56def cast_context(source: ContextSource) -> dict[str, Any]:
57    """Wrapper for `typing.cast` to simplify passing `ContextSource` to validation
58    methods. This is a lie to keep the type checker happy."""
59    return cast(dict[str, Any], source)

Wrapper for typing.cast to simplify passing ContextSource to validation methods. This is a lie to keep the type checker happy.

def cast_or_raise( val: Any, class_or_tuple: type[~_T] | tuple[type[~_T], ...], message: str | None = None) -> ~_T:
52def cast_or_raise(
53    val: Any,
54    class_or_tuple: type[_T] | tuple[type[_T], ...],
55    message: str | None = None,
56) -> _T:
57    assert isinstance_or_raise(val, class_or_tuple, message)
58    return val
def clamping_validator(lower: float | None, upper: float | None):
153def clamping_validator(lower: float | None, upper: float | None):
154    def validator(value: float):
155        lower_ = lower if lower is not None else value
156        upper_ = upper if upper is not None else value
157        return max(lower_, min(upper_, value))
158
159    return AfterValidator(validator)
def classproperty( func: Callable[[type[-_T_cv]], +_R_co]) -> hexdoc.utils.classproperties.ClassPropertyDescriptor[-_T_cv, +_R_co]:
24def classproperty(
25    func: Callable[[type[_T_cv]], _R_co],
26) -> ClassPropertyDescriptor[_T_cv, _R_co]:
27    if isinstance(func, classmethod):
28        return ClassPropertyDescriptor(func)
29    return ClassPropertyDescriptor(classmethod(func))
def decode_and_flatten_json_dict(data: str) -> dict[str, str]:
27def decode_and_flatten_json_dict(data: str) -> dict[str, str]:
28    # replace `\<LF>       foobar` with `\<LF>foobar`
29    data = re.sub(r"\\\n\s*", "\\\n", data)
30
31    # decode and flatten
32    decoded = decode_json_dict(data)
33    return _flatten_inner(decoded, "")
def decode_json_dict(data: str | bytes) -> dict[str, JsonValue]:
15def decode_json_dict(data: str | bytes) -> JSONDict:
16    match data:
17        case str():
18            decoded = pyjson5.decode(data)
19        case _:
20            decoded = pyjson5.decode_utf8(data)
21    assert isinstance_or_raise(decoded, dict)
22    return cast(JSONDict, decoded)
def git_root(cwd: str | pathlib.Path) -> pathlib.Path:
 7def git_root(cwd: str | Path) -> Path:
 8    return commands.run(
 9        ["git", "rev-parse", "--show-toplevel"],
10        cwd=cwd,
11        type=Path,
12    )
def isinstance_or_raise( val: Any, class_or_tuple: type[~_T] | tuple[type[~_T], ...], message: str | None = None) -> TypeGuard[~_T]:
13def isinstance_or_raise(
14    val: Any,
15    class_or_tuple: type[_T] | tuple[type[_T], ...],
16    message: str | None = None,
17) -> TypeGuard[_T]:
18    """Usage: `assert isinstance_or_raise(val, str)`
19
20    message placeholders: `{expected}`, `{actual}`, `{value}`
21    """
22
23    # convert generic types into the origin type
24    if not isinstance(class_or_tuple, tuple):
25        class_or_tuple = (class_or_tuple,)
26    ungenericed_classes = tuple(get_origin(t) or t for t in class_or_tuple)
27
28    if not isinstance(val, ungenericed_classes):
29        # just in case the caller messed up the message formatting
30        subs = {
31            "expected": list(class_or_tuple),
32            "actual": type(val),
33            "value": val,
34        }
35
36        if logger.getEffectiveLevel() >= logging.WARNING:
37            default_message = _DEFAULT_MESSAGE_SHORT
38        else:
39            default_message = _DEFAULT_MESSAGE_LONG
40
41        if message is None:
42            raise TypeError(default_message.format(**subs))
43
44        try:
45            raise TypeError(message.format(**subs))
46        except KeyError:
47            raise TypeError(default_message.format(**subs))
48
49    return True

Usage: assert isinstance_or_raise(val, str)

message placeholders: {expected}, {actual}, {value}

def listify(f: Callable[~_P, Iterator[~_T]]) -> Callable[~_P, list[~_T]]:
21def listify(f: Callable[_P, Iterator[_T]]) -> Callable[_P, list[_T]]:
22    @functools.wraps(f)
23    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> list[_T]:
24        return list(f(*args, **kwargs))
25
26    return wrapper
def load_toml_with_placeholders( path: pathlib.Path) -> dict[str, str | int | float | bool | datetime.datetime | datetime.date | datetime.time | list[ForwardRef('TOMLValue')] | dict[str, ForwardRef('TOMLValue')] | None]:
134def load_toml_with_placeholders(path: Path) -> TOMLDict:
135    data = tomllib.loads(path.read_text("utf-8"))
136    fill_placeholders(data)
137    return data
def must_yield_something(f: Callable[~_P, Iterator[~_T]]) -> Callable[~_P, Iterator[~_T]]:
 9def must_yield_something(f: Callable[_P, Iterator[_T]]) -> Callable[_P, Iterator[_T]]:
10    """Raises StopIteration if the wrapped iterator doesn't yield anything."""
11
12    @functools.wraps(f)
13    def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> Iterator[_T]:
14        iterator = f(*args, **kwargs)
15        yield next(iterator)
16        yield from iterator
17
18    return wrapper

Raises StopIteration if the wrapped iterator doesn't yield anything.

def relative_path_root(path: pathlib.Path):
13def relative_path_root(path: Path):
14    return set_contextvar(_relative_path_root, path)
def replace_suffixes(path: pathlib.Path, suffix: str) -> pathlib.Path:
26def replace_suffixes(path: Path, suffix: str) -> Path:
27    """Replaces all suffixes of a path. This is helpful because `path.with_suffix()`
28    only replaces the last suffix.
29
30    For example:
31    ```py
32    path = Path("lang/en_us.flatten.json5")
33    replace_suffixes(path, ".json")  # lang/en_us.json
34    path.with_suffix(".json")        # lang/en_us.flatten.json
35    ```
36    """
37    return strip_suffixes(path).with_suffix(suffix)

Replaces all suffixes of a path. This is helpful because path.with_suffix() only replaces the last suffix.

For example:

path = Path("lang/en_us.flatten.json5")
replace_suffixes(path, ".json")  # lang/en_us.json
path.with_suffix(".json")        # lang/en_us.flatten.json
@contextmanager
def set_contextvar(contextvar: _contextvars.ContextVar[~_T], value: ~_T):
 9@contextmanager
10def set_contextvar(contextvar: ContextVar[_T], value: _T):
11    token = contextvar.set(value)
12    try:
13        yield
14    finally:
15        contextvar.reset(token)
def setup_logging( verbosity: int, ci: bool, *, filters: Optional[Iterable[logging.Filter]] = None, quiet_langs: Optional[Iterable[str]] = None):
 80def setup_logging(
 81    verbosity: int,
 82    ci: bool,
 83    *,
 84    filters: Iterable[Filter] | None = None,
 85    quiet_langs: Iterable[str] | None = None,
 86):
 87    logging.addLevelName(TRACE, "TRACE")
 88
 89    root_logger = logging.getLogger()
 90
 91    if root_logger.handlers:
 92        for handler in root_logger.handlers:
 93            if isinstance(handler.formatter, HexdocLevelFormatter):
 94                logger.debug(f"Removing existing handler from root logger: {handler}")
 95                root_logger.removeHandler(handler)
 96
 97    level = verbosity_log_level(verbosity)
 98    root_logger.setLevel(level)
 99
100    formats = {
101        logging.DEBUG: log_format("relativeCreated", "levelname", "name"),
102    }
103
104    if level >= logging.INFO:
105        # set this here so we don't clobber exceptions in verbose mode
106        # but don't set it if Typer's pretty tracebacks are enabled
107        # see also: https://typer.tiangolo.com/tutorial/exceptions/#disable-pretty-exceptions
108        if os.getenv("_TYPER_STANDARD_TRACEBACK"):
109            sys.excepthook = filtered_excepthook
110
111        formats |= {
112            logging.INFO: log_format("levelname"),
113            logging.WARNING: log_format("levelname", "name"),
114        }
115
116    if ci:
117        formats |= {
118            logging.WARNING: "::warning file={name},line={lineno},title={levelname}::{message}",
119            logging.ERROR: "::error file={name},line={lineno},title={levelname}::{message}",
120        }
121
122    handler = StreamHandler()
123
124    handler.setLevel(level)
125    handler.setFormatter(HexdocLevelFormatter(formats, style="{"))
126
127    if filters:
128        for filter in filters:
129            handler.addFilter(filter)
130
131    if quiet_langs:
132        for lang in quiet_langs:
133            handler.addFilter(RegexFilter(f"^No translation in {lang}"))
134
135    root_logger.addHandler(handler)
136
137    logging.getLogger("PIL").setLevel(logging.INFO)
138
139    logger.debug("Initialized logger.")
def sorted_dict(d: Mapping[~_T, ~_T_Sortable]) -> dict[~_T, ~_T_Sortable]:
49def sorted_dict(d: Mapping[_T, _T_Sortable]) -> dict[_T, _T_Sortable]:
50    return dict(sorted(d.items(), key=lambda item: item[1]))
def strip_suffixes(path: pathlib.Path) -> pathlib.Path:
10def strip_suffixes(path: Path) -> Path:
11    """Removes all suffixes from a path. This is helpful because `path.with_suffix("")`
12    only removes the last suffix.
13
14    For example:
15    ```py
16    path = Path("lang/en_us.flatten.json5")
17    strip_suffixes(path)  # lang/en_us
18    path.with_suffix("")  # lang/en_us.flatten
19    ```
20    """
21    while path.suffix:
22        path = path.with_suffix("")
23    return path

Removes all suffixes from a path. This is helpful because path.with_suffix("") only removes the last suffix.

For example:

path = Path("lang/en_us.flatten.json5")
strip_suffixes(path)  # lang/en_us
path.with_suffix("")  # lang/en_us.flatten
def write_to_path(path: pathlib.Path, data: str | bytes, encoding: str = 'utf-8'):
40def write_to_path(path: Path, data: str | bytes, encoding: str = "utf-8"):
41    logger.log(TRACE, f"Writing to {path}")
42    path.parent.mkdir(parents=True, exist_ok=True)
43    match data:
44        case str():
45            path.write_text(data, encoding)
46        case _:
47            path.write_bytes(data)