import pymongo
import warnings
warnings.filterwarnings("ignore")
from sshtunnel import SSHTunnelForwarder
def connect_test(tabe_name,database_name):
test_database_name = database_name
test_table_name = tabe_name
url_test = '自己链接mongo的url地址'
client = pymongo.MongoClient(url_test)
client_db = client[test_database_name]
client_table = client_db[test_table_name]
return client_table
if __name__ == '__main__':
primary_key = ['PID', 'dateInclusion']
tabe_name = 'TEST'
database_name = 'enterprise'
client = connect_test(tabe_name,database_name)
all_data = client.find()
print(client.count())
for i in all_data:
print(i)
if __name__ == '__main__':
tabe_name = 'TEST'
database_name = 'enterprise'
client = connect_test(tabe_name,database_name)
all_data = client.find({},{"job_name":1,"_id":0})
for i in all_data:
print(i)
all_data = client.find({},{"pid":1}).sort('pid',-1)
for i in all_data:
print(i)
if __name__ == '__main__':
tabe_name = 'TEST'
database_name = 'enterprise'
client = connect_test(tabe_name,database_name)
all_data = client.find({},{"pid":1}).sort('pid',-1).skip(3)
for i in all_data:
print(i)
分页显示
if __name__ == '__main__':
tabe_name = 'TEST'
database_name = 'enterprise'
client = connect_test(tabe_name,database_name)
all_num = client.find().count()
skip_num = 0
limit_num = 2
while 1:
print(skip_num)
all_data2 = client.find({},{"pid":1,"_id":0}).skip(skip_num).limit(limit_num)
for i in all_data2:
print(i)
skip_num+=limit_num
if skip_num>=all_num:
break
正则匹配
if __name__ == '__main__':
tabe_name = 'TEST'
database_name = 'enterprise'
client = connect_test(tabe_name,database_name)
all_num = client.find().count()
skip_num = 0
limit_num = 2
while 1:
print(skip_num)
all_data2 = client.find({"company_name":{'$regex': '科技'}},{"company_name":1,"pid":1,"_id":-1}).skip(skip_num).limit(limit_num)
for i in all_data2:
print(i)
skip_num+=limit_num
if skip_num>=all_num:
break
|