Source code for main

"""
main.py
====================================
Main entry-point of this project
"""
import datetime
import queue
import configparser
import os
import threading
import json
import yaml

from influx_base_client import InfluxDBBaseClient
from mqtt_base_client import BaseClient
from paho.mqtt.client import topic_matches_sub
from flask import Flask, abort, url_for
from flask_restful import Api, Resource, reqparse, marshal

from models import TopicMappingV1, MQTTSubscriptionV1
from views import subscription_fields, topic_mapping_fields
from callbacks import Callbacks
from helper import run_async

import logging
log = logging.getLogger(__name__)


APP_CONFIG_FILE=(os.getenv('APP_CONFIG_FILE', '../config/dev/config.ini'))
PRE_MAPPING_DIR=(os.getenv('PRE_MAPPING_DIR', '../config/dev/mappings'))

# Read Config
config = configparser.ConfigParser()
log.info(APP_CONFIG_FILE)
config.read(APP_CONFIG_FILE)

# Define Queues
sub_queue = queue.Queue()  # Queue for all topic to sub
pub_queue = queue.Queue()  # Queue for msg to pub
msg_queue = queue.Queue()  # Queue with incoming messages
persistent_queue = queue.Queue() # Queue for influxDB persistent manager

topic_mapping_list = []
subscriptions_list = []



