IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 操作系统实验信号量机制_生产者消费者_哲学家进餐问题_读者写者问题 -> 正文阅读

[大数据]操作系统实验信号量机制_生产者消费者_哲学家进餐问题_读者写者问题

在这里插入图片描述
1.消费者生产者




import java.util.LinkedList;
import java.util.Queue;
/*
 * 生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。
 * 与此同时,消费者也在缓冲区消耗这些数据。
 * 该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
 * 
 * 
 * 简单来说就是生产者不断的生产资源,消费者不断生产资源,
 * 但是生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据
 * 
 * 
 * 互斥关系:产者和消费者是同步运行的,肯定会遇到两者同时用缓存区的时候,
 * 如果同时用的话,那么产品数,和消费数就会不一致,所以两者是互斥关系,对于这样的问题就必须对缓冲区进行互斥。
 * 
 * 
 * 
 */
public class 消费者生产者 {
public static void main(String[] args) {
	Buffer buffer  = new Buffer();
	Consumer consumer = new Consumer(buffer);
	Producer producer = new Producer(buffer);
	producer.start();
	consumer.start();

}
}
class Producer extends Thread{
	private Buffer buffer;
	
	public Producer(Buffer buffer) {
		this.buffer = buffer;
	}

	public void run() {
		for(int i=0;i<10;i++) {
			try {
				buffer.add(i);
				Thread.sleep(500);//模拟生产者需要生产时间
				System.out.println("生产"+i);
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			
		}
		}
}
class Consumer extends Thread{
	private Buffer buffer;
	
	public Consumer(Buffer buffer) {
		this.buffer = buffer;
	}

	@Override
	public void run() {

		for(int i=0;i<10;i++) {
			try {
				int val =buffer.pull(); // 消费者不断消费就可以
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				
				e.printStackTrace();
			}
			 System.out.println("消费"+i);
		}
	}
	
}
//这里创建一个有限的缓冲区,即临界资源,用于实现生产者和消费者的互斥
//synchronized是Java中的关键字,是一种同步锁 

class Buffer{
	private Queue<Integer> queue = new LinkedList<>();
	// 创建一个队列模拟临界资源
	private int size=5;
	// 在往生产者里边生产的时候,
	//如果缓冲区已经满了,呢么让生产者进程等待
	public synchronized void add(int val) throws InterruptedException {
	if(queue.size()>size) {
		wait();// 不让生产者继续生产,
	}
	queue.add(val);
	notify();// 通知消费者去消费
	}
	//在消费者消费资源的时候,
	//如果缓冲区是空的,呢么让消费者进程等待
	public synchronized int pull() throws InterruptedException {
		if(queue.size()==0) {
			wait();// 没有资源可以消费,阻塞消费者进程
		}
		int val=queue.poll();
		notify();
		return val;
	}
}

2.哲学家进餐问题

package 实验一;
//每个哲学家都拿着左手的筷子,呢么就会造成死锁
//为了解决这个问题只当哲学家发现左右两边的筷子都可用时,才拿起筷子,
//否则等待。哲学家编号及筷子编号,哲学家i左手边的筷子编号为(i+1)%5,右手边的筷子编号为i。
public class 哲学家进餐问题 {
	public static void main(String[] args) {
		//创建五个哲学家线程
		Philosopher p1=new Philosopher(0);
		Philosopher p2=new Philosopher(1);
		Philosopher p3=new Philosopher(2);
		Philosopher p4=new Philosopher(3);
		Philosopher p5=new Philosopher(4);
		/*
		 *   public Thread(Runnable target, String name) {
        this(null, target, name, 0);
    }
		 */
		new Thread(p1,"0").start();
		new Thread(p2,"1").start();    
		new Thread(p3,"2").start();
		new Thread(p4,"3").start();
		new Thread(p5,"4").start();
	}
}

 class Philosopher extends Thread{
	private int index;
	public static Chopstick chop=new Chopstick();
	
