"""
mqtt_base_client.py
====================================
Base MQTT Client
"""
import paho.mqtt.client as mqtt
from helper import run_async
import _thread
import logging
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)
[docs]def on_subscribe_callback(client, userdata, mid, granted_qos):
"""
on_subscribe callback
:param client:
:param userdata:
:param mid:
:param granted_qos:
:return:
"""
log.info("subscribe topic")
[docs]def on_connect_callback(client, userdata, flags, rc):
"""
on_connect callback
:param client:
:param userdata:
:param flags:
:param rc:
:return:
"""
log.info("Connected with result code " + str(rc))
[docs]def on_message_callback(client, userdata, msg):
"""
on_message callback
:param client:
:param userdata:
:param msg:
:return:
"""
# logging.info("on_message_callback:" + str(msg))
client.msg_queue.put(msg)
[docs]def on_publish_callback(client, userdata, mid):
"""
on_publish Callback
:param client:
:param userdata:
:param mid:
:return:
"""
log.info('on_publish_callback')
[docs]class BaseClient(object):
"""
Base Class for MQTT-Client
"""
client = None
def __init__(self, broker="iot.eclipse.org", port=1883, sub_queue=None, pub_queue=None, msg_queue=None, password=None, username=None):
"""
Init BaseClient
:param broker:
:param port:
:param sub_queue:
:param pub_queue:
:param msg_queue:
:param password:
:param username:
"""
log.info("run: __init__()")
self.client = mqtt.Client()
self.client.sub_queue = sub_queue
self.client.pub_queue = pub_queue
self.client.msg_queue = msg_queue
self.client.on_connect = on_connect_callback
self.client.on_message = on_message_callback
self.client.on_publish = on_publish_callback
self.client.on_subscribe = on_subscribe_callback
if (password is not None) and (username is not None):
self.client.username_pw_set(username=username,password=password)
try:
self.client.connect(broker, port)
self.client.loop_start()
self.start_sub_queue()
self.start_pub_queue()
except ConnectionRefusedError as e:
log.error(e)
log.info('Exiting main...')
_thread.interrupt_main()
exit(0)
except OSError as e:
log.error(e)
log.info('Exiting main ...')
_thread.interrupt_main()
exit(0)
except KeyboardInterrupt as e:
log.error(e)
log.info('Exiting main ...')
_thread.interrupt_main()
exit(0)
[docs] @run_async
def start_sub_queue(self):
"""
Start subscribe queue for handel asynchronous callbacks
:return:
"""
log.info("run: start_sub_queue()")
while True:
topic = self.client.sub_queue.get()
self.client.subscribe(topic)
[docs] @run_async
def start_pub_queue(self):
"""
Start publish queue for handel asynchronous callbacks
:return:
"""
log.info("run: start_pub_queue()")
while True:
msg = self.client.pub_queue.get()
self.client.publish(topic=msg['topic'], payload=msg['payload'])