IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> coding相关 -> 正文阅读

[大数据]coding相关


并行任务进度监控

import psycopg2 as pg
def doDB(sql,flag):  # flag=1是操作增删改,2是查
    conn = pg.connect(database="i_twin", user="postgres", password="123456", host="192.168.1.96",
                      port="5432")
    cur = conn.cursor()
    cur.execute(sql)
    if flag==1:conn.commit()
    if flag==2:res = cur.fetchall()  # 或fetchone()
    cur.close()
    conn.close()
    if flag==2:return res

import socket
# 获取本机计算机名称
hostname = socket.gethostname()
print(hostname)  # LAPTOP-12FVS7AP
# 获取本机ip
ip = socket.gethostbyname(hostname)
print(ip)  # 192.168.150.30

import random
import time
import threading
import os

# 并行任务进度监控,最后启个接口通过id查询进度状态
def st(x):
    print('do',x)
    c=random.random()*30
    time.sleep(c)
    print('done',x)
    s="select * from z_sch where id='%s' "%(id)
    info=doDB(s,2)[0]
    s="update z_sch  set susnum=%s,sch=%s,updatetime=%s where id='%s' "\
      %(info[4]+1,(info[4]+1)/info[3],round(time.time()),id)

    doDB(s,1)


# 开始创建一条数据
id='3'
num=4  # 任务数
t=round(time.time())
# id pid ip num susnum sch state ctime utime
# 唯一id,进程号int,服务器ip,需计算的点数,完成数,sch进度float,state状态,创建时间戳time.time,更新时间戳
# [('1dri32itn2j3r', 234, '12.12.12.12', 5, 4, 44.0, '执行中', 1649322201, 1649322222)]
s="insert into z_sch values ('%s',%s,'%s',%s,%s,%s,'%s',%s,%s) "%(id,os.getpid(),ip,num,0,0,'执行中',t,t)
doDB(s,1)
# exit()

# 主程序 并行
threads = []
for x in range(num):
  thread = threading.Thread(target=st, args=(x,))
  thread.start()
  threads.append(thread)
for thread in threads:
  thread.join()

# 结束把状态改为完成
s="update z_sch  set state='完成' where id='%s' "%(id)
doDB(s,1)

print('Main end')

在这里插入图片描述

python接口服务

可以用自带的http.server,nohup python -m http.server 8001 &,http://192.168.6.103:8001/,不过不方便,支持不够(Unsupported method),更好的有wsgi server和wsgi application
fastapi+uvicorn,wsgiref(麻烦)
可以用django、flask

通信数据包的传输、协议的设计,socket、restful
restful规范:get请求数据、post新增数据、put更新数据、delete删除数据
接口没什么特别的规范,能交互、稳定就是好接口

import os
import flask, json
from flask import request

# flask: web框架,通过flask提供的装饰器@server.route()将普通函数转换为服务
server = flask.Flask(__name__)  # 创建一个服务,把当前这个python文件当做一个服务

# server.config['JSON_AS_ASCII'] = False
# @server.route()可以将普通函数转变为服务 登录接口的路径、请求方式
@server.route('/get', methods=['get'])
def doget():
    try:
        with open(os.path.dirname(__file__)+'a.json') as f:
            data=json.load(f)
            print('get data',data) 
            return data
    except FileNotFoundError:
        return {'data':None}


@server.route('/post1', methods=[ 'post']) 
def dopost1():
    # 获取通过url请求传参的json数据
    a=request.data
    if a==b'':return {'no data': 200}
    a=json.loads(a)
    print('asd',a,type(a))
    if a == dict(): return {'data': None}  # 空数据或不传数据

    id=a['id']
    print(id)
    # a1 = request.values.keys() # 传dict而不是json
    # if a1==set():return {'data':None} # 空数据或不传数据
    # ee=dict()
    # for i in a1:
    #     print(i,request.values.getlist(i))  # a ['2', '3']   12 ['zz']  ,get()不全
    #     ee[i]=request.values.getlist(i)
    # print(list(a1),type(a1))

    with open(os.path.dirname(__file__)+'\\'+id+'.json','w') as f:  # 写数据
        json.dump(a,f)  # 传参dict,file
        print(os.path.dirname(__file__))
        print('write sus')
    os.system('python cc.py '+id) # 写入结果文件
    with open(os.path.dirname(__file__)+'\\'+id+'end.json') as f:  # 读取结果文件,同步返回
        data=json.load(f)
        print(data) 
        return json.dumps(data)

@server.route('/post2', methods=[ 'post']) 
def dopost2():
    # 获取通过url请求传参的json数据
    a=request.data
    if a==b'':return {'no data': 200}
    a=json.loads(a)
    print('asd',a,type(a))
    if a == dict(): return {'data': None}  # 空数据或不传数据

    id=a['id']
    print(id)

    with open(os.path.dirname(__file__)+'\\'+id+'.json','w') as f:  # 写数据
        json.dump(a,f)  # 传参dict,file
        print(os.path.dirname(__file__))
        print('write sus')
        
    os.system('python cc.py '+id)  # 阻塞

    # # 子线程,不阻塞
    # import threading
    # def do():
    #     os.system('python cc.py '+id)
    # a = threading.Thread(target=do)
    # a.start()

    return {'suc sent data':200}


if __name__ == '__main__':
    server.run(debug=True, port=8888, host='0.0.0.0')
    # 指定端口、host,0.0.0.0代表不管几个网卡,任何ip都可以访问
    

在这里插入图片描述

import requests
import json
url1='http://192.168.150.30:8888/get'
url2='http://192.168.150.30:8888/post2'

ddd={'a':[2,3],12:'zz',22:[3,2,1,2],'id':'asd2'}
# ddd={'a':'zz'}
# ddd={}

a=requests.post(url2,json.dumps(ddd)) # post给数据
# a=requests.post(url2) 
print(a)  # <Response [200]>
print(a.text)  #  {"suc sent data": 200}
# exit()
b=requests.get(url1) # get拿数据
bv=b.json()# 获取数据
print(b)  # <Response [200]>
print(bv,type(bv)) # {'data': None} <class 'dict'>
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-09 18:28:13  更:2022-04-09 18:29:55 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 13:23:10-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码