This commit is contained in:
Dmitry Borisenko
2022-02-11 00:22:59 +01:00
parent 02a087ccdd
commit 8d86bb222a
8 changed files with 75 additions and 134 deletions

View File

@@ -105,10 +105,6 @@ extern String prex;
extern String all_widgets; extern String all_widgets;
extern String scenario; extern String scenario;
extern int mqttConnectAttempts;
extern bool changeBroker;
extern int currentBroker;
// extern DynamicJsonDocument settingsFlashJsonDoc; // extern DynamicJsonDocument settingsFlashJsonDoc;
// extern DynamicJsonDocument paramsFlashJsonDoc; // extern DynamicJsonDocument paramsFlashJsonDoc;
// extern DynamicJsonDocument paramsHeapJsonDoc; // extern DynamicJsonDocument paramsHeapJsonDoc;

View File

@@ -6,7 +6,7 @@
void mqttInit(); void mqttInit();
void selectBroker(); void selectBroker();
void getMqttData1(); void getMqttData();
void getMqttData2(); void getMqttData2();
bool isSecondBrokerSet(); bool isSecondBrokerSet();
boolean mqttConnect(); boolean mqttConnect();

View File

@@ -1,6 +1,7 @@
#pragma once #pragma once
#include "Global.h" #include "Global.h"
#include "MqttClient.h"
boolean isNetworkActive(); boolean isNetworkActive();
void routerConnect(); void routerConnect();

View File

@@ -58,10 +58,6 @@ String all_widgets = "";
String scenario = ""; String scenario = "";
String mqttRootDevice = ""; String mqttRootDevice = "";
int mqttConnectAttempts = 0;
bool changeBroker = false;
int currentBroker = 1;
// DynamicJsonDocument settingsFlashJsonDoc(JSON_BUFFER_SIZE); // DynamicJsonDocument settingsFlashJsonDoc(JSON_BUFFER_SIZE);
// DynamicJsonDocument paramsFlashJsonDoc(JSON_BUFFER_SIZE); // DynamicJsonDocument paramsFlashJsonDoc(JSON_BUFFER_SIZE);
// DynamicJsonDocument paramsHeapJsonDoc(JSON_BUFFER_SIZE); // DynamicJsonDocument paramsHeapJsonDoc(JSON_BUFFER_SIZE);

View File

