一直想深入即时通信开发,于是在上次假期的时候就写了一个模型,最初选择的是PHP,原因是我有些PHP基础,但是做到服务器多线程处理客户端发送的消息的时候遇到了大问题:PHP默认版本是不支持多线程的,支持多线程的版本网友们的评论是说只能用来玩玩,而且电脑本地环境搭建PHP-ZTS又比较麻烦,于是就选择使用C++开发。截止记录该笔记,有很多优化思路还没有实现。 [TOC] ## 思维导图  ## 过程 首先从遇到的问题来说说我的开发过程吧: - 多线程 最初的时候C/S都没有使用多线程,C只能收一条消息发一条消息,而S也在多用户上线建立连接后发第一个包的时候线程被read堵塞,于是改成主线程只负责监听消息,收发在触发的时候创建线程里完成,结果出现读Socket消息的时候混乱,然后我当时考虑的方法是将读消息放到主线程去,读出来之后再放到其它线程操作。 这样做之后S端主线程堵塞的现象有所改善,但依旧没有解决问题。这时我想到的是我在做iOS开发时的队列,于是我创建消息模型,当监听到需要读消息的指令的时候将模型添加到队列里,后续由一个while循环一直监控队列里的任务数量,有消息就处理掉,没有就继续循环。然后将发消息也用队列处理。 结果CPU占用问题出现了,因为监听队列的while循环是和死循环,所以CPU吃不下,然后我想到的是发通知,但是以我对C++的了解还没到那个程度,然后换成信号量、线程锁,当队列空了的时候就让线程等待,有消息模型加到对了的时候启动线程,于是CPU占用问题解决。 - 收到的消息不完整 开始是用C++开发出点对点的即时通信,然后继续开发的时候总是发现出现消息长度太长的时候收到的消息不完整。一时没有找到原因,于是换PHP做服务端,C++的客户端,结果依旧,于是得到的结论是发送的消息不全(C、S都用PHP消息就是完整的)。最后返回去看read和send函数,结果是我在调用send函数的时候传递的C++的string的地址行不通,要将C++的string转成C语言的char指针,于是收消息不完整的问题就解决了。 - C端下线关闭套接字的时候访问到空指针S端crash 当时S端用select监听所有的套接字,然后遍历客户端集合找出有消息的套接字,C端是直接用while循环通过read阻塞线程读取消息。当C端下线关闭套接字的时候S端也关闭,然后将对应的套接字从集合里移除,结果将其从集合移除接下来的遍历集合就出现一个空指针了,然后就crash了。 我的解决方法是创建一个临时的集合用来保存下线的C端,等C端的集合遍历完成之后再移除并关闭套接字。 - 新用户加入的时候主线程锁死 我刚开始的时候是先判断S端负责监听连接的套接字是否在有变化的集合里,如果在就将新的套接字加到用户集合里,然后因在收到活跃套接字消息之后在遍历所有客户端套接字之前客户端的集合有了新用户加入,结果就导致偶尔又在读消息那里阻塞线程。(消息队列处理优化后读消息依旧放在主线程,因为判断C端下线需要通过读消息) 然后我就想如果有用户加入就不遍历C端集合,直接重新开始监听,至于这一轮监听收到的消息继续放在缓存区,等下一个循环再读取,然后问题解决。 - 断线重连 将整个过程封装起来,然后在这个封装的外面嵌一个循环。C端出现监听输入的线程越来越多。解决方法是在main函数启动套接字之前就创建线程,或者在创建线程的时候判断线程ID,如果用来保存该线程ID的对象为空就创建,否则就跳过。 - 数据处理 这一块遇到的问太多了,但都不是什么技术性问题就不多加描述,我传输数据的格式是将消息对象按照一些约定封装成json格式的字符串,然后将字符串base64编码后传输(base64编码是为了去除特殊字符在传输过程中造成的意外)。字符串序列化用的是[nlohmann/json](https://github.com/nlohmann/json),字符串序列化及反序列化这块花费了比较多的时间,因为谁也不能保证字符串是否一定是json格式。 ## 源码 ### S端 ```c++ #include #include #include #include #include #include #include #include #include #include #include "../message/message_model.h" #include "../config.h" #define MAX_CLIENT_COUNT 1000 using namespace std; using namespace messager; set clients; //负责用户通信的socket列表 map users; //用户信息 map users_socket; //保存用户的套接字 pthread_mutex_t read_mut = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t read_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t write_mut = PTHREAD_MUTEX_INITIALIZER; pthread_cond_t write_cond = PTHREAD_COND_INITIALIZER; void processingMessage(MessageModel model); //读消息队列 queue read_queue; void *runReadQueue(void *argc); //添加读消息任务,通知读消息的线程 void addReadmodel(MessageModel model); //发消息队列 queue write_queue; void *runWriteQueue(void *argc); //添加发消息任务,通知发消息的线程 void addWriteModel(MessageModel model); void structureJoinMessage(MessageModel *model) { //通知所有客户端有用户上线 string client_id = model->client_id; model->sockets = clients; model->type = msg; model->from = "system"; model->message = "用户->" + client_id + " 上线了!"; cout<<"用户"<sockets.size()< temp_clients = set(clients); temp_clients.erase(socket); model.sockets = temp_clients; model.from = "system"; model.message = "用户->" + client_id + "离线了!"; cout<<"用户->" + client_id + "离线了!在线用户:"< temp_clear_clients; //保存离线的客户 //遍历所有存活的套接字,找出活跃的套接字,将读消息任务加入读消息任务队列 int user_count = clients.size(); for (auto clienP = clients.begin(); clienP != clients.end(); ++clienP) { int socket = *clienP; if (FD_ISSET(socket, &socket_fds)){ char buf[kBuffsSize]; memset(&buf, 0, kBuffsSize); read(socket, &buf, kBuffsSize); if(string(buf).length() == 0){ temp_clear_clients.insert(socket); string client_id = users[socket]; if (client_id.length() > 0){ sendLeaveMessage(socket); } }else{ MessageModel model; model.socket = socket; model.message = buf; //添加处理消息队列 addReadmodel(model); } } } for (auto clienP = temp_clear_clients.begin(); clienP != temp_clear_clients.end(); ++clienP) { int socket = *clienP; string client_id = users[socket]; // cout<<"清理套接字"< #include #include #include #include #include #include #include #include #include #include "../message/message_model.h" #include "../config.h" using namespace std; using namespace messager; int client_socket; string client_id = ""; char buf[kBuffsSize]; bool is_line; //在线状态 pthread_t send_id; void doShake(Message_Type type); void *createSendMessagePthread(void *) { cout<<"请输入内容:"< 0) { doShake(join); } } } } void echoMessage(MessageModel model) { model.processMessage(); switch (model.type) { case Message_Type::bind: { client_id = model.client_id; cout<<"客户编号:==> "<"< cout<<"收到一条非法结构的消息:"<"< 0){ tmep_model.message = buf; // cout<<"收到原始数据"< #include #include using namespace std; const size_t kBuffsSize = 1024 * 10; //读区消息的最长长度 namespace messager { void coutError(int error); enum Message_Type { unknow = -1, //异常消息 msg = 0, //普通消息 join = 1, //用户加入 bind = 2, //绑定信息 ctl = 3, //控制信息 beat = 4, //用户心跳 toUser = 5, //点对点消息 // leave = 2, //用户离开 }; class MessageModel { public: //数据发送者的套接字,消息的所有者,如果不需要发送可以设置消息的所有者套接字为-1(不建议) int socket = -1; //数据接收者的套接字集合,如果设置了这个集合那么发送消息会忽略socket字段 set sockets; //消息类型 Message_Type type = msg; //消息所有者的id,在不设置from字段的时候取这个字段作为消息的发送者,可以在发送消息之前直接指定某些类型消息的发送者 string client_id; //消息内容,在没有处理之前是原数据,处理之后变成数据里的body字段的内容 //发送消息的时候则表示数据中body字段的值,构造发送消息的时候会先将内容序列号,保持特殊字符的完整性 //如果消息体就是两个双引号,json序列化的时候就会变成空字符串了,双引号之间如果都是空格也发送不了 //构造消息之前会将消息前后的空格删除 string message; //消息的发送者 string from; //消息的接收者,预留 string to; //特殊字段,用来保存心跳及用户加入时间戳的保存 string tag; //发送消息 void sendMessage(); //处理消息,将一个原数据解析出来,放置到消息中的字段对应的属性里 void processMessage(); private: //对象反序列化并按特定格式组成的json格式字符串 string send_msg; //构造json,特定格式在该函数里设定,需要和processMessage函数里取消息规则匹配 void constructMessage(); }; } ``` #### message_model.cpp ```c++ #include #include #include "message_model.h" #include "../tool/tool.h" #include "../tool/json.hpp" using namespace nlohmann; namespace messager { //套接字错误打印 void coutError(int error) { cout<message = base64_decode(this->message); json result = jsonWithString(this->message); // cout<<"解码后的数据:"<()) : msg; }else{ msg_type = msg; } this->type = msg_type; time_t new_time = time(NULL); if (body.is_string()) { this->message = body; }else{ this->message = body.dump(); } if (result.find("from") != result.end()) this->from = result["from"]; if (result.find("to") != result.end()) this->to = result["to"]; if (msg_type == join || msg_type == beat) { //以收到的消息的时间节点做tag,减少数据传输量。*缩短字段的名字也可以减少数据传输量,而且能算是一种加密。 this->tag = to_string(new_time); } switch (msg_type) { case Message_Type::join: { if (!this->from.empty()) { this->client_id = this->from; }else { this->client_id = md5String(this->tag + to_string(new_time) + to_string(rand() % 10000)); } } break; case Message_Type::bind: { if (body.find("clientId") != body.end()) this->client_id = body["clientId"]; if (this->client_id.empty()) this->client_id = md5String(to_string(new_time)); } break; default:{ } break; } }else{ //数据格式非法,设置消息为未知类型 //数据非法也有可能是一次收到多个消息的数据了 this->type = unknow; } } void MessageModel::constructMessage() { string from = this->from; string to = this->to; string content = this->message; if (from.empty()){ from = this->client_id; } json result, json_header, json_body; if (from.length() > 0) result["from"] = from; if (to.length() > 0) result["to"] = to; if (!content.empty()) result["body"] = jsonWithString(content); if (this->type != msg) result["type"] = this->type; this->send_msg = result.dump(); } //发送消息 void MessageModel::sendMessage() { bool need_send_message = true; //去掉首尾多余的空格 this->message.erase(0,this->message.find_first_not_of(" ")); this->message.erase(this->message.find_last_not_of(" ") + 1); switch (this->type) { case Message_Type::bind: { this->message = "{\"clientId\":\"" + this->client_id + "\"}"; } break; case Message_Type::join: case Message_Type::beat: { this->message = "tag"; } break; default:{ need_send_message = !this->message.empty(); } break; } // cout<<"开始发送消息"<constructMessage(); // cout<message<send_msg); // cout<message<sockets.empty()){ send(this->socket, msg_str, strlen(msg_str), 0); }else{ for (auto ket = this->sockets.begin(); ket != this->sockets.end(); ket++){ int socket = *ket; send(socket, msg_str, strlen(msg_str), 0); } } } } } ``` ### 一些配置管理 #### config.h ```C++ #include "tool/json.hpp" using namespace std; using namespace nlohmann; const int kServerWaitMin = 30; //服务器套接字活跃检测时间间隔 const int kClientWaitMin = 1; //客户端套接字活跃检测时间间隔、心跳间隔 struct sockaddr_in getAddress(); struct sockaddr_in getAddress(int argc, char *argv[]); ``` #### config.cpp ```c++ #include #include #include #include #include #include "config.h" using namespace std; struct sockaddr_in getAddress(int argc, char *argv[]) { int index = 1; struct sockaddr_in serverAddress; string host; string port; map arg; if (argc % 2 == 0) { cout<<"参数错误\ndefault:\n\t\thost 127.0.0.1 port 10132\n\neg:\n\t\t host 127.0.0.1 port 10086\n\t\t host 192.168.1.1\n\t\t port 10010\n"<= 3){ while (index < argc) { string key = argv[index ++]; string value = argv[index ++]; arg[key] = value; } } if (arg.size() >= 1) { if (arg.find("host") != arg.end() && arg.find("port") != arg.end()) { host = arg["host"]; port = arg["port"]; }else if (arg.find("host") != arg.end()) { host = arg["host"]; port = "10132"; }else if (arg.find("port") != arg.end()) { host = "127.0.0.1"; port = arg["port"]; } } if (host.empty() || port.empty()) { serverAddress = getAddress(); }else{ memset(&serverAddress, 0, sizeof(serverAddress)); serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = inet_addr(host.c_str()); serverAddress.sin_port = htons(stoi(port)); } return serverAddress; } struct sockaddr_in getAddress() { char buf[1024] = {0}; getcwd(buf, 1024); string path = buf; path += "/../address.json"; ifstream ifs(path); json document; ifs>>document; if (document.empty()){ ofstream out(path); out<<"{\"host\":\"127.0.0.1\",\"port\":10132}"; out.close(); return getAddress(); } string host = document["host"]; int port = document["port"]; // string host = "127.0.0.1"; // int port = 10132; struct sockaddr_in serverAddress; memset(&serverAddress, 0, sizeof(serverAddress)); serverAddress.sin_family = AF_INET; serverAddress.sin_addr.s_addr = inet_addr(host.c_str()); serverAddress.sin_port = htons(port); return serverAddress; } ``` ### MD5、Base64及json工具 [tool.zip](//blog.tcoding.cn/usr/uploads/2018/04/2393731013.zip) >完整的代码压缩包:[socket.zip](//blog.tcoding.cn/usr/uploads/2018/04/407659026.zip) - 还有很多优化思路,不过没有长假的零碎时间不敢实现,我会停不下来,然后熬夜的。这应该算是我的IM及消息推送系统的雏形,慢慢完善。更新可能不会很及时。
没有评论