IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> python控制EnergyPlus方法(linux) -> 正文阅读

[Python知识库]python控制EnergyPlus方法(linux)

python控制EnergyPlus方法(linux)

简介:使用python控制bcvtb与energyplus进行交互,参考 https://github.com/zhangzhizza/Gym-Eplus ,抠掉了其中gym库的内容

一.安装软件,bcvtb1.6 和 energyplus8.7

1.bcvtb,https://simulationresearch.lbl.gov/bcvtb/

①.官网提到的依赖库(好像可以不装),https://simulationresearch.lbl.gov/bcvtb/releases/latest/doc/manual/tit-sofReqDev.xhtml
sudo apt-get install libxml2-dev libexpat-dev doxygen graphviz
sudo apt-get install docbook docbook-xsl libsaxon-java libxalan2-java docbook-xsl-saxon dblatex pdftk

②.安装jdk,https://www.oracle.com/java/technologies/javase-downloads.html
sudo dpkg -i jdk-11.0.12_linux-x64_bin.deb

③.安装bcvtb,https://simulationresearch.lbl.gov/bcvtb/Download
java -jar bcvtb-install-linux64-v1.6.0.jar

卸载
cd bcvtb/Uninstaller
java -jar uninstaller.jar

2.energy plus,https://energyplus.net/downloads
sh EnergyPlus-8.7.0-78a111df4a-Linux-x86_64.sh

二.创建新的虚拟环境

pip install pipreqs -i HTTPS://mirrors.aliyun.com/pypi/simple/ # 安装 pipreqs
pip freeze > requirements.txt # 打包 requirements

virtualenv --python=/usr/bin/python3 virt_env # 创建环境
source virt_env/bin/activate # 激活环境
pip install -r requirements.txt -i HTTPS://mirrors.aliyun.com/pypi/simple/ # 安装依赖

三.调用库函数跑demo

# bcvtb 和 EnergyPlus 交互的库
# 参考 https://github.com/zhangzhizza/Gym-Eplus,抠掉了gym的内容,函数全部整合到这一个脚本
# 修改日期:2021.7.30

import socket, signal, _thread, threading
import os, copy, time, subprocess, traceback, datetime
import logging
from shutil import copyfile, rmtree
from xml.etree.ElementTree import Element, SubElement, tostring


YEAR = 1991
CWD = os.getcwd()
LOG_LEVEL_MAIN = 'INFO'
LOG_LEVEL_EPLS = 'ERROR'
LOG_FMT = "[%(asctime)s] %(name)s %(levelname)s:%(message)s"

WEEKDAY_ENCODING = {'monday': 0,
                    'tuesday': 1,
                    'wednesday': 2,
                    'thursday': 3,
                    'friday': 4,
                    'saturday': 5,
                    'sunday': 6}


