# -*- coding: utf-8 -*-
import abc
import asyncio
import logging
import sys
import time
from asyncio import Event
from typing import Union
import aiosasl
import aioxmpp
from aiosasl import AuthenticationFailure
from aioxmpp import ibr, XMPPCancelError, XMPPAuthError
from aioxmpp.dispatcher import SimpleMessageDispatcher
from loguru import logger
from spade.container import Container
from spade.message import Message
from spade.presence import PresenceManager
from spade_pubsub import PubSubMixin
[docs]class AbstractArtifact(object, metaclass=abc.ABCMeta):
async def _hook_plugin_before_connection(self, *args, **kwargs):
"""
Overload this method to hook a plugin before connection is done
"""
pass
async def _hook_plugin_after_connection(self, *args, **kwargs):
"""
Overload this method to hook a plugin after connection is done
"""
pass
[docs]class Artifact(PubSubMixin, AbstractArtifact):
def __init__(self, jid, password, pubsub_server=None, verify_security=False):
"""
Creates an artifact
Args:
jid (str): The identifier of the artifact in the form username@server
password (str): The password to connect to the server
verify_security (bool): Wether to verify or not the SSL certificates
"""
self.jid = aioxmpp.JID.fromstr(jid)
self.password = password
self.verify_security = verify_security
self.pubsub_server = (
pubsub_server if pubsub_server else f"pubsub.{self.jid.domain}"
)
self._values = {}
self.conn_coro = None
self.stream = None
self.client = None
self.message_dispatcher = None
self.presence = None
self.container = Container()
self.container.register(self)
self.loop = self.container.loop
# self.loop = None #asyncio.new_event_loop()
self.queue = asyncio.Queue(loop=self.loop)
self._alive = Event()
[docs] def set_loop(self, loop):
self.loop = loop
[docs] def set_container(self, container):
"""
Sets the container to which the artifact is attached
Args:
container (spade.container.Container): the container to be attached to
"""
self.container = container
[docs] def start(self, auto_register=True):
"""
Tells the container to start this agent.
It returns a coroutine or a future depending on whether it is called from a coroutine or a synchronous method.
Args:
auto_register (bool): register the agent in the server (Default value = True)
"""
return self.container.start_agent(agent=self, auto_register=auto_register)
async def _async_start(self, auto_register=True):
"""
Starts the agent from a coroutine. This fires some actions:
* if auto_register: register the agent in the server
* runs the event loop
* connects the agent to the server
* runs the registered behaviours
Args:
auto_register (bool, optional): register the agent in the server (Default value = True)
"""
await self._hook_plugin_before_connection()
if auto_register:
await self._async_register()
self.client = aioxmpp.PresenceManagedClient(
self.jid,
aioxmpp.make_security_layer(
self.password, no_verify=not self.verify_security
),
loop=self.loop,
logger=logging.getLogger(self.jid.localpart),
)
# obtain an instance of the service
self.message_dispatcher = self.client.summon(SimpleMessageDispatcher)
# Presence service
self.presence = PresenceManager(self)
await self._async_connect()
# register a message callback here
self.message_dispatcher.register_callback(
aioxmpp.MessageType.CHAT,
None,
self._message_received,
)
await self._hook_plugin_after_connection()
# pubsub initialization
try:
self._node = str(self.jid.bare())
await self.pubsub.create(self.pubsub_server, f"{self._node}")
except XMPPCancelError as e:
logger.info(f"Node {self._node} already registered")
except XMPPAuthError as e:
logger.error(f"Artifact {self._node} is not allowed to publish properties.")
raise e
await self.setup()
self._alive.set()
asyncio.run_coroutine_threadsafe(self.run(), loop=self.loop)
async def _async_connect(self): # pragma: no cover
""" connect and authenticate to the XMPP server. Async mode. """
try:
self.conn_coro = self.client.connected()
aenter = type(self.conn_coro).__aenter__(self.conn_coro)
self.stream = await aenter
logger.info(f"Artifact {str(self.jid)} connected and authenticated.")
except aiosasl.AuthenticationFailure:
raise AuthenticationFailure(
"Could not authenticate the artifact. Check user and password or use auto_register=True"
)
async def _async_register(self): # pragma: no cover
""" Register the artifact in the XMPP server from a coroutine. """
metadata = aioxmpp.make_security_layer(None, no_verify=not self.verify_security)
query = ibr.Query(self.jid.localpart, self.password)
_, stream, features = await aioxmpp.node.connect_xmlstream(
self.jid, metadata, loop=self.loop
)
await ibr.register(stream, query)
[docs] async def setup(self):
"""
Setup artifact before startup.
This coroutine may be overloaded.
"""
await asyncio.sleep(0)
[docs] def kill(self):
self._alive.clear()
[docs] async def run(self):
"""
Main body of the artifact.
This coroutine SHOULD be overloaded.
"""
raise NotImplementedError
@property
def name(self):
""" Returns the name of the artifact (the string before the '@') """
return self.jid.localpart
[docs] def stop(self):
"""
Stop the artifact
"""
self.kill()
return self.loop.run_until_complete(self._async_stop())
async def _async_stop(self):
""" Stops an artifact and kills all its behaviours. """
if self.presence:
self.presence.set_unavailable()
""" Discconnect from XMPP server. """
if self.is_alive():
# Disconnect from XMPP server
self.client.stop()
aexit = self.conn_coro.__aexit__(*sys.exc_info())
await aexit
logger.info("Client disconnected.")
self._alive.clear()
[docs] def is_alive(self):
"""
Checks if the artifact is alive.
Returns:
bool: wheter the artifact is alive or not
"""
return self._alive.is_set()
[docs] def set(self, name, value):
"""
Stores a knowledge item in the artifact knowledge base.
Args:
name (str): name of the item
value (object): value of the item
"""
self._values[name] = value
[docs] def get(self, name):
"""
Recovers a knowledge item from the artifact's knowledge base.
Args:
name(str): name of the item
Returns:
object: the object retrieved or None
"""
if name in self._values:
return self._values[name]
else:
return None
def _message_received(self, msg):
"""
Callback run when an XMPP Message is reveived.
The aioxmpp.Message is converted to spade.message.Message
Args:
msg (aioxmpp.Messagge): the message just received.
Returns:
asyncio.Future: a future of the append of the message.
"""
msg = Message.from_node(msg)
logger.debug(f"Got message: {msg}")
return asyncio.run_coroutine_threadsafe(self.queue.put(msg), self.loop)
[docs] async def send(self, msg: Message):
"""
Sends a message.
Args:
msg (spade.message.Message): the message to be sent.
"""
if not msg.sender:
msg.sender = str(self.jid)
logger.debug(f"Adding artifact's jid as sender to message: {msg}")
aioxmpp_msg = msg.prepare()
await self.client.send(aioxmpp_msg)
msg.sent = True
[docs] async def receive(self, timeout: float = None) -> Union[Message, None]:
"""
Receives a message for this artifact.
If timeout is not None it returns the message or "None"
after timeout is done.
Args:
timeout (float): number of seconds until return
Returns:
spade.message.Message: a Message or None
"""
if timeout:
coro = self.queue.get()
try:
msg = await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
msg = None
else:
try:
msg = self.queue.get_nowait()
except asyncio.QueueEmpty:
msg = None
return msg
[docs] def mailbox_size(self) -> int:
"""
Checks if there is a message in the mailbox
Returns:
int: the number of messages in the mailbox
"""
return self.queue.qsize()
[docs] def join(self, timeout=None):
try:
in_coroutine = asyncio.get_event_loop() == self.loop
except RuntimeError: # pragma: no cover
in_coroutine = False
if not in_coroutine:
t_start = time.time()
while self.is_alive():
time.sleep(0.001)
t = time.time()
if timeout is not None and t - t_start > timeout:
raise TimeoutError
else:
return self._async_join(timeout=timeout)
async def _async_join(self, timeout):
t_start = time.time()
while self.is_alive():
await asyncio.sleep(0.001)
t = time.time()
if timeout is not None and t - t_start > timeout:
raise TimeoutError
[docs] async def publish(self, payload: str) -> None:
await self.pubsub.publish(self.pubsub_server, self._node, payload)