From 3e558175f9d6198dd508124ee52a5b5649fbe449 Mon Sep 17 00:00:00 2001 From: Dmitry Borisenko <49808844+DmitryBorisenko33@users.noreply.github.com> Date: Mon, 10 Jan 2022 23:37:21 +0100 Subject: [PATCH] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20MQ?= =?UTF-8?q?TT,=20=D0=BD=D0=B5=20=D1=80=D0=B0=D0=B1=D0=BE=D1=87=D0=B0=D1=8F?= =?UTF-8?q?=20=D0=B2=D0=B5=D1=80=D1=81=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/Const.h | 12 +- include/Global.h | 19 ++ include/MqttClient.h | 31 +++ include/classes/NotAsync.h | 32 +++ include/main.h | 1 + platformio.ini | 1 + src/Global.cpp | 18 ++ src/MqttClient.cpp | 410 +++++++++++++++++++++++++++++++++++++ src/StandWebServer.cpp | 3 +- src/classes/NotAsync.cpp | 30 +++ src/main.cpp | 3 + 11 files changed, 558 insertions(+), 2 deletions(-) create mode 100644 include/MqttClient.h create mode 100644 include/classes/NotAsync.h create mode 100644 src/MqttClient.cpp create mode 100644 src/classes/NotAsync.cpp diff --git a/include/Const.h b/include/Const.h index 12895457..c6f18da2 100644 --- a/include/Const.h +++ b/include/Const.h @@ -14,6 +14,9 @@ //#define REST_FILE_OPERATIONS +#define MQTT_RECONNECT_INTERVAL 20000 +#define CHANGE_BROKER_AFTER 5 + #ifdef esp8266_4mb #define USE_LITTLEFS true #endif @@ -30,4 +33,11 @@ enum TimerTask_t { WIFI_SCAN, UPTIME, SYGNAL, TIMES, - MYTEST }; \ No newline at end of file + MYTEST }; + +//задачи которые надо протащить через loop +enum NotAsyncActions { + do_ZERO, + do_MQTTPARAMSCHANGED, + do_LAST, +}; \ No newline at end of file diff --git a/include/Global.h b/include/Global.h index afa6402d..375cfa9e 100644 --- a/include/Global.h +++ b/include/Global.h @@ -6,6 +6,7 @@ #include #include #include +#include #ifdef ESP32 #include "WiFi.h" @@ -52,6 +53,7 @@ extern TickerScheduler ts; extern WiFiClient espClient; +extern PubSubClient mqtt; #ifdef ASYNC_WEB_SERVER extern AsyncWebServer server; #endif @@ -77,6 +79,23 @@ extern String settingsFlashJson; extern String paramsFlashJson; extern String paramsHeapJson; +// Mqtt +extern String mqttServer; +extern int mqttPort; +extern String mqttPrefix; +extern String mqttUser; +extern String mqttPass; + +extern String mqttRootDevice; +extern String chipId; +extern String prex; +extern String all_widgets; +extern String scenario; + +extern int mqttConnectAttempts; +extern bool changeBroker; +extern int currentBroker; + // extern DynamicJsonDocument settingsFlashJsonDoc; // extern DynamicJsonDocument paramsFlashJsonDoc; // extern DynamicJsonDocument paramsHeapJsonDoc; \ No newline at end of file diff --git a/include/MqttClient.h b/include/MqttClient.h new file mode 100644 index 00000000..3612b1af --- /dev/null +++ b/include/MqttClient.h @@ -0,0 +1,31 @@ +#pragma once +#include "Const.h" +#include "classes/NotAsync.h" +#include "Global.h" +#include "Utils/WiFiUtils.h" + +void mqttInit(); +void selectBroker(); +void getMqttData1(); +void getMqttData2(); +bool isSecondBrokerSet(); +boolean mqttConnect(); +void mqttReconnect(); +void mqttLoop(); +void mqttSubscribe(); + +boolean publish(const String& topic, const String& data); +boolean publishData(const String& topic, const String& data); +boolean publishChart(const String& topic, const String& data); +boolean publishControl(String id, String topic, String state); +boolean publishChart_test(const String& topic, const String& data); +boolean publishStatus(const String& topic, const String& data); +boolean publishEvent(const String& topic, const String& data); +boolean publishInfo(const String& topic, const String& data); +boolean publishAnyJsonKey(const String& topic, const String& key, const String& data); + +void publishWidgets(); +void publishState(); + +void mqttCallback(char* topic, uint8_t* payload, size_t length); +const String getStateStr(); diff --git a/include/classes/NotAsync.h b/include/classes/NotAsync.h new file mode 100644 index 00000000..1d5278cc --- /dev/null +++ b/include/classes/NotAsync.h @@ -0,0 +1,32 @@ +#pragma once +#include +#include + +#include + +typedef std::function NotAsyncCb; + +struct NotAsyncItem { + bool test; + NotAsyncCb cb; + void* cb_arg; + volatile bool is_used = false; +}; + +class NotAsync { + private: + uint8_t size; + uint8_t task = 0; + NotAsyncItem* items = NULL; + void handle(NotAsyncCb f, void* arg); + + public: + NotAsync(uint8_t size); + ~NotAsync(); + + void add(uint8_t i, NotAsyncCb, void* arg); + void make(uint8_t task); + void loop(); +}; + +extern NotAsync* myNotAsyncActions; \ No newline at end of file diff --git a/include/main.h b/include/main.h index b0f1f37e..75bd2b15 100644 --- a/include/main.h +++ b/include/main.h @@ -6,3 +6,4 @@ #include "AsyncWebServer.h" #include "StandWebServer.h" #include "classes/sendJson.h" +#include "classes/NotAsync.h" diff --git a/platformio.ini b/platformio.ini index 63a306ca..37f45400 100644 --- a/platformio.ini +++ b/platformio.ini @@ -18,6 +18,7 @@ lib_deps_external = bblanchon/ArduinoJson @6.18.0 me-no-dev/ESP Async WebServer Links2004/WebSockets + knolleary/PubSubClient ;lib_deps_internal = diff --git a/src/Global.cpp b/src/Global.cpp index 7e974a56..d45bff58 100644 --- a/src/Global.cpp +++ b/src/Global.cpp @@ -6,6 +6,7 @@ TickerScheduler ts(MYTEST + 1); WiFiClient espClient; +PubSubClient mqtt(espClient); #ifdef ASYNC_WEB_SERVER AsyncWebServer server(80); #endif @@ -32,6 +33,23 @@ String settingsFlashJson = "{}"; //переменная в которой хр String paramsFlashJson = "{}"; //переменная в которой хранятся все параметры, находится в оперативной памяти и синхронизированна с flash памятью String paramsHeapJson = "{}"; //переменная в которой хранятся все параметры, находится в оперативной памяти только +// Mqtt +String mqttServer = ""; +int mqttPort = 0; +String mqttPrefix = ""; +String mqttUser = ""; +String mqttPass = ""; + +String chipId = ""; +String prex = ""; +String all_widgets = ""; +String scenario = ""; +String mqttRootDevice = ""; + +int mqttConnectAttempts = 0; +bool changeBroker = false; +int currentBroker = 1; + // DynamicJsonDocument settingsFlashJsonDoc(JSON_BUFFER_SIZE); // DynamicJsonDocument paramsFlashJsonDoc(JSON_BUFFER_SIZE); // DynamicJsonDocument paramsHeapJsonDoc(JSON_BUFFER_SIZE); \ No newline at end of file diff --git a/src/MqttClient.cpp b/src/MqttClient.cpp new file mode 100644 index 00000000..423a253e --- /dev/null +++ b/src/MqttClient.cpp @@ -0,0 +1,410 @@ +#include "MqttClient.h" + +void mqttInit() { + myNotAsyncActions->add( + do_MQTTPARAMSCHANGED, [&](void*) { + mqttReconnect(); + }, + nullptr); + + mqtt.setCallback(mqttCallback); + + ts.add( + WIFI_MQTT_CONNECTION_CHECK, MQTT_RECONNECT_INTERVAL, + [&](void*) { + if (WiFi.status() == WL_CONNECTED) { + SerialPrint("I", F("WIFI"), F("OK")); + if (mqtt.connected()) { + SerialPrint("I", F("MQTT"), "OK, broker No " + String(currentBroker)); + // setLedStatus(LED_OFF); + } else { + SerialPrint("E", F("MQTT"), F("✖ Connection lost")); + mqttConnect(); + } + } else { + SerialPrint("E", F("WIFI"), F("✖ Lost WiFi connection")); + ts.remove(WIFI_MQTT_CONNECTION_CHECK); + startAPMode(); + } + }, + nullptr, true); + + // myNotAsyncActions->add( + // do_sendScenMQTT, [&](void*) { + // String scen = readFile(String(DEVICE_SCENARIO_FILE), 2048); + // publishInfo("scen", scen); + // }, + // nullptr); +} + +void mqttDisconnect() { + SerialPrint("I", F("MQTT"), F("disconnected")); + mqtt.disconnect(); +} + +void mqttReconnect() { + mqttDisconnect(); + mqttConnect(); +} + +void mqttLoop() { + if (!isNetworkActive() || !mqtt.connected()) { + return; + } + mqtt.loop(); +} + +void mqttSubscribe() { + SerialPrint("I", F("MQTT"), F("subscribed")); + SerialPrint("I", "MQTT", mqttRootDevice); + mqtt.subscribe(mqttPrefix.c_str()); + mqtt.subscribe((mqttRootDevice + "/+/control").c_str()); + mqtt.subscribe((mqttRootDevice + "/update").c_str()); + + if (jsonReadBool(settingsFlashJson, "MqttIn")) { + mqtt.subscribe((mqttPrefix + "/+/+/event").c_str()); + mqtt.subscribe((mqttPrefix + "/+/+/order").c_str()); + mqtt.subscribe((mqttPrefix + "/+/+/info").c_str()); + } +} + +void selectBroker() { + if (changeBroker) { + changeBroker = false; + if (currentBroker == 1) { + getMqttData2(); + } else if (currentBroker == 2) { + getMqttData1(); + } + } else { + if (currentBroker == 1) { + getMqttData1(); + } else if (currentBroker == 2) { + getMqttData2(); + } + } +} + +void getMqttData1() { + currentBroker = 1; + mqttServer = jsonReadStr(settingsFlashJson, F("mqttServer")); + mqttPort = jsonReadInt(settingsFlashJson, F("mqttPort")); + mqttPrefix = jsonReadStr(settingsFlashJson, F("mqttPrefix")); + mqttUser = jsonReadStr(settingsFlashJson, F("mqttUser")); + mqttPass = jsonReadStr(settingsFlashJson, F("mqttPass")); + prex = mqttPrefix + "/" + chipId; +} + +void getMqttData2() { + currentBroker = 2; + mqttServer = jsonReadStr(settingsFlashJson, F("mqttServer2")); + mqttPort = jsonReadInt(settingsFlashJson, F("mqttPort2")); + mqttPrefix = jsonReadStr(settingsFlashJson, F("mqttPrefix2")); + mqttUser = jsonReadStr(settingsFlashJson, F("mqttUser2")); + mqttPass = jsonReadStr(settingsFlashJson, F("mqttPass2")); + prex = mqttPrefix + "/" + chipId; +} + +bool isSecondBrokerSet() { + bool res = true; + if (jsonReadStr(settingsFlashJson, F("mqttServer2")) == "") { + res = false; + } + if (jsonReadStr(settingsFlashJson, F("mqttPrefix2")) == "") { + res = false; + } + return res; +} + +boolean mqttConnect() { + selectBroker(); + bool res = false; + if (mqttServer == "") { + SerialPrint("E", "MQTT", F("mqttServer empty")); + return res; + } + SerialPrint("I", "MQTT", "connection started to broker No " + String(currentBroker)); + + mqttRootDevice = mqttPrefix + "/" + chipId; + + SerialPrint("I", "MQTT", "broker " + mqttServer + ":" + String(mqttPort, DEC)); + SerialPrint("I", "MQTT", "topic " + mqttRootDevice); + // setLedStatus(LED_FAST); + mqtt.setServer(mqttServer.c_str(), mqttPort); + + if (!mqtt.connected()) { + bool connected = false; + if (mqttUser != "" && mqttPass != "") { + connected = mqtt.connect(chipId.c_str(), mqttUser.c_str(), mqttPass.c_str()); + SerialPrint("I", F("MQTT"), F("Go to connection with login and password")); + } else if (mqttUser == "" && mqttPass == "") { + connected = mqtt.connect(chipId.c_str()); + SerialPrint("I", F("MQTT"), F("Go to connection without login and password")); + } else { + SerialPrint("E", F("MQTT"), F("✖ Login or password missing")); + return res; + } + + if (connected) { + SerialPrint("I", F("MQTT"), F("✔ connected")); + if (currentBroker == 1) jsonWriteStr(settingsFlashJson, F("warning4"), F("