[docs]def create_app(): """ Start Main Application :return: Flask-App Instance """ log.info("run create_app()") app = Flask(__name__) start_mqtt_client() start_persistent_client() create_mapping() return app
[docs]def start_persistent_client(): """ Start an InfluxDB-Client in a new Thread. :return: """ log.info("run: create_persistent_client()") try: INFLUX_HOST = config['influx']['host'] INFLUX_PORT = int(config['influx']['port']) INFLUX_DB = config['influx']['db'] kwargs = {'host': INFLUX_HOST, 'port': INFLUX_PORT, 'database': INFLUX_DB, 'persistent_queue': persistent_queue} log.info(kwargs) t_influxdb_client = threading.Thread(name='influxdb-client', target=InfluxDBBaseClient, kwargs=kwargs) t_influxdb_client.daemon = True try: t_influxdb_client.start() except (KeyboardInterrupt, SystemExit): print("terminate influxdb client thread") except KeyError as e: log.error(e)
[docs]def start_mqtt_client(): """ Start an MQTT-Client in a new Thread. :return: """ log.info("run: create_mqtt_client()") try: MQTT_HOST = config['broker']['host'] MQTT_PORT = int(config['broker']['port']) MQTT_PASSWORD = config['broker']['password'] MQTT_USER = config['broker']['user'] kwargs = {'broker': MQTT_HOST, 'port': MQTT_PORT, 'sub_queue': sub_queue, 'pub_queue': pub_queue, 'msg_queue': msg_queue, 'username': MQTT_USER, 'password': MQTT_PASSWORD} log.info(kwargs) # Start MQTT-Client Thread t_mqtt_client = threading.Thread(name='MQTT-Client', target=BaseClient, kwargs=kwargs) t_mqtt_client.daemon = True try: t_mqtt_client.start() except (KeyboardInterrupt, SystemExit): print("terminate mqtt client thread") except KeyError as e: log.error(e)
[docs]def create_mapping(): """ Create MQTT-Resources Mapping :return: """ log.info(PRE_MAPPING_DIR) # TODO for file in os.listdir(PRE_MAPPING_DIR): print(file) if file.endswith(".yaml") or file.endswith(".yml"): yaml_file = (os.path.join(PRE_MAPPING_DIR, file)) print(yaml_file) with open(yaml_file, 'r') as stream: yamls = yaml.load_all(stream) for y in yamls: try: kind = y['kind'] # TODO switch case alternative! if kind == "MQTTSubscriptionV1": try: sub = MQTTSubscriptionV1.load_yaml(yaml=y) subscriptions_list.append(sub) except yaml.YAMLError as exc: log.error(exc) # TODO exceptions description# if kind == "MQTTSubscriptionV2": try: sub = MQTTSubscriptionV1.load_yaml(yaml=y) subscriptions_list.append(sub) except yaml.YAMLError as exc: log.error(exc) # TODO exceptions description# except KeyError as exc: log.error("KeyError: " + str(exc)) for subscription in subscriptions_list: log.info(subscription.__repr__()) sub_queue.put(subscription.subregex)
[docs]def run_event_handlers(subscribtion, msg): """ Run defined Event-Handler for incoming massages :param subscribtion: :param msg: :return: """ try: default_manager = getattr(Callbacks, subscribtion.event_handlers['default_manager']) if default_manager is not None: default_manager(msg) except AttributeError: log.info("no default_manager function ") except KeyError: log.info("no default_manager function define ") try: persistent_manager = getattr(Callbacks, subscribtion.event_handlers['persistent_manager']) if persistent_manager is not None: persistent_manager(msg=msg, queue=persistent_queue) except KeyError: log.info("no persistent_manager ") except AttributeError: log.info("no persistent_manager function define")
[docs]@run_async def run_msq_queue(): """ Run Queue for handle Resource-Mapping :return: """ while True: to_add = True # If True a new Topic mapping will be create and append to list msg = msg_queue.get() topic_sub = None for topic_mapping in topic_mapping_list: if topic_mapping.topic == msg.topic: to_add = False topic_mapping.last_update = str(datetime.datetime.now()) try: topic_mapping.last_value = json.loads(msg.payload.decode()) except json.decoder.JSONDecodeError as e: log.info(e) topic_mapping.last_value = msg.payload.decode() # TODO for all handlers in Event_handlers: do handler(msg) for subscription in subscriptions_list: if subscription.uuid == topic_mapping.sub_uuid: topic_sub = subscription run_event_handlers(topic_sub, msg) if to_add: for subscription in subscriptions_list: if topic_matches_sub(subscription.subregex, msg.topic): topic_sub = subscription topic_mapping = TopicMappingV1(topic=msg.topic, sub_uuid=topic_sub.uuid) try: topic_mapping.last_value = json.loads(msg.payload.decode()) except json.decoder.JSONDecodeError as e: log.info(e) topic_mapping.last_value = msg.payload.decode() topic_mapping_list.append(topic_mapping) # TODO run event_handlers (first time) run_event_handlers(topic_sub, msg)
app = create_app() api = Api(app)
[docs]class Home(Resource): """ Home """
[docs] def get(self): """ :return: """ return { 'msg': 'Welcome to rest-mqtt-proxy prototype!', 'urls': { 'api_v1': url_for('api_v1', _external=True), 'api_v2': 'TODO' } }
[docs]class ApiV1(Resource): """ ApiV1 """
[docs] def get(self): """ :return: """ return {'api_version': 1, 'urls': { 'topic_mapping_list': url_for('api_topic_mapping_list', _external=True), 'subscriptions_list': url_for('api_subscription_list', _external=True) }}
[docs]class TopicMappingListApiV1(Resource): """ TopicMappingListApiV1 """
[docs] def get(self): """ :return: """ return {'topic_mapping_list': [marshal(topic, topic_mapping_fields) for topic in topic_mapping_list]}
[docs]class SubscriptionsListApiV1(Resource): """ SubscriptionsListApiV1 """
[docs] def get(self): """ :return: """ return {'subscriptions_list': [marshal(subscription, subscription_fields) for subscription in subscriptions_list]}
[docs]class SubscriptionDetailsApiV1(Resource): """ SubscriptionDetailsApiV1 """
[docs] def get(self, uuid): """ :param uuid: :return: """ for subscription in subscriptions_list: if subscription.uuid == uuid: return marshal(subscription, subscription_fields), # TODO use marshal abort(404)
[docs]class TopicValueApiV1(Resource): """ TopicValueApiV1 """ def __init__(self): self.reqparse = reqparse.RequestParser() self.reqparse.add_argument('last_value', type=str, location='json') super(TopicValueApiV1, self).__init__()
[docs] def get(self, uuid): """ :param uuid: :return: """ for mapping in topic_mapping_list: if mapping.uuid == uuid: # return marshal(mapping, topic_fields), # TODO use marshal return mapping.last_value, 200 abort(404)
[docs] def put(self, uuid): """ :param uuid: :return: """ for mapping in topic_mapping_list: if mapping.uuid == uuid: args = self.reqparse.parse_args() if args['last_value']: mapping.last_value = args['last_value'] message = dict() message['topic'] = mapping.topic message['payload'] = args['last_value'] pub_queue.put(message) return mapping.last_value, 200 abort(404)
[docs] def delete(self, uuid): """ :param uuid: :return: """ for topic in topic_mapping_list: if topic.uuid == uuid: topic_mapping_list.remove(topic) return {'result': True}, 200 abort(404)
[docs]class TopicDetailsApiV1(Resource): """ TopicDetailsApiV1 """
[docs] def get(self, uuid): """ :param uuid: :return: """ for mapping in topic_mapping_list: if mapping.uuid == uuid: return marshal(mapping, topic_mapping_fields), # TODO use marshal abort(404)
[docs]class MappingValueApiV1(Resource): """ MappingValueApiV1 """ def __init__(self): self.reqparse = reqparse.RequestParser() self.reqparse.add_argument('last_value', type=str, location='json') super(MappingValueApiV1, self).__init__()
[docs] def get(self, topic): """ :param topic: :return: """ for mapping in topic_mapping_list: if mapping.topic == topic: # return {'topic': marshal(mapping, topic_fields)} # return marshal(mapping, topic_fields), 200 # , envelope=str(mapping.uuid) return mapping.last_value, 200 abort(404)
[docs]class MappingDetailsApiV1(Resource): """ MappingDetailsApiV1 """ def __init__(self): self.reqparse = reqparse.RequestParser() self.reqparse.add_argument('last_value', type=str, location='json') super(MappingDetailsApiV1, self).__init__()
[docs] def get(self, topic): """ :param topic: :return: """ for mapping in topic_mapping_list: if mapping.topic == topic: return marshal(mapping, topic_mapping_fields), 200 # , envelope=str(mapping.uuid) abort(404)
# Add API Resources api.add_resource(Home, '/', endpoint='api_home') api.add_resource(ApiV1, '/api/v1', endpoint='api_v1') # list all topics api.add_resource(TopicMappingListApiV1, '/api/v1/topics', endpoint='api_topic_mapping_list') # Topics by uuid api.add_resource(TopicDetailsApiV1, '/api/v1/topic/details/<uuid:uuid>', endpoint='api_topic_details') api.add_resource(TopicValueApiV1, '/api/v1/topic/value/<uuid:uuid>', endpoint='api_topic_value') # Topics by name api.add_resource(MappingDetailsApiV1, '/api/v1/topic/details/<path:topic>', endpoint='api_mapping_details') api.add_resource(MappingValueApiV1, '/api/v1/topic/value/<path:topic>', endpoint='api_mapping_value') # Subscriptions api.add_resource(SubscriptionsListApiV1, '/api/v1/subscriptions', endpoint='api_subscription_list') api.add_resource(SubscriptionDetailsApiV1, '/api/v1/subscription/details/<uuid:uuid>', endpoint='api_subscription_details') run_msq_queue() if __name__ == '__main__': try: print("test") app.run(host='0.0.0.0') except (KeyboardInterrupt, SystemExit): print("terminate mqtt client thread")