Skip to content

API Reference

All public objects are importable from the top-level extracontext package.

ContextLocal

The main entry point. A factory that returns either a NativeContextLocal or PyContextLocal instance depending on the backend argument.

extracontext.ContextLocal

Source code in extracontext/base.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class ContextLocal:
    _backend_registry: dict[str, type["ContextLocal"]] = {}

    def __new__(cls, *args, backend=None, **kwargs):
        if backend is None:
            backend = getattr(cls, "_backend_key", "native")
        cls = cls._backend_registry[backend]
        ## Do not forward arguments to object.__new__
        if len(__class__.__mro__) == 2:
            args, kwargs = (), {}
        return super().__new__(cls, *args, **kwargs)

    def __init__(self, *, backend=None):
        pass

    def __init_subclass__(cls, *args, **kw):
        if hasattr(cls, "_backend_key"):
            cls._backend_registry[cls._backend_key] = cls
        super().__init_subclass__(*args, **kw)

ContextMap

A ContextLocal subclass implementing collections.abc.MutableMapping, providing dictionary-style access alongside attribute access.

extracontext.ContextMap

Bases: MutableMapping, ContextLocal

Works the same as PyContextLocal, but uses the mapping interface instead of dealing with instance attributes.

Ideal, as for most map uses, when the keys depend on data rather than hardcoded state variables

Source code in extracontext/mapping.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
class ContextMap(MutableMapping, ContextLocal):
    """Works the same as PyContextLocal,
    but uses the mapping interface instead of dealing with instance attributes.

    Ideal, as for most map uses, when the keys depend on data rather than
    hardcoded state variables
    """

    _backend_registry = {}

    # def __init__(self, initial: None | Mapping = None, *, backend=None):
    # super().__init__()
    # if not initial:
    # return
    # for key, value in initial.items():
    # self[key] = value

    def __getitem__(self, name):
        try:
            return self.__getattr__(name)
        except AttributeError:
            raise KeyError(name)

    def __setitem__(self, name, value):
        setattr(self, name, value)

    def __delitem__(self, name):
        try:
            delattr(self, name)
        except AttributeError:
            raise KeyError(name)

    def __iter__(self):
        return iter(dir(self))

    def __len__(self):
        return len(dir(self))

ContextPreservingExecutor

A concurrent.futures.ThreadPoolExecutor subclass that propagates the current context into worker threads.

extracontext.ContextPreservingExecutor

Bases: ThreadPoolExecutor

Drop in context preserving replacement to concurrent.futures.ThreadPoolExecutor

This class adds a missing functionality to asynchorous coding in Python, in which separate tasks will naturally each have their own context with distinct contextvars (either native stdlib scalar contextvars or extracontext.ContextLocal namespaces): when calling a blocking function with "run_in_executor", the context is zeroed-out in the function executed in any of the worker threads.

By simply instantiating this class for an Executor, the called functions will run in a copy of the same context that was in use by the callee - which means one is free to use await loop.run_in_executor calls and have the target function still see the current context.

Source code in extracontext/executor.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class ContextPreservingExecutor(ThreadPoolExecutor):
    """Drop in context preserving replacement to concurrent.futures.ThreadPoolExecutor

    This class adds a missing functionality to asynchorous coding in Python,
    in which separate tasks will naturally each have their own context
    with distinct contextvars (either native stdlib scalar contextvars
    or extracontext.ContextLocal namespaces): when calling a blocking
    function with "run_in_executor", the context is zeroed-out in the
    function executed in any of the worker threads.

    By simply instantiating this class for an Executor, the called
    functions will run in a copy of the same context that was in use
    by the callee - which means one is free to use `await loop.run_in_executor`
    calls and have the target function still see the current context.

    """

    submit = new_submit

ContextError

Exception raised when a context variable is accessed outside a valid scope.

extracontext.ContextError

Bases: AttributeError

Source code in extracontext/contextlocal.py
34
35
class ContextError(AttributeError):
    pass

