import sys
import h5py
import numpy as np
import logging
from multiprocessing import Process, Event
class SwmrReader(Process):
def __init__(self, event, fname, dsetname, timeout=2.0):
super(SwmrReader, self).__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
self._timeout = timeout
def run(self):
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s', level=logging.DEBUG)
self.log = logging.getLogger('reader')
self.log.setLevel(logging.DEBUG)
self.log.info("Waiting for initial event")
assert self._event.wait(self._timeout)
self._event.clear()
self.log.info("Opening file %s", self._fname)
f = h5py.File(self._fname, 'r', libver='latest', swmr=True)
assert f.swmr_mode
dset = f[self._dsetname]
try:
# monitor and read loop
while self._event.wait(self._timeout):
self._event.clear()
self.log.debug("Refreshing dataset")
dset.refresh()
shape = dset.shape
self.log.info("Read dset shape: %s" % str(shape))
finally:
f.close()
class SwmrWriter(Process):
def __init__(self, event, fname, dsetname):
super(SwmrWriter, self).__init__()
self._event = event
self._fname = fname
self._dsetname = dsetname
logging.basicConfig(level=logging.DEBUG)
def run(self):
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s', level=logging.DEBUG)
self.log = logging.getLogger('writer')
self.log.setLevel(logging.DEBUG)
self.log.info("Creating file %s", self._fname)
f = h5py.File(self._fname, 'w', libver='latest')
try:
arr = np.array([1, 2, 3, 4])
dset = f.create_dataset(self._dsetname, chunks=(2,), maxshape=(None,), data=arr)
assert not f.swmr_mode
self.log.info("SWMR mode")
f.swmr_mode = True
assert f.swmr_mode
self.log.debug("Sending initial event")
self._event.set()
# Write loop
for i in range(5):
new_shape = ((i + 1) * len(arr),)
self.log.info("Resizing dset shape: %s" % str(new_shape))
dset.resize(new_shape)
self.log.debug("Writing data")
dset[i * len(arr):] = arr
# dset.write_direct( arr, np.s_[:], np.s_[i*len(arr):] )
self.log.debug("Flushing data")
dset.flush()
self.log.info("Sending event")
self._event.set()
finally:
f.close()
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)10s %(asctime)s %(name)10s %(message)s', level=logging.DEBUG)
fname = 'swmrmp.h5'
dsetname = 'data'
if len(sys.argv) > 1:
fname = sys.argv[1]
if len(sys.argv) > 2:
dsetname = sys.argv[2]
event = Event()
reader = SwmrReader(event, fname, dsetname)
writer = SwmrWriter(event, fname, dsetname)
logging.info("Starting reader")
reader.start()
logging.info("Starting writer")
writer.start()
logging.info("Waiting for writer to finish")
writer.join()
logging.info("Waiting for reader to finish")
reader.join()
|