theta package#
Store#
- class Store(executor: ThreadPoolExecutor)#
Bases:
object
Store for arbitrary data with an executor for submitting callback tasks.
Register callbacks and create writers associated to a key; when a writer writes to the store, it will submit tasks to the executor to invoke all the callbacks for that key with the newly written value.
Retrieve a writer for some key with
store.writer(key)
.The callback functions provided must take a single positional argument, and will be called with the new value added to the store. Callback return values are available as futures from
Writer.write()
.from concurrent.futures import ThreadPoolExecutor from theta import Store executor = ThreadPoolExecutor() store = Store(executor) # q holds a queue of data, d holds the most recent value q = deque() d = {'value': None} # store a reference to the callback to remove later q_id = store.add_callback("data", q.append) # reference not needed since this callback won't be removed store.add_callback("data", lambda val: d.update(value=val)) writer = store.writer("data") for i in range(5): writer.write(i) store.remove_callback(q_id) writer.write(5) # value of 5 was only sent to the unnamed callback print("q", q) print("d", d)
deque([0, 1, 2, 3, 4]) {'value': 5}
- Parameters:
executor – Thread pool to submit callback tasks, used when data is added to the store.
- add_callback(key: Any, f: Callable[[Any], Any]) str #
Add a callback to execute on new values written under the key.
Returns a string identifier for the callback, which can be used to remove it with
remove_callback()
.- Parameters:
key – The key which links the callback to the writer. Can be any immutable type (same restriction as a dictionary key).
f – Callable executed as f(value) for each new value written to the store under this key. Return values are available via futures that the writer returns. Exceptions encountered while running are logged, then ignored.
- remove_callback(id: str) None #
Remove the specified callback.
- Raises:
KeyError – If there is no callback with the given id.
- class Writer(key: Any, submit: Callable[[Any], List[Future[Any]]])#
Bases:
object
A writer for a specific key in the Store.
Should be instantiated via
writer = store.writer(key)
.- Parameters:
key – The key for the writer.
submit – The callback function to submit a value to the store.
- write(value: Any) List[Future[Any]] #
Write a value to the store, executing all registered callbacks. Returns the futures from the submitted callbacks.
This method is non-blocking, however the futures to the callbacks are returned to wait on, if desired. For example:
writer = store.writer("key") fs = writer.write("value") concurrent.futures.wait(fs)
Queues#
- class EvictingQueue(size: Optional[int] = None)#
Bases:
Generic
[T
]Thread-safe FIFO queue with blocking gets and evicting, non-blocking puts.
Basic reads are as simple as using
get()
or iterating over the queue. Iteration will exhaust oncestop()
is called. Can also be used as a buffer withflush()
.from theta import EvictingQueue q = EvictingQueue(5) ... first_item = q.get() for item in q: process(item)
To break iteration on a timeout between gets, use
iter_timeout()
.for item in q.iter_timeout(1.5): # will end after 1.5 seconds has elapsed without a new value process(item) cleanup()
To handle timeouts and stopping separately, manually
get()
in a loop.while True: try: item = q.get(timeout=0.1) process(item) except QueueTimeout: handle_timeout() except QueueStopped: cleanup() break
Use as a buffer by flushing the queue on demand.
# some other thread adding items with `q.put()` while True: time.sleep(1) process_many(q.flush())
If size is None, the queue is unbounded.
- empty() bool #
Check if the queue is empty.
- flush() List[T] #
Consume and return all values currently in the queue.
- get(timeout: Optional[float] = None) T #
Blocking get with an optional timeout, in seconds. Pops the oldest item off the queue. Blocks indefinitely if the timeout is None.
- Raises:
QueueStopped – if the queue has been stopped.
QueueTimeout – if the timeout expires before an item is available.
- iter_timeout(timeout: Optional[float] = None) Iterator[T] #
Iterate over values as they become available in FIFO order. The iterator exhausts if the timeout (in seconds) expires before a new value is put on the queue, or if the queue is stopped. If the timeout is None, this method is equivalent to
__iter__()
.
- peek() T #
Peek at the newest value in the queue, without removing it from the queue.
- Raises:
IndexError – if the queue is empty.
- peekleft() T #
Peek at the oldest value in the queue, without removing it from the queue.
- Raises:
IndexError – if the queue is empty.
- put(value: T) None #
Put an item on the queue, evicting the oldest item if the queue is full. This method is non-blocking.
- stopped() bool #
Check if the queue has been stopped.
- class QueueStopped#
Bases:
Exception
Raised when a queue has been flagged to stop.
- class QueueTimeout#
Bases:
Exception
Raised when a timeout has expired for a blocking get.
Thread Utilities#
- class StoppableThread(*args: Any, stop_event: Optional[Event] = None, **kwargs: Any)#
Bases:
Thread
A thread which can be requested to stop running externally.
Stop requests are handled by a stop event which can be shared externally.
Note that stoppable threads require cooperation. Subclasses that implement the usual
run()
method for long-running tasks can check if the thread isrunning()
or can usewait()
for tasks that run on an interval.This class can be used like a normal thread with a target function to run. The stop event can be constructed externally and passed in, so the target function can utilize the event.
import logging import threading import time from theta import StoppableThread logging.basicConfig(level=logging.INFO) def sleeper(stop_event): t = 0 while not stop_event.wait(1): t += 1 logging.info("Slept for between %d and %d seconds", t, t + 1) event = threading.Event() thread = StoppableThread(target=sleeper, args=(event,), stop_event=event) thread.start() time.sleep(2.1) thread.stop() thread.join()
This class can also be subclassed. Methods are provided to interact with the stop event. Subclassing works the same as with a
threading.Thread
; implement therun()
function, which gets called onstart()
.class SleeperThread(StoppableThread): def run(self): t = 0 while not self.wait(1): t += 1 logging.info("Slept for between %d and %d seconds", t, t + 1) thread = SleeperThread() thread.start() time.sleep(2.1) thread.stop() thread.join()
- Parameters:
stop_event – Thread stopping event, which can be externally set. If None, creates a new
threading.Event
.
- running() bool #
Checks if the thread has not been requested to stop.
- stop() None #
Set the stop flag for the thread. Safe to call multiple times.
- stop_event: Event#
- stopped() bool #
Checks if the thread has been requested to stop.
- wait(interval: Optional[float]) bool #
Wait on the stop event for an interval (in seconds), waking up if set. If the interval is None, blocks indefinitely. Returns True if the flag was interrupted and set, otherwise False (i.e., a timeout occurred).