做了一个开机卡,计划实现的功能
用于远程开启家里的台式机(后续再配合远程软件控制)
其实这种产品市面上已经有了,但是还是想自己做一个。
三种方式唤醒
- 模拟 PW_KEY,触发开机(这种方式最简单直接,也能实现关机和强制关机)
- PCIE唤醒,需要BIOS开启PCI唤醒,通过WAKE#引脚唤醒。
- WOL网卡唤醒,同样需要BIOS开启网卡唤醒(同时支持唤醒局域网其他PC)
这是部分MQTT Server的代码
用的Vert.x实现的一个MQTT Server
package cn.roastchicken.mqttserver;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.ThreadingModel;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
public class MainVerticle extends AbstractVerticle {
private final Map<String, Set<Client>> topicSubscribers = new ConcurrentHashMap<>();
private final Map<String, Long> clientTimers = new HashMap<>();
private final Map<String, MqttEndpoint> clientEndpoints = new ConcurrentHashMap<>();
private static class Client {
private final MqttEndpoint endpoint;
public Client(MqttEndpoint endpoint) {
this.endpoint = endpoint;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Client client = (Client) o;
return Objects.equals(endpoint.clientIdentifier(), client.endpoint.clientIdentifier());
}
@Override
public int hashCode() {
return Objects.hashCode(endpoint.clientIdentifier());
}
@Override
public String toString() {
return "Client{" +
"endpoint=" + endpoint + ",\t" +
"clientIdentifier=" + endpoint.clientIdentifier() +
'}';
}
}
@Override
public void start(Promise<Void> startPromise) throws Exception {
vertx.deployVerticle(new MainVerticle.MessagePublisherVerticle(), new DeploymentOptions()
.setThreadingModel(ThreadingModel.WORKER)
.setWorkerPoolSize(Runtime.getRuntime().availableProcessors() * 2)
);
MqttServerOptions options = new MqttServerOptions();
MqttServer mqttServer = MqttServer.create(vertx, options);
mqttServer.endpointHandler(endpoint -> {
endpoint.autoKeepAlive(true);
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] request to connect, clean session = " + endpoint.isCleanSession());
// 输出客户端信息
logClientInfo(endpoint);
String clientId = endpoint.clientIdentifier();
clientEndpoints.put(clientId, endpoint); // 保存客户端ID和MqttEndpoint映射
int keepAliveTimeSeconds = endpoint.keepAliveTimeSeconds();
endpoint.accept(false);
// 设置定时器来检测心跳超时
resetClientTimer(clientId, keepAliveTimeSeconds, endpoint);
// 处理订阅请求
endpoint.subscribeHandler(subscribe -> {
resetClientTimer(clientId, keepAliveTimeSeconds, endpoint);
handleSubscription(endpoint, subscribe);
});
// 处理取消订阅请求
endpoint.unsubscribeHandler(unsubscribe -> {
resetClientTimer(clientId, keepAliveTimeSeconds, endpoint);
handleUnsubscription(endpoint, unsubscribe);
});
// 处理消息发布
endpoint.publishHandler(message -> {
resetClientTimer(clientId, keepAliveTimeSeconds, endpoint);
handlePublish(endpoint, message);
});
// 处理客户端断开连接
endpoint.disconnectHandler(v -> {
System.out.println("MQTT client [" + endpoint.clientIdentifier() + "] disconnected");
cleanupClient(endpoint);
});
// 处理PING请求
endpoint.pingHandler(v -> resetClientTimer(clientId, keepAliveTimeSeconds, endpoint));
}).listen(5883).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("MQTT server is listening on port " + ar.result().actualPort());
} else {
System.out.println("Error on starting the server");
ar.cause().printStackTrace();
}
});
}
private void resetClientTimer(String clientId, int keepAliveTimeSeconds, MqttEndpoint endpoint) {
if (clientTimers.containsKey(clientId)) {
vertx.cancelTimer(clientTimers.get(clientId));
}
long timerId = vertx.setTimer((keepAliveTimeSeconds * 1000L * 3) + 30000, id -> {
System.out.println("客户端 " + clientId + " 断开连接,由于心跳超时");
cleanupClient(endpoint);
});
clientTimers.put(clientId, timerId);
}
private void cleanupClient(MqttEndpoint endpoint) {
String clientId = endpoint.clientIdentifier();
try {
if (endpoint.isConnected()) {
endpoint.close();
}
if (clientTimers.containsKey(clientId)) {
vertx.cancelTimer(clientTimers.get(clientId));
clientTimers.remove(clientId);
}
} catch (Exception e) {
e.printStackTrace();
}
topicSubscribers.forEach((topic, subscribers) -> subscribers.remove(new Client(endpoint)));
topicSubscribers.entrySet().removeIf(entry -> entry.getValue().isEmpty());
if (clientTimers.containsKey(clientId)) {
vertx.cancelTimer(clientTimers.get(clientId));
clientTimers.remove(clientId);
}
clientEndpoints.remove(clientId);
}
private static boolean isMatchingTopic(String topicFilter, String topic) {
// 将过滤器和主题拆分为层级数组
String[] filterLevels = topicFilter.split("/");
String[] topicLevels = topic.split("/");
int i = 0;
for (; i < filterLevels.length; i++) {
String filterLevel = filterLevels[i];
// 如果过滤器为多层通配符 '#'
if (filterLevel.equals("#")) {
return true; // '#'匹配所有剩余层级
}
// 如果过滤器为单层通配符 '+'
if (filterLevel.equals("+")) {
continue; // '+'匹配一个任意层级
}
// 过滤器和主题的当前层级不匹配
if (i >= topicLevels.length || !filterLevel.equals(topicLevels[i])) {
return false;
}
}
// 如果过滤器已遍历完且匹配到主题的每个层级
return i == topicLevels.length;
}
private void handleSubscription(MqttEndpoint endpoint, MqttSubscribeMessage subscribe) {
subscribe.topicSubscriptions().forEach(subscription -> {
String topicName = subscription.topicName();
System.out.println(endpoint.clientIdentifier() + " Subscription for " + topicName + " with QoS " + subscription.qualityOfService());
topicSubscribers.computeIfAbsent(topicName, k -> new HashSet<>()).add(new Client(endpoint));
});
endpoint.subscribeAcknowledge(subscribe.messageId(),
subscribe.topicSubscriptions().stream()
.map(MqttTopicSubscription::qualityOfService)
.collect(Collectors.toList()));
}
private void handleUnsubscription(MqttEndpoint endpoint, MqttUnsubscribeMessage unsubscribe) {
unsubscribe.topics().forEach(topic -> {
System.out.println(endpoint.clientIdentifier() + " Unsubscription for " + topic);
Set<Client> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
subscribers.remove(new Client(endpoint));
if (subscribers.isEmpty()) {
topicSubscribers.remove(topic);
}
}
});
endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
}
private void handlePublish(MqttEndpoint endpoint, MqttPublishMessage message) {
String receivedTopic = message.topicName();
System.out.println("Received message on topic " + receivedTopic + ", by:" + endpoint.clientIdentifier());
System.out.println("Payload : " + message.payload().toString(StandardCharsets.UTF_8));
System.out.println("QoS : " + message.qosLevel());
// 使用JsonObject来封装消息数据
try {
JsonObject messageData = new JsonObject()
.put("clientId", endpoint.clientIdentifier())
.put("topic", message.topicName())
.put("payload", message.payload().getBytes()) // 使用 getBytes 获取字节数组
.put("qos", message.qosLevel().value()) // QoS 等级作为整数值
.put("isDup", message.isDup())
.put("isRetain", message.isRetain())
.put("messageId", message.messageId())
;
vertx.eventBus().send("message.publisher", messageData);
} catch (Exception e) {
e.printStackTrace();
}
}
// 新增的处理不同QoS级别的方法
private static void handleQoSLevel(MqttEndpoint endpoint, MqttMessageWrapper message) {
if (message.getQos() == MqttQoS.AT_MOST_ONCE) {
// QoS 0:无需确认
return;
} else if (message.getQos() == MqttQoS.AT_LEAST_ONCE) {
// QoS 1:需要收到PUBACK确认
endpoint.publishAcknowledge(message.getMessageId());
} else if (message.getQos() == MqttQoS.EXACTLY_ONCE) {
// QoS 2:进行四步握手流程
endpoint.publishReceived(message.getMessageId());
endpoint.publishReleaseHandler(endpoint::publishComplete);
}
}
private void logClientInfo(MqttEndpoint endpoint) {
if (endpoint.auth() != null) {
System.out.println("[username = " + endpoint.auth().getUsername() + ", password = " + endpoint.auth().getPassword() + "]");
}
System.out.println("[properties = " + endpoint.connectProperties() + "]");
if (endpoint.will() != null) {
System.out.println("[will topic = " + endpoint.will().getWillTopic() + " msg = " + new String(Optional.ofNullable(endpoint.will().getWillMessageBytes()).orElse(new byte[0]), StandardCharsets.UTF_8) +
" QoS = " + endpoint.will().getWillQos() + " isRetain = " + endpoint.will().isWillRetain() + "]");
}
System.out.println("[keep alive timeout = " + endpoint.keepAliveTimeSeconds() + "]");
}
public class MessagePublisherVerticle extends AbstractVerticle {
@Override
public void start() {
// 监听 EventBus 上的 "message.publisher" 地址
vertx.eventBus().consumer("message.publisher", message -> {
JsonObject data = (JsonObject) message.body();
handleMessage(data);
});
}
private void handleMessage(JsonObject data) {
String clientId = data.getString("clientId");
MqttEndpoint endpoint = clientEndpoints.get(clientId); // 获取 MqttEndpoint
if (endpoint == null) {
System.out.println("无法找到客户端 " + clientId + " 的 MqttEndpoint");
return;
}
// 从 JsonObject 中提取消息数据
String topic = data.getString("topic");
Buffer payload = Buffer.buffer(data.getBinary("payload"));
MqttQoS qos = MqttQoS.valueOf(data.getInteger("qos"));
boolean isDup = data.getBoolean("isDup");
boolean isRetain = data.getBoolean("isRetain");
int messageId = data.getInteger("messageId");
// 遍历订阅者并发送消息
topicSubscribers.forEach((subscribedTopic, subscribers) -> {
if (isMatchingTopic(subscribedTopic, topic)) {
for (Client subscriber : subscribers) {
try {
if (subscriber.endpoint.isConnected()) {
subscriber.endpoint.publish(topic, payload, qos, isDup, isRetain);
} else {
cleanupClient(subscriber.endpoint);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
handleQoSLevel(endpoint, new MqttMessageWrapper(clientId, topic, payload, qos, isDup, isRetain, messageId));
}
}
public static class MqttMessageWrapper {
private final String clientId;
private final String topic;
private final Buffer payload;
private final MqttQoS qos;
private final boolean isDup;
private final boolean isRetain;
private final int messageId;
public MqttMessageWrapper(String clientId, String topic, Buffer payload, MqttQoS qos, boolean isDup, boolean isRetain, int messageId) {
this.clientId = clientId;
this.topic = topic;
this.payload = payload;
this.qos = qos;
this.isDup = isDup;
this.isRetain = isRetain;
this.messageId = messageId;
}
public String getClientId() {
return clientId;
}
public String getTopic() {
return topic;
}
public Buffer getPayload() {
return payload;
}
public MqttQoS getQos() {
return qos;
}
public boolean isDup() {
return isDup;
}
public boolean isRetain() {
return isRetain;
}
public int getMessageId() {
return messageId;
}
}
}
这是网络唤醒(WOL)部分的逻辑代码
-- 魔法包生成函数
function create_magic_packet(mac)
-- 将MAC地址格式化成 6个字节的二进制数据
local mac_bytes = {}
for byte in mac:gmatch("%x%x") do
table.insert(mac_bytes, string.char(tonumber(byte, 16)))
end
-- 创建魔法包:6字节0xFF + 16次MAC地址
local magic_packet = string.rep(string.char(0xFF), 6) .. string.rep(table.concat(mac_bytes), 16)
return magic_packet
end
-- 函数将IP地址转换为整数
local function ipToInt(ip)
local o1, o2, o3, o4 = ip:match("(%d+)%.(%d+)%.(%d+)%.(%d+)")
return (tonumber(o1) << 24) + (tonumber(o2) << 16) + (tonumber(o3) << 8) + tonumber(o4)
end
-- 函数将整数转换为IP地址
local function intToIp(int)
return string.format("%d.%d.%d.%d", (int >> 24) & 0xFF, (int >> 16) & 0xFF, (int >> 8) & 0xFF, int & 0xFF)
end
-- 函数根据IP和子网掩码计算广播地址
local function calculateBroadcastAddress(ip, subnetMask)
local ipInt = ipToInt(ip)
local maskInt = ipToInt(subnetMask)
local broadcastInt = (ipInt & maskInt) | (~maskInt & 0xFFFFFFFF)
return intToIp(broadcastInt)
end
-- 通用的发送魔法包函数
local function send_magic_packet_to_address(mac, address)
-- 创建一个socket控制对象
local ctrl = socket.create()
-- 配置为UDP模式,端口号可以任意选择
local is_udp = true
socket.config(ctrl, nil, is_udp)
-- 等待网卡上线
local link_success, link_status = socket.linkup(ctrl)
if not link_success or not link_status then
log.error("socket", "网卡未成功上线,无法发送魔法包")
return
end
-- 生成魔法包
local magic_packet = create_magic_packet(mac)
-- 发送魔法包到指定地址
log.info('发送魔法包到', address, magic_packet:toHex())
sys.wait(200)
socket.connect(ctrl, address, 9)
sys.wait(2000)
local succ, full, result = socket.tx(ctrl, magic_packet, address, 9)
-- 检查发送结果
if succ then
log.info("socket", "魔法包发送成功", mac)
mqttc:publish(pub_topic, json.encode({type='NET_WEAKUP', event='success', address=address}), 1)
else
log.error("socket", "魔法包发送失败", mac)
mqttc:publish(pub_topic, json.encode({type='NET_WEAKUP', event='failure', address=address}), 1)
end
sys.wait(2000)
-- 关闭socket
socket.close(ctrl)
end
-- 发送魔法包的逻辑,避免重复代码
function send_magic_packet(mac)
local broadcast_addresses = {
"255.255.255.255",
broadcastAddress,
"192.168.1.255",
"192.168.0.255"
}
for _, address in ipairs(broadcast_addresses) do
send_magic_packet_to_address(mac, address)
end
end
local function netwakeup(mac)
sys.taskInit(function()
send_magic_packet(mac)
end)
end
mqtt消息监听
elseif event == 'recv' then
log.info('mqtt', 'topic', data, 'payload', payload)
if data == sub_regpool then
log.info('publish ', json.encode({id=client_id, on=gpio.get(pin.PB8)}))
mqttc:publish(pub_topic, json.encode({id=client_id, on=gpio.get(pin.PB8)}), 1)
elseif data == sub_topic then
local result = json.decode(payload)
if result.type == 'PING' then
mqttc:publish(pub_topic, json.encode({type='PRESS_KEY', event='finish'}), 1)
elseif result.type == 'PRESS_KEY' then
pressKey()
mqttc:publish(pub_topic, json.encode({type='PONG', event='finish'}), 1)
elseif result.type == 'PCIE_WEAKUP' then
pciewakeup()
mqttc:publish(pub_topic, json.encode({type='PCIE_WEAKUP', event='finish'}), 1)
elseif result.type == 'NET_WEAKUP' then
netwakeup(result.mac)
mqttc:publish(pub_topic, json.encode({type='NET_WEAKUP', event='finish'}), 1)
end
end