pymongo常见操作

cooolr 于 2022-06-23 发布

建立连接

import pymongo

def get_mongo_connection(db_name, collection_name):
    client = pymongo.MongoClient(mongoclient)
    db = client[db_name]
    collection = db[collection_name]
    return collection

col = get_mongo_connection("riskManager", "memberApiVisitLog")

建立连接2

import pymongo

def get_mongo_connection(db_name, collection_name):
    client = pymongo.MongoClient('mongodb://' + MONGO_ADDRESS)
    db = client[db_name]
    db.authenticate(MONGO_USERNAME, MONGO_PASSWORD)
    collection = db[collection_name]
    return collection

col = get_mongo_connection("riskManager", "memberApiVisitLog")

查询数据

query = {
    "uri": "/home/get_verify_code", 
    "timestamp": {"$gt":datetime(2021,3,28), "$lt":datetime(2021,3,31)}
}
condition = {"content": True}

# 查询单个
col.find_one(query, condition)

# 全部查询
col.find(query, condition)

覆盖数据

col.save(document)

更新数据

query = {"_id": "xsa8x9699020378023e030"}
col.update_one(query, {'$set': new_data})

插入数据

col.insert_one(data)

批量插入

coll.insert_many(items,ordered=False)

批量操作

import pymongo

handler = pymongo.MongoClient().test_db.test_col

handler.bulk_write([
  pymongo.UpdateMany({'sex': '男', 'result': {'$lt': 90}}, {'$set': {'is_qualified': False}}),
  pymongo.UpdateMany({'sex': '女', 'result': {'$gte': 60}}, {'$set': {'is_qualified': True}})
])

删除数据

# 删除单个
query = { "name": "Taobao" }
col.delete_one(query)

# 删除多个
query = {"createTime": {"$lt":now}}
col.delete_many(query)

写入报错Decimal

from bson import Decimal128 as Decimal

data = {"cash":Decimal(total2)}

时区问题

import pytz

zone7 = pytz.timezone('Asia/Phnom_Penh')
now = zone7.localize(datetime.now())

aggregate group by

query = {"createTime": {"$gte": begin_time, "$lt": end_time}}
group = {
    "_id": {"key": [{"$substr":["$createTime",0,10]}, "$lang", "$platform", "$appVersion", "$memberId"]},
    "count": {"$sum": 1 }
}

result = col.aggregate([{"$match":query}, {"$group":group}], allowDiskUse=True)

排序

# 简单排序
col.find(query, condition).sort("field", -1)

# 多个条件排序
col.find(query, condition).sort([("field1",1), ("field2",-1)])

# sort作为入参
col.find(query, condition, sort=[("field1",1), ("field2",-1)])

query判断是否存在

query = {"openAppCount":{"$exists":True}}

解决游标超时

with col.find(query, no_cursor_timeout=True) as news_list:
    for news in news_list:
        pass

分页查询

query = {}
news_list = col.find(query).sort("lastUpdateTime", 1).skip(0).limit(10000)

news_list = col.find(query).sort("lastUpdateTime", 1).skip(10000).limit(10000)

$in包含

query = { "_id" : {"$in" : [605b31d8ba8e2de5acf0cb87, 605b31d8ba8e2de5acf0cb85]}}

数组查询

query = {"typeTags": {"$elemMatch": {"tagId": {"$in": [314013]}}}}

索引迁移

for name,index_info in collection1.index_information().items():
    keys = index_info["key"]
    # 如果索引值是-1.0这样的小数,需要转换为整数,否则会报错
    # 索引元组也必须是元组,修改值后需要重新转换回元组,否则会报错
    for i in range(len(keys)):
        keys[i] = list(keys[i])
        keys[i][-1] = int(keys[i][-1])
        keys[i] = tuple(keys[i])
    collection2.create_index(keys, name=name)