mqtt: convert transaction list to vecdeque

Ticket: 5430
pull/7930/head
Jeff Lucovsky 4 years ago committed by Victor Julien
parent 18c616394e
commit ba37574ca5

@ -28,6 +28,7 @@ use nom;
use std;
use std::ffi::{CStr,CString};
use std::mem::transmute;
use std::collections::VecDeque;
// Used as a special pseudo packet identifier to denote the first CONNECT
// packet in a connection. Note that there is no risk of collision with a
@ -114,7 +115,7 @@ impl Drop for MQTTTransaction {
pub struct MQTTState {
tx_id: u64,
pub protocol_version: u8,
transactions: Vec<MQTTTransaction>,
transactions: VecDeque<MQTTTransaction>,
connected: bool,
skip_request: usize,
skip_response: usize,
@ -126,7 +127,7 @@ impl MQTTState {
Self {
tx_id: 0,
protocol_version: 0,
transactions: Vec::new(),
transactions: VecDeque::new(),
connected: false,
skip_request: 0,
skip_response: 0,
@ -208,18 +209,18 @@ impl MQTTState {
if self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::DoubleConnect);
self.transactions.push(tx);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(MQTT_CONNECT_PKT_ID);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::PUBLISH(ref publish) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
match msg.header.qos_level {
@ -228,23 +229,23 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
},
1..=2 => {
if let Some(pkt_id) = publish.message_id {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id as u32);
self.transactions.push(tx);
self.transactions.push_back(tx);
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingMsgId);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
},
@ -252,7 +253,7 @@ impl MQTTState {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let pkt_id = subscribe.message_id as u32;
@ -262,17 +263,17 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
},
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push(tx);
self.transactions.push_back(tx);
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
},
@ -280,7 +281,7 @@ impl MQTTState {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let pkt_id = unsubscribe.message_id as u32;
@ -290,17 +291,17 @@ impl MQTTState {
// response
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
},
1..=2 => {
let mut tx = self.new_tx(msg, toclient);
tx.pkt_id = Some(pkt_id);
self.transactions.push(tx);
self.transactions.push_back(tx);
},
_ => {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::InvalidQosLevel);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
},
@ -313,7 +314,7 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingConnect);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::PUBREC(ref v)
@ -321,7 +322,7 @@ impl MQTTState {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
@ -329,7 +330,7 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::PUBACK(ref v)
@ -337,7 +338,7 @@ impl MQTTState {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(v.message_id as u32) {
@ -347,14 +348,14 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingPublish);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::SUBACK(ref suback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(suback.message_id as u32) {
@ -364,14 +365,14 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingSubscribe);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::UNSUBACK(ref unsuback) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
if let Some(tx) = self.get_tx_by_pkt_id(unsuback.message_id as u32) {
@ -381,43 +382,43 @@ impl MQTTState {
} else {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::MissingUnsubscribe);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
},
MQTTOperation::UNASSIGNED => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
MQTTState::set_event(&mut tx, MQTTEvent::UnassignedMsgtype);
self.transactions.push(tx);
self.transactions.push_back(tx);
},
MQTTOperation::TRUNCATED(_) => {
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
},
MQTTOperation::AUTH(_)
| MQTTOperation::DISCONNECT(_) => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
},
MQTTOperation::PINGREQ
| MQTTOperation::PINGRESP => {
if !self.connected {
let mut tx = self.new_tx(msg, toclient);
MQTTState::set_event(&mut tx, MQTTEvent::UnintroducedMessage);
self.transactions.push(tx);
self.transactions.push_back(tx);
return;
}
let mut tx = self.new_tx(msg, toclient);
tx.complete = true;
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}
}
@ -576,7 +577,7 @@ impl MQTTState {
}
tx.complete = true;
MQTTState::set_event(&mut tx, event);
self.transactions.push(tx);
self.transactions.push_back(tx);
}
}

Loading…
Cancel
Save