微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用pymongo加快MogoDB中有条件的插入过程

如何解决如何使用pymongo加快MogoDB中有条件的插入过程

我有一个数据框(在.csv中),其中包含有关工作合同的信息。每行代表一个特定的合同(最好称为“激活”),每列中都包含有关该合同的信息,例如工人ID,开始日期,雇主ID等。在此,我在以下几行中仅提供示例。原始样本由超过1700万行和每个合同的更多信息组成。通过 pymongo 将数据插入我的MongoDB集合中时,我想为每个人都有一个特定的文档( i.e 。,工作人员ID),并使用一个包含有关合同信息的数组。注意:某些数据在同一工作ID(例如,性别,出生)的条目之间不会发生变化。

我猜下面的例子可以阐明我的目标。

# Libraries

import datetime
import numpy as np
import pandas as pd
from pandas import DataFrame
from pymongo import MongoClient

# Create example DataFrame
data = { 'worker_id': [ 1234,4556,1234,5578,9987 ],'birth': [ '1990-02-27','1970-01-21','1990-02-27','1968-07-05','1990-05-05' ],'gender': [ 'F','M','F','X','F' ],'employer_id': [ 5567,7789,4321,2234 ],'start': [ '2012-09-14','2011-12-31','2010-12-31','2009-10-31','2015-04-28','2008-01-01' ],'end': [ '2012-10-14','2012-01-01','2011-05-25','2010-10-31','2018-01-01','NaT' ],'contract': [ 'A.01.00','A.02.00','A.04.02','A.01.00','A.05.00' ]
       }

df = pd.DataFrame(data,columns = [ 'worker_id','birth','gender','employer_id','start','end','contract' ])

# Sort by 'worker_id' and 'start'
df.sort_values(['worker_id','start'],ascending = [True,True],inplace = True)

# Change data type for 'worker_id' and 'employer_id'
df['worker_id'] = df['worker_id'].astype(np.int64)
df['employer_id'] = df['employer_id'].astype(np.int64)

# Connect to MongoDB
client = MongoClient('localhost',27017)
db = client['example_db']
col = db['example_collection']

for index,row in df.iterrows():
    
    dt_birth = None if str(row['birth']) == 'NaT' else datetime.datetime.strptime(str(row['birth']),'%Y-%m-%d')
    dt_start = None if str(row['start']) == 'NaT' else datetime.datetime.strptime(str(row['start']),'%Y-%m-%d') 
    dt_end = None if str(row['end']) == 'NaT' else datetime.datetime.strptime(str(row['end']),'%Y-%m-%d') 

    # Check whether a document with the 'worker_id' does already exist
    find = col.find_one( { "worker_id" : row['worker_id'] } )  
    
    if find == None: 
        
        # If 'worker_id' is not already in the collection,a new document is generated
        col.insert_one( {
            "worker_id" : row['worker_id'],"birth" : dt_birth,"gender" : str(row['gender']),"activations" : [ {
                "employer_id" : row['employer_id'],"start" : dt_start,"end" : dt_end,"contract" : str(row['contract']),"coef" : 6.5,# It will be the same for each object in 'activations'
                "ord_id" : np.float64(1) # The first one has value 1,then it will increment
                } ] 
            } )

    else:
        
        # Identify the last 'ord_id' in 'activations'
        pipeline = [
            { "$match": { 
                "worker_id": row['worker_id'] } },{ "$project": { 
                "_id": 0,"last_activation": { "$arrayElemAt": [ "$activations.ord_id",0 ] } 
                } } 
            ]
        
        last_ord_id = DataFrame(list(col.aggregate(pipeline)))
        
        # For each row with the 'worker_id' already in the db,# the function pushes the new information in the array 'activations'
        col.update_one(    
            { "worker_id" : row['worker_id'] },{ "$push": { "activations":
            {
            "$each": [ {
                "employer_id" : row['employer_id'],"ord_id" : last_ord_id.loc[0,'last_activation'] + 1 # Increment 'ord_id' by one
            } ],"$position": 0 # Place as first object in 'activations'
            } } } )
            
    col.create_index("worker_id") # Set 'worker_id' as index

最终,MongoDB中的数据库将如下所示:

{
    "_id" : ObjectId("5f690c7e6267ee26f8b84034"),"worker_id" : 9987,"birth" : ISODate("1990-05-05T02:00:00.000+02:00"),"gender" : "F","activations" : [
        {
            "employer_id" : 2234,"start" : ISODate("2008-01-01T01:00:00.000+01:00"),"end" : null,"contract" : "A.05.00","ord_id" : 1
        }
    ]
},{
    "_id" : ObjectId("5f690c7e6267ee26f8b84033"),"worker_id" : 5578,"birth" : ISODate("1968-07-05T02:00:00.000+02:00"),"gender" : "X","activations" : [
        {
            "employer_id" : 4321,"start" : ISODate("2015-04-28T02:00:00.000+02:00"),"end" : ISODate("2018-01-01T01:00:00.000+01:00"),"contract" : "A.02.00",{
    "_id" : ObjectId("5f690c7e6267ee26f8b84032"),"worker_id" : 4556,"birth" : ISODate("1970-01-21T01:00:00.000+01:00"),"gender" : "M","activations" : [
        {
            "employer_id" : 7789,"start" : ISODate("2011-12-31T01:00:00.000+01:00"),"end" : ISODate("2012-01-01T01:00:00.000+01:00"),"ord_id" : 2
        },{
            "employer_id" : 7789,"start" : ISODate("2010-12-31T01:00:00.000+01:00"),"end" : ISODate("2011-05-25T02:00:00.000+02:00"),"contract" : "A.04.02",{
    "_id" : ObjectId("5f690c7e6267ee26f8b84031"),"worker_id" : 1234,"birth" : ISODate("1990-02-27T01:00:00.000+01:00"),"activations" : [
        {
            "employer_id" : 5567,"start" : ISODate("2012-09-14T02:00:00.000+02:00"),"end" : ISODate("2012-10-14T02:00:00.000+02:00"),"contract" : "A.01.00","start" : ISODate("2009-10-31T01:00:00.000+01:00"),"end" : ISODate("2010-10-31T02:00:00.000+02:00"),"ord_id" : 1
        }
    ]
}

我想知道我写的过程是否可以某种方式得到完善和加快。不幸的是,我对Python和MongoDB的了解有限。预先感谢您提出任何建议。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。