aboutsummaryrefslogtreecommitdiffstats
path: root/src/connection/mod.rs
blob: 10e8ec9efcf07a1017d98b0156e43a7b490a1219 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use futures::SinkExt;
use tokio::net::TcpStream;
use tokio::sync::mpsc;
use tokio_util::codec::Framed;
use tracing::{debug, error, info};

use crate::proto::codec::IrcCodec;
use crate::proto::message::{Command, IrcMessage};

/// A handle to send messages to the server.
/// Cheaply cloneable — pass it wherever you need to write.
#[derive(Clone)]
pub struct Sender {
    tx: mpsc::UnboundedSender<IrcMessage>,
}

impl Sender {
    /// Send a raw `IrcMessage` to the server.
    pub fn send(&self, msg: IrcMessage) {
        // Only fails if the connection task has shut down
        let _ = self.tx.send(msg);
    }

    /// Send a PRIVMSG to a channel or user.
    pub fn privmsg(&self, target: &str, text: &str) {
        self.send(IrcMessage::new(
            Command::Privmsg,
            vec![target.to_string(), text.to_string()],
        ));
    }

    /// Join a channel.
    pub fn join(&self, channel: &str) {
        self.send(IrcMessage::new(Command::Join, vec![channel.to_string()]));
    }

    /// Part a channel.
    pub fn part(&self, channel: &str, reason: Option<&str>) {
        let mut params = vec![channel.to_string()];
        if let Some(r) = reason {
            params.push(r.to_string());
        }
        self.send(IrcMessage::new(Command::Part, params));
    }

    /// Change nick.
    pub fn nick(&self, new_nick: &str) {
        self.send(IrcMessage::new(Command::Nick, vec![new_nick.to_string()]));
    }
}

/// Establish a TCP connection and return:
/// - A `Sender` handle for writing messages
/// - An `mpsc::UnboundedReceiver<IrcMessage>` for reading incoming messages
///
/// Two background tasks are spawned:
/// - A **writer task**: drains the sender channel and writes to the TCP stream
/// - A **reader task**: reads from the TCP stream and forwards to the inbox
///
/// This split means the caller never has to hold a lock to send a message.
pub async fn connect(
    addr: &str,
) -> Result<(Sender, mpsc::UnboundedReceiver<IrcMessage>), std::io::Error> {
    info!("Connecting to {}", addr);
    let stream = TcpStream::connect(addr).await?;
    info!("TCP connected to {}", addr);

    let framed = Framed::new(stream, IrcCodec::new());
    let (mut sink, mut stream) = futures::StreamExt::split(framed);

    // Channel for outbound messages (caller → writer task)
    let (out_tx, mut out_rx) = mpsc::unbounded_channel::<IrcMessage>();

    // Channel for inbound messages (reader task → caller)
    let (in_tx, in_rx) = mpsc::unbounded_channel::<IrcMessage>();

    // Writer task: takes messages from out_rx and sends them to the server
    tokio::spawn(async move {
        while let Some(msg) = out_rx.recv().await {
            debug!("--> {}", crate::proto::serializer::serialize(&msg));
            if let Err(e) = sink.send(msg).await {
                error!("Write error: {}", e);
                break;
            }
        }
        debug!("Writer task shut down");
    });

    // Reader task: receives messages from the server and forwards to in_tx
    tokio::spawn(async move {
        use futures::StreamExt;
        while let Some(result) = stream.next().await {
            match result {
                Ok(msg) => {
                    debug!("<-- {}", crate::proto::serializer::serialize(&msg));
                    if in_tx.send(msg).is_err() {
                        break; // Receiver dropped, shut down
                    }
                }
                Err(e) => {
                    error!("Read error: {}", e);
                    break;
                }
            }
        }
        debug!("Reader task shut down");
    });

    Ok((Sender { tx: out_tx }, in_rx))
}