NativeContextLocal

The default backend implementation, built on stdlib contextvars.ContextVar.

extracontext.NativeContextLocal

Bases: ContextLocal

Uses th native contextvar module in the stdlib (PEP 567) to provide a context-local namespace in the way threading.local works for threads.

Assignements and reading from the namespace should work naturally with no need to call get and set methods.

A new contextvar variable is created in the current (contextvars) context for each attribute acessed on this namespace.

Also, attributes prefixed with a single "et" are intended for internal use and will not be namespaced contextvars.

In contrast to the pure-Python implementation there are

some limitations,such as the impossibility to work

in as a contextmanager (Python with block),.

[Work In Progress]

Source code in extracontext/contextlocal_native.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class NativeContextLocal(ContextLocal):
    """Uses th native contextvar module in the stdlib (PEP 567)
    to provide a context-local namespace in the way
    threading.local  works for threads.

    Assignements and reading from the namespace
    should work naturally with no need to call `get` and `set` methods.

    A new contextvar variable is created in the current (contextvars) context
    for _each_ attribute acessed on this namespace.

    Also, attributes prefixed with a single "_et_" are intended for internal
    use and will not be namespaced contextvars.

    # In contrast to the pure-Python implementation there are
    # some limitations,such as the impossibility to work
    # in as a contextmanager (Python `with` block),.

    [Work In Progress]
    """

    _backend_key = "native"
    _ctypes_initialized = False

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._et_registry = {}
        self._et_stack = {}
        self._et_lock = threading.Lock()

    def __getattr__(self, name):
        var = self._et_registry.get(name, None)
        if var is None:
            raise AttributeError(f"Attribute not set: {name}")
        try:
            value = var.get()
        except LookupError as error:
            raise AttributeError from error
        if value is _sentinel:
            raise AttributeError(f"Attribute not set: {name}")
        return value

    def __setattr__(self, name, value):
        if name.startswith("_et_"):
            return super().__setattr__(name, value)
        var = self._et_registry.get(name, _sentinel)
        if var is _sentinel:
            var = self._et_registry[name] = ContextVar(name)
        var.set(value)

    def __delattr__(self, name):
        if getattr(self, name, _sentinel) is _sentinel:
            raise AttributeError(f"Attribute not set: {name}")
        setattr(self, name, _sentinel)

    def __call__(self, callable_):
        @wraps(callable_)
        def wrapper(*args, **kw):
            return self._run(callable_, *args, **kw)

        return wrapper

    def _ensure_api_ready(self):
        if not self._ctypes_initialized:
            ctypes.pythonapi.PyContext_Enter.argtypes = [ctypes.py_object]
            ctypes.pythonapi.PyContext_Exit.argtypes = [ctypes.py_object]
            ctypes.pythonapi.PyContext_Enter.restype = ctypes.c_int32
            ctypes.pythonapi.PyContext_Exit.restype = ctypes.c_int32
            self.__class__._ctypes_initialized = True

    def _get_ctx_key(self):
        key_thread = threading.current_thread()
        try:
            key_task = asyncio.current_task()
        except RuntimeError:
            key_task = None
        return (key_thread, key_task)

    def _enter_ctx(self, new_ctx):
        if pypy:
            prev_ctx = _get_contextvar_context()
            _set_contextvar_context(new_ctx)
            return prev_ctx
        self._ensure_api_ready()
        result = ctypes.pythonapi.PyContext_Enter(new_ctx)
        if result != 0:
            raise RuntimeError(f"Something went wrong entering context {new_ctx}")
        return None

    def _exit_ctx(self, current_ctx, prev_ctx):
        if pypy:
            _set_contextvar_context(prev_ctx)
            return
        result = ctypes.pythonapi.PyContext_Exit(current_ctx)
        if result != 0:
            raise RuntimeError(f"Something went wrong exiting context {current_ctx}")

    def __enter__(self):
        new_ctx = copy_context()
        prev_ctx = self._enter_ctx(new_ctx)
        with self._et_lock:
            self._et_stack.setdefault(self._get_ctx_key(), []).append(
                (new_ctx, prev_ctx)
            )
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        key = self._get_ctx_key()
        with self._et_lock:
            current_ctx, prev_ctx = self._et_stack[key].pop()
            if not self._et_stack[key]:
                self._et_stack.pop(key)
        self._exit_ctx(current_ctx, prev_ctx)

    def _run(self, callable_, *args, **kw):
        """Runs callable with an isolated context
        no need to decorate the target callable
        """
        new_context = copy_context()
        result = new_context.run(callable_, *args, **kw)
        if inspect.isawaitable(result):
            result = self._awaitable_wrapper(result, new_context)
        elif inspect.isgenerator(result):
            result = self._generator_wrapper(result, new_context)
        elif inspect.isasyncgen(result):
            result = self._async_generator_wrapper(result, new_context)
            # raise NotImplementedError("NativeContextLocal doesn't yet work with async generators")
        return result

    @staticmethod
    def _generator_wrapper(generator, ctx_copy):
        value = None
        while True:
            try:
                if value is None:
                    value = yield ctx_copy.run(next, generator)
                else:
                    value = yield ctx_copy.run(generator.send, value)
            except StopIteration as stop:
                return stop.value
            except GeneratorExit:
                ctx_copy.run(generator.close)
                raise
            except Exception as exc:
                # for debugging times: this will be hard without a break here!
                # print(exc)
                try:
                    value = ctx_copy.run(generator.throw, exc)
                except StopIteration as stop:
                    return stop.value

    if sys.version_info >= (3, 11):

        async def _awaitable_wrapper(self, coro, ctx_copy):
            def trampoline():
                return asyncio.create_task(coro, context=ctx_copy)

            return await ctx_copy.run(trampoline)

    else:

        async def _awaitable_wrapper(self, coro, ctx_copy):
            from ._future_task import FutureTask

            loop = asyncio.get_running_loop()

            def trampoline():
                return FutureTask(coro, loop=loop, context=ctx_copy)

            return await ctx_copy.run(trampoline)

        ## this fails in spetacular and inovative ways!
        # async def _awaitable_wrapper(self, coro, ctx_copy, force_context=True):
        # if force_context:
        # try:
        # self._enter_ctx(ctx_copy)
        # result = await coro
        # finally:
        # self._exit_ctx(ctx_copy)
        # return result
        # else:
        # return await coro

        async def _awaitable_wrapper2(self, coro, ctx_copy):
            raise NotImplementedError(
                """This code will only work with Python versions > 3.11. Please use `ContextLocal(backend="python")` for Python version 3.8 - 3.10"""
            )

    async def _async_generator_wrapper(self, generator, ctx_copy):
        value = None
        while True:
            try:
                if value is None:
                    async_res = ctx_copy.run(anext, generator)
                else:
                    async_res = ctx_copy.run(generator.asend, value)
                value = yield await self._awaitable_wrapper(async_res, ctx_copy)
            except GeneratorExit:
                async_res = ctx_copy.run(generator.aclose)
                await self._awaitable_wrapper(async_res, ctx_copy)
                raise
            except StopAsyncIteration:
                break
            except Exception as exc:
                # for debugging times: this will be hard without a break here!
                # print("*" * 50 , exc)
                try:
                    async_res = ctx_copy.run(generator.athrow, exc)
                    value = yield await self._awaitable_wrapper(async_res, ctx_copy)
                except StopAsyncIteration:
                    break

    def __dir__(self):
        return list(
            key
            for key, value in self._et_registry.items()
            if value.get() is not _sentinel
        )