class IdfParser(object):
    """
    功能:改写idf文件
    """
    def __init__(self, idf_dir, version='8_7'):
        self._idf_dir = idf_dir
        # idf_dict is:
        # {idf_class_name:[obj_content_str, obj_content_str]}
        self._idf_dict = {}
        self._version = version
        self._parser_idf()

    def _parser_idf(self):
        with open(self._idf_dir, 'r') as idf_file:
            idf_lines = idf_file.readlines()
            is_obj_start = False
            obj_content = ''
            obj_name = ''
            for idf_line in idf_lines:
                idf_line_prcd = idf_line.split('\n')[0].split('!')[0].strip()
                if is_obj_start == False:
                    if len(idf_line_prcd) > 0:
                        if idf_line_prcd[-1] == ',':
                            obj_name = idf_line_prcd[:-1]
                            is_obj_start = True
                        elif idf_line_prcd[-1] == ';':
                            obj_name = idf_line_prcd[0:idf_line_prcd.find(',')]
                            obj_content = idf_line_prcd[idf_line_prcd.find(',') + 1:];
                            if obj_name in self._idf_dict:
                                self._idf_dict[obj_name].append(obj_content)
                            else:
                                self._idf_dict[obj_name] = [obj_content]
                            # Reset obj temp fields
                            is_obj_start = False
                            obj_content = ''
                            obj_name = ''
                else:
                    obj_content += idf_line
                    if len(idf_line_prcd) > 0:
                        if idf_line_prcd[-1] == ';':
                            if obj_name in self._idf_dict:
                                self._idf_dict[obj_name].append(obj_content)
                            else:
                                self._idf_dict[obj_name] = [obj_content]
                            # Reset obj temp fields
                            is_obj_start = False
                            obj_content = ''
                            obj_name = ''

    def write_idf(self, to_write_dir):
        to_write_str = ''
        # Construct the string to write
        for idf_obj_name in self._idf_dict:
            obj_contents = self._idf_dict[idf_obj_name]
            for obj_content in obj_contents:
                to_write_str += idf_obj_name + ',\n'
                to_write_str += obj_content + '\n'
        with open(to_write_dir, 'w') as idf_file:
            idf_file.write(to_write_str)

    def write_object_in_idf(self, to_write_dir, object_name):
        to_write_str = ''
        # Construct the string to write
        obj_contents = self._idf_dict[object_name]
        for obj_content in obj_contents:
            to_write_str += object_name + ',\n'
            to_write_str += obj_content + '\n'
        with open(to_write_dir, 'w') as idf_file:
            idf_file.write(to_write_str)

    def remove_objects_all(self, class_name):
        self._idf_dict.pop(class_name)

    def get_obj_reference_count(self, obj_name):
        ref_ct = 0
        for key, value in self._idf_dict.items():
            for obj in value:
                obj_lines = obj.split(',')[1:]  # Exclude the obj name itself from the reference
                for obj_line in obj_lines:
                    # Remove \n
                    nl_sps = obj_line.split('\n')
                    nl_free = nl_sps[1] if len(nl_sps) > 2 else nl_sps[-1]  # Handle the line with ;
                    effc_obj_line = nl_free.split(';')[0].strip()
                    if obj_name == effc_obj_line:
                        ref_ct += 1
        return ref_ct

    def remove_object(self, class_name, obj_name):
        try:
            tgt_objects = self._idf_dict[class_name]
            tgt_idx = 0
            for obj in tgt_objects:
                obj_name_this = self.get_object_name(obj)
                if obj_name_this == obj_name:
                    break
                else:
                    tgt_idx += 1
            self._idf_dict[class_name].pop(tgt_idx)
        except Exception as e:
            print('Func: remove_object, args:(%s, %s), error: %s' % (class_name, obj_name, traceback.format_exc()))

    def get_object_name(self, object_content):
        obj_name = object_content.split(',')[0].split('\n')[-1].strip()
        return obj_name

    def get_schedule_type_init_value(self, schedule_name):
        schedule_content = None
        for cmp_schedule_content in self._idf_dict['Schedule:Compact']:
            if self.get_object_name(cmp_schedule_content) == schedule_name:
                schedule_content = cmp_schedule_content
                break
        schedule_content = schedule_content.split(';')[0].split(',')
        schedule_type = schedule_content[1].split('\n')[-1].strip()
        # Schedule init value
        for schedule_line_i in schedule_content[2:]:
            try:
                init_value = float(schedule_line_i.split('\n')[-1].strip())
                break
            except Exception as e:
                pass
        return (schedule_type, init_value)

    def get_all_compact_schedules_names(self):
        returned_list = []
        for cmp_schedule_content in self._idf_dict['Schedule:Compact']:
            returned_list.append(self.get_object_name(cmp_schedule_content))
        return returned_list

    def localize_schedule(self, local_file_path):
        file_name = local_file_path.split(os.sep)[-1]
        file_dir = local_file_path[:local_file_path.rfind(os.sep)]
        sch_file_contents = self._idf_dict['Schedule:File']
        content_i = 0
        for sch_file_obj in copy.deepcopy(sch_file_contents):
            if file_name in sch_file_obj:
                file_name_st_idx = sch_file_obj.rfind(file_name)
                full_path_st_idx = sch_file_obj.rfind(',', 0, file_name_st_idx)
                sch_file_obj = sch_file_obj[0:full_path_st_idx] + ',\n' + file_dir + '/' + sch_file_obj[
                                                                                           file_name_st_idx:]
                sch_file_contents[content_i] = sch_file_obj
            content_i += 1
        self._idf_dict['Schedule:File'] = sch_file_contents

    def is_contain_filesch(self):
        result = 'Schedule:File' in self._idf_dict
        return (result)

    def add_objects(self, dict_to_add):
        for key in dict_to_add:
            objects_to_add = dict_to_add[key]
            if key in self._idf_dict:
                self._idf_dict[key].extend(objects_to_add)
            else:
                self._idf_dict[key] = objects_to_add

    def add_dxf_output(self):
        self._idf_dict['Output:Surfaces:Drawing'] = ['DXF,!- Report Type\n' +
                                                     'Triangulate3DFace;\n']

    def set_minimum_run(self):
        self._idf_dict['SimulationControl'] = ['Yes,!- Do Zone Sizing Calculation\n' +
                                               'No,!- Do System Sizing Calculation\n' +
                                               'No,!- Do Plant Sizing Calculation\n' +
                                               'No,!- Run Simulation for Sizing Periods\n' +
                                               'No;!- Run Simulation for Weather File Run Periods\n'];
        if 'Schedule:File' in self._idf_dict:
            self._idf_dict.pop('Schedule:File', None)

    @property
    def idf_dict(self):
        return self._idf_dict


