调用Mosquitto实现mqtt物联网通信

By admin at 2021-09-06 • 1人收藏 • 598人看过

感谢: indertust 分享代码


MQTT工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型(publish/subscribe)消息协议.

此处使用的开源组件官方: https://mosquitto.org

目前代码还有几个问题未解决, 先分享出来供大家参考.


Mosquitto.aardio库代码如下:

//Mosquitto

/*
参考:
https://mosquitto.org/man/mosquitto-8.html
https://mosquitto.org/man/libmosquitto-3.html
*/
class Mosquitto{
	ctor(tParam){
		table.mixin(this, tParam)
		
		if(type(this.clean_session) != type.boolean){
			this.clean_session = true
		}
		
		if(type(this.keepalive) != type.number){
			this.keepalive = 60
		}

		if(this.host == null){
			this.host = '127.0.0.1'
		}
		
		if(type(this.port) != type.number){
			this.port = 1883
		}
		
		if(this.subTopic == null){
			this.subTopic = '$SYS/broker/messages/received'
		}
		
		this.listener = thread.command(this.winform)
		
		this.listener.setInfo = function(info){
			this.info = info
		}
		
		this.listener.onMessage = function(topic, message){
			if(type(this.onMessage) == type.function){
				this.onMessage(topic, message)
			}
		}
		
		this.listener.onConnect = function(result){
			if(type(this.onConnect) == type.function){
				this.onConnect(result)
			}
		}

		this.listener.onSubscribe = function(mid){
			if(type(this.onSubscribe) == type.function){
				this.onSubscribe(mid)
			}
		}
		
		this.listener.onPublish = function(mid){
			if(type(this.onPublish) == type.function){
				this.onPublish(mid)
			}
		}
		
		this.listener.onLog = function(str){
			if(type(this.onLog) == type.function){
				this.onLog(str)
			}
		}
		
		this.threadHandle = thread.create(
			function(host, port, subTopic, keepalive, clean_session, id){
				import thread.command
				import Mosquitto
				import console
				
				var listener = thread.command()
				
				var mm = Mosquitto.mosquitto_message()
				
				if(id){
					id = string.random(12)
				}
				
				my_message_callback = function(mosq, userdata, message){
					var msg = raw.convert(message,mm)
					
					if(msg.payloadlen > 0){
    					thread.command.onMessage(msg.topic, msg.payload)
    				}
				}
				var my_message_callback_cdecl = raw.tocdecl(my_message_callback, "void(pointer, pointer, pointer)" )

				my_connect_callback = function(mosq, userdata, result){
					if(!result){
						thread.command.setInfo('连接成功')
						thread.command.onConnect(result)
						if(subTopic){
							var ret, mid = Mosquitto.subscribe(mosq, 0, subTopic, 2);
							var qos = 0
							var retain = false
							
							var message = 'I am here, ' ++ string.random(10)
							
							Mosquitto.publish(mosq, mid, subTopic, #message, message, qos, retain)
						}

					}else{
						thread.command.setInfo('连接失败')
					}
				}
				var my_connect_callback_cdecl = raw.tocdecl(my_connect_callback, "void(pointer, pointer, int)" )
				
				my_subscribe_callback = function(mosq, userdata, mid, qos_count, granted_qos){
					thread.command.onSubscribe(mid)
				}
				var my_subscribe_callback_cdecl = raw.tocdecl(my_subscribe_callback, "void(pointer, pointer, int, int, int &)" )
				
				my_log_callback = function(mosq, userdata, level, str){
					thread.command.onLog(str)
				}
				var my_log_callback_cdecl = raw.tocdecl(my_log_callback, "void(pointer, pointer, int, string)" )
				
				
				//publish_callback_set= dll.api("mosquitto_publish_callback_set","void(pointer mosq, void (&on_publish)(struct &, pointer , int))");

				my_publish_callback = function(mosq, obj, mid){
					thread.command.onPublish(mid)
				}
				var my_publish_callback_cdecl = raw.tocdecl(my_publish_callback, "void(pointer, pointer, int)" )
				
				Mosquitto.init()
				
				var mosq, id = Mosquitto.new(id, clean_session, null)
				
				if(mosq){
					Mosquitto.log_callback_set(mosq, my_log_callback_cdecl);
					Mosquitto.connect_callback_set(mosq, my_connect_callback_cdecl);
					Mosquitto.message_callback_set(mosq, my_message_callback_cdecl);
					Mosquitto.subscribe_callback_set(mosq, my_subscribe_callback_cdecl);
					Mosquitto.publish_callback_set(mosq, my_publish_callback_cdecl);
				}else{
					thread.command.setInfo('内存不足')
				}
				
				if(Mosquitto.connect(mosq, host, port, keepalive)){
					thread.command.setInfo('连接失败')
				}else {
					thread.command.setInfo('连接成功')

					listener.pub = function(topic, mid, message, qos, retain){
						Mosquitto.publish(mosq, mid, topic, #message, message, qos, retain)
					}

					Mosquitto.loop_forever(mosq, -1, 1);
				
					Mosquitto.destroy(mosq);
					Mosquitto.lib_cleanup();
				}
				
			},this.host, this.port, this.subTopic, this.keepalive, this.clean_session, this.id
			
		)
		
	};
	

	sub = function(topic){
		//暂时不用,阻塞
	};
	
	pub = function(topic, mid, message, qos, retain){
		//暂时不用,阻塞
		//thread.command.pub(topic, mid, message, qos, retain)
	};

	
	close = function(){
		if(this.threadHandle){
			raw.closehandle(this.threadHandle)
		}
	}
	
}

namespace Mosquitto{
    table = ..table;
    string = ..string;
    io = ..io;
    raw = ..raw;
    import thread;
    import thread.command;
    
    dllPath = "/mosquitto.dll"

	MOSQ_ERR_CONN_PENDING = -1;
	MOSQ_ERR_SUCCESS = 0;
	MOSQ_ERR_NOMEM = 1;
	MOSQ_ERR_PROTOCOL = 2;
	MOSQ_ERR_INVAL = 3;
	MOSQ_ERR_NO_CONN = 4;
	MOSQ_ERR_CONN_REFUSED = 5;
	MOSQ_ERR_NOT_FOUND = 6;
	MOSQ_ERR_CONN_LOST = 7;
	MOSQ_ERR_TLS = 8;
	MOSQ_ERR_PAYLOAD_SIZE = 9;
	MOSQ_ERR_NOT_SUPPORTED = 10;
	MOSQ_ERR_AUTH = 11;
	MOSQ_ERR_ACL_DENIED = 12;
	MOSQ_ERR_UNKNOWN = 13;
	MOSQ_ERR_ERRNO = 14;
	MOSQ_ERR_EAI = 15;
	MOSQ_ERR_PROXY = 16
	
	class mosquitto_message {
		int mid;
		string topic;
		string payload;
		int payloadlen;
		int qos;
		bool retain;
	};
    
    if(io.exist(dllPath)){
		dll = raw.loadDll(dllPath,,"cdecl")

		if(dll){
			version= dll.api("mosquitto_lib_version","int(int &major, int &minor, int &revision)");
			
			getVersion = function(){
				var major, minor, revision = 0,0,0
				var ret, major, minor, revision = version(major, minor, revision)
				return string.format("%d.%d.%d", major, minor, revision), ret
			}

			init= dll.api("mosquitto_lib_init","int(void)");
			
			cleanup= dll.api("mosquitto_lib_cleanup","int(void)");
			
			new = dll.api("mosquitto_new","struct (string id, bool clean_session, pointer obj)");
			
			destroy= dll.api("mosquitto_destroy","void(pointer mosq)");
			
			username_pw_set= dll.api("mosquitto_username_pw_set","int(pointer mosq, string username, string password)");
			
			connect= dll.api("mosquitto_connect","int(pointer mosq, string host, int port, int keepalive)");
			
			disconnect= dll.api("mosquitto_disconnect","int(pointer mosq)");
			
			publish= dll.api("mosquitto_publish","int(pointer mosq, int &mid, string topic, int payloadlen, string payload, int qos, bool retain)");
			
			subscribe= dll.api("mosquitto_subscribe","int(pointer mosq, int &mid, string sub, int qos)");
			
			unsubscribe= dll.api("mosquitto_unsubscribe","int(pointer mosq, int &mid, string sub)");
			
			message_copy= dll.api("mosquitto_message_copy","int(struct_message &dst,  struct_message &src)");
			
			message_free= dll.api("mosquitto_message_free","void(struct_message &&message)");
			
			loop= dll.api("mosquitto_loop","int(pointer mosq, int timeout, int max_packets)");
			
			loop_forever= dll.api("mosquitto_loop_forever","int(pointer mosq, int timeout, int max_packets)");
			
			loop_start= dll.api("mosquitto_loop_start","int(pointer mosq)");
			
			loop_stop= dll.api("mosquitto_loop_stop","int(pointer mosq, bool force)");
	
			connect_callback_set= dll.api("mosquitto_connect_callback_set","void(pointer mosq, pointer callback)");
			
			disconnect_callback_set= dll.api("mosquitto_disconnect_callback_set","void(pointer mosq, pointer callback)");
			
			publish_callback_set= dll.api("mosquitto_publish_callback_set","void(pointer mosq, pointer callback)");

			message_callback_set= dll.api("mosquitto_message_callback_set","void(pointer mosq, pointer callback)");

			subscribe_callback_set= dll.api("mosquitto_subscribe_callback_set","void(pointer mosq, pointer callback)");

			unsubscribe_callback_set= dll.api("mosquitto_unsubscribe_callback_set","void(pointer mosq, pointer callback)");
			
			log_callback_set= dll.api("mosquitto_log_callback_set","void(pointer mosq, pointer callback)");

			strerror= dll.api("mosquitto_strerror","string(int mosq_errno)");	
		}
	}
}


/**intellisense()
Mosquitto = Mosquitto
Mosquitto(.(tParam) = 创建 Mosquitto
Mosquitto() = !Mosquitto.
end intellisense**/

/**intellisense(!Mosquitto)
end intellisense**/


mainform.aardio调用代码如下:

import win.ui;
/*DSG{{*/
mainForm = win.form(text="MosquittoTest";right=672;bottom=450)
mainForm.add(
buttonPublish={cls="button";text="Publish";left=564;top=103;right=659;bottom=137;z=6};
buttonStart={cls="button";text="Start";left=564;top=21;right=659;bottom=55;z=1};
buttonStop={cls="button";text="Stop";left=564;top=62;right=659;bottom=96;z=4};
buttonTest={cls="button";text="说明";left=564;top=144;right=659;bottom=178;z=2};
editLog={cls="edit";left=9;top=178;right=548;bottom=441;autohscroll=false;edge=1;multiline=1;vscroll=1;z=3};
editParam={cls="edit";left=9;top=8;right=548;bottom=169;autohscroll=false;edge=1;multiline=1;vscroll=1;z=5}
)
/*}}*/

/*
import console; 
console.open()

execute("mode con cols=100 lines=500")
execute("color 0A")

var oh = console.getOutPutHandle()
var ih = console.getInputHandle()
console.modifyMode(ih,0,0x40)

p = function(...){
	console.print(time(), ...)	
}

d = console.dump
dj = console.dumpJson
vd = console.varDump
*/

import Mosquitto
var mosquitto

onMessage = function(topic, message){
	mainForm.editLog.print('收到消息', topic, message)
}

onConnect = function(result){
	mainForm.editLog.print('连接', result)
}

onSubscribe = function(mid){
	mainForm.editLog.print('订阅 mid=', mid)
}

onPublish= function(mid){
	mainForm.editLog.print('发布 mid=', mid)
}

onLog=onLog = function(str){
	mainForm.editLog.print('日志', str)
}

mainForm.buttonStart.oncommand = function(id,event){
	mainForm.editLog.print('version', Mosquitto.getVersion())
    var sparam = mainForm.editParam.text
    var param = eval(sparam)
	mosquitto = Mosquitto(param)
}

mainForm.buttonTest.oncommand = function(id,event){
	var msg = /*
0、下载安装 https://mosquitto.org/download/ 
1、先启动服务:mosquitto.exe -v -p 61613
2、运行本程序,点 "Start" 按钮
3、可用下面的命令发布消息测试
	mosquitto_pub.exe -h 127.0.0.1 -p 61613 -t $SYS/broker/messages/received -m message_you_wanna_send
4、主要为收消息而做,可能 Mosquitto.loop_forever 阻塞了,无法发消息
*/
	mainForm.editLog.print(msg)
}

mainForm.onClose = function(hwnd,message,wParam,lParam){
    if(mosquitto){
        mosquitto.close()
        mosquitto = null
    }
}

mainForm.buttonStop.oncommand = function(id,event){
    if(mosquitto){
        mosquitto.close()
        mosquitto = null
    }
}

var strPublish = /*
{
    topic = '$SYS/broker/messages/received';
    mid = 0;
    message = string.random(20);
    qos = 0;
    retain = false;
}
*/

var strConnect = /*
{
	ip = '127.0.0.1';
	port = 61613;
	onMessage = onMessage;
	onConnect = onConnect;
	onSubscribe = onSubscribe;
	onPublish = onPublish;
	onLog = onLog;
	winform = mainForm;
}
*/

mainForm.editParam.text = strConnect

mainForm.buttonPublish.oncommand = function(id,event){
/*
    if(mosquitto){
        var sparam = mainForm.editParam.text
        var param = eval(sparam)
        mosquitto.pub(param.topic, param.mid, param.message, param.qos, param.retain)
    }
*/
}

mainForm.editParam.oncommand = function(id,event){
	
}

mainForm.show();
return win.loopMessage();


工程下载:

MosquittoTest 2021-9-5 1413.rar


登录后方可回帖

登 录
信息栏
公 告:

专注分享

谢绝纯提问

谢谢合作!
本站域名:HtmLayout.Cn
aardio可以快速开发上位机,本站主要记录了学习过程中遇到的问题和解决办法及aardio代码分享

这里主要专注于aardio学习交流和经验分享.
纯私人站,当笔记本用的,学到哪写到哪.

Aardio 官方站:Aardio官方
Aardio最新功能:Aardio官方更新日志
本 站 主 站:Stm32cube中文网
Sciter中文在线文档Sciter在线学习文档
空间赞助:才仁机械
打赏本站
Loading...