databases, он какой-то кривой, или я кривой хз)) говорят в самой алхимии есть теперь асинхронность
@pytest_asyncio.fixture(scope="module") async def db_session(session_factory: sessionmaker, settings_app: AppSettings) -> AsyncSession: async with create_async_engine(settings_app.dsn, echo=False).connect() as connect: transaction = await connect.begin() session: AsyncSession = session_factory(bind=connect) await connect.begin_nested() @event.listens_for(session.sync_session, "after_transaction_end") def reopen_nested_transaction(session, transaction): if connect.closed: return if not connect.in_nested_transaction(): connect.sync_connection.begin_nested() yield session await transaction.rollback() await session.rollback() # await session.commit()
Обсуждают сегодня