前言
进程间通信,故名思意是在不同进程间传递消息,因为每个进程都有自己独立的虚拟地址空间,方式主要有如下几种:管道(有名管道和无名管道),共享内存,内存映射,消息队列, 信号,信号量,套接字等方式
一、常见进程间通信方式的原理
- 无名管道(PIPE):只能用在具有亲缘关系的进程中比如父子进程,基本原理是使用内核缓冲区作为通信介质,亲缘关系的进程具有缓冲区的读写描述符,一个进程写入时,另一个进程只能读,是半双工的。并且因为数据是内核区的,每次读写都需要在内核和用于区之间交换数据。
- 有名管道(FIFO): 可以在无亲缘关系的进程中,基本原理与无名管道相同,当是具有文件实体,用于标识内核缓冲区。
- 共享内存:不同于管道基于内核缓冲区进行进程间通信,共享内存是开辟在不同进程的共享区域中(段),不同进程间操作共享区域数据基本不需要内核介入。共享内存是最快的IPC方式。
- 文件映射:基于文件共享的想法,不过操作时是将文件数据映射到内存中,写入时需要将数据同步到硬盘中,因此即使进程退出,数据也保留在磁盘中。
- 信号:Linux中信号机制
- 信号量:信号和信号量都是内核区的对象,本质上与管道差不多,携带的信息更少,一般用于进程同步而不是传递数据。
- 套接字:不同主机间的进程间通信,将传输层中的两台主机抽象成两个端点,这两个端点就是套接字。
二、基于信号量的进程间通信
1. 功能
使用进程实现生产者和消费者模型,进程同步伪代码如下:
void Producer()
{
P(empty)
P(mutex)
临界区
V(mutex)
V(full)
}
void Customer()
{
P(full)
P(mutex)
临界区
P(mutex)
v(full)
}
PRODUCER个生产者依次向share.txt中写入1到n的数字字符串,长度为10,CUSTOMER个生产者依次从share.txt中读出写入的数字字符串,并记录到log.txt。
2.代码
代码如下(示例):
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <sys/sem.h>
#include <sys/shm.h>
#define CONSUMER 5
#define PRODUCER 2
#define LOGFILE "log.txt"
#define SHAREDFILE "share.txt"
#define BUFSIZE 10
#define COUNTER 10
#define handle_error(msg) do {perror(msg); exit(EXIT_FAILURE);} while(0)
static int empty, full, mutex;
static int rposid, dataid;
union semun
{
int val;
struct semid_ds *buf;
unsigned short *array;
struct seminfo *__buf;
};
void sem_wait(int semid)
{
struct sembuf sof;
sof.sem_num = 0;
sof.sem_op = -1;
sof.sem_flg = SEM_UNDO;
if(semop(semid, &sof, 1) != 0)
perror("semop");
}
void sem_post(int semid)
{
struct sembuf sof;
sof.sem_num = 0;
sof.sem_op = 1;
sof.sem_flg = SEM_UNDO;
if(semop(semid, &sof, 1) != 0)
perror("semop");
}
void producer(int id)
{
int fd;
char buf[BUFSIZE] = {0};
int* data = shmat(dataid, NULL, SHM_RND);
while(1)
{
sem_wait(empty);
sem_wait(mutex);
fd = open(SHAREDFILE, O_RDWR | O_APPEND, 0666);
sprintf(buf, "%d", (*data)++);
write(fd, buf, BUFSIZE);
close(fd);
sem_post(mutex);
sem_post(full);
}
if(shmdt(data) == -1)
handle_error("shmdt");
printf("producer endding!\n");
while(1);
}
void consumer(int id)
{
int fd, log;
char buf[BUFSIZE] = {0};
char logbuf[20] = {0};
int* rpos;
int trpos = 0;
while(1)
{
rpos = shmat(rposid, NULL, SHM_RND);
if(rpos == (void*)-1)
handle_error("shmat");
sem_wait(full);
sem_wait(mutex);
fd = open(SHAREDFILE, O_RDONLY, 0666);
if(fd == -1)
handle_error("open");
log = open(LOGFILE, O_RDWR | O_APPEND, 0666);
if(log == -1)
handle_error("open");
lseek(fd, *rpos, SEEK_SET);
if(read(fd, buf, BUFSIZE) == -1)
handle_error("read");
(*rpos) += BUFSIZE;
trpos = *rpos;
sprintf(logbuf, "%d: %s\n", id, buf);
if(write(log, logbuf, sizeof(logbuf)) == -1)
handle_error("write");
printf("%s \n", logbuf);
printf("%d \n", *rpos);
sem_post(mutex);
sem_post(empty);
close(fd);
close(log);
if(shmdt((void*)rpos) != 0)
handle_error("shmdt");
}
printf("consumer %d endding!\n", id);
}
int main()
{
int ret;
int fd;
char buf[2];
int i = 0;
empty = semget((key_t)1122, 1, IPC_CREAT | 0666);
if(empty == -1)
handle_error("empty");
full = semget((key_t)2233, 1, IPC_CREAT | 0666);
if(full == -1)
handle_error("full");
mutex = semget((key_t)3344, 1, IPC_CREAT | 0666);
if(mutex == -1)
handle_error("mutex");
rposid = shmget((key_t)4455, 4, IPC_CREAT | 0666);
if(rposid == -1)
handle_error("shmget");
dataid = shmget((key_t)5566, 4, IPC_CREAT | 0666);
if(dataid == -1)
handle_error("shmget");
union semun arg;
arg.val = 5;
if(semctl(empty, 0, SETVAL, arg) == -1)
perror("semctl");
arg.val = 0;
semctl(full, 0, SETVAL, arg);
arg.val = 1;
semctl(mutex, 0, SETVAL, arg);
int* data = (int*)shmat(dataid, NULL, SHM_RND);
*data = 0;
printf("rdata: %d\n", *data);
if(shmdt((void*)data) != 0)
handle_error("shmdt");
int* rpos = (int*)shmat(rposid, NULL, SHM_RND);
*rpos = 0;
printf("rpos: %d\n", *rpos);
if(shmdt((void*)rpos) != 0)
handle_error("shmdt");
fd = open(LOGFILE, O_CREAT | O_RDWR | O_TRUNC, 0666);
close(fd);
fd = open(SHAREDFILE, O_CREAT | O_RDWR | O_TRUNC, 0666);
close(fd);
while(i++ < CONSUMER)
{
ret = fork();
if(ret == 0)
{
printf("consumer %d is running\n", i);
consumer(i);
return 0;
}
}
i = 0;
while(i++ < PRODUCER)
{
ret = fork();
if(ret == 0)
{
printf("producer %d is running\n", i);
producer(i);
return 0;
}
}
int st;
for(int i = 0; i < CONSUMER; ++i)
{
wait(&st);
printf("st: %d\n", WCOREDUMP(st));
}
semctl(empty, 0, IPC_RMID);
semctl(full, 0, IPC_RMID);
semctl(mutex, 0, IPC_RMID);
shmctl(rposid, IPC_RMID, NULL);
printf("end\n");
return 0;
}
3. 细节
进程间通信的信号量没有P和V操作的函数,手动实现,其中sem_wait和sem_post是P和V操作。 因为是进程间通信,其虚拟地址不同,虽然上述代码是有亲缘关系的,但是也适用于无亲缘关系见通信
总结
因为经常写线程间通信,因此例如rpos记录当前读的位置,一度被我当作全局变量来进行进程间通信,导致失败很多次。创建log和share文件时候需要加上0666的读写权限,默认是没有写权限的。
|