利用thrift实现匹配系统的一些学习笔记(c++实现match_system部分
match_system
基于thrift,实现目标是模拟一个匹配系统,类似于游戏中两个用户的匹配
接收的参数:
- 一个
User
变量,代表一个用户 - 一个
string
变量,可选添加的额外信息,设置成string
类型,也是为了便于之后的维护,这样是支持以后传入一个json
表的,所以拓展性较好
version 1.0
初步实现添加用户信息和删除用户信息两个操作的传递,这一部分很简单,略讲
c++代码部分
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user
");
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user
");
return 0;
}
};
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
std::cout<<"Start match server ..."<<std::endl;
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
这样当我们从match_cilent
端调用相关函数时,便可以远程调用match_server
端的相关函数,进行用户的添加和删除,并把用户的信息传递过去
version 2.0
使用多线程实现对client
发送来的指令进行处理,并且进行对用户的简单匹配,这一部分很重要,将详细展开讲
c++代码部分
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "match_server/Match.h"
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <vector>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::match_service;
using namespace std;
class Task{
public:
User user;
string type;
private:
};
class MessageQueue{
public:
queue<Task> q;
mutex m;
condition_variable cv;
private:
}message_queue;
class Pool{
public:
void save_result(int a, int b){
printf("Match result: %d %d
",a,b);
}
void match(){
while(users.size()>1){
auto a=users[0],b=users[1];
users.erase(users.begin());
users.erase(users.begin());
save_result(a.id, b.id);
}
}
void add(User user){
users.push_back(user);
}
void remove(User user){
for(uint32_t i = 0;i < users.size();i++)
if(user.id = users[i].id ){
users.erase(users.begin() + i);
break;
}
}
private:
vector<User> users;
}pool;
class MatchHandler : virtual public MatchIf {
public:
MatchHandler() {
// Your initialization goes here
}
int32_t add_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("add_user
");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "add"});
message_queue.cv.notify_all();
return 0;
}
int32_t remove_user(const User& user, const std::string& info) {
// Your implementation goes here
printf("remove_user
");
unique_lock<mutex> lck(message_queue.m);
message_queue.q.push({user, "remove"});
message_queue.cv.notify_all();
return 0;
}
};
void consume_task(){
while(true){
unique_lock<mutex> lck(message_queue.m);
if(message_queue.q.empty()){
message_queue.cv.wait(lck);
}else{
auto task=message_queue.q.front();
message_queue.q.pop();
lck.unlock();
if(task.type == "add")
pool.add(task.user);
else if(task.type == "remove")
pool.remove(task.user);
pool.match();
}
}
}
int main(int argc, char **argv) {
int port = 9090;
::std::shared_ptr<MatchHandler> handler(new MatchHandler());
::std::shared_ptr<TProcessor> processor(new MatchProcessor(handler));
::std::shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
::std::shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
::std::shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
std::cout<<"Start match server ..."<<std::endl;
thread matching_thread(consume_task);
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
消费者:
consume_task()
生产者:
MatchHandler
中的add_user()
MatchHandler
中的remove_user()
资源:
message_queue
我们对于client
端发来的请求一共有两个步骤:
- 接收并存储请求
- 处理请求
所以有了以上这套生产-消费者系统
由于我们对用户池的操作,只有add_user()
和remove_user()
这两个操作,而这两个操作已经属于以上那一套生产-消费者模型,所以就不需要额外构建一套新的模型
接着,我们来讲讲代码上的实现
首先解释一下几个相关函数的大致意思(因为本人也没有完全系统学习相关c++文档,所以写一些个人理解,如有错误,欢迎各位读者大佬指出)
unique_lock<mutex> lck(message_queue.m)
:这个lck
变量代表一个互斥锁,创建之后,所有用message_queue.m
初始化得到锁的地方,只能有一处能够获得此锁,也就是只有一个地方能够继续往下执行,直到获得到的锁被释放。因此对于每个需要读取或者修改message_queue
的地方,都需要加上此锁,也就是前面的所有生产者和消费者:MatchHandler
中的add_user()
MatchHandler
中的remove_user()
consume_task()
message_queue.cv.wait(lck)
和message_queue.cv.notify_all()
:如果消息队列message_queue
为空,那么我们在consume_task
中,出于对效率的考虑,如果一直待在consume_task
的while
循环里,就会大量占用不必要的资源,这种情况有个专有名词叫做忙等。所以我们可以暂时把这个线程挂起,也就是“现在没有我要的资源,那我先休息,等有我需要的资源再叫(唤醒)我”的意思。从操作系统方面来解释,执行了message_queue.cv.wait(lck)
操作后,会把此线程加入到一个等待队列上,待条件满足了再执行message_queue.cv.notify_all()
唤醒它,因为这里我们只有一个线程可能会发生这种忙等现象,所有我们使message_queue.cv.notify_all()
和message_queue.cv.notify_one()
都是等效的,都是将consume_task()
中的while
那一段唤醒
在consume_task()
中,else
分支里我们取出下一个可以进行处理的task
后,就可以将lck
锁解锁掉,因为我们已经取出了队头task
,不在需要访问资源,此时别的线程就可以开始访问资源。所以将lck.unlock()
放在message_queue.q.pop()
之后,而不是pool.match()
之后,只是出于对效率的优化,两者的结果其实是一样的
Pool
的相关操作比较简单,不再赘述了