用Python脚本迁移MongoDB数据到金仓-kingbase数据库

python,mongodb,kingbase · 浏览次数 : 6

小编点评

**脚本运行结果:** * 脚本成功将MongoDB里的数据插入到了KingBase数据库中。 * 脚本使用分页和排序将数据分成多个批次进行插入,以提高效率。 * 脚本记录了每个批次的偏移量,以便下次可以从offset.txt中获取当前的偏移量。 * 脚本使用MD5值进行数据校验,确保数据完整性。 **注意:** * 需要在运行脚本之前设置MongoDB的用户名、密码、主机和端口等信息。 * 可以根据需要调整脚本中的一些参数,例如batch_size,偏移量和MD5值算法。

正文

1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表;

此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档)

所以在金仓里创建表 create table admin(id varchar,image bytea);

2、安装Python环境,由于是内网环境,没有yum源,需要从能连接互联网的环境下载好相应的安装包

Python:3.9.0版本

用到以下这些包

import pymongo
import ksycopg2
import concurrent.futures
from ksycopg2 import pool
import logging
from urllib.parse import quote_plus

------------------------------------------------------------------------------------

pip download pymongo -d pymongo_packages --下载pymongo库

pip3 install --no-index --find-links=. pymongo --安装pymongo库

金仓的Python驱动可以到金仓官网下载,需要找和Python对应的版本

以下是Python脚本内容:

import pymongo
import psycopg2
import concurrent.futures
from psycopg2 import pool
import logging
from urllib.parse import quote_plus
import os

# 初始化日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')

# MongoDB设置
username='admin'
password='SCJGscjg@123'
host='10.253.228.41'
port='27017'
encoded_username = quote_plus(username)
encoded_password = quote_plus(password)
uri = f"mongodb://{encoded_username}:{encoded_password}@{host}:{port}/"
mongo_client = pymongo.MongoClient(uri)
mongo_db = mongo_client['admin']
mongo_collection = mongo_db['admin']

# 连接池设置
kb_pool = psycopg2.pool.ThreadedConnectionPool(
    minconn=1,
    maxconn=20,
    host="10.253.228.110",
    database="mongo",
    user="system",
    password="1",
    port="54322"
)

# 偏移量存储文件
OFFSET_FILE = 'offset.txt'

def read_offset():
    if os.path.exists(OFFSET_FILE):
        with open(OFFSET_FILE, 'r') as f:
            return int(f.read().strip())
    return 0

def write_offset(offset):
    with open(OFFSET_FILE, 'w') as f:
        f.write(str(offset))

def batch_insert(mongo_data):
    kb_conn = None
    try:
        kb_conn = kb_pool.getconn()
        with kb_conn.cursor() as kb_cursor:
            for data in mongo_data:
                id_value = data['_id']
                image_data = data['image']
                insert_query = "INSERT INTO dzzzwj(id, image) VALUES (%s, %s)"
                kb_cursor.execute(insert_query, (id_value, image_data))
            kb_conn.commit()
        return True
    except Exception as e:
        logging.error(f"批量插入错误: {e}")
        return False
    finally:
        if kb_conn:
            kb_pool.putconn(kb_conn)

def main():
    batch_size = 80
    offset = read_offset()
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)
    
    try:
        while True:
            mongo_data = list(mongo_collection.find().skip(offset).limit(batch_size))
            if not mongo_data:
                break

            future = executor.submit(batch_insert, mongo_data)
            future.add_done_callback(lambda f, offset=offset: (
                logging.info(f"Batch completed with offset {offset}") if f.result() else logging.error(f"Batch failed with offset {offset}"),
                write_offset(offset + batch_size) if f.result() else None
            ))
            offset += batch_size if future.result() else 0
    except Exception as e:
        logging.error(f"主循环错误: {e}")
    finally:
        executor.shutdown(wait=True)
        mongo_client.close()
        kb_pool.closeall()
        logging.info("资源已清理完毕。")

if __name__ == "__main__":
    main()

这段代码思路:

(1)连接MongoDB和kingbase数据;

(2)因为MongoDB数据量比较大,并且需要断点续传,索引用了分页和排序;

(3)数据成功插入金仓数据库后,增加偏移量,并且将当前偏移量记录在offset.txt里面,以便脚本停了,可以再重启接着迁数据;

因为二进制数据从MongoDB和金仓数据查询出来的内容看着不一样,所以下面的代码是计算两边数据md5值对比的简单代码

import pymongo
import ksycopg2
import base64
import hashlib

def compute_hash(data):
    return hashlib.md5(data).hexdigest()

mongo_client = pymongo.MongoClient('mongodb://127.0.0.1:27017/')
mongo_db = mongo_client['admin']
mongo_collection = mongo_db['mongodb']

database = "test"
user = "system"
password = "1"
host = "127.0.0.1"
port = "54322"

conn = ksycopg2.connect(database=database, user=user, password=password, host=host, port=port)