def create_env(source_idf_path, add_idf_path):
    """
    功能:创建 EnergyPlus 运行需要的 .env 模型文件
    """
    # Create a new idf file with the addtional contents
    source_idf = IdfParser(source_idf_path)
    add_idf = IdfParser(add_idf_path)
    # Remove the original output variable
    source_idf.remove_objects_all('Output:Variable')
    # Remove the schedules in the original idf
    tgt_class_name_in_add = 'ExternalInterface:Schedule'
    tgt_sch_names_in_org = [source_idf.get_object_name(add_content)
                            for add_content in
                            add_idf.idf_dict[tgt_class_name_in_add]]
    tgt_class_name_in_org = 'Schedule:Compact'
    for to_rm_obj_name in tgt_sch_names_in_org:
        source_idf.remove_object(tgt_class_name_in_org, to_rm_obj_name)
    # Check whether or not the tgt_sch_names have been actually used in the
    # source idf
    for tgt_sch_name in tgt_sch_names_in_org:
        tgt_sch_ref_ct = source_idf.get_obj_reference_count(tgt_sch_name)
        if tgt_sch_ref_ct < 1:
            print('WARNING!!!!! The target schedule %s '
                  'may not be used the source IDF.' % tgt_sch_name)

    # Add the addition to the source idf
    source_idf.add_objects(add_idf.idf_dict)
    # Write the new idf out. The name has '.env' before the file idf
    # extension
    new_idf_name = source_idf_path + '.env'
    source_idf.write_idf(new_idf_name)


class Logger():
    def getLogger(self, name, level, formatter):
        logger = logging.getLogger(name)
        consoleHandler = logging.StreamHandler()
        consoleHandler.setFormatter(logging.Formatter(formatter))
        logger.addHandler(consoleHandler)
        logger.setLevel(level)
        logger.propagate = False
        return logger


def get_delta_seconds(st_year, st_mon, st_day, ed_mon, ed_day):
    """
    funtion: 计算时间差(秒),[start_year:start_month:start_day:0:0:0] 到 [start_year:end_mon:end_day:24:0:0]
    """
    startTime = datetime.datetime(st_year, st_mon, st_day, 0, 0, 0)
    endTime = datetime.datetime(st_year, ed_mon, ed_day, 23, 0, 0) + datetime.timedelta(0, 3600)
    delta_sec = (endTime - startTime).total_seconds()
    return delta_sec


