11use bytes:: { BufMut , Bytes } ;
22use futures:: { Stream , StreamExt } ;
33use http_body:: Frame ;
4+ use linkedbytes:: Node ;
45use pilota:: { LinkedBytes , pb:: Message } ;
56
67use super :: { DefaultEncoder , PREFIX_LEN } ;
@@ -24,26 +25,23 @@ where
2425 futures_util:: pin_mut!( source) ;
2526
2627 loop {
27- let mut buf = LinkedBytes :: with_capacity( BUFFER_SIZE ) ;
28- let mut compressed_buf = if compression_encoding. is_some( ) {
29- LinkedBytes :: with_capacity( BUFFER_SIZE )
30- } else {
31- LinkedBytes :: new( )
32- } ;
3328 match source. next( ) . await {
3429 Some ( Ok ( item) ) => {
35- let reserve_node_idx = {
36- buf. reserve( PREFIX_LEN ) ;
37- unsafe {
38- buf. advance_mut( PREFIX_LEN ) ;
39- }
40- buf. split( )
30+ let mut buf = LinkedBytes :: with_capacity( BUFFER_SIZE ) ;
31+ let mut compressed_buf = if compression_encoding. is_some( ) {
32+ LinkedBytes :: with_capacity( BUFFER_SIZE )
33+ } else {
34+ LinkedBytes :: new( )
4135 } ;
4236
37+ buf. reserve( PREFIX_LEN ) ;
38+ unsafe {
39+ buf. advance_mut( PREFIX_LEN ) ;
40+ }
41+
4342 let mut encoder=DefaultEncoder :: default ( ) ;
4443
4544 if let Some ( config) =compression_encoding{
46- compressed_buf. reset( ) ;
4745 encoder. encode( item, & mut compressed_buf)
4846 . map_err( |err| Status :: internal( format!( "Error encoding: {err}" ) ) ) ?;
4947 compress( config, & mut compressed_buf. concat( ) , buf. bytes_mut( ) )
@@ -57,19 +55,34 @@ where
5755 let len = buf. len( ) - PREFIX_LEN ;
5856 assert!( len <= u32 :: MAX as usize ) ;
5957 {
60- match buf. get_list_mut( reserve_node_idx) . expect( "reserve_node_idx is valid" ) {
61- linkedbytes:: Node :: BytesMut ( bytes_mut) => {
62- let start = bytes_mut. len( ) - PREFIX_LEN ;
63- let mut buf = & mut bytes_mut[ start..] ;
64- buf. put_u8( compression_encoding. is_some( ) as u8 ) ;
65- buf. put_u32( len as u32 ) ;
66- }
67- _ => unreachable!( "reserve_node_idx is not a bytesmut" ) ,
58+ if let Some ( node) = buf. get_list_mut( 0 ) {
59+ match node {
60+ linkedbytes:: Node :: BytesMut ( bytes_mut) => {
61+ let start = bytes_mut. len( ) - PREFIX_LEN ;
62+ let mut dest = & mut bytes_mut[ start..] ;
63+ dest. put_u8( compression_encoding. is_some( ) as u8 ) ;
64+ dest. put_u32( len as u32 ) ;
65+ }
66+ _ => unreachable!( "reserve_node_idx is not a bytesmut" ) ,
67+ } ;
68+ } else {
69+ let mut dest = & mut buf. bytes_mut( ) [ ..PREFIX_LEN ] ;
70+ dest. put_u8( compression_encoding. is_some( ) as u8 ) ;
71+ dest. put_u32( len as u32 ) ;
6872 }
6973 }
7074
71- // remove the trailing empty bytes
72- yield Ok ( Frame :: data( buf. concat( ) . split_to( len + PREFIX_LEN ) . freeze( ) ) ) ;
75+ // send each node in linked bytes as a separate frame
76+ for node in buf. into_iter_list( ) {
77+ let bytes = match node {
78+ Node :: Bytes ( bytes) => bytes,
79+ Node :: BytesMut ( bytesmut) => bytesmut. freeze( ) ,
80+ Node :: FastStr ( faststr) => faststr. into_bytes( ) ,
81+ } ;
82+ if !bytes. is_empty( ) {
83+ yield Ok ( Frame :: data( bytes) ) ;
84+ }
85+ }
7386 } ,
7487 Some ( Err ( status) ) => yield Err ( status) ,
7588 None => break ,
@@ -121,6 +134,23 @@ pub mod tests {
121134 }
122135 }
123136
137+ #[ tokio:: test]
138+ async fn test_encode ( ) {
139+ let source = async_stream:: stream! {
140+ yield Ok ( EchoRequest { message: "Volo" . into( ) } ) ;
141+ } ;
142+
143+ let mut stream = encode ( source, None ) ;
144+ // frame
145+ let frame = stream. next ( ) . await . unwrap ( ) . unwrap ( ) ;
146+ assert ! ( frame. is_data( ) ) ;
147+ let data = frame. data_ref ( ) . unwrap ( ) ;
148+ assert_eq ! ( & data[ ..PREFIX_LEN ] , b"\x00 \x00 \x00 \x00 \x06 " ) ;
149+ assert_eq ! ( & data[ PREFIX_LEN ..] , b"\x0a \x04 Volo" ) ;
150+
151+ assert ! ( stream. next( ) . await . is_none( ) ) ;
152+ }
153+
124154 #[ cfg( feature = "gzip" ) ]
125155 #[ tokio:: test]
126156 async fn test_encode_gzip ( ) {
@@ -133,21 +163,25 @@ pub mod tests {
133163 } ;
134164
135165 let compression_encoding = Some ( CompressionEncoding :: Gzip ( Some ( GzipConfig :: default ( ) ) ) ) ;
136- let result = encode ( source, compression_encoding) . next ( ) . await . unwrap ( ) ;
166+ let mut stream = encode ( source, compression_encoding) ;
137167
138- assert ! ( result . is_ok ( ) ) ;
139- let frame = result . unwrap ( ) ;
168+ // frame
169+ let frame = stream . next ( ) . await . unwrap ( ) . unwrap ( ) ;
140170 assert ! ( frame. is_data( ) ) ;
141171 let data = frame. data_ref ( ) . unwrap ( ) ;
142- let mut data_mut = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
172+ assert_eq ! ( & data[ ..PREFIX_LEN ] , b"\x01 \x00 \x00 \x00 \x1a " ) ;
173+
174+ let mut compressed_data = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
143175 let mut uncompressed_data_mut = BytesMut :: new ( ) ;
144176 decompress (
145177 compression_encoding. unwrap ( ) ,
146- & mut data_mut ,
178+ & mut compressed_data ,
147179 & mut uncompressed_data_mut,
148180 )
149181 . unwrap ( ) ;
150182 assert_eq ! ( & uncompressed_data_mut[ ..] , b"\x0a \x04 Volo" ) ;
183+
184+ assert ! ( stream. next( ) . await . is_none( ) ) ;
151185 }
152186
153187 #[ cfg( feature = "zlib" ) ]
@@ -162,21 +196,25 @@ pub mod tests {
162196 } ;
163197
164198 let compression_encoding = Some ( CompressionEncoding :: Zlib ( Some ( ZlibConfig :: default ( ) ) ) ) ;
165- let result = encode ( source, compression_encoding) . next ( ) . await . unwrap ( ) ;
199+ let mut stream = encode ( source, compression_encoding) ;
166200
167- assert ! ( result . is_ok ( ) ) ;
168- let frame = result . unwrap ( ) ;
201+ // frame
202+ let frame = stream . next ( ) . await . unwrap ( ) . unwrap ( ) ;
169203 assert ! ( frame. is_data( ) ) ;
170204 let data = frame. data_ref ( ) . unwrap ( ) ;
171- let mut data_mut = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
205+ assert_eq ! ( & data[ ..PREFIX_LEN ] , b"\x01 \x00 \x00 \x00 \x0e " ) ;
206+
207+ let mut compressed_data = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
172208 let mut uncompressed_data_mut = BytesMut :: new ( ) ;
173209 decompress (
174210 compression_encoding. unwrap ( ) ,
175- & mut data_mut ,
211+ & mut compressed_data ,
176212 & mut uncompressed_data_mut,
177213 )
178214 . unwrap ( ) ;
179215 assert_eq ! ( & uncompressed_data_mut[ ..] , b"\x0a \x04 Volo" ) ;
216+
217+ assert ! ( stream. next( ) . await . is_none( ) ) ;
180218 }
181219
182220 #[ cfg( feature = "zstd" ) ]
@@ -191,20 +229,24 @@ pub mod tests {
191229 } ;
192230
193231 let compression_encoding = Some ( CompressionEncoding :: Zstd ( Some ( ZstdConfig :: default ( ) ) ) ) ;
194- let result = encode ( source, compression_encoding) . next ( ) . await . unwrap ( ) ;
232+ let mut stream = encode ( source, compression_encoding) ;
195233
196- assert ! ( result . is_ok ( ) ) ;
197- let frame = result . unwrap ( ) ;
234+ // frame
235+ let frame = stream . next ( ) . await . unwrap ( ) . unwrap ( ) ;
198236 assert ! ( frame. is_data( ) ) ;
199237 let data = frame. data_ref ( ) . unwrap ( ) ;
200- let mut data_mut = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
238+ assert_eq ! ( & data[ ..PREFIX_LEN ] , b"\x01 \x00 \x00 \x00 \x0f " ) ;
239+
240+ let mut compressed_data = BytesMut :: from ( & data[ PREFIX_LEN ..] ) ;
201241 let mut uncompressed_data_mut = BytesMut :: new ( ) ;
202242 decompress (
203243 compression_encoding. unwrap ( ) ,
204- & mut data_mut ,
244+ & mut compressed_data ,
205245 & mut uncompressed_data_mut,
206246 )
207247 . unwrap ( ) ;
208248 assert_eq ! ( & uncompressed_data_mut[ ..] , b"\x0a \x04 Volo" ) ;
249+
250+ assert ! ( stream. next( ) . await . is_none( ) ) ;
209251 }
210252}
0 commit comments