微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何从 JSON 数组延迟反序列化? 问题描述到目前为止我的方法最后一个问题

如何解决如何从 JSON 数组延迟反序列化? 问题描述到目前为止我的方法最后一个问题

问题描述

使用 serde_json 将很长的对象数组反序列化为 Vec<T> 可能需要很长时间,因为必须预先将整个数组读入内存。我想迭代数组中的项目,以避免前期处理和内存需求。

到目前为止我的方法

StreamDeserializer 不能直接使用,因为它只能迭代背靠背放置的自定界类型。所以到目前为止我所做的是编写一个自定义结构来实现 Read,包装另一个 Read 但省略开始和结束方括号,以及任何逗号。

例如,读取器会将 JSON [{"name": "foo"},{"name": "bar"},{"name": "baz"}] 转换为 {"name": "foo"} {"name": "bar"} {"name": "baz"},以便它可以与 StreamDeserializer 一起使用。

这是完整的代码

use std::io;

/// An implementation of `Read` that transforms JSON input where the outermost
/// structure is an array. The enclosing brackets and commas are removed,/// causing the items to be adjacent to one another. This works with
/// [`serde_json::StreamDeserializer`].
pub(crate) struct ArrayStreamReader<T> {
    inner: T,depth: Option<usize>,inside_string: bool,escape_next: bool,}

impl<T: io::Read> ArrayStreamReader<T> {
    pub(crate) fn new_buffered(inner: T) -> io::BufReader<Self> {
        io::BufReader::new(ArrayStreamReader {
            inner,depth: None,inside_string: false,escape_next: false,})
    }
}

#[inline]
fn do_copy(dst: &mut [u8],src: &[u8],len: usize) {
    if len == 1 {
        dst[0] = src[0]; // Avoids memcpy call.
    } else {
        dst[..len].copy_from_slice(&src[..len]);
    }
}

impl<T: io::Read> io::Read for ArrayStreamReader<T> {
    fn read(&mut self,buf: &mut [u8]) -> io::Result<usize> {
        if buf.is_empty() {
            return Ok(0);
        }

        let mut tmp = vec![0u8; buf.len()];

        // The outer loop is here in case every byte was skipped,which can happen
        // easily if `buf.len()` is 1. In this situation,the operation is retried
        // until either no bytes are read from the inner stream,or at least 1 byte
        // is written to `buf`.
        loop {
            let byte_count = self.inner.read(&mut tmp)?;
            if byte_count == 0 {
                return if self.depth.is_some() {
                    Err(io::ErrorKind::UnexpectedEof.into())
                } else {
                    Ok(0)
                };
            }

            let mut tmp_pos = 0;
            let mut buf_pos = 0;
            for (i,b) in tmp.iter().cloned().enumerate() {
                if self.depth.is_none() {
                    match b {
                        b'[' => {
                            tmp_pos = i + 1;
                            self.depth = Some(0);
                        },b if b.is_ascii_whitespace() => {},b'\0' => break,_ => return Err(io::ErrorKind::InvalidData.into()),}
                    continue;
                }

                if self.inside_string {
                    match b {
                        _ if self.escape_next => self.escape_next = false,b'\\' => self.escape_next = true,b'"' if !self.escape_next => self.inside_string = false,_ => {},}
                    continue;
                }

                let depth = self.depth.unwrap();
                match b {
                    b'[' | b'{' => self.depth = Some(depth + 1),b']' | b'}' if depth > 0 => self.depth = Some(depth - 1),b'"' => self.inside_string = true,b'}' if depth == 0 => return Err(io::ErrorKind::InvalidData.into()),b',' | b']' if depth == 0 => {
                        let len = i - tmp_pos;
                        do_copy(&mut buf[buf_pos..],&tmp[tmp_pos..],len);
                        tmp_pos = i + 1;
                        buf_pos += len;

                        // Then write a space to separate items.
                        buf[buf_pos] = b' ';
                        buf_pos += 1;

                        if b == b']' {
                            // Reached the end of outer array. If another array
                            // follows,the stream will continue.
                            self.depth = None;
                        }
                    },}
            }

            if tmp_pos < byte_count {
                let len = byte_count - tmp_pos;
                do_copy(&mut buf[buf_pos..],len);
                buf_pos += len;
            }

            if buf_pos > 0 {
                // If at least some data was read,return with the amount. Otherwise,the outer
                // loop will try again.
                return Ok(buf_pos);
            }
        }
    }
}

它是这样使用的:

use std::io;

use serde::Deserialize;

#[derive(Deserialize)]
struct Item {
    name: String,}

fn main() -> io::Result<()> {
    let json = br#"[{"name": "foo"},{"name": "bar"}]"#;
    let wrapped = ArrayStreamReader::new_buffered(&json[..]);
    let first_item: Item = serde_json::Deserializer::from_reader(wrapped)
        .into_iter()
        .next()
        .unwrap()?;
    assert_eq!(first_item.name,"foo");
    Ok(())
}

最后一个问题

一定有更好的方法来做到这一点,对吗?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。