IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> Linux 基于信号量的进程间通信 -> 正文阅读

[系统运维]Linux 基于信号量的进程间通信


前言

进程间通信,故名思意是在不同进程间传递消息,因为每个进程都有自己独立的虚拟地址空间,方式主要有如下几种:管道(有名管道和无名管道),共享内存,内存映射,消息队列, 信号,信号量,套接字等方式


一、常见进程间通信方式的原理

  1. 无名管道(PIPE):只能用在具有亲缘关系的进程中比如父子进程,基本原理是使用内核缓冲区作为通信介质,亲缘关系的进程具有缓冲区的读写描述符,一个进程写入时,另一个进程只能读,是半双工的。并且因为数据是内核区的,每次读写都需要在内核和用于区之间交换数据。
  2. 有名管道(FIFO): 可以在无亲缘关系的进程中,基本原理与无名管道相同,当是具有文件实体,用于标识内核缓冲区。
  3. 共享内存:不同于管道基于内核缓冲区进行进程间通信,共享内存是开辟在不同进程的共享区域中(段),不同进程间操作共享区域数据基本不需要内核介入。共享内存是最快的IPC方式。
  4. 文件映射:基于文件共享的想法,不过操作时是将文件数据映射到内存中,写入时需要将数据同步到硬盘中,因此即使进程退出,数据也保留在磁盘中。
  5. 信号:Linux中信号机制
  6. 信号量:信号和信号量都是内核区的对象,本质上与管道差不多,携带的信息更少,一般用于进程同步而不是传递数据。
  7. 套接字:不同主机间的进程间通信,将传输层中的两台主机抽象成两个端点,这两个端点就是套接字。

二、基于信号量的进程间通信

1. 功能

使用进程实现生产者和消费者模型,进程同步伪代码如下:

/**
empty: 空闲资源,如果为负数表示当前有多少生产者进程等待空闲资源, 如果为正数表示还剩多少资源可用
full: 可用资源,如果为正数表示当前有多少消费者等待可用资源,如果为负数表示有多少消费者进程等待可用资源
mutex: 锁,信号量初值为1
*/
void Producer()
{
	P(empty)  
	P(mutex)
	临界区
    V(mutex)
    V(full)
}

void Customer()
{
	P(full)
	P(mutex)
	临界区
	P(mutex)
	v(full)
}

PRODUCER个生产者依次向share.txt中写入1到n的数字字符串,长度为10,CUSTOMER个生产者依次从share.txt中读出写入的数字字符串,并记录到log.txt。

2.代码

代码如下(示例):


#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <errno.h>
#include <signal.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <sys/sem.h>
#include <sys/shm.h>

/**
    interprocess communication with semphore
    (1) semget(key_t, nsems, sem_flag)
    (2) semctl(semid, sem_num, cmd, semun) SETVAL, GETVAL
    (3) semop(semid, &sem_buf, npos) 
*/

/**
 * shmget
*/

#define CONSUMER    5
#define PRODUCER    2
#define LOGFILE     "log.txt"
#define SHAREDFILE  "share.txt"
#define BUFSIZE     10
#define COUNTER     10

#define handle_error(msg) do {perror(msg); exit(EXIT_FAILURE);} while(0)

static int empty, full, mutex;
static int rposid, dataid;

//int data = 0;   
// int rpos = 0;

union semun
{
    int              val;    /* Value for SETVAL */
    struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
    unsigned short  *array;  /* Array for GETALL, SETALL */
    struct seminfo  *__buf;  /* Buffer for IPC_INFO
                                (Linux-specific) */
};

void sem_wait(int semid) //P
{
    struct sembuf sof;
    sof.sem_num = 0;  //sem_num th of sem set
    sof.sem_op = -1;  //adj
    sof.sem_flg = SEM_UNDO;

    if(semop(semid, &sof, 1) != 0)
        perror("semop");
}

void sem_post(int semid)
{
    struct sembuf sof;
    sof.sem_num = 0;
    sof.sem_op = 1;
    sof.sem_flg = SEM_UNDO;

    if(semop(semid, &sof, 1) != 0)
        perror("semop");
}


void producer(int id)
{
    int fd;
    char buf[BUFSIZE] = {0};
    int* data = shmat(dataid, NULL, SHM_RND);
    while(1)
    {

        // if(*data >= COUNTER)
        //     break;

        sem_wait(empty);
        sem_wait(mutex);

        fd = open(SHAREDFILE, O_RDWR | O_APPEND, 0666);    
        
        sprintf(buf, "%d", (*data)++);
        //printf("%s\n", buf);
        write(fd, buf, BUFSIZE);

        close(fd);

        sem_post(mutex);
        sem_post(full);
    }

    if(shmdt(data) == -1)
        handle_error("shmdt");

    printf("producer endding!\n");
    while(1);

    
}

