issue_comments: 970655304
This data as json
html_url | issue_url | id | node_id | user | created_at | updated_at | author_association | body | reactions | issue | performed_via_github_app |
---|---|---|---|---|---|---|---|---|---|---|---|
https://github.com/simonw/datasette/issues/878#issuecomment-970655304 | https://api.github.com/repos/simonw/datasette/issues/878 | 970655304 | IC_kwDOBm6k_c452wZI | 9599 | 2021-11-16T20:32:16Z | 2021-11-16T20:32:16Z | OWNER | This code is really fiddly. I just got to this version: ```python import asyncio from functools import wraps import inspect try: import graphlib except ImportError: from . import vendored_graphlib as graphlib class AsyncMeta(type): def __new__(cls, name, bases, attrs): # Decorate any items that are 'async def' methods _registry = {} new_attrs = {"_registry": _registry} for key, value in attrs.items(): if inspect.iscoroutinefunction(value) and not value.__name__ == "resolve": new_attrs[key] = make_method(value) _registry[key] = new_attrs[key] else: new_attrs[key] = value # Gather graph for later dependency resolution graph = { key: { p for p in inspect.signature(method).parameters.keys() if p != "self" and not p.startswith("_") } for key, method in _registry.items() } new_attrs["_graph"] = graph return super().__new__(cls, name, bases, new_attrs) def make_method(method): @wraps(method) async def inner(self, _results=None, **kwargs): print("inner - _results=", _results) parameters = inspect.signature(method).parameters.keys() # Any parameters not provided by kwargs are resolved from registry to_resolve = [p for p in parameters if p not in kwargs and p != "self"] missing = [p for p in to_resolve if p not in self._registry] assert ( not missing ), "The following DI parameters could not be found in the registry: {}".format( missing ) results = {} results.update(kwargs) if to_resolve: resolved_parameters = await self.resolve(to_resolve, _results) results.update(resolved_parameters) return_value = await method(self, **results) if _results is not None: _results[method.__name__] = return_value return return_value return inner class AsyncBase(metaclass=AsyncMeta): async def resolve(self, names, results=None): print("\n resolve: ", names) if results is None: results = {} # Resolve them in the correct order ts = graphlib.TopologicalSorter() for name in names: ts.add(name, *self._graph[name]) ts.prepare() async def resolve_nodes(nodes): print(" resolve_nodes", nodes) print(" (current results = {})".format(repr(results))) awaitables = [ self._registry[name]( self, _results=results, **{k: v for k, v in results.items() if k in self._graph[name]}, ) for name in nodes if name not in results ] print(" awaitables: ", awaitables) awaitable_results = await asyncio.gather(*awaitables) results.update( {p[0].__name__: p[1] for p in zip(awaitables, awaitable_results)} ) if not ts.is_active(): # Nothing has dependencies - just resolve directly print(" no dependencies, resolve directly") await resolve_nodes(names) else: # Resolve in topological order while ts.is_active(): nodes = ts.get_ready() print(" ts.get_ready() returned nodes:", nodes) await resolve_nodes(nodes) for node in nodes: ts.done(node) print(" End of resolve(), returning", results) return {key: value for key, value in results.items() if key in names} ``` With this test: ```python class Complex(AsyncBase): def __init__(self): self.log = [] async def c(self): print("LOG: c") self.log.append("c") async def b(self, c): print("LOG: b") self.log.append("b") async def a(self, b, c): print("LOG: a") self.log.append("a") async def go(self, a): print("LOG: go") self.log.append("go") return self.log @pytest.mark.asyncio async def test_complex(): result = await Complex().go() # 'c' should only be called once assert result == ["c", "b", "a", "go"] ``` This test sometimes passes, and sometimes fails! Output for a pass: ``` tests/test_asyncdi.py inner - _results= None resolve: ['a'] ts.get_ready() returned nodes: ('c', 'b') resolve_nodes ('c', 'b') (current results = {}) awaitables: [<coroutine object Complex.c at 0x1074ac890>, <coroutine object Complex.b at 0x1074ac820>] inner - _results= {} LOG: c inner - _results= {'c': None} resolve: ['c'] ts.get_ready() returned nodes: ('c',) resolve_nodes ('c',) (current results = {'c': None}) awaitables: [] End of resolve(), returning {'c': None} LOG: b ts.get_ready() returned nodes: ('a',) resolve_nodes ('a',) (current results = {'c': None, 'b': None}) awaitables: [<coroutine object Complex.a at 0x1074ac7b0>] inner - _results= {'c': None, 'b': None} LOG: a End of resolve(), returning {'c': None, 'b': None, 'a': None} LOG: go ``` Output for a fail: ``` tests/test_asyncdi.py inner - _results= None resolve: ['a'] ts.get_ready() returned nodes: ('b', 'c') resolve_nodes ('b', 'c') (current results = {}) awaitables: [<coroutine object Complex.b at 0x10923c890>, <coroutine object Complex.c at 0x10923c820>] inner - _results= {} resolve: ['c'] ts.get_ready() returned nodes: ('c',) resolve_nodes ('c',) (current results = {}) awaitables: [<coroutine object Complex.c at 0x10923c6d0>] inner - _results= {} LOG: c inner - _results= {'c': None} LOG: c End of resolve(), returning {'c': None} LOG: b ts.get_ready() returned nodes: ('a',) resolve_nodes ('a',) (current results = {'c': None, 'b': None}) awaitables: [<coroutine object Complex.a at 0x10923c6d0>] inner - _results= {'c': None, 'b': None} LOG: a End of resolve(), returning {'c': None, 'b': None, 'a': None} LOG: go F =================================================================================================== FAILURES =================================================================================================== _________________________________________________________________________________________________ test_complex _________________________________________________________________________________________________ @pytest.mark.asyncio async def test_complex(): result = await Complex().go() # 'c' should only be called once > assert result == ["c", "b", "a", "go"] E AssertionError: assert ['c', 'c', 'b', 'a', 'go'] == ['c', 'b', 'a', 'go'] E At index 1 diff: 'c' != 'b' E Left contains one more item: 'go' E Use -v to get the full diff tests/test_asyncdi.py:48: AssertionError ================== short test summary info ================================ FAILED tests/test_asyncdi.py::test_complex - AssertionError: assert ['c', 'c', 'b', 'a', 'go'] == ['c', 'b', 'a', 'go'] ``` I figured out why this is happening. `a` requires `b` and `c` `b` also requires `c` The code decides to run `b` and `c` in parallel. If `c` completes first, then when `b` runs it gets to use the already-calculated result for `c` - so it doesn't need to call `c` again. If `b` gets to that point before `c` does it also needs to call `c`. | {"total_count": 0, "+1": 0, "-1": 0, "laugh": 0, "hooray": 0, "confused": 0, "heart": 0, "rocket": 0, "eyes": 0} | 648435885 |