|
|
|
@ -19,9 +19,14 @@
|
|
|
|
|
|
|
|
|
|
use crate::mqtt::mqtt_message::*;
|
|
|
|
|
use crate::mqtt::mqtt_property::*;
|
|
|
|
|
use nom::combinator::rest;
|
|
|
|
|
use nom::number::streaming::*;
|
|
|
|
|
use nom::*;
|
|
|
|
|
use nom7::bits::{bits, streaming::take as take_bits};
|
|
|
|
|
use nom7::bytes::streaming::take_while_m_n;
|
|
|
|
|
use nom7::combinator::{complete, cond, verify};
|
|
|
|
|
use nom7::error::Error;
|
|
|
|
|
use nom7::multi::{length_data, many0, many1};
|
|
|
|
|
use nom7::number::streaming::*;
|
|
|
|
|
use nom7::sequence::tuple;
|
|
|
|
|
use nom7::{Err, IResult, Needed};
|
|
|
|
|
use num_traits::FromPrimitive;
|
|
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
@ -54,53 +59,43 @@ fn convert_varint(continued: Vec<u8>, last: u8) -> u32 {
|
|
|
|
|
|
|
|
|
|
// DATA TYPES
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_mqtt_string<String>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
length: be_u16
|
|
|
|
|
>> content: take!(length)
|
|
|
|
|
>> (
|
|
|
|
|
String::from_utf8_lossy(content).to_string()
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_mqtt_variable_integer<u32>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
// take at most 4 bytes in total, so as not to overflow u32
|
|
|
|
|
continued_part: take_while_m_n!(0, 3, is_continuation_bit_set)
|
|
|
|
|
>> non_continued_part: verify!(be_u8, |&val| !is_continuation_bit_set(val))
|
|
|
|
|
>> (
|
|
|
|
|
convert_varint(continued_part.to_vec(), non_continued_part)
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_mqtt_binary_data<Vec<u8>>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
length: be_u16
|
|
|
|
|
>> data: take!(length)
|
|
|
|
|
>> (
|
|
|
|
|
data.to_vec()
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_mqtt_string_pair<(String, String)>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
name: parse_mqtt_string
|
|
|
|
|
>> value: parse_mqtt_string
|
|
|
|
|
>> (
|
|
|
|
|
(name, value)
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_mqtt_string(i: &[u8]) -> IResult<&[u8], String> {
|
|
|
|
|
let (i, content) = length_data(be_u16)(i)?;
|
|
|
|
|
Ok((i, String::from_utf8_lossy(content).to_string()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_mqtt_variable_integer(i: &[u8]) -> IResult<&[u8], u32> {
|
|
|
|
|
let (i, continued_part) = take_while_m_n(0, 3, is_continuation_bit_set)(i)?;
|
|
|
|
|
let (i, non_continued_part) = verify(be_u8, |&val| !is_continuation_bit_set(val))(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
convert_varint(continued_part.to_vec(), non_continued_part),
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_mqtt_binary_data(i: &[u8]) -> IResult<&[u8], Vec<u8>> {
|
|
|
|
|
let (i, data) = length_data(be_u16)(i)?;
|
|
|
|
|
Ok((i, data.to_vec()))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_mqtt_string_pair(i: &[u8]) -> IResult<&[u8], (String, String)> {
|
|
|
|
|
let (i, name) = parse_mqtt_string(i)?;
|
|
|
|
|
let (i, value) = parse_mqtt_string(i)?;
|
|
|
|
|
Ok((i, (name, value)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// MESSAGE COMPONENTS
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_property<MQTTProperty>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
identifier: parse_mqtt_variable_integer
|
|
|
|
|
>> value: call!(parse_qualified_property, identifier)
|
|
|
|
|
>> (
|
|
|
|
|
value
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_property(i: &[u8]) -> IResult<&[u8], MQTTProperty> {
|
|
|
|
|
let (i, identifier) = parse_mqtt_variable_integer(i)?;
|
|
|
|
|
let (i, value) = parse_qualified_property(i, identifier)?;
|
|
|
|
|
Ok((i, value))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQTTProperty>>> {
|
|
|
|
@ -137,15 +132,12 @@ fn parse_properties(input: &[u8], precond: bool) -> IResult<&[u8], Option<Vec<MQ
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn parse_fixed_header_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8)> {
|
|
|
|
|
bits!(
|
|
|
|
|
i,
|
|
|
|
|
tuple!(
|
|
|
|
|
take_bits!(4u8),
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(2u8),
|
|
|
|
|
take_bits!(1u8)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple((
|
|
|
|
|
take_bits(4u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(2u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
)))(i)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
@ -153,11 +145,11 @@ fn parse_message_type(code: u8) -> MQTTTypeCode {
|
|
|
|
|
match code {
|
|
|
|
|
0..=15 => {
|
|
|
|
|
if let Some(t) = FromPrimitive::from_u8(code) {
|
|
|
|
|
return t
|
|
|
|
|
return t;
|
|
|
|
|
} else {
|
|
|
|
|
return MQTTTypeCode::UNASSIGNED
|
|
|
|
|
return MQTTTypeCode::UNASSIGNED;
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
_ => {
|
|
|
|
|
// unreachable state in parser: we only pass values parsed from take_bits!(4u8)
|
|
|
|
|
debug_validate_fail!("can't have message codes >15 from 4 bits");
|
|
|
|
@ -166,107 +158,104 @@ fn parse_message_type(code: u8) -> MQTTTypeCode {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_fixed_header<FixedHeader>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
flags: parse_fixed_header_flags
|
|
|
|
|
>> remaining_length: parse_mqtt_variable_integer
|
|
|
|
|
>> (
|
|
|
|
|
FixedHeader {
|
|
|
|
|
message_type: parse_message_type(flags.0),
|
|
|
|
|
dup_flag: flags.1 != 0,
|
|
|
|
|
qos_level: flags.2 as u8,
|
|
|
|
|
retain: flags.3 != 0,
|
|
|
|
|
remaining_length: remaining_length,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_fixed_header(i: &[u8]) -> IResult<&[u8], FixedHeader> {
|
|
|
|
|
let (i, flags) = parse_fixed_header_flags(i)?;
|
|
|
|
|
let (i, remaining_length) = parse_mqtt_variable_integer(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
FixedHeader {
|
|
|
|
|
message_type: parse_message_type(flags.0),
|
|
|
|
|
dup_flag: flags.1 != 0,
|
|
|
|
|
qos_level: flags.2 as u8,
|
|
|
|
|
retain: flags.3 != 0,
|
|
|
|
|
remaining_length,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn parse_connect_variable_flags(i: &[u8]) -> IResult<&[u8], (u8, u8, u8, u8, u8, u8, u8)> {
|
|
|
|
|
bits!(
|
|
|
|
|
bits::<_, _, Error<(&[u8], usize)>, _, _>(tuple((
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(2u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
take_bits(1u8),
|
|
|
|
|
)))(i)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_connect(i: &[u8]) -> IResult<&[u8], MQTTConnectData> {
|
|
|
|
|
let (i, protocol_string) = parse_mqtt_string(i)?;
|
|
|
|
|
let (i, protocol_version) = be_u8(i)?;
|
|
|
|
|
let (i, flags) = parse_connect_variable_flags(i)?;
|
|
|
|
|
let (i, keepalive) = be_u16(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
let (i, client_id) = parse_mqtt_string(i)?;
|
|
|
|
|
let (i, will_properties) = parse_properties(i, protocol_version == 5 && flags.4 != 0)?;
|
|
|
|
|
let (i, will_topic) = cond(flags.4 != 0, parse_mqtt_string)(i)?;
|
|
|
|
|
let (i, will_message) = cond(flags.4 != 0, parse_mqtt_binary_data)(i)?;
|
|
|
|
|
let (i, username) = cond(flags.0 != 0, parse_mqtt_string)(i)?;
|
|
|
|
|
let (i, password) = cond(flags.1 != 0, parse_mqtt_binary_data)(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTConnectData {
|
|
|
|
|
protocol_string,
|
|
|
|
|
protocol_version,
|
|
|
|
|
username_flag: flags.0 != 0,
|
|
|
|
|
password_flag: flags.1 != 0,
|
|
|
|
|
will_retain: flags.2 != 0,
|
|
|
|
|
will_qos: flags.3 as u8,
|
|
|
|
|
will_flag: flags.4 != 0,
|
|
|
|
|
clean_session: flags.5 != 0,
|
|
|
|
|
keepalive,
|
|
|
|
|
client_id,
|
|
|
|
|
will_topic,
|
|
|
|
|
will_message,
|
|
|
|
|
username,
|
|
|
|
|
password,
|
|
|
|
|
properties,
|
|
|
|
|
will_properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_connack(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTConnackData> {
|
|
|
|
|
let (i, topic_name_compression_response) = be_u8(i)?;
|
|
|
|
|
let (i, return_code) = be_u8(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
tuple!(
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(2u8),
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(1u8),
|
|
|
|
|
take_bits!(1u8)
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
MQTTConnackData {
|
|
|
|
|
session_present: (topic_name_compression_response & 1) != 0,
|
|
|
|
|
return_code,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_connect<MQTTConnectData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
protocol_string: parse_mqtt_string
|
|
|
|
|
>> protocol_version: be_u8
|
|
|
|
|
>> flags: parse_connect_variable_flags
|
|
|
|
|
>> keepalive: be_u16
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> client_id: parse_mqtt_string
|
|
|
|
|
>> will_properties: call!(parse_properties, protocol_version == 5 && flags.4 != 0)
|
|
|
|
|
>> will_topic: cond!(flags.4 != 0, parse_mqtt_string)
|
|
|
|
|
>> will_message: cond!(flags.4 != 0, parse_mqtt_binary_data)
|
|
|
|
|
>> username: cond!(flags.0 != 0, parse_mqtt_string)
|
|
|
|
|
>> password: cond!(flags.1 != 0, parse_mqtt_binary_data)
|
|
|
|
|
>> (
|
|
|
|
|
MQTTConnectData {
|
|
|
|
|
protocol_string: protocol_string,
|
|
|
|
|
protocol_version: protocol_version,
|
|
|
|
|
username_flag: flags.0 != 0,
|
|
|
|
|
password_flag: flags.1 != 0,
|
|
|
|
|
will_retain: flags.2 != 0,
|
|
|
|
|
will_qos: flags.3 as u8,
|
|
|
|
|
will_flag: flags.4 != 0,
|
|
|
|
|
clean_session: flags.5 != 0,
|
|
|
|
|
keepalive: keepalive,
|
|
|
|
|
client_id: client_id,
|
|
|
|
|
will_topic: will_topic,
|
|
|
|
|
will_message: will_message,
|
|
|
|
|
username: username,
|
|
|
|
|
password: password,
|
|
|
|
|
properties: properties,
|
|
|
|
|
will_properties: will_properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_connack(protocol_version: u8)<MQTTConnackData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
topic_name_compression_response: be_u8
|
|
|
|
|
>> retcode: be_u8
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> (
|
|
|
|
|
MQTTConnackData {
|
|
|
|
|
session_present: (topic_name_compression_response & 1) != 0,
|
|
|
|
|
return_code: retcode,
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_publish(protocol_version: u8, has_id: bool)<MQTTPublishData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
topic: parse_mqtt_string
|
|
|
|
|
>> message_id: cond!(has_id, be_u16)
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> message: rest
|
|
|
|
|
>> (
|
|
|
|
|
MQTTPublishData {
|
|
|
|
|
topic: topic,
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message: message.to_vec(),
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
pub fn parse_publish(
|
|
|
|
|
i: &[u8], protocol_version: u8, has_id: bool,
|
|
|
|
|
) -> IResult<&[u8], MQTTPublishData> {
|
|
|
|
|
let (i, topic) = parse_mqtt_string(i)?;
|
|
|
|
|
let (i, message_id) = cond(has_id, be_u16)(i)?;
|
|
|
|
|
let (message, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTPublishData {
|
|
|
|
|
topic,
|
|
|
|
|
message_id,
|
|
|
|
|
message: message.to_vec(),
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn parse_msgidonly(
|
|
|
|
|
input: &[u8],
|
|
|
|
|
protocol_version: u8,
|
|
|
|
|
) -> IResult<&[u8], MQTTMessageIdOnly> {
|
|
|
|
|
fn parse_msgidonly(input: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTMessageIdOnly> {
|
|
|
|
|
if protocol_version < 5 {
|
|
|
|
|
// before v5 we don't even have to care about reason codes
|
|
|
|
|
// and properties, lucky us
|
|
|
|
@ -283,7 +272,7 @@ fn parse_msgidonly(
|
|
|
|
|
return Ok((
|
|
|
|
|
rem,
|
|
|
|
|
MQTTMessageIdOnly {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message_id,
|
|
|
|
|
reason_code: Some(0),
|
|
|
|
|
properties: None,
|
|
|
|
|
},
|
|
|
|
@ -298,7 +287,7 @@ fn parse_msgidonly(
|
|
|
|
|
return Ok((
|
|
|
|
|
rem,
|
|
|
|
|
MQTTMessageIdOnly {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message_id,
|
|
|
|
|
reason_code: Some(reason_code),
|
|
|
|
|
properties: None,
|
|
|
|
|
},
|
|
|
|
@ -309,9 +298,9 @@ fn parse_msgidonly(
|
|
|
|
|
return Ok((
|
|
|
|
|
rem,
|
|
|
|
|
MQTTMessageIdOnly {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
message_id,
|
|
|
|
|
reason_code: Some(reason_code),
|
|
|
|
|
properties: properties,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
@ -325,91 +314,84 @@ fn parse_msgidonly(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_msgidonly_v3<MQTTMessageIdOnly>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
message_id: be_u16
|
|
|
|
|
>> (
|
|
|
|
|
MQTTMessageIdOnly {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
reason_code: None,
|
|
|
|
|
properties: None,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_subscribe_topic<MQTTSubscribeTopicData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
topic: parse_mqtt_string
|
|
|
|
|
>> qos: be_u8
|
|
|
|
|
>> (
|
|
|
|
|
MQTTSubscribeTopicData {
|
|
|
|
|
topic_name: topic,
|
|
|
|
|
qos: qos,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_subscribe(protocol_version: u8)<MQTTSubscribeData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
message_id: be_u16
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> topics: many1!(complete!(parse_subscribe_topic))
|
|
|
|
|
>> (
|
|
|
|
|
MQTTSubscribeData {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
topics: topics,
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_suback(protocol_version: u8)<MQTTSubackData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
message_id: be_u16
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> qoss: rest
|
|
|
|
|
>> (
|
|
|
|
|
MQTTSubackData {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
qoss: qoss.to_vec(),
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_unsubscribe(protocol_version: u8)<MQTTUnsubscribeData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
message_id: be_u16
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> topics: many0!(complete!(parse_mqtt_string))
|
|
|
|
|
>> (
|
|
|
|
|
MQTTUnsubscribeData {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
topics: topics,
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
named_args!(pub parse_unsuback(protocol_version: u8)<MQTTUnsubackData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
message_id: be_u16
|
|
|
|
|
>> properties: call!(parse_properties, protocol_version == 5)
|
|
|
|
|
>> reason_codes: many0!(complete!(be_u8))
|
|
|
|
|
>> (
|
|
|
|
|
MQTTUnsubackData {
|
|
|
|
|
message_id: message_id,
|
|
|
|
|
properties: properties,
|
|
|
|
|
reason_codes: Some(reason_codes),
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_msgidonly_v3(i: &[u8]) -> IResult<&[u8], MQTTMessageIdOnly> {
|
|
|
|
|
let (i, message_id) = be_u16(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTMessageIdOnly {
|
|
|
|
|
message_id,
|
|
|
|
|
reason_code: None,
|
|
|
|
|
properties: None,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_subscribe_topic(i: &[u8]) -> IResult<&[u8], MQTTSubscribeTopicData> {
|
|
|
|
|
let (i, topic_name) = parse_mqtt_string(i)?;
|
|
|
|
|
let (i, qos) = be_u8(i)?;
|
|
|
|
|
Ok((i, MQTTSubscribeTopicData { topic_name, qos }))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_subscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubscribeData> {
|
|
|
|
|
let (i, message_id) = be_u16(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
let (i, topics) = many1(complete(parse_subscribe_topic))(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTSubscribeData {
|
|
|
|
|
message_id,
|
|
|
|
|
topics,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_suback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTSubackData> {
|
|
|
|
|
let (i, message_id) = be_u16(i)?;
|
|
|
|
|
let (qoss, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTSubackData {
|
|
|
|
|
message_id,
|
|
|
|
|
qoss: qoss.to_vec(),
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_unsubscribe(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubscribeData> {
|
|
|
|
|
let (i, message_id) = be_u16(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
let (i, topics) = many0(complete(parse_mqtt_string))(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTUnsubscribeData {
|
|
|
|
|
message_id,
|
|
|
|
|
topics,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_unsuback(i: &[u8], protocol_version: u8) -> IResult<&[u8], MQTTUnsubackData> {
|
|
|
|
|
let (i, message_id) = be_u16(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, protocol_version == 5)?;
|
|
|
|
|
let (i, reason_codes) = many0(complete(be_u8))(i)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTUnsubackData {
|
|
|
|
|
message_id,
|
|
|
|
|
properties,
|
|
|
|
|
reason_codes: Some(reason_codes),
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
|
fn parse_disconnect(
|
|
|
|
|
input: &[u8],
|
|
|
|
|
remaining_len: usize,
|
|
|
|
|
protocol_version: u8,
|
|
|
|
|
input: &[u8], remaining_len: usize, protocol_version: u8,
|
|
|
|
|
) -> IResult<&[u8], MQTTDisconnectData> {
|
|
|
|
|
if protocol_version < 5 {
|
|
|
|
|
return Ok((
|
|
|
|
@ -452,7 +434,7 @@ fn parse_disconnect(
|
|
|
|
|
rem,
|
|
|
|
|
MQTTDisconnectData {
|
|
|
|
|
reason_code: Some(reason_code),
|
|
|
|
|
properties: properties,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
));
|
|
|
|
|
}
|
|
|
|
@ -463,19 +445,22 @@ fn parse_disconnect(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
named!(#[inline], pub parse_auth<MQTTAuthData>,
|
|
|
|
|
do_parse!(
|
|
|
|
|
reason_code: be_u8
|
|
|
|
|
>> properties: call!(parse_properties, true)
|
|
|
|
|
>> (
|
|
|
|
|
MQTTAuthData {
|
|
|
|
|
reason_code: reason_code,
|
|
|
|
|
properties: properties,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
));
|
|
|
|
|
|
|
|
|
|
pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) -> IResult<&[u8], MQTTMessage> {
|
|
|
|
|
#[inline]
|
|
|
|
|
pub fn parse_auth(i: &[u8]) -> IResult<&[u8], MQTTAuthData> {
|
|
|
|
|
let (i, reason_code) = be_u8(i)?;
|
|
|
|
|
let (i, properties) = parse_properties(i, true)?;
|
|
|
|
|
Ok((
|
|
|
|
|
i,
|
|
|
|
|
MQTTAuthData {
|
|
|
|
|
reason_code,
|
|
|
|
|
properties,
|
|
|
|
|
},
|
|
|
|
|
))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn parse_message(
|
|
|
|
|
input: &[u8], protocol_version: u8, max_msg_size: usize,
|
|
|
|
|
) -> IResult<&[u8], MQTTMessage> {
|
|
|
|
|
// Parse the fixed header first. This is identical across versions and can
|
|
|
|
|
// be between 2 and 5 bytes long.
|
|
|
|
|
match parse_fixed_header(input) {
|
|
|
|
@ -494,7 +479,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
|
|
|
|
|
// type.
|
|
|
|
|
if len > max_msg_size {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::TRUNCATED(MQTTTruncatedData {
|
|
|
|
|
original_message_type: message_type,
|
|
|
|
|
skipped_length: len + skiplen,
|
|
|
|
@ -510,7 +495,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
|
|
|
|
|
// have enough data in the input buffer to handle the full
|
|
|
|
|
// message. Signal this by returning an Incomplete IResult value.
|
|
|
|
|
if fullrem.len() < len {
|
|
|
|
|
return Err(Err::Incomplete(Needed::Size(len - fullrem.len())));
|
|
|
|
|
return Err(Err::Incomplete(Needed::new(len - fullrem.len())));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Parse the contents of the buffer into a single message.
|
|
|
|
@ -523,119 +508,122 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
|
|
|
|
|
MQTTTypeCode::CONNECT => match parse_connect(rem) {
|
|
|
|
|
Ok((_rem, conn)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::CONNECT(conn),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::CONNACK => match parse_connack(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, connack)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::CONNACK(connack),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::PUBLISH => match parse_publish(rem, protocol_version, header.qos_level > 0) {
|
|
|
|
|
Ok((_rem, publish)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
op: MQTTOperation::PUBLISH(publish),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::PUBACK | MQTTTypeCode::PUBREC | MQTTTypeCode::PUBREL | MQTTTypeCode::PUBCOMP => {
|
|
|
|
|
match parse_msgidonly(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, msgidonly)) => {
|
|
|
|
|
MQTTTypeCode::PUBLISH => {
|
|
|
|
|
match parse_publish(rem, protocol_version, header.qos_level > 0) {
|
|
|
|
|
Ok((_rem, publish)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
op: match message_type {
|
|
|
|
|
MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
|
|
|
|
|
_ => MQTTOperation::UNASSIGNED,
|
|
|
|
|
},
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::PUBLISH(publish),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
MQTTTypeCode::PUBACK
|
|
|
|
|
| MQTTTypeCode::PUBREC
|
|
|
|
|
| MQTTTypeCode::PUBREL
|
|
|
|
|
| MQTTTypeCode::PUBCOMP => match parse_msgidonly(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, msgidonly)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header,
|
|
|
|
|
op: match message_type {
|
|
|
|
|
MQTTTypeCode::PUBACK => MQTTOperation::PUBACK(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBREC => MQTTOperation::PUBREC(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBREL => MQTTOperation::PUBREL(msgidonly),
|
|
|
|
|
MQTTTypeCode::PUBCOMP => MQTTOperation::PUBCOMP(msgidonly),
|
|
|
|
|
_ => MQTTOperation::UNASSIGNED,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::SUBSCRIBE => match parse_subscribe(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, subs)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::SUBSCRIBE(subs),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::SUBACK => match parse_suback(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, suback)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::SUBACK(suback),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::UNSUBSCRIBE => match parse_unsubscribe(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, unsub)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::UNSUBSCRIBE(unsub),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::UNSUBACK => match parse_unsuback(rem, protocol_version) {
|
|
|
|
|
Ok((_rem, unsuback)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::UNSUBACK(unsuback),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::PINGREQ | MQTTTypeCode::PINGRESP => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: match message_type {
|
|
|
|
|
MQTTTypeCode::PINGREQ => MQTTOperation::PINGREQ,
|
|
|
|
|
MQTTTypeCode::PINGRESP => MQTTOperation::PINGRESP,
|
|
|
|
|
_ => MQTTOperation::UNASSIGNED,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
return Ok((&input[skiplen+len..], msg));
|
|
|
|
|
return Ok((&input[skiplen + len..], msg));
|
|
|
|
|
}
|
|
|
|
|
MQTTTypeCode::DISCONNECT => match parse_disconnect(rem, len, protocol_version) {
|
|
|
|
|
Ok((_rem, disco)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::DISCONNECT(disco),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
|
MQTTTypeCode::AUTH => match parse_auth(rem) {
|
|
|
|
|
Ok((_rem, auth)) => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::AUTH(auth),
|
|
|
|
|
};
|
|
|
|
|
Ok((&input[skiplen+len..], msg))
|
|
|
|
|
Ok((&input[skiplen + len..], msg))
|
|
|
|
|
}
|
|
|
|
|
Err(e) => Err(e),
|
|
|
|
|
},
|
|
|
|
@ -644,7 +632,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
|
|
|
|
|
// crafted MQTT traffic.
|
|
|
|
|
_ => {
|
|
|
|
|
let msg = MQTTMessage {
|
|
|
|
|
header: header,
|
|
|
|
|
header,
|
|
|
|
|
op: MQTTOperation::UNASSIGNED,
|
|
|
|
|
};
|
|
|
|
|
return Ok((&rem[len..], msg));
|
|
|
|
@ -660,6 +648,7 @@ pub fn parse_message(input: &[u8], protocol_version: u8, max_msg_size: usize) ->
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests {
|
|
|
|
|
use super::*;
|
|
|
|
|
use nom7::error::ErrorKind;
|
|
|
|
|
|
|
|
|
|
fn test_mqtt_parse_variable_fail(buf0: &[u8]) {
|
|
|
|
|
let r0 = parse_mqtt_variable_integer(buf0);
|
|
|
|
@ -668,7 +657,7 @@ mod tests {
|
|
|
|
|
panic!("Result should not have been ok.");
|
|
|
|
|
}
|
|
|
|
|
Err(Err::Error(err)) => {
|
|
|
|
|
assert_eq!(err.1, error::ErrorKind::Verify);
|
|
|
|
|
assert_eq!(err.code, ErrorKind::Verify);
|
|
|
|
|
}
|
|
|
|
|
_ => {
|
|
|
|
|
panic!("Result should be an error.");
|
|
|
|
|