PyContextLocal

The pure-Python backend implementation, using frame introspection.

extracontext.PyContextLocal

Bases: ContextLocal

Creates a namespace object whose attributes can keep individual and distinct values for the same key for code running in parallel - either in asyncio tasks, or threads.

The bennefits are the same one gets by using contextvars.ContextVar from the stdlib as specified on PEP 567. However extracontext.ContextLocal is designed to be easier and more convenient to use - as a single instance can hold values for several keys, just as happens with threading.local objects. And no special getter and setter methods are needed to retrieve the unique value stored in the current context: normal attribute access and assignment works transparently.

Internally, the current implementation uses a completly different way to keep distinct states where needed: the "locals" mapping for each execution frame is used as storage for the unique values in an async task context, or in a thread. Although not recomended up to now, read/write access to non-local-variables in the "locals" mapping is specified on PEP 558. While that PEP is not final, it is clear in its texts that the capability of using "locals" as a mapping to convey data will be kept and made official.

References to the frames containing context data is kept using weakreferences, so when a Frame ends up execution, its contents are deleted normally, with no risks of frame data hanging around due to PyContextLocal data.

Source code in extracontext/contextlocal.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
class PyContextLocal(ContextLocal):
    """Creates a namespace object whose attributes can keep individual and distinct values for
    the same key for code running in parallel - either in asyncio tasks, or threads.

    The bennefits are the same one gets by using contextvars.ContextVar from the stdlib as
    specified on PEP 567. However extracontext.ContextLocal is designed to be easier
    and more convenient to use - as a single instance can hold values for several
    keys, just as happens with threading.local objects. And no special getter and
    setter methods are needed to retrieve the unique value stored in the current
    context: normal attribute access and assignment works transparently.

    Internally, the current implementation uses a completly different way to
    keep distinct states where needed: the "locals" mapping for each execution
    frame is used as storage for the unique values in an async task context, or in
    a thread. Although not recomended up to now, read/write access to non-local-variables
    in the "locals" mapping is specified on PEP 558. While that PEP is not
    final, it is clear in its texts that the capability of using "locals" as
    a mapping to convey data will be kept and made official.

    References to the frames containing context data is kept using
    weakreferences, so when a Frame ends up execution, its contents
    are deleted normally, with no risks of frame data
    hanging around due to PyContextLocal data.


    """

    # TODO: change _BASEDIST to a property counting the intermediate
    # methods between subclasses and the methods here.
    _BASEDIST = 0

    _backend_key = "python"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        super().__setattr__("_et_registry", WeakKeyDictionary())

    def _introspect_registry(
        self, name: T.Optional[str] = None, starting_frame: int = 2
    ) -> T.Tuple[dict, T.Tuple[int, int]]:
        """
        returns the first namespace found for this context, if name is None
        else, the first namespace where the name exists. The second return
        value is a tuple inticatind the frame distance to the topmost namespace
        and the frame distance to the returned namespace.
        This way callers can tell if the searched name is on the topmost
        namespace and act accordingly. ("del" needs this information,
        as it can't remove information on an outter namespace)
        """
        starting_frame += self._BASEDIST
        f: T.Optional[FrameType] = sys._getframe(starting_frame)
        count = 0
        first_ns = None
        while f:
            hf = self._frameid(f)
            if hf in self._et_registry:
                if first_ns is None:
                    first_ns = count
                registered_namespaces = f.f_locals["$contexts"]
                for namespace_index in reversed(self._et_registry[hf]):
                    namespace = registered_namespaces[namespace_index]
                    if name is None or name in namespace:
                        return namespace, (first_ns, count)
                    count += 1
            f = f.f_back

        if name:
            raise ContextError(f"{name !r} not defined in any previous context")
        raise ContextError("No previous context set")

    def _frameid(self, frame: FrameType) -> _WeakableId:
        if not "$contexts_salt" in frame.f_locals:
            frame.f_locals["$contexts_salt"] = _WeakableId()
        return frame.f_locals["$contexts_salt"]

    def _register_context(self, f: FrameType) -> None:
        hf = self._frameid(f)
        contexts_list = f.f_locals.setdefault("$contexts", [])
        contexts_list.append({})
        self._et_registry.setdefault(hf, []).append(len(contexts_list) - 1)

    def _pop_context(self, f: FrameType) -> None:
        hf = self._frameid(f)
        context_being_popped = self._et_registry[hf].pop()
        contexts_list = f.f_locals["$contexts"]
        contexts_list[context_being_popped] = None

    def __getattr__(self, name: str) -> T.Any:
        try:
            namespace, _ = self._introspect_registry(name)
            result = namespace[name]
            if result is _sentinel:
                raise KeyError(name)
            return result
        except (ContextError, KeyError):
            raise AttributeError(f"Attribute not set: {name}")

    def __setattr__(self, name: str, value: T.Any) -> None:
        try:
            namespace, _ = self._introspect_registry()
        except ContextError:
            # Automatically creates a new namespace if not inside
            # any explicit denominated context:
            self._register_context(sys._getframe(1 + self._BASEDIST))
            namespace, _ = self._introspect_registry()

        namespace[name] = value

    def __delattr__(self, name: str) -> None:
        try:
            namespace, (topmost_ns, found_ns) = self._introspect_registry(name)
        except ContextError:
            raise AttributeError(name)
        if topmost_ns == found_ns:
            result = namespace[name]
            if result is not _sentinel:
                if "$deleted" in namespace and name in namespace["$deleted"]:
                    # attribute exists in target namespace, but the outter
                    # attribute had previously been shadowed by a delete -
                    # restore the shadowing:
                    setattr(self, name, _sentinel)

                else:
                    # Remove topmost name assignemnt, and outer value is exposed
                    # ("one_level" attribute stacking behavior as described in 'features.py'
                    # disbled as unecessaryly complex):
                    # del namespace[name]

                    # To preserve  "entry_only" behavior:
                    namespace.setdefault("$deleted", set()).add(name)
                    setattr(self, name, _sentinel)
                return
            # value is already shadowed:
            raise AttributeError(name)

        # Name is found, but it is not on the top-most level, so attribute is shadowed:
        setattr(self, name, _sentinel)
        # fossil: namespace, _ = self._introspect_registry(name)
        namespace.setdefault("$deleted", set()).add(name)

    def __call__(self, callable_: T.Callable) -> T.Callable:
        @wraps(callable_)
        def wrapper(*args, **kw):
            f = sys._getframe()
            self._register_context(f)
            f_id = self._frameid(f)
            result = _sentinel
            try:
                result = callable_(*args, **kw)
            finally:
                if f_id in self._et_registry:
                    del self._et_registry[f_id]
                # Setup context for generator, async generator or coroutine if one was returned:
                if result is not _sentinel:
                    frame = None
                    for frame_attr in ("gi_frame", "ag_frame", "cr_frame"):
                        frame = getattr(result, frame_attr, None)
                        if frame:
                            self._register_context(frame)
            return result

        return wrapper

    def __enter__(self):
        self._register_context(sys._getframe(1))
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self._pop_context(sys._getframe(1))

    def _run(self, callable_, *args, **kw):
        """Runs callable with an isolated context
        no need to decorate the target callable
        """
        with self:
            return callable_(*args, **kw)

    def __dir__(self) -> T.List[str]:
        frame_count = 2
        all_attrs = set()
        seen_namespaces = set()
        while True:
            try:
                namespace, _ = self._introspect_registry(starting_frame=frame_count)
            except (
                ValueError,
                ContextError,
            ):  # ValueError can be raised sys._getframe inside _introspect_registry
                break
            frame_count += 1
            if id(namespace) in seen_namespaces:
                continue
            for key, value in namespace.items():
                if not key.startswith("$") and value is not _sentinel:
                    all_attrs.add(key)

            seen_namespaces.add(id(namespace))
        all_attrs = {
            attr
            for attr in all_attrs
            if getattr(self, attr, _sentinel) is not _sentinel
        }
        return sorted(all_attrs)