aboutsummaryrefslogtreecommitdiffstats
path: root/src/connection/mod.rs
blob: bc837c0c20667f4d95efd91f1116399d3dfa142d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
use futures::SinkExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tracing::{debug, error, info};

use crate::proto::codec::IrcCodec;
use crate::proto::message::IrcMessage;

/// A handle to send messages to the server.
/// Cheaply cloneable — pass it wherever you need to write.
#[derive(Clone)]
pub struct Sender {
    tx: mpsc::UnboundedSender<IrcMessage>,
}

impl Sender {
    pub fn send(&self, msg: IrcMessage) {
        // Only fails if the connection task has shut down
        let _ = self.tx.send(msg);
    }
}

/// Establish a TCP connection and return:
/// - A `Sender` handle for writing messages
/// - An `mpsc::UnboundedReceiver<IrcMessage>` for reading incoming messages
///
/// Two background tasks are spawned:
/// - A **writer task**: drains the sender channel and writes to the TCP stream
/// - A **reader task**: reads from the TCP stream and forwards to the inbox
///
/// This split means the caller never has to hold a lock to send a message.
pub async fn connect(
    addr: &str,
) -> Result<(Sender, mpsc::UnboundedReceiver<IrcMessage>), std::io::Error> {
    info!("Connecting to {}", addr);
    let stream = TcpStream::connect(addr).await?;
    info!("TCP connected to {}", addr);

    let framed = Framed::new(stream, IrcCodec::new());
    let (mut sink, mut stream) = futures::StreamExt::split(framed);

    // Channel for outbound messages (caller → writer task)
    let (out_tx, mut out_rx) = mpsc::unbounded_channel::<IrcMessage>();

    // Channel for inbound messages (reader task → caller)
    let (in_tx, in_rx) = mpsc::unbounded_channel::<IrcMessage>();

    // Writer task: takes messages from out_rx and sends them to the server
    tokio::spawn(async move {
        while let Some(msg) = out_rx.recv().await {
            debug!("--> {}", crate::proto::serializer::serialize(&msg));
            if let Err(e) = sink.send(msg).await {
                error!("Write error: {}", e);
                break;
            }
        }
        debug!("Writer task shut down");
    });

    // Reader task: receives messages from the server and forwards to in_tx
    tokio::spawn(async move {
        use futures::StreamExt;
        while let Some(result) = stream.next().await {
            match result {
                Ok(msg) => {
                    debug!("<-- {}", crate::proto::serializer::serialize(&msg));
                    if in_tx.send(msg).is_err() {
                        break; // Receiver dropped, shut down
                    }
                }
                Err(e) => {
                    error!("Read error: {}", e);
                    break;
                }
            }
        }
        debug!("Reader task shut down");
    });

    Ok((Sender { tx: out_tx }, in_rx))
}