cursor = conn.cursor()


mongo_data = mongo_collection.find()
print(mongo_data)    

    # 插入到 kingbase
for data in mongo_data:
   id_value = data['_id']
   image_data = data['image']

   #image_data = base64.b64encode(base64_data).decode('utf-8')

   image_data_byte = image_data 
   if isinstance(image_data, bytes):
       mongo_hash = compute_hash(image_data_byte)
       print(mongo_hash)

   #image_data = base64.b64encode(base64_data).decode('utf-8')
   if id_value and image_data:
      insert_query = "INSERT INTO zzwj(_id, image) VALUES (%s, %s)"
      cursor.execute(insert_query, (id_value, image_data))

    # 提交事务
conn.commit()

cursor.execute("select _id, image from zzwj")
rows = cursor.fetchall()

for row in rows:
    _id = row[0]
    image_byte = row[1]
    
    pg_hash = compute_hash(image_byte)
    print(pg_hash)


# 关闭连接
cursor.close()
conn.close()
mongo_client.close()

 

与用Python脚本迁移MongoDB数据到金仓-kingbase数据库相似的内容:

用Python脚本迁移MongoDB数据到金仓-kingbase数据库

1、首先需要明确MongoDB与kingbase的对应关系,collection相当于table,filed相当于字段,根据这个对应关系创建表; 此次迁移的MongoDB里的数据字段是:_id(自动生成的objectid),image(转成二进制存储的文档) 所以在金仓里创建表 create tab

用 Python 脚本实现电脑唤醒后自动拍照 截屏并发邮件通知

背景 背景是这样的, 我的家里台式机常年 休眠, 并配置了 Wake On Lan (WOL) 方便远程唤醒并使用. 但是我发现, 偶尔台式机会被其他情况唤醒, 这时候我并不知道, 结果白白运行了好几天, 浪费了很多电. 所以我的需求是这样的: 🤔 电脑唤醒后(可能是开机, 有可能是从休眠状态唤醒

为ssh服务器添加2fa认证,一个python脚本全搞定

服务器ssh如果被别人登陆就是一场灾难,所以我研究了ssh认证,我发现Google Authenticator PAM可以实现ssh的2fa认证,但是安装和配置比较麻烦。因此我用python实现了ssh的2fa认证。考虑到很多Linux服务器默认安装python,所以我用py脚本,并只使用标准库,不

一个用来画拉氏图的简单Python脚本

这里我提供了一个用于画拉氏图的Python脚本源代码,供大家免费使用。虽然现在也有很多免费的平台和工具可以用,但很多都是黑箱,有需要的开发者可以直接在这个脚本基础上二次开发,定制自己的拉氏图绘制方法。

Python 搭建 FastAPI 项目

一般网上的文章都是以脚本的方式写Demor的,没找到自己想要的那种项目结构型的示例(类似Java SpringBoot 创建 Model,通过 pom 进行关联配置的那种) 看了一些源码,再结合自己的想法,建了一个简单的示例, 用 Python 做接口服务的项目搭建,仅供参考 代码结构说明 VipQ

[转帖]Jmeter笔记:使用Jmeter向kafka发送消息

https://www.cnblogs.com/daydayup-lin/p/14124816.html 日常工作中有时候需要向kafka中发送消息来测试功能或者性能,这时候我们怎么办呢?我之前是自己写个简单的python脚本来模拟发送消息的,其实用Jmeter来实现也比较简单方便。 1、我们必须有

python独立脚本应用Django项目的环境

一、需求说明 一直用 Django 在开发一个网站项目,其中的注册用户和登录,都是使用Django自带的认证系统。主要是对密码的加密,在注册或者登录的时候,前端传递过来的密码,我会使用Django的set_password()方法再加密一次 经过加密后的数据库中的数据样子如下: 现在我有另外一个需求

限速上传文件到腾讯对象存储cos的脚本

官网:https://cloud.tencent.com/document/product/436/12269 安装包,这里用的python2.7 # pip install -U cos-python-sdk-v5 -i https://mirrors.tencent.com/pypi/simpl

用python字典统计CSV数据

1.用python字典统计CSV数据的步骤和代码示例 为了使用Python字典来统计CSV数据,我们可以使用内置的csv模块来读取CSV文件,并使用字典来存储统计信息。以下是一个详细的步骤和完整的代码示例: 1.1步骤 (1)导入csv模块。 (2)打开CSV文件并读取数据。 (3)初始化一个空字典

用python用户注册和短信验证码逻辑实现案例

一.写代码前分析(逻辑分析OK了才可以顺利成章的敲代码): A、用户发送请求 1、注册账号(用户名不能重复)--按照需求进行判断 2、短信验证码(有效期5分钟)--对短信验证码进行保存 B、用户注册、短信验证用不同得函数封装实现 d_user={} #存放用户名和密码的数据字典 verificati