aiocoap.util.queuewithend module

This is a relic from before the __aiter__ protocol was established; it will be phased out before aiocoap 1.0 is released.

class aiocoap.util.queuewithend.AsyncIterable

Bases: object

can_peek()

Return True when a result is ready to be fetched with .get_nowait(), and False when no more items can be fetched.

get_nowait()

Fetch the next item. This must only be called once after can_peek has returned True.

class aiocoap.util.queuewithend.QueueWithEnd(maxsize=0)

Bases: aiocoap.util.queuewithend.AsyncIterable

A QueueWithEnd shares a Queue’s behavior in that it gets fed with put and consumed with get_nowait. Contrary to a Queue, this is designed to be consumed only by one entity, which uses the coroutine can_peek to make sure the get_nowait will succeed.

Another difference between a Queue and a QueueWithEnd is that the latter can also terminate (which is indicated by can_peek returning False and set by the finish coroutine) and raise exceptions (which raise from the get_nowait function and are set by the put_exception coroutine).

Type

alias of QueueWithEnd.Type

can_peek()
get_nowait()
put(value)
put_exception(value)
finish()
classmethod cogenerator(maxsize=0)

Coroutine decorator that passes a callable asyncyield into the function as the first argument and returns a QueueWithEnd. It is implicitly finished when the coroutine returns.

>>> @QueueWithEnd.cogenerator()
>>> def count_slowly(asyncyield, count_to=count_to):
...     for i in range(count_to):
...         yield from asyncio.sleep(1)
...         yield from asyncyield(i + 1)
>>> counter = count_slowly(10)
>>> while (yield from counter.can_peek()):
...     i = counter.get_nowait()
...     print("Current count is %d"%i)
classmethod merge(queues)

Asyncio’s as_completed does not work with QueueWithEnd objects for the same reason it can’t replace it (missing end-of-loop indication); the merge classmethod can be used instead to fetch results indiscriminately from queues as they are completed:

>>> @QueueWithEnd.cogenerator()
>>> def count(asyncyield):
...     for i in range(3):
...         yield from asyncyield(i + 1)
...         yield from time.sleep(0.1 * i)
>>> firstcount = count()
>>> secondcount = count()
>>> merged = QueueWithEnd.merged([firstcount, secondcount])
>>> while (yield from merged.can_peek()):
...     print(merged.get_nowait())
1
2
1
2
3
3
more()
value
consume()