class EplusEnv():
    """
    funtion: EnergyPlus 仿真运行的类,重点在 self.step() 函数
    """
    def __init__(self,
                 eplus_path,
                 weather_path,
                 bcvtb_path,
                 variable_path,
                 idf_path,
                 env_name,
                 act_repeat=1,
                 max_ep_data_store_num=1):

        self._env_name = env_name
        self._thread_name = threading.current_thread().getName()
        self.logger_main = Logger().getLogger('EPLUS_ENV_%s_%s_ROOT' % (env_name, self._thread_name),LOG_LEVEL_MAIN, LOG_FMT)

        # Set the environment variable for bcvtb
        os.environ['BCVTB_HOME'] = bcvtb_path

        # Create a socket for communication with the EnergyPlus
        self.logger_main.debug('Creating socket for communication...')
        s = socket.socket()
        host = socket.gethostname()  # Get local machine name
        s.bind((host, 0))  # Bind to the host and any available port
        sockname = s.getsockname()
        port = sockname[1]  # Get the port number
        s.listen(60)  # Listen on request
        self.logger_main.debug('Socket is listening on host %s port %d' % (sockname))

        self._env_working_dir_parent = self._get_eplus_working_folder(CWD, '-%s-res' % (env_name))
        os.makedirs(self._env_working_dir_parent)
        self._host = host
        self._port = port
        self._socket = s
        self._eplus_path = eplus_path
        self._weather_path = weather_path
        self._variable_path = variable_path
        self._idf_path = idf_path
        self._episode_existed = False

        (self._eplus_run_st_mon,
         self._eplus_run_st_day,
         self._eplus_run_ed_mon,
         self._eplus_run_ed_day,
         self._eplus_run_st_weekday,
         self._eplus_run_stepsize) = self._get_eplus_run_info(idf_path)

        self._eplus_run_stepsize = 3600 / self._eplus_run_stepsize

        # Stepsize in second
        self._eplus_one_epi_len = self._get_one_epi_len(self._eplus_run_st_mon,
                                                        self._eplus_run_st_day,
                                                        self._eplus_run_ed_mon,
                                                        self._eplus_run_ed_day)
        self._epi_num = 0
        self._act_repeat = act_repeat
        self._max_res_to_keep = max_ep_data_store_num
        self._last_action = [20.0]


    def reset(self):
        ret = []
        # End the last episode if exists
        if self._episode_existed:
            self._end_episode()
            self.logger_main.info('Last EnergyPlus process has been closed. ')
            self._epi_num += 1
        # Create EnergyPlus simulaton process
        self.logger_main.info('Creating EnergyPlus simulation environment...')
        eplus_working_dir = self._get_eplus_working_folder(
            self._env_working_dir_parent, '-sub_run')
        os.makedirs(eplus_working_dir)  # Create the Eplus working directory
        # Remove redundant past working directories
        self._rm_past_history_dir(eplus_working_dir, '-sub_run')
        eplus_working_idf_path = (eplus_working_dir +
                                  '/' +
                                  self._get_file_name(self._idf_path))
        eplus_working_var_path = (eplus_working_dir +
                                  '/' +
                                  'variables.cfg')  # Variable file must be with this name
        eplus_working_out_path = (eplus_working_dir +
                                  '/' +
                                  'output')
        copyfile(self._idf_path, eplus_working_idf_path)
        # Copy the idf file to the working directory
        copyfile(self._variable_path, eplus_working_var_path)
        # Copy the variable.cfg file to the working dir
        self._create_socket_cfg(self._host,
                                self._port,
                                eplus_working_dir)
        # Create the socket.cfg file in the working dir
        self.logger_main.info('EnergyPlus working directory is in %s' % (eplus_working_dir))
        eplus_process = self._create_eplus(self._eplus_path,
                                           self._weather_path,
                                           eplus_working_idf_path,
                                           eplus_working_out_path,
                                           eplus_working_dir)
        self.logger_main.debug('EnergyPlus process is still running ? %r' % self._get_is_subprocess_running(eplus_process))
        self._eplus_process = eplus_process

        # Log the Eplus output
        eplus_logger = Logger().getLogger('EPLUS_ENV_%s_%s-EPLUSPROCESS_EPI_%d'
                                          % (self._env_name, self._thread_name, self._epi_num), LOG_LEVEL_EPLS, LOG_FMT)
        _thread.start_new_thread(self._log_subprocess_info, (eplus_process.stdout, eplus_logger))
        _thread.start_new_thread(self._log_subprocess_err, (eplus_process.stderr, eplus_logger))

        # Establish connection with EnergyPlus
        # Establish connection with client.
        conn, addr = self._socket.accept()
        self.logger_main.debug('Got connection from %s at port %d.' % (addr))
        # Start the first data exchange
        rcv_1st = conn.recv(2048).decode(encoding='ISO-8859-1')
        self.logger_main.debug('Got the first message successfully: ' + rcv_1st)
        version, flag, nDb, nIn, nBl, curSimTim, Dblist = self._disassembleMsg(rcv_1st)
        ret.append(curSimTim)
        ret.append(Dblist)
        # Remember the message header, useful when send data back to EnergyPlus
        self._eplus_msg_header = [version, flag]
        self._curSimTim = curSimTim

        # Check if episode terminates
        is_terminal = False
        if curSimTim >= self._eplus_one_epi_len:
            is_terminal = True
        ret.append(is_terminal)
        # Change some attributes
        self._conn = conn
        self._eplus_working_dir = eplus_working_dir
        self._episode_existed = True
        # Process for episode terminal
        if is_terminal:
            self._end_episode()

        return tuple(ret)

    def step(self, action):
        # Check terminal
        if self._curSimTim >= self._eplus_one_epi_len:
            return None
        ret = []
        # Send to the EnergyPlus
        act_repeat_i = 0
        is_terminal = False
        curSimTim = self._curSimTim
        # Now just hard code to the energy, the last item in state observation
        integral_item_list = []
        while act_repeat_i < self._act_repeat and (not is_terminal):
            self.logger_main.debug('Perform one step.')
            header = self._eplus_msg_header
            runFlag = 0  # 0 is normal flag
            tosend = self._assembleMsg(header[0], runFlag, len(action), 0, 0, curSimTim, action)
            self._conn.send(tosend.encode())
            # Recieve from the EnergyPlus
            rcv = self._conn.recv(2048).decode(encoding='ISO-8859-1')
            self.logger_main.debug('Got message successfully: ' + rcv)
            # Process received msg
            version, flag, nDb, nIn, nBl, curSimTim, Dblist = self._disassembleMsg(rcv)
            # Hard code that the last item is the integral item
            integral_item_list.append(Dblist[-1])
            if curSimTim >= self._eplus_one_epi_len:
                is_terminal = True
                # Remember the last action
                self._last_action = action
            act_repeat_i += 1
        # Construct the return. The return is the state observation of the last
        # step plus the integral item
        ret.append(curSimTim)
        Dblist[-1] = 1.0 * sum(integral_item_list) / len(integral_item_list)
        ret.append(Dblist)
        # Add terminal status
        ret.append(is_terminal)
        # Change some attributes
        self._curSimTim = curSimTim
        return ret

    def _rm_past_history_dir(self, cur_eplus_working_dir, dir_sig):
        cur_dir_name, cur_dir_id = cur_eplus_working_dir.split(dir_sig)
        cur_dir_id = int(cur_dir_id)
        if cur_dir_id - self._max_res_to_keep > 0:
            rm_dir_id = cur_dir_id - self._max_res_to_keep
            rm_dir_full_name = cur_dir_name + dir_sig + str(rm_dir_id)
            rmtree(rm_dir_full_name)

    def _create_eplus(self, eplus_path, weather_path, idf_path, out_path, eplus_working_dir):
        eplus_process = subprocess.Popen('%s -w %s -d %s %s'
                                         % (eplus_path + '/energyplus',
                                            weather_path,
                                            out_path, idf_path),
                                         shell=True,
                                         cwd=eplus_working_dir,
                                         stdout=subprocess.PIPE,
                                         stderr=subprocess.PIPE,
                                         preexec_fn=os.setsid)
        return eplus_process

    def _get_eplus_working_folder(self, parent_dir, dir_sig='-run'):
        os.makedirs(parent_dir, exist_ok=True)
        experiment_id = 0
        for folder_name in os.listdir(parent_dir):
            if not os.path.isdir(os.path.join(parent_dir, folder_name)):
                continue
            try:
                folder_name = int(folder_name.split(dir_sig)[-1])
                if folder_name > experiment_id:
                    experiment_id = folder_name
            except:
                pass
        experiment_id += 1

        parent_dir = os.path.join(parent_dir, 'Out')
        parent_dir = parent_dir + '%s%d' % (dir_sig, experiment_id)
        return parent_dir

    def _create_socket_cfg(self, host, port, write_dir):
        top = Element('BCVTB-client')
        ipc = SubElement(top, 'ipc')
        socket = SubElement(ipc, 'socket', {'port': str(port), 'hostname': host, })
        xml_str = tostring(top, encoding='ISO-8859-1').decode()

        with open(write_dir + '/' + 'socket.cfg', 'w+') as socket_file:
            socket_file.write(xml_str)

    def _get_file_name(self, file_path):
        path_list = file_path.split('/')
        return path_list[-1]

    def _log_subprocess_info(self, out, logger):
        for line in iter(out.readline, b''):
            logger.info(line.decode())

    def _log_subprocess_err(self, out, logger):
        for line in iter(out.readline, b''):
            logger.error(line.decode())

    def _get_is_subprocess_running(self, subprocess):
        if subprocess.poll() is None:
            return True
        else:
            return False

    def get_is_eplus_running(self):
        return self._get_is_subprocess_running(self._eplus_process)

    def end_env(self):
        self._end_episode()
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()

    def end_episode(self):
        self._end_episode()

    def _end_episode(self):
        # Send the final msg to EnergyPlus
        header = self._eplus_msg_header
        flag = 1.0  # Terminate flag is 1.0, specified by EnergyPlus
        action = self._last_action
        action_size = len(self._last_action)
        tosend = self._assembleMsg(header[0], flag, action_size, 0, 0, self._curSimTim, action)
        self.logger_main.debug('Send the final msg to Eplus.')
        self._conn.send(tosend.encode())
        # Recieve the final msg from Eplus
        rcv = self._conn.recv(2048).decode(encoding='ISO-8859-1')
        self.logger_main.debug('Final msh from Eplus: %s', rcv)
        self._conn.send(tosend.encode())  # Send again, don't know why
        # time.sleep(0.2) # Rest for a while so EnergyPlus finish post processing
        # Remove the connection
        self._conn.close()
        self._conn = None
        # Process the output
        # self._run_eplus_outputProcessing();
        time.sleep(1)  # Sleep the thread so EnergyPlus has time to do the
        # post processing

        # Kill subprocess
        os.killpg(self._eplus_process.pid, signal.SIGTERM)
        self._episode_existed = False

    def _run_eplus_outputProcessing(self):
        eplus_outputProcessing_process = \
            subprocess.Popen('%s'
                             % (self._eplus_path + '/PostProcess/ReadVarsESO'),
                             shell=True,
                             cwd=self._eplus_working_dir + '/output',
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE,
                             preexec_fn=os.setsid)

    def _assembleMsg(self, version, flag, nDb, nIn, nBl, curSimTim, Dblist):
        ret = ''
        ret += '%d' % (version)
        ret += ' '
        ret += '%d' % (flag)
        ret += ' '
        ret += '%d' % (nDb)
        ret += ' '
        ret += '%d' % (nIn)
        ret += ' '
        ret += '%d' % (nBl)
        ret += ' '
        ret += '%20.15e' % (curSimTim)
        ret += ' '
        for i in range(len(Dblist)):
            ret += '%20.15e' % (Dblist[i])
            ret += ' '
        ret += '\n'

        return ret

    def _disassembleMsg(self, rcv):
        rcv = rcv.split(' ')
        version = int(rcv[0])
        flag = int(rcv[1])
        nDb = int(rcv[2])
        nIn = int(rcv[3])
        nBl = int(rcv[4])
        curSimTim = float(rcv[5])
        Dblist = []
        for i in range(6, len(rcv) - 1):
            Dblist.append(float(rcv[i]))

        return (version, flag, nDb, nIn, nBl, curSimTim, Dblist)

    def _get_eplus_run_info(self, idf_path):
        ret = []
        with open(idf_path, encoding='ISO-8859-1') as idf:
            contents = idf.readlines()
        # Run period
        tgtIndex = None
        for i in range(len(contents)):
            line = contents[i]
            effectiveContent = line.strip().split(
                '!')[0]  # Ignore contents after '!'
            effectiveContent = effectiveContent.strip().split(',')[0]
            # Remove tailing ','
            if effectiveContent.lower() == 'runperiod':
                tgtIndex = i
                break
        for i in range(2, 6):
            ret.append(int(contents[tgtIndex + i].strip()
                           .split('!')[0]
                           .strip()
                           .split(',')[0]
                           .strip()
                           .split(';')[0]))
        # Start weekday
        ret.append(WEEKDAY_ENCODING[contents[tgtIndex + i + 1].strip()
                   .split('!')[0]
                   .strip()
                   .split(',')[0]
                   .strip()
                   .split(';')[0]
                   .strip()
                   .lower()])
        # Step size
        line_count = 0
        for line in contents:
            effectiveContent = line.strip().split(
                '!')[0]  # Ignore contents after '!'
            effectiveContent = effectiveContent.strip().split(',')
            if effectiveContent[0].strip().lower() == 'timestep':
                if len(effectiveContent) > 1 and len(effectiveContent[1]) > 0:
                    ret.append(int(effectiveContent[1]
                                   .split(';')[0]
                                   .strip()))
                else:
                    ret.append(int(contents[line_count + 1].strip()
                                   .split('!')[0]
                                   .strip()
                                   .split(',')[0]
                                   .strip()
                                   .split(';')[0]))
                break
            line_count += 1
        return tuple(ret)

    def _get_one_epi_len(self, st_mon, st_day, ed_mon, ed_day):
        return get_delta_seconds(YEAR, st_mon, st_day, ed_mon, ed_day)

    @property
    def start_year(self):
        return YEAR

    @property
    def start_mon(self):
        return self._eplus_run_st_mon

    @property
    def start_day(self):
        return self._eplus_run_st_day

    @property
    def start_weekday(self):
        return self._eplus_run_st_weekday

    @property
    def env_name(self):
        return self._env_name

    @property
    def max_res_to_keep(self):
        return self._max_res_to_keep

    @max_res_to_keep.setter  # setter装饰器,调用后进行该预处理
    def max_res_to_keep(self, value):
        self._max_res_to_keep = value