@@ -1,24 +1,19 @@
#include "MqttClient.h" #include "MqttClient.h"
void mqttInit() { void mqttInit() {
myNotAsyncActions->add(
do_MQTTPARAMSCHANGED, [&](void*) {
mqttReconnect();
},
nullptr);
mqtt.setCallback(mqttCallback); mqtt.setCallback(mqttCallback);
ts.add( ts.add(
WIFI_MQTT_CONNECTION_CHECK, MQTT_RECONNECT_INTERVAL, WIFI_MQTT_CONNECTION_CHECK, MQTT_RECONNECT_INTERVAL,
[&](void*) { [&](void*) {
if (WiFi.status() == WL_CONNECTED) { if (WiFi.status() == WL_CONNECTED) {
SerialPrint("I", F("WIFI"), F("OK")); SerialPrint("I", F("WIFI"), F("OK"));
if (mqtt.connected()) { if (mqtt.connected()) {
SerialPrint("I", F("MQTT"), "OK, broker No " + String(currentBroker)); SerialPrint("I", F("MQTT"), "OK");
jsonWriteInt_(errorsHeapJson, F("mqtt"), mqtt.state());
// setLedStatus(LED_OFF); // setLedStatus(LED_OFF);
} else { } else {
SerialPrint("E", F("MQTT"), F("✖ Connection lost")); SerialPrint("E", F("MQTT"), F("✖ Connection lost"));
jsonWriteInt_(errorsHeapJson, F("mqtt"), mqtt.state());
mqttConnect(); mqttConnect();
} }
} else { } else {
@@ -28,13 +23,59 @@ void mqttInit() {
} }
}, },
nullptr, true); nullptr, true);
}
// myNotAsyncActions->add( void mqttLoop() {
// do_sendScenMQTT, [&](void*) { if (!isNetworkActive() || !mqtt.connected()) {
// String scen = readFile(String(DEVICE_SCENARIO_FILE), 2048); return;
// publishInfo("scen", scen); }
// }, mqtt.loop();
// nullptr); }
boolean mqttConnect() {
getMqttData();
bool res = false;
if (mqttServer == "") {
SerialPrint("E", "MQTT", F("mqttServer empty"));
jsonWriteInt_(errorsHeapJson, F("mqtt"), 6);
return res;
}
SerialPrint("I", "MQTT", "connection started");
SerialPrint("I", "MQTT", "broker " + mqttServer + ":" + String(mqttPort, DEC));
SerialPrint("I", "MQTT", "topic " + mqttRootDevice);
// setLedStatus(LED_FAST);
mqtt.setServer(mqttServer.c_str(), mqttPort);
if (!mqtt.connected()) {
bool connected = false;
if (mqttUser != "" && mqttPass != "") {
connected = mqtt.connect(chipId.c_str(), mqttUser.c_str(), mqttPass.c_str());
SerialPrint("I", F("MQTT"), F("Go to connection with login and password"));
} else if (mqttUser == "" && mqttPass == "") {
connected = mqtt.connect(chipId.c_str());
SerialPrint("I", F("MQTT"), F("Go to connection without login and password"));
} else {
SerialPrint("E", F("MQTT"), F("✖ Login or password missed"));
jsonWriteInt_(errorsHeapJson, F("mqtt"), 7);
return res;
}
if (connected) {
SerialPrint("I", F("MQTT"), F("✔ connected"));
jsonWriteInt_(errorsHeapJson, F("mqtt"), mqtt.state());
// setLedStatus(LED_OFF);
mqttSubscribe();
res = true;
} else {
SerialPrint("E", F("MQTT"), "🡆 Could't connect, retry in " + String(MQTT_RECONNECT_INTERVAL / 1000) + "s");
jsonWriteInt_(errorsHeapJson, F("mqtt"), mqtt.state());
// setLedStatus(LED_FAST);
}
}
return res;
} }
void mqttDisconnect() { void mqttDisconnect() {
@@ -47,16 +88,16 @@ void mqttReconnect() {
mqttConnect(); mqttConnect();
} }
void mqttLoop() { void getMqttData() {
if (!isNetworkActive() || !mqtt.connected()) { mqttServer = jsonReadStr(settingsFlashJson, F("mqttServer"));
return; mqttPort = jsonReadInt(settingsFlashJson, F("mqttPort"));
} mqttUser = jsonReadStr(settingsFlashJson, F("mqttUser"));
mqtt.loop(); mqttPass = jsonReadStr(settingsFlashJson, F("mqttPass"));
} }
void mqttSubscribe() { void mqttSubscribe() {
SerialPrint("I", F("MQTT"), F("subscribed")); SerialPrint("I", F("MQTT"), F("subscribed"));
SerialPrint("I", "MQTT", mqttRootDevice); SerialPrint("I", F("MQTT"), mqttRootDevice);
mqtt.subscribe(mqttPrefix.c_str()); mqtt.subscribe(mqttPrefix.c_str());
mqtt.subscribe((mqttRootDevice + "/+/control").c_str()); mqtt.subscribe((mqttRootDevice + "/+/control").c_str());
mqtt.subscribe((mqttRootDevice + "/update").c_str()); mqtt.subscribe((mqttRootDevice + "/update").c_str());
@@ -68,105 +109,6 @@ void mqttSubscribe() {
} }
} }
void selectBroker() {
if (changeBroker) {
changeBroker = false;
if (currentBroker == 1) {
getMqttData2();
} else if (currentBroker == 2) {
getMqttData1();
}
} else {
if (currentBroker == 1) {
getMqttData1();
} else if (currentBroker == 2) {
getMqttData2();
}
}
}
void getMqttData1() {
currentBroker = 1;
mqttServer = jsonReadStr(settingsFlashJson, F("mqttServer"));
mqttPort = jsonReadInt(settingsFlashJson, F("mqttPort"));
mqttUser = jsonReadStr(settingsFlashJson, F("mqttUser"));
mqttPass = jsonReadStr(settingsFlashJson, F("mqttPass"));
// prex = mqttPrefix + "/" + chipId;
}
void getMqttData2() {
currentBroker = 2;
mqttServer = jsonReadStr(settingsFlashJson, F("mqttServer2"));
mqttPort = jsonReadInt(settingsFlashJson, F("mqttPort2"));
mqttUser = jsonReadStr(settingsFlashJson, F("mqttUser2"));
mqttPass = jsonReadStr(settingsFlashJson, F("mqttPass2"));
// prex = mqttPrefix + "/" + chipId;
}
bool isSecondBrokerSet() {
bool res = true;
if (jsonReadStr(settingsFlashJson, F("mqttServer2")) == "") {
res = false;
}
if (jsonReadStr(settingsFlashJson, F("mqttPrefix2")) == "") {
res = false;
}
return res;
}
boolean mqttConnect() {
selectBroker();
bool res = false;
if (mqttServer == "") {
SerialPrint("E", "MQTT", F("mqttServer empty"));
return res;
}
SerialPrint("I", "MQTT", "connection started to broker No " + String(currentBroker));
SerialPrint("I", "MQTT", "broker " + mqttServer + ":" + String(mqttPort, DEC));
SerialPrint("I", "MQTT", "topic " + mqttRootDevice);
// setLedStatus(LED_FAST);
mqtt.setServer(mqttServer.c_str(), mqttPort);
if (!mqtt.connected()) {
bool connected = false;
if (mqttUser != "" && mqttPass != "") {
connected = mqtt.connect(chipId.c_str(), mqttUser.c_str(), mqttPass.c_str());
SerialPrint("I", F("MQTT"), F("Go to connection with login and password"));
} else if (mqttUser == "" && mqttPass == "") {
connected = mqtt.connect(chipId.c_str());
SerialPrint("I", F("MQTT"), F("Go to connection without login and password"));
} else {
SerialPrint("E", F("MQTT"), F("✖ Login or password missing"));
return res;
}
if (connected) {
SerialPrint("I", F("MQTT"), F("✔ connected"));
// if (currentBroker == 1) jsonWriteStr(settingsFlashJson, F("warning4"), F("<div style='margin-top:10px;margin-bottom:10px;'><font color='black'><p style='border: 1px solid #DCDCDC; border-radius: 3px; background-color: #8ef584; padding: 10px;'>Подключено к основному брокеру</p></font></div>"));
// if (currentBroker == 2) jsonWriteStr(settingsFlashJson, F("warning4"), F("<div style='margin-top:10px;margin-bottom:10px;'><font color='black'><p style='border: 1px solid #DCDCDC; border-radius: 3px; background-color: #8ef584; padding: 10px;'>Подключено к резервному брокеру</p></font></div>"));
// setLedStatus(LED_OFF);
mqttSubscribe();
res = true;
} else {
mqttConnectAttempts++;
SerialPrint("E", F("MQTT"), "🡆 Attempt No: " + String(mqttConnectAttempts) + " could't connect, retry in " + String(MQTT_RECONNECT_INTERVAL / 1000) + "s");
// setLedStatus(LED_FAST);
// jsonWriteStr(settingsFlashJson, F("warning4"), F("<div style='margin-top:10px;margin-bottom:10px;'><font color='black'><p style='border: 1px solid #DCDCDC; border-radius: 3px; background-color: #fa987a; padding: 10px;'>Не подключено брокеру</p></font></div>"));
if (mqttConnectAttempts >= CHANGE_BROKER_AFTER) {
mqttConnectAttempts = 0;
if (isSecondBrokerSet()) {
changeBroker = true;
SerialPrint("E", F("MQTT"), "✖ Broker fully missed (" + String(CHANGE_BROKER_AFTER) + " attempts passed), try connect to another one");
} else {
SerialPrint("E", F("MQTT"), F("Secound broker not seted"));
}
}
}
}
return res;
}
void mqttCallback(char* topic, uint8_t* payload, size_t length) { void mqttCallback(char* topic, uint8_t* payload, size_t length) {
String topicStr = String(topic); String topicStr = String(topic);
// SerialPrint("I", "=>MQTT", topicStr); // SerialPrint("I", "=>MQTT", topicStr);
@@ -404,3 +346,6 @@ const String getStateStr() {
break; break;
} }
} }
// 6 сервер не задан
// 7 Логин или пароль отсутствует

