IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> Dist dataloader OP -> 正文阅读

[Python知识库]Dist dataloader OP

Dist dataloader OP

一些问题

  • 在开多个Server,一个Client,且这个Client有多个sampler,Server从没有back_up Server到有时,会报错:

Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_mp_dataloader_byme.py”, line 208, in start_server
g.start()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 364, in start
start_server(server_id=self.server_id,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc_server.py”, line 77, in start_server
req, _ = rpc.recv_request()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc.py”, line 666, in recv_request
raise DGLError('Got request sent to server {}, ’
dgl._ffi.base.DGLError: Got request sent to server 1, different from my rank 2!

  • 查代码感觉应该是client对server_id处理有误,添加os.environ[‘DGL_NUM_SERVER’]后bug解决
  • 但是又会出来一个新的问题:这个问题在之前使用shared_mem的设定解决了。

Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_mp_dataloader_byme.py”, line 512, in start_server_v1
g.start()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 365, in start
start_server(server_id=self.server_id,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc_server.py”, line 94, in start_server
res = req.process_request(server_state)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 133, in process_request
global_src, global_dst, global_eids = _sample_neighbors(local_g, partition_book,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/graph_services.py”, line 58, in _sample_neighbors
local_ids = F.astype(local_ids, local_g.idtype)
AttributeError: ‘NoneType’ object has no attribute ‘idtype’

  • 然而在此处并没有解决,似乎不是很容易解决的问题,我还是保持原有的num_server=1的设定。

code

## Test distdataloader with multi-client
def example3_v1(tmpdir, num_servers, num_workers, num_machines, num_clients, dataloader_type):
    from utils import reset_envs
    from dgl.data import CitationGraphDataset
    reset_envs()
    g = CitationGraphDataset("cora")[0]
    check_dataloader_v1(g, tmpdir, num_servers, num_workers, num_machines, num_clients)

def check_dataloader_v1(g, tmpdir, num_servers, num_workers, num_machines, num_clients):
    from utils import generate_ip_config
    from dgl.distributed import partition_graph
    import multiprocessing as mp
    import time
    ip_config = "kv_ip_config.txt"
    generate_ip_config(ip_config, num_machines, num_servers)

    num_parts = num_machines
    num_hops = 1
    orig_nid, orig_eid = partition_graph(g, 'test_sampling', num_parts, tmpdir,
                                         num_hops=num_hops, part_method='metis',
                                         reshuffle=True, return_mapping=True)
    # print(orig_nid)
    # print(g.dstdata['feat'].size())
    # print(dgl.NID)
    if not isinstance(orig_nid, dict):
        orig_nid = {g.ntypes[0]: orig_nid}
    if not isinstance(orig_eid, dict):
        orig_eid = {g.etypes[0]: orig_eid}
    # print(orig_nid)

    total_num_servers = num_machines * num_servers
    total_num_workers = (num_workers +1)*num_clients
    pserver_list = []
    ctx = mp.get_context('spawn')
    os.environ['DGL_DIST_MODE'] = 'distributed'
    os.environ['DGL_NUM_SAMPLER'] = str(num_workers)
    os.environ['DGL_NUM_SERVER'] = str(num_servers)
    for i in range(total_num_servers):
        p = ctx.Process(target=start_server_v1, args=(
            i, tmpdir, num_servers, total_num_workers, total_num_servers>1))
        p.start()
        time.sleep(1)
        pserver_list.append(p)

    ptrainer_list = []
    for i in range(num_clients):
        p = ctx.Process(target=start_node_dataloader_v1,args=(
            0, tmpdir, num_servers, num_workers, num_machines, orig_nid, orig_eid, g
        ))
        p.start()
        ptrainer_list.append(p)
    # if dataloader_type == 'node':
    #     p = ctx.Process(target=start_node_dataloader, args=(
    #         0, tmpdir, num_servers, num_workers, num_machines, orig_nid, orig_eid, g))
    #     p.start()
    #     ptrainer_list.append(p)
    # elif dataloader_type == 'edge':
    #     p = ctx.Process(target=start_edge_dataloader, args=(
    #         0, tmpdir, num_servers, num_workers, orig_nid, orig_eid, g))
    #     p.start()
    #     ptrainer_list.append(p)
    for p in pserver_list:
        p.join()
    for p in ptrainer_list:
        p.join()
    print(f"Loader Done!")

def start_server_v1(rank, tmpdir, num_servers, num_clients, disable_shared_mem):
    from dgl.distributed import DistGraphServer
    g = DistGraphServer(rank, "kv_ip_config.txt", num_servers, num_clients,
                        tmpdir+'/test_sampling.json', disable_shared_mem=not disable_shared_mem,
                        graph_format=['csc', 'coo'])
    g.start()

def start_node_dataloader_v1(rank, tmpdir, num_servers, num_workers, num_machines, orig_nid, orig_eid, groundtruth_g):
    import dgl
    import torch as th
    from dgl.distributed import load_partition, DistGraph
    dgl.distributed.initialize("kv_ip_config.txt")
    gpb = None
    total_num_servers = num_machines*num_servers
    disable_shared_mem = total_num_servers > 1
    if disable_shared_mem:
        _, _, _, gpb, _, _, _ = load_partition(tmpdir + '/test_sampling.json', rank)
    num_nodes_to_sample = 202
    batch_size = 32
    dist_graph = DistGraph("test_mp", gpb=gpb, part_config=tmpdir + '/test_sampling.json')
    assert len(dist_graph.ntypes) == len(groundtruth_g.ntypes)
    assert len(dist_graph.etypes) == len(groundtruth_g.etypes)
    if len(dist_graph.etypes) == 1:
        train_nid = th.arange(num_nodes_to_sample)
    else:
        train_nid = {'n3': th.arange(num_nodes_to_sample)}

    # for i in range(total_num_servers):
    #     part, _, _, _, _, _, _ = load_partition(tmpdir + '/test_sampling.json', i)

    # Create sampler
    sampler = dgl.dataloading.MultiLayerNeighborSampler([
        # test dict for hetero
        {etype: 5 for etype in dist_graph.etypes} if len(dist_graph.etypes) > 1 else 5,
        10])        # test int for hetero

    # We need to test creating DistDataLoader multiple times.
    for i in range(2):
        # Create DataLoader for constructing blocks
        dataloader = dgl.dataloading.NodeDataLoader(
            dist_graph,
            train_nid,
            sampler,
            batch_size=batch_size,
            shuffle=True,
            drop_last=False,
            num_workers=num_workers)

        for epoch in range(1):
            for idx, (_, _, blocks) in zip(range(0, num_nodes_to_sample, batch_size), dataloader):
                block = blocks[-1]
                for src_type, etype, dst_type in block.canonical_etypes:
                    o_src, o_dst =  block.edges(etype=etype)
                    src_nodes_id = block.srcnodes[src_type].data[dgl.NID][o_src]
                    dst_nodes_id = block.dstnodes[dst_type].data[dgl.NID][o_dst]
                    src_nodes_id = orig_nid[src_type][src_nodes_id]
                    dst_nodes_id = orig_nid[dst_type][dst_nodes_id]
                    has_edges = groundtruth_g.has_edges_between(src_nodes_id, dst_nodes_id, etype=etype)
                    assert np.all(F.asnumpy(has_edges))
                    # print(src_type, dst_type)
                    # print(block.srcnodes[src_type])
                # assert np.all(np.unique(np.sort(F.asnumpy(dst_nodes_id))) == np.arange(idx, batch_size))
    del dataloader
    dgl.distributed.exit_client() # this is needed since there's two test here in one process

通过比对test sampling.py与现在的code,以及代码错误在graph_service.py中没有local_g,而每个backup Server的server_state都没有local_g啊,为什么test sampling中现在不报错了?

  • 在跑多机版的时候,发现2号机在load_tensor的时候总会报错,1号机并不会出现该错误,特别奇怪

Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_context.py”, line 87, in init_process
(dataloader_name, collate_fn_dictdataloader_name))
File “/home/amax/gnn-tutorial/tests/distributed/test_machine_9.py”, line 34, in sample_blocks
batch_inputs, batch_labels = load_subtensor(self.g, seeds, input_nodes, “cpu”)
File “/home/amax/gnn-tutorial/tests/distributed/test_machine_9.py”, line 41, in load_subtensor
batch_inputs = g.ndata[‘feat’][input_nodes].to(device) if load_feat else None
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_tensor.py”, line 170, in getitem
return self.kvstore.pull(name=self._name, id_tensor=idx)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/kvstore.py”, line 1292, in pull
return rpc.fast_pull(name, id_tensor, part_id, KVSTORE_PULL,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc.py”, line 975, in fast_pull
res_tensor = _CAPI_DGLRPCFastPull(name,
File “dgl/_ffi/_cython/./function.pxi”, line 287, in dgl._ffi._cy3.core.FunctionBase.call
File “dgl/_ffi/_cython/./function.pxi”, line 232, in dgl._ffi._cy3.core.FuncCall
File “dgl/_ffi/_cython/./base.pxi”, line 155, in dgl._ffi._cy3.core.CALL
dgl._ffi.base.DGLError: [17:34:09] /opt/dgl/src/rpc/rpc.cc:422: Check failed: l_id < local_data_shape[0] (190590 vs. 119671) :

  • 把load_tensor去掉之后确实错误消失,但是我需要这个函数
  • 当把ip_config.txt中的ip顺序更换一下就会出现下面错误,并且两台机器都会出现

[18:05:52] /opt/dgl/src/rpc/network/tcp_socket.cc:76: Failed bind on 192.168.1.8:10000 , error: Cannot assign requested address
Process SpawnProcess-1:
Traceback (most recent call last):
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 315, in _bootstrap
self.run()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/multiprocessing/process.py”, line 108, in run
self._target(*self._args, **self._kwargs)
File “/home/amax/gnn-tutorial/tests/distributed/test_machine_9.py”, line 94, in start_server
g.start()
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/dist_graph.py”, line 353, in start
start_server(server_id=self.server_id,
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc_server.py”, line 68, in start_server
rpc.receiver_wait(ip_addr, port, num_clients)
File “/home/amax/anaconda3/envs/torch/lib/python3.8/site-packages/dgl/distributed/rpc.py”, line 152, in receiver_wait
_CAPI_DGLRPCReceiverWait(ip_addr, int(port), int(num_senders))
File “dgl/_ffi/_cython/./function.pxi”, line 287, in dgl._ffi._cy3.core.FunctionBase.call
File “dgl/_ffi/_cython/./function.pxi”, line 222, in dgl._ffi._cy3.core.FuncCall
File “dgl/_ffi/_cython/./function.pxi”, line 211, in dgl._ffi._cy3.core.FuncCall3
File “dgl/_ffi/_cython/./base.pxi”, line 155, in dgl._ffi._cy3.core.CALL
dgl._ffi.base.DGLError: [18:05:52] /opt/dgl/src/rpc/network/socket_communicator.cc:198: Cannot bind to 192.168.1.8:10000

  • 尝试bind测试

Traceback (most recent call last):
File “/home/amax/gnn-tutorial/tests/distributed/test_rpc_bind.py”, line 11, in
s.bind((HOST, PORT))
OSError: [Errno 99] Cannot assign requested address

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-02-28 15:25:00  更:2022-02-28 15:27:09 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/15 23:40:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码