本文介绍了线程间共享引用的生命周期问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个启动工作线程的线程,预计所有线程都将永远存在.每个工作线程维护它自己的 Socket 列表.

某些操作要求我遍历当前处于活动状态的所有套接字,但我在尝试创建包含指向另一个列表拥有的套接字的指针的套接字主列表时遇到了生命周期问题.

use std::{str, thread};使用 std::thread::JoinHandle;使用 std::io::{读取,写入};使用 std::net::{TcpListener, TcpStream};使用 std::sync::{Arc, Mutex};使用 std::ops::DerefMut;使用 std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};使用 self::socketlist::SocketList;使用 self::mastersocketlist::MasterSocketList;酒吧结构套接字{用户:字符串,流:TcpStream,}mod 套接字列表 {使用 self::SocketList::{Node, End};使用 super::Socket;pub 枚举 SocketList {节点(套接字,框),结尾,}impl SocketList {pub fn new() ->套接字列表{结尾}pub fn add(self, socket: Socket) ->套接字列表{节点(套接字,盒子::新(自我))}pub fn newest<'a>(&'a mut self) ->结果<&'a Socket, String>{匹配*自我{Node(ref mut socket, ref mut next) =>好的(插座),结束 =>Err("没有可用的套接字".to_string()),}}}}mod mastersocketlist {使用 self::MasterSocketList::{Node, End};使用 super::Socket;pub enum MasterSocketList{节点(BoxMasterSocketList{pub fn new() ->MasterSocketList{结尾}pub fn add(self, socket: &'a Socket) ->MasterSocketList{MasterSocketList::Node(Box::new(&socket), Box::new(self))}}}pub 结构 SlotManager {代理:JoinHandle,prox_tx:发送方,}impl SlotManager {pub fn new() ->插槽管理器{let (tx, rx): (Sender, Receiver) = channel();让 tx_clone = tx.clone();让 prox = thread::spawn(move || SlotManager::event_loop(tx, rx));插槽管理器{代理:代理,prox_tx: tx_clone,}}pub fn sender(&self) ->发送方{self.prox_tx.clone()}fn event_loop(tx: Sender, rx: Receiver) {让 socket_list = Arc::new(Mutex::new(MasterSocketList::new()));让 mut slot = Slot::new(socket_list.clone());环形 {匹配 rx.try_recv() {好的(流)=>slot.new_connection(流),错误(e) =>{}}}}}酒吧结构插槽{代理:JoinHandle,prox_tx:发送方,}impl 插槽 {pub fn new(master_socket_list: Arc) ->投币口 {let (tx, rx): (Sender, Receiver) = channel();让 tx_clone = tx.clone();让 prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));投币口 {代理:代理,prox_tx: tx_clone,}}pub fn new_connection(&self, 流: TcpStream) {self.prox_tx.send(stream);}fn event_loop(tx: Sender,rx:接收器,master_socket_list:Arc){让 mut sockets = SocketList::new();环形 {//检查新连接匹配 rx.try_recv() {好的(流)=>{让 mut 套接字 = 套接字 {用户:默认".to_string(),流:流,};sockets = sockets.add(socket);让 mut msl_guard = 匹配 master_socket_list.lock() {好的(守卫)=>警卫,错误(中毒)=>中毒.into_inner(),};让 mut msl_handle = msl_guard.deref_mut();*msl_handle = msl_handle.add(sockets.newest().unwrap());}错误(e) =>{}}}}}fn 主(){让 mut slot_manager = SlotManager::new();让 listener = TcpListener::bind("127.0.0.1:1234").unwrap();对于 listener.incoming() { 中的流匹配流{好的(流)=>{让发件人 = slot_manager.sender();线程::生成(移动 || {发送者.发送(流);//process_new_connection(流,发送方)});}错误(e) =>println!("连接错误:{}", e),}}下降(听者);}

我收到的错误...

error[E0477]:类型`[closure@src/main.rs:107:34:107:86 tx:std::sync::mpsc::Sender, rx:std::sync::mpsc::Receiver, master_socket_list:std::sync::Arc

我什至不知道我想要做的是否可以作为安全代码.

我希望 mastersocketlist 包含一个指向套接字的指针,其中套接字的生命周期由创建它的线程定义.我相信这就是所有这些错误的含义,但我不知道如何提供正确的生命周期注释来修复它.

解决方案

Rust 的一大优点是跨函数的类型检查完全由函数签名完成.这意味着您可以用 unimplemented!() 替换大部分函数体并保留类型检查错误.

重复这个过程几次,你最终不会调用很多函数 - 删除它们.内联模块和减少结构/枚举也有帮助.

在某些时候,您的错误将消失 - 解决问题的第一个线索!坚持下去,你会得到一个很小的复制:

use std::sync::{Arc, Mutex};使用 std::thread;pub enum MasterSocketList{一个(&'a u8),}pub结构槽;impl 插槽 {pub fn new(master_socket_list: Arc) ->投币口 {线程::生成(移动 || {master_socket_list;});未实现!();}}fn main() {}

查看错误,依旧匹配:

error[E0477]:类型`[closure@src/main.rs:12:23:14:10 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` 不满足要求的生命周期-->src/main.rs:12:9|12 |线程::生成(移动 || {|^^^^^^^^^^^^^^|= 注意:类型必须满足静态生存期

让我们检查线程的签名文档::spawn:

pub fn spawn(f: F) ->JoinHandle在哪里F: FnOnce() ->,F:发送+'静态,T:发送+'静态,

这里的关键是 F: Send + 'static - 你给 spawn 的闭包 必须只包含持续整个程序生命周期的引用.这是因为spawn 可以创建成为分离 的线程.一旦分离,线程就可以永远存在,所以所有引用必须至少存在那么长时间,否则你会得到悬空引用,这是一件坏事!Rust 再次拯救了这一天!

如果您想保证线程会在已知点终止,您可以使用作用域线程,例如由 scoped-threadpoolcrossbeam.>

如果您的代码没有包含生命周期的变量,请使用某种类型的共享所有权,例如 Arc 与确保只有一个线程可以改变变量的东西配对,例如 Mutex 就足够了.这允许每个线程拥有共享值,最终在最后一个线程退出时删除它.有关详细信息,请参阅如何在线程之间共享可变对象?.

I've got a thread that launches worker threads, all are expected to live forever. Each worker thread maintains it's own list of Sockets.

Some operations require that I traverse all sockets currently alive, but I'm having trouble with lifetimes trying to create a master list of sockets containing a pointer to a socket owned by another list.

use std::{str, thread};
use std::thread::JoinHandle;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::ops::DerefMut;
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use self::socketlist::SocketList;
use self::mastersocketlist::MasterSocketList;

pub struct Socket {
    user: String,
    stream: TcpStream,
}

mod socketlist {
    use self::SocketList::{Node, End};
    use super::Socket;

    pub enum SocketList {
        Node(Socket, Box<SocketList>),
        End,
    }

    impl SocketList {
        pub fn new() -> SocketList {
            End
        }

        pub fn add(self, socket: Socket) -> SocketList {
            Node(socket, Box::new(self))
        }

        pub fn newest<'a>(&'a mut self) -> Result<&'a Socket, String> {
            match *self {
                Node(ref mut socket, ref mut next) => Ok(socket),
                End => Err("No socket available".to_string()),
            }
        }
    }
}

mod mastersocketlist {
    use self::MasterSocketList::{Node, End};
    use super::Socket;

    pub enum MasterSocketList<'a> {
        Node(Box<&'a Socket>, Box<MasterSocketList<'a>>),
        End,
    }

    impl<'a> MasterSocketList<'a> {
        pub fn new() -> MasterSocketList<'a> {
            End
        }

        pub fn add(self, socket: &'a Socket) -> MasterSocketList<'a> {
            MasterSocketList::Node(Box::new(&socket), Box::new(self))
        }
    }
}

pub struct SlotManager {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl SlotManager {
    pub fn new() -> SlotManager {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || SlotManager::event_loop(tx, rx));

        SlotManager {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn sender(&self) -> Sender<TcpStream> {
        self.prox_tx.clone()
    }

    fn event_loop(tx: Sender<TcpStream>, rx: Receiver<TcpStream>) {
        let socket_list = Arc::new(Mutex::new(MasterSocketList::new()));
        let mut slot = Slot::new(socket_list.clone());
        loop {
            match rx.try_recv() {
                Ok(stream) => slot.new_connection(stream),
                Err(e) => {}
            }
        }
    }
}

pub struct Slot {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl Slot {
    pub fn new(master_socket_list: Arc<Mutex<MasterSocketList>>) -> Slot {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));

        Slot {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn new_connection(&self, stream: TcpStream) {
        self.prox_tx.send(stream);
    }

    fn event_loop(tx: Sender<TcpStream>,
                  rx: Receiver<TcpStream>,
                  master_socket_list: Arc<Mutex<MasterSocketList>>) {

        let mut sockets = SocketList::new();
        loop {
            // Check for new connections
            match rx.try_recv() {
                Ok(stream) => {
                    let mut socket = Socket {
                        user: "default".to_string(),
                        stream: stream,
                    };
                    sockets = sockets.add(socket);

                    let mut msl_guard = match master_socket_list.lock() {
                        Ok(guard) => guard,
                        Err(poisoned) => poisoned.into_inner(),
                    };
                    let mut msl_handle = msl_guard.deref_mut();
                    *msl_handle = msl_handle.add(sockets.newest().unwrap());
                }
                Err(e) => {}
            }
        }
    }
}

fn main() {
    let mut slot_manager = SlotManager::new();
    let listener = TcpListener::bind("127.0.0.1:1234").unwrap();
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                let sender = slot_manager.sender();
                thread::spawn(move || {
                    sender.send(stream);
                    //process_new_connection(stream, sender)
                });
            }
            Err(e) => println!("Connection error: {}", e),
        }
    }
    drop(listener);
}

The errors that I receive...

error[E0477]: the type `[closure@src/main.rs:107:34: 107:86 tx:std::sync::mpsc::Sender<std::net::TcpStream>, rx:std::sync::mpsc::Receiver<std::net::TcpStream>, master_socket_list:std::sync::Arc<std::sync::Mutex<mastersocketlist::MasterSocketList<'_>>>]` does not fulfill the required lifetime
   --> src/main.rs:107:20
    |
107 |         let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));
    |                    ^^^^^^^^^^^^^
    |
    = note: type must outlive the static lifetime

I don't even know if what I am trying to is possible as safe code.

I wanting the mastersocketlist to contain a pointer to a socket where the socket's lifetime is defined by the thread that created it. I believe that's what all those errors mean, but I have no idea how to provide the proper lifetime annotations to fix it.

解决方案

A great thing about Rust is that the type-checking across functions is done solely by the function signature. That means you can replace most of the bodies of functions with unimplemented!() and preserve type-checking errors.

Repeat that process a few times, and you end up not calling a lot of functions - remove those. Inlining modules and reducing structs / enums can also help.

At some point your error will disappear - the first clue towards the problem! Keep at it, and you get a tiny reproduction:

use std::sync::{Arc, Mutex};
use std::thread;

pub enum MasterSocketList<'a> {
    One(&'a u8),
}

pub struct Slot;

impl Slot {
    pub fn new<'a>(master_socket_list: Arc<Mutex<MasterSocketList<'a>>>) -> Slot {
        thread::spawn(move || {
            master_socket_list;
        });
        unimplemented!();
    }
}

fn main() {}

Checking out the error, it still matches:

error[E0477]: the type `[closure@src/main.rs:12:23: 14:10 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` does not fulfill the required lifetime
  --> src/main.rs:12:9
   |
12 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
   |
   = note: type must satisfy the static lifetime

Let's check the docs for the signature of thread::spawn:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static,

The key point here is F: Send + 'static - the closure you give to spawn must only contain references that last the entire life of the program. This is because spawn can create threads that become detached. Once detached, the thread could live forever, so all references must live at least that long, otherwise you'd get dangling references, a bad thing! Rust saves the day, once again!

If you want to guarantee that the threads will terminate at a known point, you can use scoped threads, such as those provided by scoped-threadpool or crossbeam.

If your code didn't have a variable with a lifetime inside of it, using some type of shared ownership like Arc paired with something that will ensure only one thread can mutate the variable, like Mutex would have been sufficient. This allows each thread to own the shared value, finally dropping it whenever the last thread exits. See How do I share a mutable object between threads? for details.

这篇关于线程间共享引用的生命周期问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-02 11:24