View File

@@ -22,18 +22,18 @@ void periodicTasksInit() {
void printGlobalVarSize() { void printGlobalVarSize() {
size_t settingsFlashJsonSize = settingsFlashJson.length(); size_t settingsFlashJsonSize = settingsFlashJson.length();
SerialPrint(F("i"), F("settingsFlashJson"), String(settingsFlashJsonSize)); // SerialPrint(F("i"), F("settingsFlashJson"), String(settingsFlashJsonSize));
size_t errorsHeapJsonSize = errorsHeapJson.length(); size_t errorsHeapJsonSize = errorsHeapJson.length();
SerialPrint(F("i"), F("settingsFlashJson"), String(errorsHeapJsonSize)); // SerialPrint(F("i"), F("settingsFlashJson"), String(errorsHeapJsonSize));
size_t paramsFlashJsonSize = paramsFlashJson.length(); size_t paramsFlashJsonSize = paramsFlashJson.length();
SerialPrint(F("i"), F("settingsFlashJson"), String(paramsFlashJsonSize)); // SerialPrint(F("i"), F("settingsFlashJson"), String(paramsFlashJsonSize));
size_t paramsHeapJsonSize = paramsHeapJson.length(); size_t paramsHeapJsonSize = paramsHeapJson.length();
SerialPrint(F("i"), F("settingsFlashJson"), String(paramsHeapJsonSize)); // SerialPrint(F("i"), F("settingsFlashJson"), String(paramsHeapJsonSize));
size_t halfBuffer = JSON_BUFFER_SIZE / 2; size_t halfBuffer = JSON_BUFFER_SIZE / 2;
if (settingsFlashJsonSize > halfBuffer || errorsHeapJsonSize > halfBuffer || paramsFlashJsonSize > halfBuffer || paramsHeapJsonSize > halfBuffer) { if (settingsFlashJsonSize > halfBuffer || errorsHeapJsonSize > halfBuffer || paramsFlashJsonSize > halfBuffer || paramsHeapJsonSize > halfBuffer) {
SerialPrint(F("EE"), F("Json"), "Insufficient buffer size!!!"); SerialPrint(F("EE"), F("Json"), F("Insufficient buffer size!!!"));
jsonWriteInt(errorsHeapJson, "jsbuf", 1); jsonWriteInt(errorsHeapJson, "jsbuf", 1);
} }
} }

View File

@@ -106,6 +106,9 @@ void webSocketEvent(uint8_t num, WStype_t type, uint8_t* payload, size_t length)
if (headerStr == "/reboot|") { if (headerStr == "/reboot|") {
ESP.restart(); ESP.restart();
} }
if (headerStr == "/mqtt|") {
mqttReconnect();
}
} break; } break;

View File

@@ -41,7 +41,7 @@ void routerConnect() {
SerialPrint("i", "WIFI", "http://" + WiFi.localIP().toString()); SerialPrint("i", "WIFI", "http://" + WiFi.localIP().toString());
jsonWriteStr(settingsFlashJson, "ip", WiFi.localIP().toString()); jsonWriteStr(settingsFlashJson, "ip", WiFi.localIP().toString());
// mqttInit(); mqttInit();
} }
SerialPrint("i", F("WIFI"), F("Network Init")); SerialPrint("i", F("WIFI"), F("Network Init"));
} }