mirror of
https://github.com/ARM-software/workload-automation.git
synced 2025-01-19 12:24:32 +00:00
592 lines
19 KiB
Python
592 lines
19 KiB
Python
|
"""Multiple-producer-multiple-consumer signal-dispatching.
|
||
|
|
||
|
``dispatcher`` is the core of Louie, providing the primary API and the
|
||
|
core logic for the system.
|
||
|
|
||
|
Internal attributes:
|
||
|
|
||
|
- ``WEAKREF_TYPES``: Tuple of types/classes which represent weak
|
||
|
references to receivers, and thus must be dereferenced on retrieval
|
||
|
to retrieve the callable object
|
||
|
|
||
|
- ``connections``::
|
||
|
|
||
|
{ senderkey (id) : { signal : [receivers...] } }
|
||
|
|
||
|
- ``senders``: Used for cleaning up sender references on sender
|
||
|
deletion::
|
||
|
|
||
|
{ senderkey (id) : weakref(sender) }
|
||
|
|
||
|
- ``senders_back``: Used for cleaning up receiver references on receiver
|
||
|
deletion::
|
||
|
|
||
|
{ receiverkey (id) : [senderkey (id)...] }
|
||
|
"""
|
||
|
|
||
|
import os
|
||
|
import weakref
|
||
|
|
||
|
try:
|
||
|
set
|
||
|
except NameError:
|
||
|
from sets import Set as set, ImmutableSet as frozenset
|
||
|
|
||
|
from louie import error
|
||
|
from louie import robustapply
|
||
|
from louie import saferef
|
||
|
from louie.sender import Any, Anonymous
|
||
|
from louie.signal import All
|
||
|
from prioritylist import PriorityList
|
||
|
|
||
|
|
||
|
# Support for statistics.
|
||
|
if __debug__:
|
||
|
connects = 0
|
||
|
disconnects = 0
|
||
|
sends = 0
|
||
|
|
||
|
def print_stats():
|
||
|
print ('\n'
|
||
|
'Louie connects: %i\n'
|
||
|
'Louie disconnects: %i\n'
|
||
|
'Louie sends: %i\n'
|
||
|
'\n') % (connects, disconnects, sends)
|
||
|
|
||
|
if 'PYDISPATCH_STATS' in os.environ:
|
||
|
import atexit
|
||
|
atexit.register(print_stats)
|
||
|
|
||
|
|
||
|
|
||
|
WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
|
||
|
|
||
|
|
||
|
connections = {}
|
||
|
senders = {}
|
||
|
senders_back = {}
|
||
|
plugins = []
|
||
|
|
||
|
def reset():
|
||
|
"""Reset the state of Louie.
|
||
|
|
||
|
Useful during unit testing. Should be avoided otherwise.
|
||
|
"""
|
||
|
global connections, senders, senders_back, plugins
|
||
|
connections = {}
|
||
|
senders = {}
|
||
|
senders_back = {}
|
||
|
plugins = []
|
||
|
|
||
|
|
||
|
def connect(receiver, signal=All, sender=Any, weak=True, priority=0):
|
||
|
"""Connect ``receiver`` to ``sender`` for ``signal``.
|
||
|
|
||
|
- ``receiver``: A callable Python object which is to receive
|
||
|
messages/signals/events. Receivers must be hashable objects.
|
||
|
|
||
|
If weak is ``True``, then receiver must be weak-referencable (more
|
||
|
precisely ``saferef.safe_ref()`` must be able to create a
|
||
|
reference to the receiver).
|
||
|
|
||
|
Receivers are fairly flexible in their specification, as the
|
||
|
machinery in the ``robustapply`` module takes care of most of the
|
||
|
details regarding figuring out appropriate subsets of the sent
|
||
|
arguments to apply to a given receiver.
|
||
|
|
||
|
Note: If ``receiver`` is itself a weak reference (a callable), it
|
||
|
will be de-referenced by the system's machinery, so *generally*
|
||
|
weak references are not suitable as receivers, though some use
|
||
|
might be found for the facility whereby a higher-level library
|
||
|
passes in pre-weakrefed receiver references.
|
||
|
|
||
|
- ``signal``: The signal to which the receiver should respond.
|
||
|
|
||
|
If ``All``, receiver will receive all signals from the indicated
|
||
|
sender (which might also be ``All``, but is not necessarily
|
||
|
``All``).
|
||
|
|
||
|
Otherwise must be a hashable Python object other than ``None``
|
||
|
(``DispatcherError`` raised on ``None``).
|
||
|
|
||
|
- ``sender``: The sender to which the receiver should respond.
|
||
|
|
||
|
If ``Any``, receiver will receive the indicated signals from any
|
||
|
sender.
|
||
|
|
||
|
If ``Anonymous``, receiver will only receive indicated signals
|
||
|
from ``send``/``send_exact`` which do not specify a sender, or
|
||
|
specify ``Anonymous`` explicitly as the sender.
|
||
|
|
||
|
Otherwise can be any python object.
|
||
|
|
||
|
- ``weak``: Whether to use weak references to the receiver.
|
||
|
|
||
|
By default, the module will attempt to use weak references to
|
||
|
the receiver objects. If this parameter is ``False``, then strong
|
||
|
references will be used.
|
||
|
|
||
|
- ``priority``: specifies the priority by which a reciever should
|
||
|
get notified
|
||
|
|
||
|
Returns ``None``, may raise ``DispatcherTypeError``.
|
||
|
"""
|
||
|
if signal is None:
|
||
|
raise error.DispatcherTypeError(
|
||
|
'Signal cannot be None (receiver=%r sender=%r)'
|
||
|
% (receiver, sender))
|
||
|
if weak:
|
||
|
receiver = saferef.safe_ref(receiver, on_delete=_remove_receiver)
|
||
|
senderkey = id(sender)
|
||
|
if connections.has_key(senderkey):
|
||
|
signals = connections[senderkey]
|
||
|
else:
|
||
|
connections[senderkey] = signals = {}
|
||
|
# Keep track of senders for cleanup.
|
||
|
# Is Anonymous something we want to clean up?
|
||
|
if sender not in (None, Anonymous, Any):
|
||
|
def remove(object, senderkey=senderkey):
|
||
|
_remove_sender(senderkey=senderkey)
|
||
|
# Skip objects that can not be weakly referenced, which means
|
||
|
# they won't be automatically cleaned up, but that's too bad.
|
||
|
try:
|
||
|
weak_sender = weakref.ref(sender, remove)
|
||
|
senders[senderkey] = weak_sender
|
||
|
except:
|
||
|
pass
|
||
|
receiver_id = id(receiver)
|
||
|
# get current set, remove any current references to
|
||
|
# this receiver in the set, including back-references
|
||
|
if signals.has_key(signal):
|
||
|
receivers = signals[signal]
|
||
|
_remove_old_back_refs(senderkey, signal, receiver, receivers)
|
||
|
else:
|
||
|
receivers = signals[signal] = PriorityList()
|
||
|
try:
|
||
|
current = senders_back.get(receiver_id)
|
||
|
if current is None:
|
||
|
senders_back[receiver_id] = current = []
|
||
|
if senderkey not in current:
|
||
|
current.append(senderkey)
|
||
|
except:
|
||
|
pass
|
||
|
receivers.add(receiver, priority)
|
||
|
# Update stats.
|
||
|
if __debug__:
|
||
|
global connects
|
||
|
connects += 1
|
||
|
|
||
|
|
||
|
def disconnect(receiver, signal=All, sender=Any, weak=True):
|
||
|
"""Disconnect ``receiver`` from ``sender`` for ``signal``.
|
||
|
|
||
|
- ``receiver``: The registered receiver to disconnect.
|
||
|
|
||
|
- ``signal``: The registered signal to disconnect.
|
||
|
|
||
|
- ``sender``: The registered sender to disconnect.
|
||
|
|
||
|
- ``weak``: The weakref state to disconnect.
|
||
|
|
||
|
``disconnect`` reverses the process of ``connect``, the semantics for
|
||
|
the individual elements are logically equivalent to a tuple of
|
||
|
``(receiver, signal, sender, weak)`` used as a key to be deleted
|
||
|
from the internal routing tables. (The actual process is slightly
|
||
|
more complex but the semantics are basically the same).
|
||
|
|
||
|
Note: Using ``disconnect`` is not required to cleanup routing when
|
||
|
an object is deleted; the framework will remove routes for deleted
|
||
|
objects automatically. It's only necessary to disconnect if you
|
||
|
want to stop routing to a live object.
|
||
|
|
||
|
Returns ``None``, may raise ``DispatcherTypeError`` or
|
||
|
``DispatcherKeyError``.
|
||
|
"""
|
||
|
if signal is None:
|
||
|
raise error.DispatcherTypeError(
|
||
|
'Signal cannot be None (receiver=%r sender=%r)'
|
||
|
% (receiver, sender))
|
||
|
if weak:
|
||
|
receiver = saferef.safe_ref(receiver)
|
||
|
senderkey = id(sender)
|
||
|
try:
|
||
|
signals = connections[senderkey]
|
||
|
receivers = signals[signal]
|
||
|
except KeyError:
|
||
|
raise error.DispatcherKeyError(
|
||
|
'No receivers found for signal %r from sender %r'
|
||
|
% (signal, sender)
|
||
|
)
|
||
|
try:
|
||
|
# also removes from receivers
|
||
|
_remove_old_back_refs(senderkey, signal, receiver, receivers)
|
||
|
except ValueError:
|
||
|
raise error.DispatcherKeyError(
|
||
|
'No connection to receiver %s for signal %s from sender %s'
|
||
|
% (receiver, signal, sender)
|
||
|
)
|
||
|
_cleanup_connections(senderkey, signal)
|
||
|
# Update stats.
|
||
|
if __debug__:
|
||
|
global disconnects
|
||
|
disconnects += 1
|
||
|
|
||
|
|
||
|
def get_receivers(sender=Any, signal=All):
|
||
|
"""Get list of receivers from global tables.
|
||
|
|
||
|
This function allows you to retrieve the raw list of receivers
|
||
|
from the connections table for the given sender and signal pair.
|
||
|
|
||
|
Note: There is no guarantee that this is the actual list stored in
|
||
|
the connections table, so the value should be treated as a simple
|
||
|
iterable/truth value rather than, for instance a list to which you
|
||
|
might append new records.
|
||
|
|
||
|
Normally you would use ``live_receivers(get_receivers(...))`` to
|
||
|
retrieve the actual receiver objects as an iterable object.
|
||
|
"""
|
||
|
try:
|
||
|
return connections[id(sender)][signal]
|
||
|
except KeyError:
|
||
|
return []
|
||
|
|
||
|
|
||
|
def live_receivers(receivers):
|
||
|
"""Filter sequence of receivers to get resolved, live receivers.
|
||
|
|
||
|
This is a generator which will iterate over the passed sequence,
|
||
|
checking for weak references and resolving them, then returning
|
||
|
all live receivers.
|
||
|
"""
|
||
|
for receiver in receivers:
|
||
|
if isinstance(receiver, WEAKREF_TYPES):
|
||
|
# Dereference the weak reference.
|
||
|
receiver = receiver()
|
||
|
if receiver is not None:
|
||
|
# Check installed plugins to make sure this receiver is
|
||
|
# live.
|
||
|
live = True
|
||
|
for plugin in plugins:
|
||
|
if not plugin.is_live(receiver):
|
||
|
live = False
|
||
|
break
|
||
|
if live:
|
||
|
yield receiver
|
||
|
|
||
|
|
||
|
def get_all_receivers(sender=Any, signal=All):
|
||
|
"""Get list of all receivers from global tables.
|
||
|
|
||
|
This gets all receivers which should receive the given signal from
|
||
|
sender, each receiver should be produced only once by the
|
||
|
resulting generator.
|
||
|
"""
|
||
|
yielded = set()
|
||
|
for receivers in (
|
||
|
# Get receivers that receive *this* signal from *this* sender.
|
||
|
get_receivers(sender, signal),
|
||
|
# Add receivers that receive *all* signals from *this* sender.
|
||
|
get_receivers(sender, All),
|
||
|
# Add receivers that receive *this* signal from *any* sender.
|
||
|
get_receivers(Any, signal),
|
||
|
# Add receivers that receive *all* signals from *any* sender.
|
||
|
get_receivers(Any, All),
|
||
|
):
|
||
|
for receiver in receivers:
|
||
|
if receiver: # filter out dead instance-method weakrefs
|
||
|
try:
|
||
|
if not receiver in yielded:
|
||
|
yielded.add(receiver)
|
||
|
yield receiver
|
||
|
except TypeError:
|
||
|
# dead weakrefs raise TypeError on hash...
|
||
|
pass
|
||
|
|
||
|
|
||
|
def send(signal=All, sender=Anonymous, *arguments, **named):
|
||
|
"""Send ``signal`` from ``sender`` to all connected receivers.
|
||
|
|
||
|
- ``signal``: (Hashable) signal value; see ``connect`` for details.
|
||
|
|
||
|
- ``sender``: The sender of the signal.
|
||
|
|
||
|
If ``Any``, only receivers registered for ``Any`` will receive the
|
||
|
message.
|
||
|
|
||
|
If ``Anonymous``, only receivers registered to receive messages
|
||
|
from ``Anonymous`` or ``Any`` will receive the message.
|
||
|
|
||
|
Otherwise can be any Python object (normally one registered with
|
||
|
a connect if you actually want something to occur).
|
||
|
|
||
|
- ``arguments``: Positional arguments which will be passed to *all*
|
||
|
receivers. Note that this may raise ``TypeError`` if the receivers
|
||
|
do not allow the particular arguments. Note also that arguments
|
||
|
are applied before named arguments, so they should be used with
|
||
|
care.
|
||
|
|
||
|
- ``named``: Named arguments which will be filtered according to the
|
||
|
parameters of the receivers to only provide those acceptable to
|
||
|
the receiver.
|
||
|
|
||
|
Return a list of tuple pairs ``[(receiver, response), ...]``
|
||
|
|
||
|
If any receiver raises an error, the error propagates back through
|
||
|
send, terminating the dispatch loop, so it is quite possible to
|
||
|
not have all receivers called if a raises an error.
|
||
|
"""
|
||
|
# Call each receiver with whatever arguments it can accept.
|
||
|
# Return a list of tuple pairs [(receiver, response), ... ].
|
||
|
responses = []
|
||
|
for receiver in live_receivers(get_all_receivers(sender, signal)):
|
||
|
# Wrap receiver using installed plugins.
|
||
|
original = receiver
|
||
|
for plugin in plugins:
|
||
|
receiver = plugin.wrap_receiver(receiver)
|
||
|
response = robustapply.robust_apply(
|
||
|
receiver, original,
|
||
|
signal=signal,
|
||
|
sender=sender,
|
||
|
*arguments,
|
||
|
**named
|
||
|
)
|
||
|
responses.append((receiver, response))
|
||
|
# Update stats.
|
||
|
if __debug__:
|
||
|
global sends
|
||
|
sends += 1
|
||
|
return responses
|
||
|
|
||
|
|
||
|
def send_minimal(signal=All, sender=Anonymous, *arguments, **named):
|
||
|
"""Like ``send``, but does not attach ``signal`` and ``sender``
|
||
|
arguments to the call to the receiver."""
|
||
|
# Call each receiver with whatever arguments it can accept.
|
||
|
# Return a list of tuple pairs [(receiver, response), ... ].
|
||
|
responses = []
|
||
|
for receiver in live_receivers(get_all_receivers(sender, signal)):
|
||
|
# Wrap receiver using installed plugins.
|
||
|
original = receiver
|
||
|
for plugin in plugins:
|
||
|
receiver = plugin.wrap_receiver(receiver)
|
||
|
response = robustapply.robust_apply(
|
||
|
receiver, original,
|
||
|
*arguments,
|
||
|
**named
|
||
|
)
|
||
|
responses.append((receiver, response))
|
||
|
# Update stats.
|
||
|
if __debug__:
|
||
|
global sends
|
||
|
sends += 1
|
||
|
return responses
|
||
|
|
||
|
|
||
|
def send_exact(signal=All, sender=Anonymous, *arguments, **named):
|
||
|
"""Send ``signal`` only to receivers registered for exact message.
|
||
|
|
||
|
``send_exact`` allows for avoiding ``Any``/``Anonymous`` registered
|
||
|
handlers, sending only to those receivers explicitly registered
|
||
|
for a particular signal on a particular sender.
|
||
|
"""
|
||
|
responses = []
|
||
|
for receiver in live_receivers(get_receivers(sender, signal)):
|
||
|
# Wrap receiver using installed plugins.
|
||
|
original = receiver
|
||
|
for plugin in plugins:
|
||
|
receiver = plugin.wrap_receiver(receiver)
|
||
|
response = robustapply.robust_apply(
|
||
|
receiver, original,
|
||
|
signal=signal,
|
||
|
sender=sender,
|
||
|
*arguments,
|
||
|
**named
|
||
|
)
|
||
|
responses.append((receiver, response))
|
||
|
return responses
|
||
|
|
||
|
|
||
|
def send_robust(signal=All, sender=Anonymous, *arguments, **named):
|
||
|
"""Send ``signal`` from ``sender`` to all connected receivers catching
|
||
|
errors
|
||
|
|
||
|
- ``signal``: (Hashable) signal value, see connect for details
|
||
|
|
||
|
- ``sender``: The sender of the signal.
|
||
|
|
||
|
If ``Any``, only receivers registered for ``Any`` will receive the
|
||
|
message.
|
||
|
|
||
|
If ``Anonymous``, only receivers registered to receive messages
|
||
|
from ``Anonymous`` or ``Any`` will receive the message.
|
||
|
|
||
|
Otherwise can be any Python object (normally one registered with
|
||
|
a connect if you actually want something to occur).
|
||
|
|
||
|
- ``arguments``: Positional arguments which will be passed to *all*
|
||
|
receivers. Note that this may raise ``TypeError`` if the receivers
|
||
|
do not allow the particular arguments. Note also that arguments
|
||
|
are applied before named arguments, so they should be used with
|
||
|
care.
|
||
|
|
||
|
- ``named``: Named arguments which will be filtered according to the
|
||
|
parameters of the receivers to only provide those acceptable to
|
||
|
the receiver.
|
||
|
|
||
|
Return a list of tuple pairs ``[(receiver, response), ... ]``
|
||
|
|
||
|
If any receiver raises an error (specifically, any subclass of
|
||
|
``Exception``), the error instance is returned as the result for
|
||
|
that receiver.
|
||
|
"""
|
||
|
# Call each receiver with whatever arguments it can accept.
|
||
|
# Return a list of tuple pairs [(receiver, response), ... ].
|
||
|
responses = []
|
||
|
for receiver in live_receivers(get_all_receivers(sender, signal)):
|
||
|
original = receiver
|
||
|
for plugin in plugins:
|
||
|
receiver = plugin.wrap_receiver(receiver)
|
||
|
try:
|
||
|
response = robustapply.robust_apply(
|
||
|
receiver, original,
|
||
|
signal=signal,
|
||
|
sender=sender,
|
||
|
*arguments,
|
||
|
**named
|
||
|
)
|
||
|
except Exception, err:
|
||
|
responses.append((receiver, err))
|
||
|
else:
|
||
|
responses.append((receiver, response))
|
||
|
return responses
|
||
|
|
||
|
|
||
|
def _remove_receiver(receiver):
|
||
|
"""Remove ``receiver`` from connections."""
|
||
|
if not senders_back:
|
||
|
# During module cleanup the mapping will be replaced with None.
|
||
|
return False
|
||
|
backKey = id(receiver)
|
||
|
for senderkey in senders_back.get(backKey, ()):
|
||
|
try:
|
||
|
signals = connections[senderkey].keys()
|
||
|
except KeyError:
|
||
|
pass
|
||
|
else:
|
||
|
for signal in signals:
|
||
|
try:
|
||
|
receivers = connections[senderkey][signal]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
else:
|
||
|
try:
|
||
|
receivers.remove(receiver)
|
||
|
except Exception:
|
||
|
pass
|
||
|
_cleanup_connections(senderkey, signal)
|
||
|
try:
|
||
|
del senders_back[backKey]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def _cleanup_connections(senderkey, signal):
|
||
|
"""Delete empty signals for ``senderkey``. Delete ``senderkey`` if
|
||
|
empty."""
|
||
|
try:
|
||
|
receivers = connections[senderkey][signal]
|
||
|
except:
|
||
|
pass
|
||
|
else:
|
||
|
if not receivers:
|
||
|
# No more connected receivers. Therefore, remove the signal.
|
||
|
try:
|
||
|
signals = connections[senderkey]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
else:
|
||
|
del signals[signal]
|
||
|
if not signals:
|
||
|
# No more signal connections. Therefore, remove the sender.
|
||
|
_remove_sender(senderkey)
|
||
|
|
||
|
|
||
|
def _remove_sender(senderkey):
|
||
|
"""Remove ``senderkey`` from connections."""
|
||
|
_remove_back_refs(senderkey)
|
||
|
try:
|
||
|
del connections[senderkey]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
# Senderkey will only be in senders dictionary if sender
|
||
|
# could be weakly referenced.
|
||
|
try:
|
||
|
del senders[senderkey]
|
||
|
except:
|
||
|
pass
|
||
|
|
||
|
|
||
|
def _remove_back_refs(senderkey):
|
||
|
"""Remove all back-references to this ``senderkey``."""
|
||
|
try:
|
||
|
signals = connections[senderkey]
|
||
|
except KeyError:
|
||
|
signals = None
|
||
|
else:
|
||
|
for signal, receivers in signals.iteritems():
|
||
|
for receiver in receivers:
|
||
|
_kill_back_ref(receiver, senderkey)
|
||
|
|
||
|
|
||
|
def _remove_old_back_refs(senderkey, signal, receiver, receivers):
|
||
|
"""Kill old ``senders_back`` references from ``receiver``.
|
||
|
|
||
|
This guards against multiple registration of the same receiver for
|
||
|
a given signal and sender leaking memory as old back reference
|
||
|
records build up.
|
||
|
|
||
|
Also removes old receiver instance from receivers.
|
||
|
"""
|
||
|
try:
|
||
|
index = receivers.index(receiver)
|
||
|
# need to scan back references here and remove senderkey
|
||
|
except ValueError:
|
||
|
return False
|
||
|
else:
|
||
|
old_receiver = receivers[index]
|
||
|
del receivers[index]
|
||
|
found = 0
|
||
|
signals = connections.get(signal)
|
||
|
if signals is not None:
|
||
|
for sig, recs in connections.get(signal, {}).iteritems():
|
||
|
if sig != signal:
|
||
|
for rec in recs:
|
||
|
if rec is old_receiver:
|
||
|
found = 1
|
||
|
break
|
||
|
if not found:
|
||
|
_kill_back_ref(old_receiver, senderkey)
|
||
|
return True
|
||
|
return False
|
||
|
|
||
|
|
||
|
def _kill_back_ref(receiver, senderkey):
|
||
|
"""Do actual removal of back reference from ``receiver`` to
|
||
|
``senderkey``."""
|
||
|
receiverkey = id(receiver)
|
||
|
senders = senders_back.get(receiverkey, ())
|
||
|
while senderkey in senders:
|
||
|
try:
|
||
|
senders.remove(senderkey)
|
||
|
except:
|
||
|
break
|
||
|
if not senders:
|
||
|
try:
|
||
|
del senders_back[receiverkey]
|
||
|
except KeyError:
|
||
|
pass
|
||
|
return True
|
||
|
|
||
|
|