def get_abs_path(rel_path):
    return_path = rel_path
    if rel_path[0] != '/':
        return_path = os.getcwd() + '/' + rel_path  # os.getcwd() return to current work directory
    return return_path


def init_model(env_name,
               eplus_path,
               weather_path,
               bcvtb_path,
               base_idf_path,
               add_idf_path,
               variable_path):

    idf_env_path = base_idf_path + '.env'

    # abs path
    eplus_path = get_abs_path(eplus_path)
    weather_path = get_abs_path(weather_path)
    bcvtb_path = get_abs_path(bcvtb_path)
    base_idf_path = get_abs_path(base_idf_path)
    add_idf_path = get_abs_path(add_idf_path)
    variable_path = get_abs_path(variable_path)
    idf_env_path = get_abs_path(idf_env_path)

    # Create EnergyPlus Env
    # env_creator = creator.EplusEnvCreator()
    create_env(base_idf_path, add_idf_path)

    # init EplusEnv()
    changban_env = EplusEnv(eplus_path,
                            weather_path,
                            bcvtb_path,
                            variable_path,
                            idf_env_path,
                            env_name,)

    return changban_env

from EplusUtils import init_model

if __name__ == '__main__':

    # config paths
    env_name = 'myenv'
    eplus_path = 'software/EnergyPlus-8-7-0/'  # ep安装路径
    bcvtb_path = 'software/bcvtb'  # bcvtb安装路径
    weather_path = 'weather.epw'  # 天气

    model_path = 'models/eplus/flow/'
    base_idf_path = model_path + 'changban_flow.idf'
    add_idf_path = model_path + 'add_flow.idf'
    variable_path = model_path + 'variables_flow.cfg'

    # init energyplus env
    changban_env = init_model(env_name,
                              eplus_path,
                              weather_path,
                              bcvtb_path,
                              base_idf_path,
                              add_idf_path,
                              variable_path)
    # env reset
    time, states, end = changban_env.reset()

    # actions value
    actions = [1]
    
    while not end:
        time, states, end = changban_env.step(actions)

        step_num = step_num + 1
        print('step:', step_num)

    changban_env.end_env()
  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2021-07-31 16:35:50  更:2021-07-31 16:37:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/2 16:43:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码