di: pythonic dependency injection




In this article, I would like to introduce di, a dependency injection framework that seeks to be:

  • Intuitive: simple things are easy, complex things are possible
  • Succinct: low boilerplate, can run your code directly via autowiring
  • Correct: tested with MyPy; exports valid type annotations
  • Powerful: with lifecycles, shared dependencies and a flexible API
  • Performant: supporting async dependencies and concurrent execution



Introduction to dependency injection

Dependency injection is a technique for organizing larger applications by handing over control of your dependencies to “something” (often a framework) that assembles them for you.
It’s a complicated topic, so maybe it’s better to just look at an example.
We’ll build our example around a pretty common scenario: you have a class that needs to make web requests, so it needs an HTTP client (we’ll use httpx).

from httpx import Client


class WeatherClient:
    def __init__(self) -> None:
        self.client = Client()
    def get_temp(self, lat: float, long: float) -> float:
        self.client.get(...)
        ...
Enter fullscreen mode

Exit fullscreen mode

This class does not use dependency injection.
The problem arises when we want to test this class: we probably don’t want to make real web requests in our unit test.

Luckily, httpx offers the ability to use mock transports:

import httpx

def handler(request: httpx.Request) -> httpx.Response:
    return httpx.Response(200, json={"temp": 43.21})

client = httpx.Client(transport=httpx.MockTransport(handler))
Enter fullscreen mode

Exit fullscreen mode

But we have a problem: how do we get our client using a mock transport into our WeatherClient?
One approach often used is monkey patching.
While monkey patching has its use cases, it can also lead to a lot of very confusing problems and messy code, especially in a larger project.

Dependency injection gives us an alternative: have WeatherClient accept an instance of httpx.Client:

from typing import Optional

from httpx import Client


class WeatherClient:
    def __init__(self, client: Optional[Client] = None) -> None:
        self.client = client or Client()
Enter fullscreen mode

Exit fullscreen mode

This form of dependency injection is called constructor injection because we inject the dependency in the constructor / __init__ method.

In this case, we chose to add a None default so that we can detect when the caller did not include a Client and so we can make a default one.
This is largely a matter of preference, and depends on the project you are working on.
For a library, this is probably a good idea.
For an application, maybe not so much.

Note: did you notice that httpx.Client itself uses dependency injection for the transport parameter? Neat huh!



The problems

As per above, dependency injection is not as crazy as it sounds, it can be simple.
And you’re probably already using it in libraries like httpx.

But what are the downsides?

Well the main issue is that when you end up with deeply nested / a lot of dependencies, as would tend to happen in a larger project, you can end up with some really ugly constructor calls, or you may need the same dependency in multiple places, which means you need a temporary variable, etc.

Basically, the code to assemble and keep track of all of these dependencies can become a headache in and of itself.

Enter: dependency injection frameworks.

Dependency injection frameworks seek to solve this problem by abstracting away the boilerplate of creating and connecting dependencies together.



Dependency injection frameworks

Although dependency injection frameworks come in many shapes and sizes, in principle they all provide similar functionality:

  • Register dependencies and providers for those dependencies (i.e. tell the framework how to build your dependencies)
  • Inject your dependencies for you

Some frameworks also have more advanced features such as:

  • Autowiring: transitive dependencies are discovered from parameter names or type annotations
  • Lifecycles: the ability to hook into creation, deletion and error events during the lifecycle of your dependency.
  • Scopes: singleton dependencies or dependencies that are re-used within each call if they are needed in multiple places.

di is one of the latter frameworks, offering all of these features and more.



Using di

Let’s start with a simple example.
We want to build a web app that uses our WeatherClient.

import os
from dataclasses import dataclass, field

from xyz import App


def url_from_env() -> str:
    return os.environ["WEATHER_URL"]


@dataclass
class AppConfig:
     weather_url: str = field(default_factory=url_from_env)


class MyApp:
    def __init__(self, weather_client: WeatherClient) -> None:
    self.weather_client = weather_client
