本专栏文章是Linux多线程同步(互斥锁、条件变量、信号量、自旋锁、读写锁)专题视频的源代码,视频地址:
1、book47.cpp,互斥锁实现数据库连接池的客户端程序。
/*
* 程序名:demo47.cpp,此程序演示采用freecplus框架的CTcpClient类实现socket通信的客户端。
* 作者:C语言技术网() 日期:20190525
*/
#include "../_freecplus.h"
int main(int argc,char *argv[])
{
printf("pid=%dn",getpid());
CTcpClient TcpClient; // 创建客户端的对象。
if (TcpClient.ConnectToServer("172.21.0.3",5858)==false) // 向服务端发起连接请求。
{
printf("TcpClient.ConnectToServer("172.21.0.3",5858) failed.n"); return -1;
}
char strbuffer[1024]; // 存放数据的缓冲区。
for (int ii=0;ii
{
memset(strbuffer,0,sizeof(strbuffer));
snprintf(strbuffer,50,"(%d)这是第%d个超级女生,编号d。",getpid(),ii+1,ii+1);
printf("发送:%sn",strbuffer);
if (TcpClient.Write(strbuffer)==false) break; // 向服务端发送请求报文。
memset(strbuffer,0,sizeof(strbuffer));
if (TcpClient.Read(strbuffer,20)==false) break; // 接收服务端的回应报文。
printf("接收:%sn",strbuffer);
sleep(5);
}
// 程序直接退出,析构函数会释放资源。
}
2、serverdb.cpp,互斥锁实现数据库连接池的服务端程序。
/*
* 程序名:serverdb.cpplinux 信号量 互斥锁,此程序演示多线程的网络服务端的数据库连接池。
* 作者:C语言技术网() 日期:20190525
* 编译指令:g++ -g -Wno-write-strings -o serverdb serverdb.cpp -I/oracle/home/rdbms/public -L/oracle/home/lib -L. -lclntsh /freecplus/db/oracle/_ooci.cpp /freecplus/_freecplus.cpp -lpthread -lm -lc
*/
#include "/freecplus/_freecplus.h"
#include "/freecplus/db/oracle/_ooci.h"
pthread_mutex_t mutexs[100]; // 用于数据库连接池的锁。
connection conns[100]; // 数据库连接池。
bool initconns(); // 初始化数据库连接池。
connection *getconn(); // 从连接池中获取一个数据库连接。
void freeconn(connection *in_conn); // 释放数据库连接。
void freeconns(); // 释放数据库连接池。
void *pthmain(void *arg);
CTcpServer TcpServer; // 创建服务端对象。
vector vpthid; // 存放线程id的容器。
void mainexit(int sig); // 信号2和15的处理函数。
// 线程清理函数。
void pthmainexit(void *arg);
CLogFile logfile;
int main(int argc,char *argv[])
{
signal(2,mainexit); signal(15,mainexit); // 捕获信号2和15
logfile.Open("/tmp/serverdb.log","a+");
if (TcpServer.InitServer(5858)==false) // 初始化TcpServer的通信端口。
{
logfile.Write("TcpServer.InitServer(5858) failed.n"); return -1;
}
if (initconns()==false) // 初始化数据库连接池。
{
logfile.Write("initconns() failed.n"); return -1;
}
while (true)
{
if (TcpServer.Accept()==false) // 等待客户端连接。
{
logfile.Write("TcpServer.Accept() failed.n"); return -1;
}
logfile.Write("客户端(%s)已连接。n",TcpServer.GetIP());
pthread_t pthid;
if (pthread_create(&pthid,NULL,pthmain,(void *)(long)TcpServer.m_connfd)!=0)
{ logfile.Write("pthread_create failed.n"); return -1; }
vpthid.push_back(pthid); // 把线程id保存到vpthid容器中。
}
return 0;
}
void *pthmain(void *arg)
{
pthread_cleanup_push(pthmainexit,arg); // 设置线程清理函数。
pthread_detach(pthread_self()); // 分离线程。
pthread_setcanceltype(PTHREAD_CANCEL_DISABLE,NULL); // 设置取消方式为立即取消。
int sockfd=(int)(long)arg; // 与客户端的socket连接。
int ibuflen=0;
char strbuffer[1024]; // 存放数据的缓冲区。
while (true)
{
memset(strbuffer,0,sizeof(strbuffer));
if (TcpRead(sockfd,strbuffer,&ibuflen,300)==false) break; // 接收客户端发过来的请求报文。
logfile.Write("接收:%sn",strbuffer);
connection *conn=getconn(); // 获取一个数据库连接。
// 处理业务
sleep(2);
freeconn(conn); // 释放一个数据库连接。
strcat(strbuffer,"ok"); // 在客户端的报文后加上"ok"。
logfile.Write("发送:%sn",strbuffer);
if (TcpWrite(sockfd,strbuffer)==false) break; // 向客户端回应报文。
}
logfile.Write("客户端已断开。n"); // 程序直接退出linux 信号量 互斥锁,析构函数会释放资源。
pthread_cleanup_pop(1);
pthread_exit(0);
}
// 信号2和15的处理函数。
void mainexit(int sig)
{
logfile.Write("mainexit begin.n");
// 关闭监听的socket。
TcpServer.CloseListen();
// 取消全部的线程。
for (int ii=0;ii
{
logfile.Write("cancel %ldn",vpthid[ii]);
pthread_cancel(vpthid[ii]);
}
// 释放数据库连接池。
freeconns();
logfile.Write("mainexit end.n");
exit(0);
}
// 线程清理函数。
void pthmainexit(void *arg)
{
logfile.Write("pthmainexit begin.n");
// 关闭与客户端的socket。
close((int)(long)arg);
// 从vpthid中删除本线程的id。
for (int ii=0;ii
{
if (vpthid[ii]==pthread_self())
{
vpthid.erase(vpthid.begin()+ii);
}
}
logfile.Write("pthmainexit end.n");
}
// 初始化数据库连接池。
bool initconns()
{
for (int ii=0;ii
{
if (conns[ii].connecttodb("scott/tiger","Simplified Chinese_China.ZHS16GBK") != 0)
{
logfile.Write("connect database failed.n%sn",conns[ii].m_cda.message); return false;
}
}
for (int ii=0;ii
return true;
}
// 从连接池中获取一个数据库连接。
connection *getconn()
{
for (int ii=0;ii
{
if (pthread_mutex_trylock(&mutexs[ii])==0)
{
logfile.Write("get a conn[%d] ok.n",ii);
return &conns[ii];
}
}
return NULL;
}
// 释放数据库连接。
void freeconn(connection *in_conn)
{
for (int ii=0;ii
{
if (in_conn==&conns[ii]) pthread_mutex_unlock(&mutexs[ii]);
}
}
// 释放数据库连接池。
void freeconns()
{
for (int ii=0;ii
{
conns[ii].disconnect(); pthread_mutex_destroy(&mutexs[ii]);
}
}
3、book17.cpp,用互斥锁和条件变量实现高速缓存。
// 本程序演示用互斥锁和条件变量实现高速缓存。
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int mesgid=1; // 消息的记数器。
// 缓存消息的结构体。
struct st_message
{
int mesgid;
char message[1024];
} stmesg;
vector vcache; // 用vector容器做缓存。
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 声名并初始化条件变量。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 声名并初始化互斥锁。
// 消费者、出队线程主函数。
void *outcache(void *arg)
{
struct st_message stmesg;
while (true)
{
pthread_mutex_lock(&mutex); // 加锁。
// 如果缓存为空,等待。 // 条件变量虚假唤醒。
while (vcache.size() == 0)
{
pthread_cond_wait(&cond,&mutex);
}
// 从缓存中获取第一条记录linux教程,然后删除该记录。
memcpy(&stmesg,&vcache[0],sizeof(struct st_message)); // 内存拷贝。
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex); // 解锁。
// 以下是处理业务的代码。
printf("phid=%ld,mesgid=%dn",pthread_self(),stmesg.mesgid);
usleep(100);
}
}
// 生产者、把生产的数据存入缓存。
void incache(int sig)
{
struct st_message stmesg;
memset(&stmesg,0,sizeof(struct st_message));
pthread_mutex_lock(&mutex); // 加锁。
// 生产数据,放入缓存。
stmesg.mesgid=mesgid++; vcache.push_back(stmesg); // 内存拷贝。
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex); // 解锁。
pthread_cond_broadcast(&cond); // 触发条件,激活全部的线程。
}
int main()
{
signal(15,incache); // 接收15的信号linux手机软件,调用生产者函数。
pthread_t thid1,thid2,thid3;
pthread_create(&thid1,NULL,outcache,NULL);
pthread_create(&thid2,NULL,outcache,NULL);
pthread_create(&thid3,NULL,outcache,NULL);
pthread_join(thid1,NULL);
pthread_join(thid2,NULL);
pthread_join(thid3,NULL);
return 0;
}
4、book42.cpp,用互斥锁和信号量实现高速缓存。
// 本程序演示用互斥锁和信号量实现高速缓存。
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
int mesgid=1; // 消息的记数器。
// 缓存消息的结构体。
struct st_message
{
int mesgid;
char message[1024];
} stmesg;
vector vcache; // 用vector容器做缓存。
sem_t sem; // 声明信号量。
pthread_mutex_t mutex; // 声名并初始化互斥锁。
// 消费者、出队线程主函数。
void *outcache(void *arg)
{
struct st_message stmesg;
while (true)
{
while (vcache.size() == 0)
{
sem_wait(&sem); // 如果缓存中没有数据,等待信号。
printf("%ld wait ok.n",pthread_self());
}
pthread_mutex_lock(&mutex); // 加锁。
if (vcache.size() == 0) // 判断缓存中是否有数据。
{
pthread_mutex_unlock(&mutex); continue; // 解锁,continue。
}
// 从缓存中获取第一条记录,然后删除该记录。
memcpy(&stmesg,&vcache[0],sizeof(struct st_message));
vcache.erase(vcache.begin());
pthread_mutex_unlock(&mutex); // 解锁。
// 以下是处理业务的代码。
printf("phid=%ld,mesgid=%dn",pthread_self(),stmesg.mesgid);
usleep(100);
}
}
// 生产者、把生产的数据存入缓存。
void incache(int sig)
{
struct st_message stmesg;
memset(&stmesg,0,sizeof(struct st_message));
pthread_mutex_lock(&mutex); // 加锁。
// 生产数据,放入缓存。
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
stmesg.mesgid=mesgid++; vcache.push_back(stmesg);
pthread_mutex_unlock(&mutex); // 解锁。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
sem_post(&sem); // 信号加1。
}
int main()
{
signal(15,incache); // 接收15的信号,调用生产者函数。
sem_init(&sem,0,0); // 初始化信号量。
pthread_mutex_init(&mutex,NULL); // 初始化互斥锁。
pthread_t thid1,thid2,thid3;
pthread_create(&thid1,NULL,outcache,NULL);
pthread_create(&thid2,NULL,outcache,NULL);
pthread_create(&thid3,NULL,outcache,NULL);
pthread_join(thid1,NULL);
pthread_join(thid2,NULL);
pthread_join(thid3,NULL);
return 0;
}