void consumer(int id)
{

    int fd, log;
    char buf[BUFSIZE] = {0};
    char logbuf[20] = {0};
    int* rpos;
    int trpos = 0;
    while(1)
    {    
        // printf("full: %d \n", semctl(full, 0, GETVAL));
        // printf("mutex: %d \n", semctl(mutex, 0, GETVAL));

        rpos = shmat(rposid, NULL, SHM_RND);
        if(rpos == (void*)-1)
            handle_error("shmat");

        // if(*rpos >= 10 * COUNTER)
        //     break;

        sem_wait(full);
        sem_wait(mutex);

        fd = open(SHAREDFILE, O_RDONLY, 0666);
        if(fd == -1)
            handle_error("open");

        log = open(LOGFILE, O_RDWR | O_APPEND, 0666);
        if(log == -1)
            handle_error("open");
            
        lseek(fd, *rpos, SEEK_SET);

        if(read(fd, buf, BUFSIZE) == -1)
            handle_error("read");

        (*rpos) += BUFSIZE;
        trpos  = *rpos;
        
        sprintf(logbuf, "%d: %s\n", id, buf);
        
        if(write(log, logbuf, sizeof(logbuf)) == -1)
            handle_error("write");

        printf("%s \n", logbuf);
        printf("%d \n", *rpos);

        sem_post(mutex);
        sem_post(empty);
        
        close(fd);
        close(log);

        if(shmdt((void*)rpos) != 0)
            handle_error("shmdt");
    }

    printf("consumer %d endding!\n", id);
}

int main()
{
    int ret;
    int fd;
    char buf[2];
    int i = 0;

    //获取信号
    empty = semget((key_t)1122, 1, IPC_CREAT | 0666);
    if(empty == -1)
        handle_error("empty");
    full = semget((key_t)2233, 1, IPC_CREAT | 0666);
    if(full == -1)
        handle_error("full");
    mutex = semget((key_t)3344, 1, IPC_CREAT | 0666);
    if(mutex == -1)
        handle_error("mutex");
    rposid = shmget((key_t)4455, 4, IPC_CREAT | 0666);
    if(rposid == -1)
        handle_error("shmget");
    dataid = shmget((key_t)5566, 4, IPC_CREAT | 0666);
    if(dataid == -1)
        handle_error("shmget");

    //初始化信号
    union semun arg;
    arg.val = 5;
    if(semctl(empty, 0, SETVAL, arg) == -1) //semid, semnum-th, flag
        perror("semctl");
    arg.val = 0;
    semctl(full, 0, SETVAL, arg);
    arg.val = 1;
    semctl(mutex, 0, SETVAL, arg);

    int* data = (int*)shmat(dataid, NULL, SHM_RND);
    *data = 0;
    printf("rdata: %d\n", *data);
    if(shmdt((void*)data) != 0)
        handle_error("shmdt");

    int* rpos = (int*)shmat(rposid, NULL, SHM_RND);
    *rpos = 0;
    printf("rpos: %d\n", *rpos);
    if(shmdt((void*)rpos) != 0)
        handle_error("shmdt");

    fd = open(LOGFILE, O_CREAT | O_RDWR | O_TRUNC, 0666);
    close(fd);

    fd = open(SHAREDFILE, O_CREAT | O_RDWR | O_TRUNC, 0666);
    close(fd);


    while(i++ < CONSUMER)
    {
        ret = fork();
        if(ret == 0)
        {
            printf("consumer %d is running\n", i);
            consumer(i);
            return 0;
        }
    }

    i = 0;
    while(i++ < PRODUCER)
    {
        ret = fork();
        if(ret == 0)
        {
            printf("producer %d is running\n", i);
            producer(i);
            return 0;
        }
    }


    int st;
    for(int i = 0; i < CONSUMER; ++i)
    {
        wait(&st);
        printf("st: %d\n", WCOREDUMP(st));
    }


    semctl(empty, 0, IPC_RMID);
    semctl(full,  0,  IPC_RMID);
    semctl(mutex, 0, IPC_RMID);
    shmctl(rposid, IPC_RMID, NULL);
    
    printf("end\n");

    return 0;
}

3. 细节

进程间通信的信号量没有P和V操作的函数,手动实现,其中sem_wait和sem_post是P和V操作。
因为是进程间通信,其虚拟地址不同,虽然上述代码是有亲缘关系的,但是也适用于无亲缘关系见通信


总结

因为经常写线程间通信,因此例如rpos记录当前读的位置,一度被我当作全局变量来进行进程间通信,导致失败很多次。创建log和share文件时候需要加上0666的读写权限,默认是没有写权限的。

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-05-04 07:32:52  更:2022-05-04 07:33:10 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/4 18:03:27-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码