1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
use futures::SinkExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tracing::{debug, error, info};
use crate::proto::codec::IrcCodec;
use crate::proto::message::IrcMessage;
/// A handle to send messages to the server.
/// Cheaply cloneable — pass it wherever you need to write.
#[derive(Clone)]
pub struct Sender {
tx: mpsc::UnboundedSender<IrcMessage>,
}
impl Sender {
pub fn send(&self, msg: IrcMessage) {
// Only fails if the connection task has shut down
let _ = self.tx.send(msg);
}
}
/// Establish a TCP connection and return:
/// - A `Sender` handle for writing messages
/// - An `mpsc::UnboundedReceiver<IrcMessage>` for reading incoming messages
///
/// Two background tasks are spawned:
/// - A **writer task**: drains the sender channel and writes to the TCP stream
/// - A **reader task**: reads from the TCP stream and forwards to the inbox
///
/// This split means the caller never has to hold a lock to send a message.
pub async fn connect(
addr: &str,
) -> Result<(Sender, mpsc::UnboundedReceiver<IrcMessage>), std::io::Error> {
info!("Connecting to {}", addr);
let stream = TcpStream::connect(addr).await?;
info!("TCP connected to {}", addr);
let framed = Framed::new(stream, IrcCodec::new());
let (mut sink, mut stream) = futures::StreamExt::split(framed);
// Channel for outbound messages (caller → writer task)
let (out_tx, mut out_rx) = mpsc::unbounded_channel::<IrcMessage>();
// Channel for inbound messages (reader task → caller)
let (in_tx, in_rx) = mpsc::unbounded_channel::<IrcMessage>();
// Writer task: takes messages from out_rx and sends them to the server
tokio::spawn(async move {
while let Some(msg) = out_rx.recv().await {
debug!("--> {}", crate::proto::serializer::serialize(&msg));
if let Err(e) = sink.send(msg).await {
error!("Write error: {}", e);
break;
}
}
debug!("Writer task shut down");
});
// Reader task: receives messages from the server and forwards to in_tx
tokio::spawn(async move {
use futures::StreamExt;
while let Some(result) = stream.next().await {
match result {
Ok(msg) => {
debug!("<-- {}", crate::proto::serializer::serialize(&msg));
if in_tx.send(msg).is_err() {
break; // Receiver dropped, shut down
}
}
Err(e) => {
error!("Read error: {}", e);
break;
}
}
}
debug!("Reader task shut down");
});
Ok((Sender { tx: out_tx }, in_rx))
}
|