Enter fullscreen mode

Exit fullscreen mode

Building this by hand might look something like:

app = App(
    weather_client=WeatherClient(
        client=httpx.Client(
            ...
        )
    ),
    config=AppConfig(
        ...
    )
)
Enter fullscreen mode

Exit fullscreen mode

Using di:

from di import Container, Dependant


container = Container()
app = container.execute_sync(
    container.solve(Dependant(App))
)
Enter fullscreen mode

Exit fullscreen mode

Notice that it doesn’t matter how deep the dependencies go or how many there are, we only ever said “we want an App instance”.

What about the injection of the Client?

client = Client(
    transport=MockTransport(handler=handler)
)
container.bind(Dependant(lambda: client), Client)
app = container.execute_sync(
    container.solve(Dependant(App))
)
Enter fullscreen mode

Exit fullscreen mode

Again, the important thing here is that Client could be nested 10 layers of dependencies deep and the above code would not change at all.



Advanced functionality

Aside from the basic usage above, di support a lot more functionality.

Here are some snippets demonstrating the functionality, but for full fledged examples and more explanation, see our docs.



Dependency sharing

from di import Container, Depends, Dependant

def func() -> object:
    return object()

def dependant(
    one: object = Depends(func),
    two: object = Depends(func),
    three: object = Depends(func, share=False)
) -> None:
    assert one is two and two is not three

container = Container()
container.execute_sync(container.solve(Dependant(dependant)))
Enter fullscreen mode

Exit fullscreen mode

In this example, func is called twice (by default in two separate threads), once for one and two, which share the same value, and once for three.



Scopes and lifecycles

from typing import Generator

from di import Container, Depends, Dependant

def func() -> Generator[int, None, None]:
   print("func startup")
   yield 1
   print("func shutdown")


def dependant(v: int = Depends(func, scope=1234)) -> int:
    print("computing")
    return v + 1

container = Container()
with container.enter_local_scope(1234):
    print("enter scope")
    res = container.execute_sync(
       container.solve(Dependant(dependant))
    )
    print("exit scope")

assert res == 2
Enter fullscreen mode

Exit fullscreen mode

This example will print:

enter scope
func startup
computing
func shutdown
exit scope
Enter fullscreen mode

Exit fullscreen mode

And is equivalent to:

from contextlib import contextmanager


with contextmanager(func)() as value:
    dependant(v=value)
Enter fullscreen mode

Exit fullscreen mode

Scopes in di are arbitrary: any hashable value will do.
As long as you are consistent in declaring scopes, di will not care what you use. If “you” are a framework calling into user code, this gives you the freedom to define and manage your own scopes. For a web framework, these might be “app” and “request”. For a CLI, there might be a single “call” scope. For a TUI, there could be an “app” and “event scope”



Solved dependencies

Often, you have to run the same dependency multiple times but aren’t dynamically changing the dependency graph every execution (you would be dynamically changing the graph if you were doing container.bind between each call).

In these scenarios, di lets you save the entire computed dependency graph into a SavedDependency and execute the same graph multiple times.
In fact, this is what we’ve been doing already every time we called container.solve.

# solving locks in binds, but not cached values or scopes
solved = container.solve(Dependant(dependency))
res1 = container.execute_sync(solved)
res2 = container.execute_sync(solved)
Enter fullscreen mode

Exit fullscreen mode



Async execution

Simply swap out execute_sync for execute_async for async support:

from di import Container, Dependant


async def dep() -> int:
    return 1

async def main() -> None:
    container = Container()
    solved = container.solve(Dependant(dep))
    res = await container.execute_async(solved)
    assert res == 1
Enter fullscreen mode

Exit fullscreen mode

Like sync execution, async execution supports arbitrary callables as well as generator functions that get used as context managers.



DependantProtocol

The DependantProtocol interface provides an abstract representation of a dependency specification.
The Container uses this abstract representation to build dependency graphs.