Подключено к основному брокеру

")); + if (currentBroker == 2) jsonWriteStr(settingsFlashJson, F("warning4"), F("

Подключено к резервному брокеру

")); + // setLedStatus(LED_OFF); + mqttSubscribe(); + res = true; + } else { + mqttConnectAttempts++; + SerialPrint("E", F("MQTT"), "🡆 Attempt No: " + String(mqttConnectAttempts) + " could't connect, retry in " + String(MQTT_RECONNECT_INTERVAL / 1000) + "s"); + // setLedStatus(LED_FAST); + jsonWriteStr(settingsFlashJson, F("warning4"), F("

Не подключено брокеру

")); + if (mqttConnectAttempts >= CHANGE_BROKER_AFTER) { + mqttConnectAttempts = 0; + if (isSecondBrokerSet()) { + changeBroker = true; + SerialPrint("E", F("MQTT"), "✖ Broker fully missed (" + String(CHANGE_BROKER_AFTER) + " attempts passed), try connect to another one"); + } else { + SerialPrint("E", F("MQTT"), F("Secound broker not seted")); + } + } + } + } + return res; +} + +void mqttCallback(char* topic, uint8_t* payload, size_t length) { + String topicStr = String(topic); + // SerialPrint("I", "=>MQTT", topicStr); + String payloadStr; + payloadStr.reserve(length + 1); + for (size_t i = 0; i < length; i++) { + payloadStr += (char)payload[i]; + } + + // SerialPrint("I", "=>MQTT", payloadStr); + + if (payloadStr.startsWith("HELLO")) { + SerialPrint("I", F("MQTT"), F("Full update")); + publishWidgets(); + publishState(); +#ifdef GATE_MODE + publishTimes(); +#endif +#ifdef EnableLogging + choose_log_date_and_send(); +#endif + } + + // else if (topicStr.indexOf("control") != -1) { + // String key = selectFromMarkerToMarker(topicStr, "/", 3); + // + // String order; + // order += key; + // order += " "; + // order += payloadStr; + // order += ","; + // + // loopCmdAdd(order); + // + // SerialPrint("I", F("=>MQTT"), "Msg from iotmanager app: " + key + " " + payloadStr); + //} + // + // else if (topicStr.indexOf("event") != -1) { + // if (!jsonReadBool(settingsFlashJson, "MqttIn")) { + // return; + // } + // if (topicStr.indexOf(chipId) == -1) { + // String devId = selectFromMarkerToMarker(topicStr, "/", 2); + // String key = selectFromMarkerToMarker(topicStr, "/", 3); + // SerialPrint("I", F("=>MQTT"), "Received event from other device: '" + devId + "' " + key + " " + payloadStr); + // String event = key + " " + payloadStr + ","; + // eventBuf += event; + // } + //} + // + // else if (topicStr.indexOf("order") != -1) { + // if (!jsonReadBool(settingsFlashJson, "MqttIn")) { + // return; + // } + // String devId = selectFromMarkerToMarker(topicStr, "/", 2); + // String key = selectFromMarkerToMarker(topicStr, "/", 3); + // SerialPrint("I", F("=>MQTT"), "Received direct order " + key + " " + payloadStr); + // String order = key + " " + payloadStr + ","; + // loopCmdAdd(order); + // SerialPrint("I", "Order add", order); + //} + // + // else if (topicStr.indexOf("info") != -1) { + // if (topicStr.indexOf("scen") != -1) { + // writeFile(String(DEVICE_SCENARIO_FILE), payloadStr); + // loadScenario(); + // SerialPrint("I", F("=>MQTT"), F("Scenario received")); + // } + //} +} + +boolean publish(const String& topic, const String& data) { + if (mqtt.beginPublish(topic.c_str(), data.length(), false)) { + mqtt.print(data); + return mqtt.endPublish(); + } + return false; +} + +boolean publishData(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic; + if (!publish(path, data)) { + SerialPrint("E", F("MQTT"), F("on publish data")); + return false; + } + return true; +} + +boolean publishChart(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic + "/status"; + if (!publish(path, data)) { + SerialPrint("E", F("MQTT"), F("on publish chart")); + return false; + } + return true; +} + +boolean publishControl(String id, String topic, String state) { + String path = mqttPrefix + "/" + id + "/" + topic + "/control"; + return mqtt.publish(path.c_str(), state.c_str(), false); +} + +boolean publishChart_test(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic + "/status"; + return mqtt.publish(path.c_str(), data.c_str(), false); +} + +boolean publishStatus(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic + "/status"; + String json = "{}"; + jsonWriteStr(json, "status", data); + return mqtt.publish(path.c_str(), json.c_str(), false); +} + +boolean publishAnyJsonKey(const String& topic, const String& key, const String& data) { + String path = mqttRootDevice + "/" + topic + "/status"; + String json = "{}"; + jsonWriteStr(json, key, data); + return mqtt.publish(path.c_str(), json.c_str(), false); +} + +boolean publishEvent(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic + "/event"; + return mqtt.publish(path.c_str(), data.c_str(), false); +} + +boolean publishInfo(const String& topic, const String& data) { + String path = mqttRootDevice + "/" + topic + "/info"; + return mqtt.publish(path.c_str(), data.c_str(), false); +} + +#ifdef LAYOUT_IN_RAM +void publishWidgets() { + if (all_widgets != "") { + int counter = 0; + String line; + int psn_1 = 0; + int psn_2; + do { + psn_2 = all_widgets.indexOf("\r\n", psn_1); //\r\n + line = all_widgets.substring(psn_1, psn_2); + line.replace("\n", ""); + line.replace("\r\n", ""); + // jsonWriteStr(line, "id", String(counter)); + // jsonWriteStr(line, "pageId", String(counter)); + counter++; + sendMQTT("config", line); + Serial.println("[V] " + line); + psn_1 = psn_2 + 1; + } while (psn_2 + 2 < all_widgets.length()); + getMemoryLoad("I after send all widgets"); + } +} +#endif + +#ifndef LAYOUT_IN_RAM +void publishWidgets() { + auto file = seekFile("layout.txt"); + if (!file) { + SerialPrint("E", F("MQTT"), F("no file layout.txt")); + return; + } + while (file.available()) { + String payload = file.readStringUntil('\n'); + SerialPrint("I", F("MQTT"), "widgets: " + payload); + publishData("config", payload); + } + file.close(); +} +#endif + +void publishState() { + //// берет строку json и ключи превращает в топики а значения колючей в них посылает + // String str; + // if (paramsHeapJson != "{}") { + // str += paramsHeapJson; + // } + // if (paramsFlashJson != "{}") { + // str += "," + paramsFlashJson; + // } + // str.replace("{", ""); + // str.replace("}", ""); + // str.replace("\"", ""); + // str += ","; + // + // while (str.length() != 0) { + // String tmp = selectToMarker(str, ","); + // + // String topic = selectToMarker(tmp, ":"); + // String state = deleteBeforeDelimiter(tmp, ":"); + // + // if (topic != "" && state != "") { + // if (topic != "timenow") { + // publishStatus(topic, state); + // } + // } + // str = deleteBeforeDelimiter(str, ","); + //} +} + +const String getStateStr() { + switch (mqtt.state()) { + case -4: + return F("no respond"); + break; + case -3: + return F("connection was broken"); + break; + case -2: + return F("connection failed"); + break; + case -1: + return F("client disconnected"); + break; + case 0: + return F("client connected"); + break; + case 1: + return F("doesn't support the requested version"); + break; + case 2: + return F("rejected the client identifier"); + break; + case 3: + return F("unable to accept the connection"); + break; + case 4: + return F("wrong username/password"); + break; + case 5: + return F("not authorized to connect"); + break; + default: + return F("unspecified"); + break; + } +} diff --git a/src/StandWebServer.cpp b/src/StandWebServer.cpp index b214f51e..fd07158b 100644 --- a/src/StandWebServer.cpp +++ b/src/StandWebServer.cpp @@ -215,7 +215,8 @@ void webSocketEvent(uint8_t num, WStype_t type, uint8_t* payload, size_t length) if (payloadStr.startsWith("/config")) { //если прилетел url страницы /config то отправим пакеты с меткой /config.json if (mySendJson) mySendJson->sendFile("/config.json", num); } - if (payloadStr.startsWith("/gifnoc.json")) { //если прилетел измененный пакет с меткой /gifnoc то перепишем файл + + if (payloadStr.startsWith("/gifnoc.json")) { //если прилетел измененный пакет с меткой /gifnoc (config наоборот) то перепишем файл payloadStr.replace("/gifnoc.json", ""); writeFile(F("config.json"), payloadStr); } diff --git a/src/classes/NotAsync.cpp b/src/classes/NotAsync.cpp new file mode 100644 index 00000000..b0c9fe4e --- /dev/null +++ b/src/classes/NotAsync.cpp @@ -0,0 +1,30 @@ +#include "classes/NotAsync.h" + +NotAsync::NotAsync(uint8_t size) { + this->items = new NotAsyncItem[size]; + this->size = size; +} + +NotAsync::~NotAsync() {} + +void NotAsync::add(uint8_t i, NotAsyncCb f, void* arg) { + this->items[i].cb = f; + this->items[i].cb_arg = arg; + this->items[i].is_used = true; +} + +void NotAsync::loop() { + if (this->items[task].is_used) { + handle(this->items[task].cb, this->items[task].cb_arg); + task = 0; + } +} + +void NotAsync::make(uint8_t task) { + this->task = task; +} + +void NotAsync::handle(NotAsyncCb f, void* arg) { + f(arg); +} +NotAsync* myNotAsyncActions; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index da945782..6c533114 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,6 +6,9 @@ void setup() { Serial.println(); Serial.println(F("--------------started----------------")); + //создание экземпляров классов + myNotAsyncActions = new NotAsync(do_LAST); + //инициализация файловой системы fileSystemInit();