summaryrefslogtreecommitdiff
path: root/tests/twisted/dispatcher/dispatch-text.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/twisted/dispatcher/dispatch-text.py')
-rw-r--r--tests/twisted/dispatcher/dispatch-text.py542
1 files changed, 542 insertions, 0 deletions
diff --git a/tests/twisted/dispatcher/dispatch-text.py b/tests/twisted/dispatcher/dispatch-text.py
new file mode 100644
index 0000000..b1f6ce6
--- /dev/null
+++ b/tests/twisted/dispatcher/dispatch-text.py
@@ -0,0 +1,542 @@
+# Copyright (C) 2009 Nokia Corporation
+# Copyright (C) 2009 Collabora Ltd.
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+# 02110-1301 USA
+
+import dbus
+"""Regression test for dispatching an incoming Text channel.
+"""
+
+import dbus
+import dbus.bus
+import dbus.service
+
+from servicetest import EventPattern, tp_name_prefix, tp_path_prefix, \
+ call_async, sync_dbus
+from mctest import exec_test, SimulatedConnection, SimulatedClient, \
+ create_fakecm_account, enable_fakecm_account, SimulatedChannel, \
+ expect_client_setup
+import constants as cs
+
+def test(q, bus, mc):
+ params = dbus.Dictionary({"account": "someguy@example.com",
+ "password": "secrecy"}, signature='sv')
+ cm_name_ref, account = create_fakecm_account(q, bus, mc, params)
+ conn = enable_fakecm_account(q, bus, mc, account, params)
+
+ text_fixed_properties = dbus.Dictionary({
+ cs.CHANNEL + '.TargetHandleType': cs.HT_CONTACT,
+ cs.CHANNEL + '.ChannelType': cs.CHANNEL_TYPE_TEXT,
+ }, signature='sv')
+
+ empathy_bus = dbus.bus.BusConnection()
+ empathy_bus.set_exit_on_disconnect(False) # we'll disconnect later
+ kopete_bus = dbus.bus.BusConnection()
+ q.attach_to_bus(empathy_bus)
+ q.attach_to_bus(kopete_bus)
+ # Two clients want to observe, approve and handle channels
+ empathy = SimulatedClient(q, empathy_bus, 'Empathy',
+ observe=[text_fixed_properties], approve=[text_fixed_properties],
+ handle=[text_fixed_properties], bypass_approval=False)
+ kopete = SimulatedClient(q, kopete_bus, 'Kopete',
+ observe=[text_fixed_properties], approve=[text_fixed_properties],
+ handle=[text_fixed_properties], bypass_approval=False)
+
+ # wait for MC to download the properties
+ expect_client_setup(q, [empathy, kopete])
+
+ # subscribe to the OperationList interface (MC assumes that until this
+ # property has been retrieved once, nobody cares)
+
+ cd = bus.get_object(cs.CD, cs.CD_PATH)
+ cd_props = dbus.Interface(cd, cs.PROPERTIES_IFACE)
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') == []
+
+ channel_properties = dbus.Dictionary(text_fixed_properties,
+ signature='sv')
+ channel_properties[cs.CHANNEL + '.TargetID'] = 'juliet'
+ channel_properties[cs.CHANNEL + '.TargetHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'juliet')
+ channel_properties[cs.CHANNEL + '.InitiatorID'] = 'juliet'
+ channel_properties[cs.CHANNEL + '.InitiatorHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'juliet')
+ channel_properties[cs.CHANNEL + '.Requested'] = False
+ channel_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
+
+ chan = SimulatedChannel(conn, channel_properties)
+ chan.announce()
+
+ # A channel dispatch operation is created
+
+ e = q.expect('dbus-signal',
+ path=cs.CD_PATH,
+ interface=cs.CD_IFACE_OP_LIST,
+ signal='NewDispatchOperation')
+
+ cdo_path = e.args[0]
+ cdo_properties = e.args[1]
+
+ assert cdo_properties[cs.CDO + '.Account'] == account.object_path
+ assert cdo_properties[cs.CDO + '.Connection'] == conn.object_path
+ assert cs.CDO + '.Interfaces' in cdo_properties
+
+ handlers = cdo_properties[cs.CDO + '.PossibleHandlers'][:]
+ handlers.sort()
+ assert handlers == [cs.tp_name_prefix + '.Client.Empathy',
+ cs.tp_name_prefix + '.Client.Kopete'], handlers
+
+ assert cs.CD_IFACE_OP_LIST in cd_props.Get(cs.CD, 'Interfaces')
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') ==\
+ [(cdo_path, cdo_properties)]
+
+ cdo = bus.get_object(cs.CD, cdo_path)
+ cdo_iface = dbus.Interface(cdo, cs.CDO)
+ cdo_props_iface = dbus.Interface(cdo, cs.PROPERTIES_IFACE)
+
+ assert cdo_props_iface.Get(cs.CDO, 'Interfaces') == \
+ cdo_properties[cs.CDO + '.Interfaces']
+ assert cdo_props_iface.Get(cs.CDO, 'Connection') == conn.object_path
+ assert cdo_props_iface.Get(cs.CDO, 'Account') == account.object_path
+ assert cdo_props_iface.Get(cs.CDO, 'Channels') == [(chan.object_path,
+ channel_properties)]
+ assert cdo_props_iface.Get(cs.CDO, 'PossibleHandlers') == \
+ cdo_properties[cs.CDO + '.PossibleHandlers']
+
+ # Both Observers are told about the new channel
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ )
+ assert e.args[0] == account.object_path, e.args
+ assert e.args[1] == conn.object_path, e.args
+ assert e.args[3] == cdo_path, e.args
+ assert e.args[4] == [], e.args # no requests satisfied
+ channels = e.args[2]
+ assert len(channels) == 1, channels
+ assert channels[0][0] == chan.object_path, channels
+ assert channels[0][1] == channel_properties, channels
+
+ assert k.args == e.args
+
+ # Both Observers indicate that they are ready to proceed
+ q.dbus_return(k.message, bus=empathy_bus, signature='')
+ q.dbus_return(e.message, bus=kopete_bus, signature='')
+
+ # The Approvers are next
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ )
+
+ assert e.args == [[(chan.object_path, channel_properties)],
+ cdo_path, cdo_properties]
+ assert k.args == e.args
+
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+
+ # Both Approvers now have a flashing icon or something, trying to get the
+ # user's attention
+
+ # Using an invalid Handler name should fail
+ call_async(q, cdo_iface, 'HandleWith',
+ cs.tp_name_prefix + '.The.Moon.On.A.Stick')
+ q.expect('dbus-error', method='HandleWith')
+ call_async(q, cdo_iface, 'HandleWith',
+ cs.tp_name_prefix + '.Client.the moon on a stick')
+ q.expect('dbus-error', method='HandleWith')
+
+ # The user responds to Empathy first
+ call_async(q, cdo_iface, 'HandleWith',
+ cs.tp_name_prefix + '.Client.Empathy')
+
+ # Empathy is asked to handle the channels
+ e = q.expect('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.HANDLER, method='HandleChannels',
+ handled=False)
+
+ # Empathy accepts the channels
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+
+ q.expect_many(
+ EventPattern('dbus-return', method='HandleWith'),
+ EventPattern('dbus-signal', interface=cs.CDO, signal='Finished'),
+ EventPattern('dbus-signal', interface=cs.CD_IFACE_OP_LIST,
+ signal='DispatchOperationFinished'),
+ )
+
+ # Now there are no more active channel dispatch operations
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') == []
+
+ # Approve another channel using HandleWithTime()
+ channel_properties = dbus.Dictionary(text_fixed_properties,
+ signature='sv')
+ channel_properties[cs.CHANNEL + '.TargetID'] = 'lucien'
+ channel_properties[cs.CHANNEL + '.TargetHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'lucien')
+ channel_properties[cs.CHANNEL + '.InitiatorID'] = 'lucien'
+ channel_properties[cs.CHANNEL + '.InitiatorHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'lucien')
+ channel_properties[cs.CHANNEL + '.Requested'] = False
+ channel_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
+
+ chan = SimulatedChannel(conn, channel_properties)
+ chan.announce()
+
+ # A channel dispatch operation is created
+
+ e = q.expect('dbus-signal',
+ path=cs.CD_PATH,
+ interface=cs.CD_IFACE_OP_LIST,
+ signal='NewDispatchOperation')
+
+ cdo_path = e.args[0]
+ cdo_properties = e.args[1]
+
+ assert cdo_properties[cs.CDO + '.Account'] == account.object_path
+ assert cdo_properties[cs.CDO + '.Connection'] == conn.object_path
+ assert cs.CDO + '.Interfaces' in cdo_properties
+
+ handlers = cdo_properties[cs.CDO + '.PossibleHandlers'][:]
+ handlers.sort()
+ assert handlers == [cs.tp_name_prefix + '.Client.Empathy',
+ cs.tp_name_prefix + '.Client.Kopete'], handlers
+
+ assert cs.CD_IFACE_OP_LIST in cd_props.Get(cs.CD, 'Interfaces')
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') ==\
+ [(cdo_path, cdo_properties)]
+
+ cdo = bus.get_object(cs.CD, cdo_path)
+ cdo_iface = dbus.Interface(cdo, cs.CDO)
+ cdo_props_iface = dbus.Interface(cdo, cs.PROPERTIES_IFACE)
+
+ assert cdo_props_iface.Get(cs.CDO, 'Interfaces') == \
+ cdo_properties[cs.CDO + '.Interfaces']
+ assert cdo_props_iface.Get(cs.CDO, 'Connection') == conn.object_path
+ assert cdo_props_iface.Get(cs.CDO, 'Account') == account.object_path
+ assert cdo_props_iface.Get(cs.CDO, 'Channels') == [(chan.object_path,
+ channel_properties)]
+ assert cdo_props_iface.Get(cs.CDO, 'PossibleHandlers') == \
+ cdo_properties[cs.CDO + '.PossibleHandlers']
+
+ # Both Observers are told about the new channel
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ )
+ assert e.args[0] == account.object_path, e.args
+ assert e.args[1] == conn.object_path, e.args
+ assert e.args[3] == cdo_path, e.args
+ assert e.args[4] == [], e.args # no requests satisfied
+ channels = e.args[2]
+ assert len(channels) == 1, channels
+ assert channels[0][0] == chan.object_path, channels
+ assert channels[0][1] == channel_properties, channels
+
+ assert k.args == e.args
+
+ # Both Observers indicate that they are ready to proceed
+ q.dbus_return(k.message, bus=empathy_bus, signature='')
+ q.dbus_return(e.message, bus=kopete_bus, signature='')
+
+ # The Approvers are next
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ )
+
+ assert e.args == [[(chan.object_path, channel_properties)],
+ cdo_path, cdo_properties]
+ assert k.args == e.args
+
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+
+ # Both Approvers now have a flashing icon or something, trying to get the
+ # user's attention
+
+ # The user responds to Empathy first
+ call_async(q, cdo_iface, 'HandleWithTime',
+ cs.tp_name_prefix + '.Client.Empathy', 13)
+
+ # Empathy is asked to handle the channels
+ e = q.expect('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.HANDLER, method='HandleChannels',
+ handled=False)
+
+ account_path, conn_path, channels, requests, action_time, info = e.args
+ assert action_time == 13
+
+ # Empathy accepts the channels
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+
+ q.expect_many(
+ EventPattern('dbus-return', method='HandleWithTime'),
+ EventPattern('dbus-signal', interface=cs.CDO, signal='Finished'),
+ EventPattern('dbus-signal', interface=cs.CD_IFACE_OP_LIST,
+ signal='DispatchOperationFinished'),
+ )
+
+ # Now there are no more active channel dispatch operations
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') == []
+
+ # From now on, it is an error to get HandleChannels (because we're
+ # testing Claim())
+ forbidden = [
+ EventPattern('dbus-method-call', method='HandleChannels'),
+ ]
+ q.forbid_events(forbidden)
+
+ # Another channel
+ channel_properties = dbus.Dictionary(text_fixed_properties,
+ signature='sv')
+ channel_properties[cs.CHANNEL + '.TargetID'] = 'mercutio'
+ channel_properties[cs.CHANNEL + '.TargetHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'mercutio')
+ channel_properties[cs.CHANNEL + '.InitiatorID'] = 'mercutio'
+ channel_properties[cs.CHANNEL + '.InitiatorHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'mercutio')
+ channel_properties[cs.CHANNEL + '.Requested'] = False
+ channel_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
+
+ claimed_chan = SimulatedChannel(conn, channel_properties)
+ claimed_chan.announce()
+
+ # A channel dispatch operation is created
+
+ e = q.expect('dbus-signal',
+ path=cs.CD_PATH,
+ interface=cs.CD_IFACE_OP_LIST,
+ signal='NewDispatchOperation')
+
+ cdo_path = e.args[0]
+ cdo_properties = e.args[1]
+
+ assert cdo_properties[cs.CDO + '.Account'] == account.object_path
+ assert cdo_properties[cs.CDO + '.Connection'] == conn.object_path
+ assert cs.CDO + '.Interfaces' in cdo_properties
+
+ handlers = cdo_properties[cs.CDO + '.PossibleHandlers'][:]
+ handlers.sort()
+ assert handlers == [cs.tp_name_prefix + '.Client.Empathy',
+ cs.tp_name_prefix + '.Client.Kopete'], handlers
+
+ assert cs.CD_IFACE_OP_LIST in cd_props.Get(cs.CD, 'Interfaces')
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') ==\
+ [(cdo_path, cdo_properties)]
+
+ cdo = bus.get_object(cs.CD, cdo_path)
+ cdo_iface = dbus.Interface(cdo, cs.CDO)
+
+ # Both Observers are told about the new channel
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ )
+ assert e.args[0] == account.object_path, e.args
+ assert e.args[1] == conn.object_path, e.args
+ assert e.args[3] == cdo_path, e.args
+ assert e.args[4] == [], e.args # no requests satisfied
+ channels = e.args[2]
+ assert len(channels) == 1, channels
+ assert channels[0][0] == claimed_chan.object_path, channels
+ assert channels[0][1] == channel_properties, channels
+
+ assert k.args == e.args
+
+ # Both Observers indicate that they are ready to proceed
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+
+ # The Approvers are next
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ )
+
+ assert e.args == [[(claimed_chan.object_path, channel_properties)],
+ cdo_path, cdo_properties]
+ assert k.args == e.args
+
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+
+ # Both Approvers now have a flashing icon or something, trying to get the
+ # user's attention
+
+ # The user responds to Empathy first, and Empathy decides it wants the
+ # channel for itself
+ empathy_cdo = empathy_bus.get_object(cdo.bus_name, cdo.object_path)
+ empathy_cdo_iface = dbus.Interface(empathy_cdo, cs.CDO)
+ call_async(q, empathy_cdo_iface, 'Claim')
+
+ q.expect_many(
+ EventPattern('dbus-signal', path=cdo_path, signal='Finished'),
+ EventPattern('dbus-signal', path=cs.CD_PATH,
+ signal='DispatchOperationFinished', args=[cdo_path]),
+ EventPattern('dbus-return', method='Claim'),
+ )
+
+ # Now there are no more active channel dispatch operations
+ assert cd_props.Get(cs.CD_IFACE_OP_LIST, 'DispatchOperations') == []
+
+ # A third channel
+ channel_properties = dbus.Dictionary(text_fixed_properties,
+ signature='sv')
+ channel_properties[cs.CHANNEL + '.TargetID'] = 'benvolio'
+ channel_properties[cs.CHANNEL + '.TargetHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'benvolio')
+ channel_properties[cs.CHANNEL + '.InitiatorID'] = 'benvolio'
+ channel_properties[cs.CHANNEL + '.InitiatorHandle'] = \
+ conn.ensure_handle(cs.HT_CONTACT, 'benvolio')
+ channel_properties[cs.CHANNEL + '.Requested'] = False
+ channel_properties[cs.CHANNEL + '.Interfaces'] = dbus.Array(signature='s')
+
+ third_chan = SimulatedChannel(conn, channel_properties)
+ third_chan.announce()
+
+ # third_chan should not be closed
+ q.unforbid_events(forbidden)
+ forbidden.append(EventPattern('dbus-method-call', method='Close',
+ path=third_chan.object_path))
+ q.forbid_events(forbidden)
+
+ # A channel dispatch operation is created
+
+ e = q.expect('dbus-signal',
+ path=cs.CD_PATH,
+ interface=cs.CD_IFACE_OP_LIST,
+ signal='NewDispatchOperation')
+
+ cdo_path = e.args[0]
+ cdo_properties = e.args[1]
+
+ cdo = bus.get_object(cs.CD, cdo_path)
+ cdo_iface = dbus.Interface(cdo, cs.CDO)
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.OBSERVER, method='ObserveChannels',
+ handled=False),
+ )
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+
+ e, k = q.expect_many(
+ EventPattern('dbus-method-call',
+ path=empathy.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ EventPattern('dbus-method-call',
+ path=kopete.object_path,
+ interface=cs.APPROVER, method='AddDispatchOperation',
+ handled=False),
+ )
+ q.dbus_return(e.message, bus=empathy_bus, signature='')
+ q.dbus_return(k.message, bus=kopete_bus, signature='')
+
+ # Kopete closes this one
+ kopete_cdo = kopete_bus.get_object(cdo.bus_name, cdo.object_path)
+ kopete_cdo_iface = dbus.Interface(kopete_cdo, cs.CDO)
+ call_async(q, kopete_cdo_iface, 'Claim')
+
+ q.expect_many(
+ EventPattern('dbus-signal', path=cdo_path, signal='Finished'),
+ EventPattern('dbus-signal', path=cs.CD_PATH,
+ signal='DispatchOperationFinished', args=[cdo_path]),
+ EventPattern('dbus-return', method='Claim'),
+ )
+
+ # Empathy crashes
+ empathy.release_name()
+
+ e = q.expect('dbus-signal',
+ signal='NameOwnerChanged',
+ predicate=(lambda e:
+ e.args[0] == empathy.bus_name and e.args[2] == ''),
+ )
+ empathy_unique_name = e.args[1]
+
+ empathy_bus.flush()
+ empathy_bus.close()
+
+ # In response, the channels that were being handled by Empathy are closed.
+ # Kopete's channel is *not* closed.
+ q.expect_many(
+ EventPattern('dbus-signal',
+ signal='NameOwnerChanged',
+ predicate=(lambda e:
+ e.args[0] == empathy_unique_name and e.args[2] == '')),
+ EventPattern('dbus-method-call',
+ path=chan.object_path, method='Close'),
+ EventPattern('dbus-method-call',
+ path=claimed_chan.object_path, method='Close'),
+ )
+
+ sync_dbus(bus, q, mc)
+ q.unforbid_events(forbidden)
+
+if __name__ == '__main__':
+ exec_test(test, {})