theta package#

Store#

class Store(executor: ThreadPoolExecutor)#

Bases: object

Store for arbitrary data with an executor for submitting callback tasks.

Register callbacks and create writers associated to a key; when a writer writes to the store, it will submit tasks to the executor to invoke all the callbacks for that key with the newly written value.

Retrieve a writer for some key with store.writer(key).

The callback functions provided must take a single positional argument, and will be called with the new value added to the store. Callback return values are available as futures from Writer.write().

from concurrent.futures import ThreadPoolExecutor
from theta import Store

executor = ThreadPoolExecutor()
store = Store(executor)

# q holds a queue of data, d holds the most recent value
q = deque()
d = {'value': None}

# store a reference to the callback to remove later
q_id = store.add_callback("data", q.append)

# reference not needed since this callback won't be removed
store.add_callback("data", lambda val: d.update(value=val))

writer = store.writer("data")
for i in range(5):
    writer.write(i)

store.remove_callback(q_id)
writer.write(5)  # value of 5 was only sent to the unnamed callback

print("q", q)
print("d", d)
deque([0, 1, 2, 3, 4])
{'value': 5}
Parameters:

executor – Thread pool to submit callback tasks, used when data is added to the store.

add_callback(key: Any, f: Callable[[Any], Any]) str#

Add a callback to execute on new values written under the key.

Returns a string identifier for the callback, which can be used to remove it with remove_callback().

Parameters:
  • key – The key which links the callback to the writer. Can be any immutable type (same restriction as a dictionary key).

  • f – Callable executed as f(value) for each new value written to the store under this key. Return values are available via futures that the writer returns. Exceptions encountered while running are logged, then ignored.

remove_callback(id: str) None#

Remove the specified callback.

Raises:

KeyError – If there is no callback with the given id.

writer(key: Any) Writer#

Creates or retrieves a writer for the specified key.

class Writer(key: Any, submit: Callable[[Any], List[Future[Any]]])#

Bases: object

A writer for a specific key in the Store.

Should be instantiated via writer = store.writer(key).

Parameters:
  • key – The key for the writer.

  • submit – The callback function to submit a value to the store.

write(value: Any) List[Future[Any]]#

Write a value to the store, executing all registered callbacks. Returns the futures from the submitted callbacks.

This method is non-blocking, however the futures to the callbacks are returned to wait on, if desired. For example:

writer = store.writer("key")
fs = writer.write("value")
concurrent.futures.wait(fs)

Queues#

class EvictingQueue(size: Optional[int] = None)#

Bases: Generic[T]

Thread-safe FIFO queue with blocking gets and evicting, non-blocking puts.

Basic reads are as simple as using get() or iterating over the queue. Iteration will exhaust once stop() is called. Can also be used as a buffer with flush().

from theta import EvictingQueue

q = EvictingQueue(5)
...

first_item = q.get()
for item in q:
    process(item)

To break iteration on a timeout between gets, use iter_timeout().

for item in q.iter_timeout(1.5):
    # will end after 1.5 seconds has elapsed without a new value
    process(item)
cleanup()

To handle timeouts and stopping separately, manually get() in a loop.

while True:
    try:
        item = q.get(timeout=0.1)
        process(item)
    except QueueTimeout:
        handle_timeout()
    except QueueStopped:
        cleanup()
        break

Use as a buffer by flushing the queue on demand.

# some other thread adding items with `q.put()`
while True:
    time.sleep(1)
    process_many(q.flush())

If size is None, the queue is unbounded.

empty() bool#

Check if the queue is empty.

flush() List[T]#

Consume and return all values currently in the queue.

get(timeout: Optional[float] = None) T#

Blocking get with an optional timeout, in seconds. Pops the oldest item off the queue. Blocks indefinitely if the timeout is None.

Raises:
iter_timeout(timeout: Optional[float] = None) Iterator[T]#

Iterate over values as they become available in FIFO order. The iterator exhausts if the timeout (in seconds) expires before a new value is put on the queue, or if the queue is stopped. If the timeout is None, this method is equivalent to __iter__().

peek() T#

Peek at the newest value in the queue, without removing it from the queue.

Raises:

IndexError – if the queue is empty.

peekleft() T#

Peek at the oldest value in the queue, without removing it from the queue.

Raises:

IndexError – if the queue is empty.

put(value: T) None#

Put an item on the queue, evicting the oldest item if the queue is full. This method is non-blocking.

stop() None#

Propogate a QueueStopped exception to all threads blocking on get().

stopped() bool#

Check if the queue has been stopped.

class QueueStopped#

Bases: Exception

Raised when a queue has been flagged to stop.

class QueueTimeout#

Bases: Exception

Raised when a timeout has expired for a blocking get.

Thread Utilities#

class StoppableThread(*args: Any, stop_event: Optional[Event] = None, **kwargs: Any)#

Bases: Thread

A thread which can be requested to stop running externally.

Stop requests are handled by a stop event which can be shared externally.

Note that stoppable threads require cooperation. Subclasses that implement the usual run() method for long-running tasks can check if the thread is running() or can use wait() for tasks that run on an interval.

This class can be used like a normal thread with a target function to run. The stop event can be constructed externally and passed in, so the target function can utilize the event.

import logging
import threading
import time
from theta import StoppableThread

logging.basicConfig(level=logging.INFO)

def sleeper(stop_event):
    t = 0
    while not stop_event.wait(1):
        t += 1
    logging.info("Slept for between %d and %d seconds", t, t + 1)

event = threading.Event()
thread = StoppableThread(target=sleeper, args=(event,), stop_event=event)
thread.start()
time.sleep(2.1)
thread.stop()
thread.join()

This class can also be subclassed. Methods are provided to interact with the stop event. Subclassing works the same as with a threading.Thread; implement the run() function, which gets called on start().

class SleeperThread(StoppableThread):
    def run(self):
        t = 0
        while not self.wait(1):
            t += 1
        logging.info("Slept for between %d and %d seconds", t, t + 1)

thread = SleeperThread()
thread.start()
time.sleep(2.1)
thread.stop()
thread.join()
Parameters:

stop_event – Thread stopping event, which can be externally set. If None, creates a new threading.Event.

running() bool#

Checks if the thread has not been requested to stop.

stop() None#

Set the stop flag for the thread. Safe to call multiple times.

stop_event: Event#
stopped() bool#

Checks if the thread has been requested to stop.

wait(interval: Optional[float]) bool#

Wait on the stop event for an interval (in seconds), waking up if set. If the interval is None, blocks indefinitely. Returns True if the flag was interrupted and set, otherwise False (i.e., a timeout occurred).