	public Philosopher(int index) {
		this.index = index;
	}
	public synchronized void thinking(){
		System.out.println("哲学家"+index+"在思考");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public synchronized void eating(){
		System.out.println("哲学家"+index+"在吃饭");
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		while(true){
			//思考之后,去拿筷子吃饭
			thinking();
			chop.takeChopsticks(index);
			//吃完之后,去思考
			eating();
			chop.putChopsticks(index);
		}
	}
}
 // 哲学家拿筷子和放筷子是互斥的
 class Chopstick {
 	public boolean[] isUsing=new boolean[5];// 用来记录筷子的占用情况
 	public synchronized void takeChopsticks(int index){
 		while(isUsing[index]||isUsing[(index+1)%5]){//当左右手有一个筷子是已经被占用的时候,就要阻塞这个哲学家进程
 			try {
 				wait();
 			} catch (InterruptedException e) {
 				e.printStackTrace();
 			}
 		}
 		//进行到这,说明这个哲学家左右手的筷子都是空闲的
 		isUsing[index]=true;
 		isUsing[(index+1)%5]=true;
 		System.out.println("哲学家"+index+"拿起筷子");
 	}
 	public synchronized void putChopsticks(int index){
 		isUsing[index]=false;
 		isUsing[(index+1)%5]=false;
 		System.out.println("哲学家"+index+"放下筷子");
 		notify();// 放下筷子吃完饭 唤醒这个哲学家进程去拿筷子吃饭
 	}
 }

3.读者写者问题

package 实验一;
import java.util.concurrent.Semaphore;
/*
 * 写进程与写进程之间必须互斥的写入数据

 * 写进程与读进程之间必须互斥的访问共享数据
 * 读进程与读进程之间可以同时访问数据,不需要实现互斥的访问共享数据
 *
*
 * 写进程与写进程之间必须互斥的写入数据
 * 写进程与读进程之间必须互斥的访问共享数据
 * 读进程与读进程之间可以同时访问数据,不需要实现互斥的访问共享数据
 */


/*
 * Semaphore(信号量)是用来控制同时访问特定资源的线程数量,通过协调各个线程以保证合理地使用公共资源。
 * void acquire() :从信号量获取一个许可,如果无可用许可前将一直阻塞等待
 * 
 * 其构造函数:
 * public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    permits 初始许可数,也就是最大访问线程数
    
    int availablePermits(): 获取当前信号量可用的许可
    void release():释放一个许可
 */
public class 读者写者问题 {
    public static void main(String[] args) throws InterruptedException {
        int writerNumber=3;
        int readerNumber=7;
        Disk disks=new Disk();
        Thread[] writer_threads=new Thread[writerNumber];
        Writer[] writers=new Writer[writerNumber];
        Thread[] reader_threads=new Thread[readerNumber];
        Reader[] readers=new Reader[readerNumber];
         

        /*启动部分写者*/
        for (int i=0;i<writerNumber/2;i++)
        {
            writers[i]=new Writer(disks);
            writer_threads[i]=new Thread(writers[i],Integer.toString(i));
            writer_threads[i].start();
        }
        /*暂停等待写者启动*/
        Thread.sleep(100);
        /*启动读者*/
        for (int i=0;i<readerNumber;i++)
        {
            readers[i]=new Reader(disks);
            reader_threads[i]=new Thread(readers[i],Integer.toString(i));
            reader_threads[i].start();
        }
        for (int i=writerNumber/2;i<writerNumber;i++)
        {
            writers[i]=new Writer(disks);
            writer_threads[i]=new Thread(writers[i],Integer.toString(i));
            writer_threads[i].start();
        }

    }
}
/**
 * 模拟被读和被写的对象
 */
 class Disk extends Thread{
    private String data_str;    /*读写数据*/
    private int reader_count;   /*读者数量*/
    private Semaphore write_mutex;  /*写者信号量*/
    private Semaphore read_mutex;   /*读者信号量*/
    private Semaphore read_count_mutex; /*修改读者计数的信号量 用于修改reader_count时使用的pv操作*/
    Disk()
    {
        write_mutex=new Semaphore(1);
        read_mutex=new Semaphore(1000);
        read_count_mutex=new Semaphore(1);
    }
    public void start_read() throws InterruptedException {
        //有写者时忙等
        while (write_mutex.availablePermits()==0){ }//int availablePermits(): 获取当前信号量可用的许可
        // 修改读者计数pv操作
        read_count_mutex.acquire();//申请
        reader_count++;
        read_count_mutex.release();//释放
        /*获取信号量*/
        read_mutex.acquire();
    }
    public void finish_read() throws InterruptedException {
        /*修改读者计数 (pv操作)*/
        read_count_mutex.acquire();
        reader_count--;
        read_count_mutex.release();
        read_mutex.release();//释放读者的信号量
    }

    public void start_write() throws InterruptedException {
        /*获取写者信号量*/
        write_mutex.acquire();
        /*当有读者时忙等*/
        while (getReader_count()!=0){}
    }

    public void finish_write()
    {
        /*释放写者信号量*/
        write_mutex.release();
    }
    public String read() throws InterruptedException {
        return data_str;
    }
    public String write(String new_data) throws InterruptedException {
        this.data_str=new_data;
        return data_str;
    }
    public int getReader_count() {
        return reader_count;
    }
    public void setReader_count(int reader_count) {
        this.reader_count = reader_count;
    }
}


 class Reader extends Thread {
    private Disk disk;

    public Reader(Disk disk) {
        this.disk = disk;
    }

    @Override
    public void run() {
        for (int i=10;i>=0;i--)
        {
            /*读操作*/
            try {
                read();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

           //模拟真实的读操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void read() throws InterruptedException {
        disk.start_read();
        /*读数据并打印*/
        String str=disk.read();
        System.out.println("读操作:"+Thread.currentThread().getId()+" 现在的数据为:"+str);
        disk.finish_read();
    }

}

 class Writer extends Thread{
    private Disk disk;
    public Writer(Disk disk) {
        this.disk = disk;
    }
    public void run() {
        for (int i=10;i>=0;i--)
        {
            try {
                write();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void write() throws InterruptedException {
        disk.start_write();
        String str=disk.write("data+"+Thread.currentThread().getId());
        System.out.println("写操作:"+Thread.currentThread().getId()+" 现在的数据为:"+str);
        disk.finish_write();// 释放写者信号量,让别的写者可以接着写入
    }
}




  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-05 11:25:14  更:2022-05-05 11:29:41 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:04:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码