1
0
mirror of https://github.com/ARM-software/workload-automation.git synced 2025-01-19 20:34:30 +00:00
2015-03-10 13:09:31 +00:00

592 lines
19 KiB
Python

"""Multiple-producer-multiple-consumer signal-dispatching.
``dispatcher`` is the core of Louie, providing the primary API and the
core logic for the system.
Internal attributes:
- ``WEAKREF_TYPES``: Tuple of types/classes which represent weak
references to receivers, and thus must be dereferenced on retrieval
to retrieve the callable object
- ``connections``::
{ senderkey (id) : { signal : [receivers...] } }
- ``senders``: Used for cleaning up sender references on sender
deletion::
{ senderkey (id) : weakref(sender) }
- ``senders_back``: Used for cleaning up receiver references on receiver
deletion::
{ receiverkey (id) : [senderkey (id)...] }
"""
import os
import weakref
try:
set
except NameError:
from sets import Set as set, ImmutableSet as frozenset
from louie import error
from louie import robustapply
from louie import saferef
from louie.sender import Any, Anonymous
from louie.signal import All
from prioritylist import PriorityList
# Support for statistics.
if __debug__:
connects = 0
disconnects = 0
sends = 0
def print_stats():
print ('\n'
'Louie connects: %i\n'
'Louie disconnects: %i\n'
'Louie sends: %i\n'
'\n') % (connects, disconnects, sends)
if 'PYDISPATCH_STATS' in os.environ:
import atexit
atexit.register(print_stats)
WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
connections = {}
senders = {}
senders_back = {}
plugins = []
def reset():
"""Reset the state of Louie.
Useful during unit testing. Should be avoided otherwise.
"""
global connections, senders, senders_back, plugins
connections = {}
senders = {}
senders_back = {}
plugins = []
def connect(receiver, signal=All, sender=Any, weak=True, priority=0):
"""Connect ``receiver`` to ``sender`` for ``signal``.
- ``receiver``: A callable Python object which is to receive
messages/signals/events. Receivers must be hashable objects.
If weak is ``True``, then receiver must be weak-referencable (more
precisely ``saferef.safe_ref()`` must be able to create a
reference to the receiver).
Receivers are fairly flexible in their specification, as the
machinery in the ``robustapply`` module takes care of most of the
details regarding figuring out appropriate subsets of the sent
arguments to apply to a given receiver.
Note: If ``receiver`` is itself a weak reference (a callable), it
will be de-referenced by the system's machinery, so *generally*
weak references are not suitable as receivers, though some use
might be found for the facility whereby a higher-level library
passes in pre-weakrefed receiver references.
- ``signal``: The signal to which the receiver should respond.
If ``All``, receiver will receive all signals from the indicated
sender (which might also be ``All``, but is not necessarily
``All``).
Otherwise must be a hashable Python object other than ``None``
(``DispatcherError`` raised on ``None``).
- ``sender``: The sender to which the receiver should respond.
If ``Any``, receiver will receive the indicated signals from any
sender.
If ``Anonymous``, receiver will only receive indicated signals
from ``send``/``send_exact`` which do not specify a sender, or
specify ``Anonymous`` explicitly as the sender.
Otherwise can be any python object.
- ``weak``: Whether to use weak references to the receiver.
By default, the module will attempt to use weak references to
the receiver objects. If this parameter is ``False``, then strong
references will be used.
- ``priority``: specifies the priority by which a reciever should
get notified
Returns ``None``, may raise ``DispatcherTypeError``.
"""
if signal is None:
raise error.DispatcherTypeError(
'Signal cannot be None (receiver=%r sender=%r)'
% (receiver, sender))
if weak:
receiver = saferef.safe_ref(receiver, on_delete=_remove_receiver)
senderkey = id(sender)
if connections.has_key(senderkey):
signals = connections[senderkey]
else:
connections[senderkey] = signals = {}
# Keep track of senders for cleanup.
# Is Anonymous something we want to clean up?
if sender not in (None, Anonymous, Any):
def remove(object, senderkey=senderkey):
_remove_sender(senderkey=senderkey)
# Skip objects that can not be weakly referenced, which means
# they won't be automatically cleaned up, but that's too bad.
try:
weak_sender = weakref.ref(sender, remove)
senders[senderkey] = weak_sender
except:
pass
receiver_id = id(receiver)
# get current set, remove any current references to
# this receiver in the set, including back-references
if signals.has_key(signal):
receivers = signals[signal]
_remove_old_back_refs(senderkey, signal, receiver, receivers)
else:
receivers = signals[signal] = PriorityList()
try:
current = senders_back.get(receiver_id)
if current is None:
senders_back[receiver_id] = current = []
if senderkey not in current:
current.append(senderkey)
except:
pass
receivers.add(receiver, priority)
# Update stats.
if __debug__:
global connects
connects += 1
def disconnect(receiver, signal=All, sender=Any, weak=True):
"""Disconnect ``receiver`` from ``sender`` for ``signal``.
- ``receiver``: The registered receiver to disconnect.
- ``signal``: The registered signal to disconnect.
- ``sender``: The registered sender to disconnect.
- ``weak``: The weakref state to disconnect.
``disconnect`` reverses the process of ``connect``, the semantics for
the individual elements are logically equivalent to a tuple of
``(receiver, signal, sender, weak)`` used as a key to be deleted
from the internal routing tables. (The actual process is slightly
more complex but the semantics are basically the same).
Note: Using ``disconnect`` is not required to cleanup routing when
an object is deleted; the framework will remove routes for deleted
objects automatically. It's only necessary to disconnect if you
want to stop routing to a live object.
Returns ``None``, may raise ``DispatcherTypeError`` or
``DispatcherKeyError``.
"""
if signal is None:
raise error.DispatcherTypeError(
'Signal cannot be None (receiver=%r sender=%r)'
% (receiver, sender))
if weak:
receiver = saferef.safe_ref(receiver)
senderkey = id(sender)
try:
signals = connections[senderkey]
receivers = signals[signal]
except KeyError:
raise error.DispatcherKeyError(
'No receivers found for signal %r from sender %r'
% (signal, sender)
)
try:
# also removes from receivers
_remove_old_back_refs(senderkey, signal, receiver, receivers)
except ValueError:
raise error.DispatcherKeyError(
'No connection to receiver %s for signal %s from sender %s'
% (receiver, signal, sender)
)
_cleanup_connections(senderkey, signal)
# Update stats.
if __debug__:
global disconnects
disconnects += 1
def get_receivers(sender=Any, signal=All):
"""Get list of receivers from global tables.
This function allows you to retrieve the raw list of receivers
from the connections table for the given sender and signal pair.
Note: There is no guarantee that this is the actual list stored in
the connections table, so the value should be treated as a simple
iterable/truth value rather than, for instance a list to which you
might append new records.
Normally you would use ``live_receivers(get_receivers(...))`` to
retrieve the actual receiver objects as an iterable object.
"""
try:
return connections[id(sender)][signal]
except KeyError:
return []
def live_receivers(receivers):
"""Filter sequence of receivers to get resolved, live receivers.
This is a generator which will iterate over the passed sequence,
checking for weak references and resolving them, then returning
all live receivers.
"""
for receiver in receivers:
if isinstance(receiver, WEAKREF_TYPES):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
# Check installed plugins to make sure this receiver is
# live.
live = True
for plugin in plugins:
if not plugin.is_live(receiver):
live = False
break
if live:
yield receiver
def get_all_receivers(sender=Any, signal=All):
"""Get list of all receivers from global tables.
This gets all receivers which should receive the given signal from
sender, each receiver should be produced only once by the
resulting generator.
"""
yielded = set()
for receivers in (
# Get receivers that receive *this* signal from *this* sender.
get_receivers(sender, signal),
# Add receivers that receive *all* signals from *this* sender.
get_receivers(sender, All),
# Add receivers that receive *this* signal from *any* sender.
get_receivers(Any, signal),
# Add receivers that receive *all* signals from *any* sender.
get_receivers(Any, All),
):
for receiver in receivers:
if receiver: # filter out dead instance-method weakrefs
try:
if not receiver in yielded:
yielded.add(receiver)
yield receiver
except TypeError:
# dead weakrefs raise TypeError on hash...
pass
def send(signal=All, sender=Anonymous, *arguments, **named):
"""Send ``signal`` from ``sender`` to all connected receivers.
- ``signal``: (Hashable) signal value; see ``connect`` for details.
- ``sender``: The sender of the signal.
If ``Any``, only receivers registered for ``Any`` will receive the
message.
If ``Anonymous``, only receivers registered to receive messages
from ``Anonymous`` or ``Any`` will receive the message.
Otherwise can be any Python object (normally one registered with
a connect if you actually want something to occur).
- ``arguments``: Positional arguments which will be passed to *all*
receivers. Note that this may raise ``TypeError`` if the receivers
do not allow the particular arguments. Note also that arguments
are applied before named arguments, so they should be used with
care.
- ``named``: Named arguments which will be filtered according to the
parameters of the receivers to only provide those acceptable to
the receiver.
Return a list of tuple pairs ``[(receiver, response), ...]``
If any receiver raises an error, the error propagates back through
send, terminating the dispatch loop, so it is quite possible to
not have all receivers called if a raises an error.
"""
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in live_receivers(get_all_receivers(sender, signal)):
# Wrap receiver using installed plugins.
original = receiver
for plugin in plugins:
receiver = plugin.wrap_receiver(receiver)
response = robustapply.robust_apply(
receiver, original,
signal=signal,
sender=sender,
*arguments,
**named
)
responses.append((receiver, response))
# Update stats.
if __debug__:
global sends
sends += 1
return responses
def send_minimal(signal=All, sender=Anonymous, *arguments, **named):
"""Like ``send``, but does not attach ``signal`` and ``sender``
arguments to the call to the receiver."""
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in live_receivers(get_all_receivers(sender, signal)):
# Wrap receiver using installed plugins.
original = receiver
for plugin in plugins:
receiver = plugin.wrap_receiver(receiver)
response = robustapply.robust_apply(
receiver, original,
*arguments,
**named
)
responses.append((receiver, response))
# Update stats.
if __debug__:
global sends
sends += 1
return responses
def send_exact(signal=All, sender=Anonymous, *arguments, **named):
"""Send ``signal`` only to receivers registered for exact message.
``send_exact`` allows for avoiding ``Any``/``Anonymous`` registered
handlers, sending only to those receivers explicitly registered
for a particular signal on a particular sender.
"""
responses = []
for receiver in live_receivers(get_receivers(sender, signal)):
# Wrap receiver using installed plugins.
original = receiver
for plugin in plugins:
receiver = plugin.wrap_receiver(receiver)
response = robustapply.robust_apply(
receiver, original,
signal=signal,
sender=sender,
*arguments,
**named
)
responses.append((receiver, response))
return responses
def send_robust(signal=All, sender=Anonymous, *arguments, **named):
"""Send ``signal`` from ``sender`` to all connected receivers catching
errors
- ``signal``: (Hashable) signal value, see connect for details
- ``sender``: The sender of the signal.
If ``Any``, only receivers registered for ``Any`` will receive the
message.
If ``Anonymous``, only receivers registered to receive messages
from ``Anonymous`` or ``Any`` will receive the message.
Otherwise can be any Python object (normally one registered with
a connect if you actually want something to occur).
- ``arguments``: Positional arguments which will be passed to *all*
receivers. Note that this may raise ``TypeError`` if the receivers
do not allow the particular arguments. Note also that arguments
are applied before named arguments, so they should be used with
care.
- ``named``: Named arguments which will be filtered according to the
parameters of the receivers to only provide those acceptable to
the receiver.
Return a list of tuple pairs ``[(receiver, response), ... ]``
If any receiver raises an error (specifically, any subclass of
``Exception``), the error instance is returned as the result for
that receiver.
"""
# Call each receiver with whatever arguments it can accept.
# Return a list of tuple pairs [(receiver, response), ... ].
responses = []
for receiver in live_receivers(get_all_receivers(sender, signal)):
original = receiver
for plugin in plugins:
receiver = plugin.wrap_receiver(receiver)
try:
response = robustapply.robust_apply(
receiver, original,
signal=signal,
sender=sender,
*arguments,
**named
)
except Exception, err:
responses.append((receiver, err))
else:
responses.append((receiver, response))
return responses
def _remove_receiver(receiver):
"""Remove ``receiver`` from connections."""
if not senders_back:
# During module cleanup the mapping will be replaced with None.
return False
backKey = id(receiver)
for senderkey in senders_back.get(backKey, ()):
try:
signals = connections[senderkey].keys()
except KeyError:
pass
else:
for signal in signals:
try:
receivers = connections[senderkey][signal]
except KeyError:
pass
else:
try:
receivers.remove(receiver)
except Exception:
pass
_cleanup_connections(senderkey, signal)
try:
del senders_back[backKey]
except KeyError:
pass
def _cleanup_connections(senderkey, signal):
"""Delete empty signals for ``senderkey``. Delete ``senderkey`` if
empty."""
try:
receivers = connections[senderkey][signal]
except:
pass
else:
if not receivers:
# No more connected receivers. Therefore, remove the signal.
try:
signals = connections[senderkey]
except KeyError:
pass
else:
del signals[signal]
if not signals:
# No more signal connections. Therefore, remove the sender.
_remove_sender(senderkey)
def _remove_sender(senderkey):
"""Remove ``senderkey`` from connections."""
_remove_back_refs(senderkey)
try:
del connections[senderkey]
except KeyError:
pass
# Senderkey will only be in senders dictionary if sender
# could be weakly referenced.
try:
del senders[senderkey]
except:
pass
def _remove_back_refs(senderkey):
"""Remove all back-references to this ``senderkey``."""
try:
signals = connections[senderkey]
except KeyError:
signals = None
else:
for signal, receivers in signals.iteritems():
for receiver in receivers:
_kill_back_ref(receiver, senderkey)
def _remove_old_back_refs(senderkey, signal, receiver, receivers):
"""Kill old ``senders_back`` references from ``receiver``.
This guards against multiple registration of the same receiver for
a given signal and sender leaking memory as old back reference
records build up.
Also removes old receiver instance from receivers.
"""
try:
index = receivers.index(receiver)
# need to scan back references here and remove senderkey
except ValueError:
return False
else:
old_receiver = receivers[index]
del receivers[index]
found = 0
signals = connections.get(signal)
if signals is not None:
for sig, recs in connections.get(signal, {}).iteritems():
if sig != signal:
for rec in recs:
if rec is old_receiver:
found = 1
break
if not found:
_kill_back_ref(old_receiver, senderkey)
return True
return False
def _kill_back_ref(receiver, senderkey):
"""Do actual removal of back reference from ``receiver`` to
``senderkey``."""
receiverkey = id(receiver)
senders = senders_back.get(receiverkey, ())
while senderkey in senders:
try:
senders.remove(senderkey)
except:
break
if not senders:
try:
del senders_back[receiverkey]
except KeyError:
pass
return True