1,官方文档
使用自定义函数及Python第三方库 (aliyun.com)https://help.aliyun.com/document_detail/90716.html
2,查看本地python3版本和MaxCompute集群的版本
本地版本
MaxCompute集群内的python版本可以在mapper或者reducer方法里print(sys.version)看到
?这里可以看到本地是python 是3.6,MaxCompute是3.7
3,下载需要引入的包
MaxCompute集群内,只能通过官方文档的方法引入包,所以要先在外部下载好
pip3 download pandas -d /home/admin/workspace/zl/37/
因为本地python3 是3.6,有两个包是3.6的版本,而MaxCompute集群内的python是3.7,所以要把这俩包换成3.7的,可以去这里下载,把链接里的包名替换掉,就能看到不同的包了??????
https://mirrors.aliyun.com/pypi/simple/pandas/?spm=a2c4g.11186623.0.0.4bb97eebvJmf3S
引入pandas-ta 时,报错了
需要引入setuptools包?引入之后,还有报错
这一步我是直接修改源码,重新打包解决的
4,创建资源
# 创建资源
# 注意,资源后缀与下载的包后缀必须一致,这些官方文档里有写
o.create_resource('numpy_zl.whl', 'archive', file_obj=open('../37/numpy-1.19.4-cp37-cp37m-manylinux1_x86_64.whl', 'rb'))
o.create_resource('pandas_zl.whl', 'archive', file_obj=open('../37/pandas-0.23.2-cp37-cp37m-manylinux1_x86_64.whl', 'rb'))
o.create_resource('python_dateutil_zl.whl', 'archive', file_obj=open('../38/python_dateutil-2.8.2-py2.py3-none-any.whl', 'rb'))
o.create_resource('pytz_zl.whl', 'archive', file_obj=open('../38/pytz-2022.1-py2.py3-none-any.whl', 'rb'))
o.create_resource('siz_zl.whl', 'archive', file_obj=open('../38/six-1.16.0-py2.py3-none-any.whl', 'rb'))
# o.create_resource('pandas_ta.tar.gz', 'archive', file_obj=open('../37/pandas_ta-0.3.14b.tar.gz', 'rb'))
o.create_resource('pandas_ta.zip', 'archive', file_obj=open('../37/pandas_ta-0.3.14b.zip', 'rb'))
# o.create_resource('distribute.zip', 'archive', file_obj=open('../37/distribute-0.7.3.zip', 'rb'))
o.create_resource('setuptools_zl.whl', 'archive', file_obj=open('../37/setuptools-40.8.0-py2.py3-none-any.whl', 'rb'))
?5,引用后,就可以在MaxCompute集群内使用pandas和pandas_ta了,附上源码
# %run g.ipynb
%load_ext dswmagic
%matplotlib inline
import pandas as pd
import numpy as np
from odps.df import DataFrame
from odps.df import output
from odps import ODPS
from odps import options
from odps.df import DataFrame
from odps.models import Schema, Column, Partition
o = ODPS('密码', '秘钥',project='自己的项目', endpoint='odps地址')
# 先删除,再创建,保持每次的资源都是最新的
if(o.exist_resource('numpy_zl.whl')):
o.delete_resource('numpy_zl.whl')
if(o.exist_resource('pandas_zl.whl')):
o.delete_resource('pandas_zl.whl')
if(o.exist_resource('python_dateutil_zl.whl')):
o.delete_resource('python_dateutil_zl.whl')
if(o.exist_resource('pytz_zl.whl')):
o.delete_resource('pytz_zl.whl')
if(o.exist_resource('siz_zl.whl')):
o.delete_resource('siz_zl.whl')
if(o.exist_resource('pandas_ta.tar.gz')):
o.delete_resource('pandas_ta.tar.gz')
if(o.exist_resource('pandas_ta.zip')):
o.delete_resource('pandas_ta.zip')
if(o.exist_resource('distribute.zip')):
o.delete_resource('distribute.zip')
if(o.exist_resource('setuptools_zl.whl')):
o.delete_resource('setuptools_zl.whl')
# #本地 3.6.9
# o.create_resource('numpy_zl.whl', 'archive', file_obj=open('../zl/numpy-1.19.5-cp36-cp36m-manylinux2010_x86_64.whl', 'rb'))
# o.create_resource('pandas_zl.whl', 'archive', file_obj=open('../zl/pandas-1.1.5-cp36-cp36m-manylinux1_x86_64.whl', 'rb'))
# o.create_resource('python_dateutil_zl.whl', 'archive', file_obj=open('../zl/python_dateutil-2.8.2-py2.py3-none-any.whl', 'rb'))
# o.create_resource('pytz_zl.whl', 'archive', file_obj=open('../zl/pytz-2022.1-py2.py3-none-any.whl', 'rb'))
# o.create_resource('siz_zl.whl', 'archive', file_obj=open('../zl/six-1.16.0-py2.py3-none-any.whl', 'rb'))
# #macos 3.7.3
# o.create_resource('numpy_zl.whl', 'archive', file_obj=open('../numpy-1.21.6-cp37-cp37m-macosx_10_9_x86_64.whl', 'rb'))
# o.create_resource('pandas_zl.whl', 'archive', file_obj=open('../pandas-1.3.5-cp37-cp37m-macosx_10_9_x86_64.whl', 'rb'))
# o.create_resource('python_dateutil_zl.whl', 'archive', file_obj=open('../python_dateutil-2.8.2-py2.py3-none-any.whl', 'rb'))
# o.create_resource('pytz_zl.whl', 'archive', file_obj=open('../pytz-2022.1-py2.py3-none-any.whl', 'rb'))
# o.create_resource('siz_zl.whl', 'archive', file_obj=open('../six-1.16.0-py2.py3-none-any.whl', 'rb'))
#data00 3.8.5
# o.create_resource('numpy_zl.whl', 'archive', file_obj=open('../38/numpy-1.22.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl', 'rb'))
# o.create_resource('pandas_zl.whl', 'archive', file_obj=open('../38/pandas-1.3.5-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl', 'rb'))
# o.create_resource('python_dateutil_zl.whl', 'archive', file_obj=open('../38/python_dateutil-2.8.2-py2.py3-none-any.whl', 'rb'))
# o.create_resource('pytz_zl.whl', 'archive', file_obj=open('../38/pytz-2022.1-py2.py3-none-any.whl', 'rb'))
# o.create_resource('siz_zl.whl', 'archive', file_obj=open('../38/six-1.16.0-py2.py3-none-any.whl', 'rb'))
# #python3.7
o.create_resource('numpy_zl.whl', 'archive', file_obj=open('../37/numpy-1.19.4-cp37-cp37m-manylinux1_x86_64.whl', 'rb'))
o.create_resource('pandas_zl.whl', 'archive', file_obj=open('../37/pandas-0.23.2-cp37-cp37m-manylinux1_x86_64.whl', 'rb'))
o.create_resource('python_dateutil_zl.whl', 'archive', file_obj=open('../38/python_dateutil-2.8.2-py2.py3-none-any.whl', 'rb'))
o.create_resource('pytz_zl.whl', 'archive', file_obj=open('../38/pytz-2022.1-py2.py3-none-any.whl', 'rb'))
o.create_resource('siz_zl.whl', 'archive', file_obj=open('../38/six-1.16.0-py2.py3-none-any.whl', 'rb'))
# o.create_resource('pandas_ta.tar.gz', 'archive', file_obj=open('../37/pandas_ta-0.3.14b.tar.gz', 'rb'))
o.create_resource('pandas_ta.zip', 'archive', file_obj=open('../37/pandas_ta-0.3.14b.zip', 'rb'))
# o.create_resource('distribute.zip', 'archive', file_obj=open('../37/distribute-0.7.3.zip', 'rb'))
o.create_resource('setuptools_zl.whl', 'archive', file_obj=open('../37/setuptools-40.8.0-py2.py3-none-any.whl', 'rb'))
@output(['a', 'b' ,'c'], ['int', 'int', 'int'])
def mapper(row):
import sys
print(sys.version)
yield row.a,row.b,row.c
@output(['a', 'b' ,'c'], ['int', 'int', 'int'])
def reducer(keys):
import sys
print(sys.version)
import pandas
import pandas_ta
def h(row, done):
ss = [
[1,2,3],
[5,2,3],
[2,2,3],
[4,2,3],
[9,2,3]
]
pddata = pd.DataFrame(ss)
pddata.columns = ['a','b','c']
print(pddata)
rows = pd.DataFrame()
rows['Open'] = pddata['a']
rows['High'] = pddata['a']
rows['Low'] = pddata['a']
rows['Close'] = pddata['a']
print(rows)
wma = rows.ta.wma(length=3, append=True)
print(wma)
yield row.a,row.b,row.c
return h
#引入上面创建的资源
options.df.libraries = [
'numpy_zl.whl',
'pandas_zl.whl',
'python_dateutil_zl.whl',
'pytz_zl.whl',
'siz_zl.whl',
# 'pandas_ta.tar.gz',
'pandas_ta.zip',
'setuptools_zl.whl',
# 'distribute.zip'
]
# 官方文档说要加这一行
options.sql.settings = { 'odps.isolation.session.enable': True }
options.verbose = True
df = DataFrame(o.get_table('newland.test'))
res = df.map_reduce(mapper, reducer)
# res.persist(res)
res.persist('newland.test_zl')
|