1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
| class MongoDBClient(object): """定义MongoDB操作类""" def __init__(self, mongo_conf): """ 初始化参数 :param mongo_conf: mongo配置 """ self.client = pymongo.MongoClient( "mongodb://{0}:{1}@{2}:{3}".format( mongo_conf.get("username"), parse.quote(mongo_conf.get("password")), mongo_conf.get("host"), mongo_conf.get("port") ) )
self.db = self.client[mongo_conf.get("db_name")]
def insert_one(self, table_name, insert_data, data_field=None): """ 插入单条数据 :param table_name: 数据表名 :param insert_data: 插入的数据 :param data_field: 数据筛选 :return: """ try: self.db[table_name].insert_one(insert_data, data_field)
except: raise Exception("插入错误")
def insert_many(self, table_name, insert_data): """ 插入多条数据 :param table_name: 数据表名 :param insert_data: 插入数据 :return: """ try: self.db[table_name].insert_many(insert_data)
except: raise Exception("插入错误")
def find_one(self, table_name: str, data_field: dict = None, target_field: dict = None) -> dict: """ 查找单条数据有 :param table_name: 数据表名 :param data_field: 查找数据字段 :param target_field: 指定返回的字段 :return: """ try: result_data = self.db[table_name].find_one(data_field, target_field) return result_data
except: raise Exception("查找错误")
def find_many(self, table_name: str, data_field: dict = None, target_field: dict = None) -> list: """ 查找多条数据 :param table_name: 数据表名 :param data_field: 查找数据字段 :param target_field: 指定返回的字段 :return: """ try: result_data = self.db[table_name].find(data_field, target_field) return result_data
except: raise Exception("查找错误")
def update_one(self, table_name, data_condition, data_set): """ 更新单条数据 :param table_name: 数据表名 :param data_condition: 原数据 :param data_set: 新数据 :return: """ try: self.db[table_name].update_one(data_condition, {"$set": data_set})
except: raise Exception("更新错误")
def update_many(self, table_name, data_condition, data_set): """ 更新多条数据 :param table_name: 数据表名 :param data_condition: 原数据 :param data_set: 新数据 :return: """ try: self.db[table_name].update_many(data_condition, {"$set": data_set})
except: raise Exception("更新错误")
def replace_one(self, table_name, data_condition, data_set): """ 替换单条数据 :param table_name: 数据表名 :param data_condition: 原数据 :param data_set: 新数据 :return: """ try: self.db[table_name].replace_one(data_condition, data_set)
except: raise Exception("替换错误")
def delete_many(self, table_name, delete_data): """ 删除多条数据 :param table_name: 数据表名 :param delete_data: 删除的数据 :return: """ try: self.db[table_name].delete_many(delete_data)
except: raise Exception("删除错误")
def delete_one(self, table_name, delete_data): """ 删除单条数据 :param table_name: 数据表名 :param delete_data: 删除的数据 :return: """ try: self.db[table_name].delete_one(delete_data)
except: raise Exception("删除错误")
def sum_data(self, table_name: str, match_obj: dict, group_obj: dict) -> list: """ 查询数据计算 :param table_name: 数据表名 :param match_obj: 过滤条件 :param group_obj: 计算条件 :return: """ try: data_obj = self.db[table_name].aggregate([ {"$match": match_obj}, {"$group": group_obj} ]) return list(data_obj)
except: raise Exception("查询失败")
def aggregate_query(self, table_name: str, pipeline: list) -> list: """ 查询数据计算 :param table_name: 数据表名 :param pipeline: 聚合过滤条件 :return: """ try: data_obj = self.db[table_name].aggregate(pipeline) return list(data_obj)
except: raise Exception("查询失败")
def close_db(self): """ 关闭数据库 :return: """ self.client.close()
|