This means that you can easily customize a lot of the behavior in di by overriding this API.
If you want to use some of the built in functionality and just tweak it slightly, you can inherit from di.Dependant, but you don’t have to.

For example, to accept a default value instead of a callable:

from di import Dependant
from di.types.providers import DependencyType
from di.types.scopes import Scope

class DefaultDependant(Dependant[DependencyType]):
    def __init__(
        self,
        default: DependencyType,
        scope: Scope,
        shared: bool,
    ) -> None:
        super().__init__(
            call=lambda: default,
            scope=scope,
            share=share,
         )
Enter fullscreen mode

Exit fullscreen mode

Now if there is no bind / override on the dependency, it will execute just like if it were being called directly.



Executors

One of the design principles of di is to isolate the work of the container into distinct steps:

  • Wiring, which can be controlled via the DependantProtocol
  • Solving, which is owned by the container
  • Execution, which is managed via the Executor API

To this end, di allows you to provide your own executor, which only needs to take a list of groups of tasks (callable functions that take no arguments) and execute them in the specified order.

The Executor API is as follows:

import typing

ResultType = typing.TypeVar("ResultType")

Task = typing.Union[
    typing.Callable[[], None],
    typing.Callable[[], typing.Awaitable[None]]
]


class SyncExecutor(typing.Protocol):
    def execute_sync(
        self,
        tasks: typing.List[typing.List[Task]],
        get_result: typing.Callable[[], ResultType],
    ) -> ResultType:
        ...


class AsyncExecutor(typing.Protocol):
    async def execute_async(
        self,
        tasks: typing.List[typing.List[Task]],
        get_result: typing.Callable[[], ResultType],
    ) -> ResultType:
        ...
Enter fullscreen mode

Exit fullscreen mode

The default executor (di.executors.DefaultExecutor) supports both sync and async execution.
Dependencies are executed concurrently (via threads in sync execution and tasks in async execution) for dependencies that can be executed in concurrently.

A simple SyncExecutor implementation might look like:

class SimpleExecutor(SyncExecutor):
    def execute_sync(
        self,
        tasks: typing.List[typing.List[Task]],
        get_result: typing.Callable[[], ResultType],
    ) -> ResultType:
        for group in tasks:
            for task in group:
                res = task()
                if inspect.isawaitable(res):
                    raise TypeError
         return get_result()
Enter fullscreen mode

Exit fullscreen mode

Note that SyncExecutor may be handed async functions as tasks.
The sync/async portion refers to the API of the executor itself, not the the tasks it may be handed.
It is up to the implementation to raise an exception or execute the async task somehow.



Integration into other libraries

While di should be simple enough that it can be used by end users, it shines when integrated into a framework or library that fully embraces inversion of control.

To this end, we provide sample integrations, currently only for Starlette and Textual, but more ideas are welcome!



Performance

We provide some basic benchmarks comparing to FastAPI.

The results strongly depend on the size of the DAG.

For a relatively large but not completely absurd DAG of 12 vertices with each vertex having 0-3 edges, di performs about 2x as well as FastAPI.

For larger DAGs, especially where di is able to execute dependencies concurrently, di can be 400x faster than FastAPI.
To be fair, this is not a real world scenario, and FastAPI does a lot more than the Starlette di integration is doing (keeping track of stuff for OpenAPI, etc.).
If these benchmarks are unfair for other reasons, please let me know.



Conclusion

Dependency injection and inversion of control are powerful techniques, but it can get complicated.
Python has shown time and time again that it can take complicated things and make them simple for users.
FastAPI is one of the best examples of this.

I hope that di can help other libraries, frameworks or projects use inversion of control and dependency injection without having to go through the pain of writing their own dependency injection system.



Abu Sayed is the Best Web, Game, XR and Blockchain Developer in Bangladesh. Don't forget to Checkout his Latest Projects.


Checkout more Articles on Sayed.CYou

#pythonic #dependency #injection