Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!

python,最全,种类,数据库,操作方法,想到,类型,里面,甚至,还有 · 浏览次数 : 568

小编点评

```python import boto3dynamodb import azure.cosmos # 创建 DynamoDB 表格 table = dynamodb.create_table( TableName='Employees', KeySchema=[ { 'AttributeName': 'id', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'id', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) # 添加 item 到 DynamoDB 表格 table.put_item( Item={ 'id': 1, 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 } ) # 获取 DynamoDB 表格中的 item response = table.get_item(Key={ 'id': 1, }) # 打印 item item = response['Item'] # 更新 DynamoDB 表格中的 item table.update_item( Key={ 'id': 1 }, UpdateExpression='SET salary = :val1', ExpressionAttributeValues={ ':val1': 25000.00 } ) # 删除 DynamoDB 表格中的 item table.delete_item(Key={ 'id': 1 }) # 连接 Azure Cosmos DB client = CosmosClient(url, credential=key) # 创建数据库 database = client.create_database_if_not_exists(id=database_name) # 创建容器 container = database.create_container_if_not_exists( id=container_name, partition_key=PartitionKey(path=\"/id\"), offer_throughput=400 ) # 插入 item 到 Cosmos DB 中 container.upsert_item({ 'id': '1', 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 }) # 获取 Cosmos DB 中的 item for item in container.read_all_items(): print(item) # 更新 item 的 salary for item in container.read_all_items(): if item['id'] == '1': item['salary'] = 25000.00 container.upsert_item(item) print("Item updated successfully") # 删除 Cosmos DB 中的 item for item in container.read_all_items(): if item['id'] == '1': container.delete_item(item, partition_key='1') print("Item deleted successfully") ```

正文

本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。

MySQL

连接数据库

Python可以使用mysql-connector-python库连接MySQL数据库:

import mysql.connector

conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
print("Opened MySQL database successfully")
conn.close()

CRUD操作

接下来,我们将展示在MySQL中如何进行基本的CRUD操作。

创建(Create)

conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')

cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
print("Table created successfully")

conn.close()

读取(Retrieve)

conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

SQL Server

连接数据库

Python可以使用pyodbc库连接SQL Server数据库:

import pyodbc

conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
print("Opened SQL Server database successfully")
conn.close()

CRUD操作

接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。

创建(Create)

conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')

cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
conn.commit()
print("Table created successfully")

conn.close()

读取(Retrieve)

conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

Oracle

连接数据库

Python可以使用cx_Oracle库连接Oracle数据库:

import cx_Oracle

dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
print("Opened Oracle database successfully")
conn.close()

CRUD操作

接下来,我们将展示在Oracle中如何进行基本的CRUD操作。

创建(Create)

dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)

cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")
conn.commit()
print("Table created successfully")

conn.close()

读取(Retrieve)

dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') 
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

PostgreSQL

连接数据库

Python可以使用psycopg2库连接PostgreSQL数据库:

import psycopg2

conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
print("Opened PostgreSQL database successfully")
conn.close()

CRUD操作

接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。

创建(Create)

conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")

cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
      (ID INT PRIMARY KEY     NOT NULL,
      NAME           TEXT    NOT NULL,
      AGE            INT     NOT NULL,
      ADDRESS        CHAR(50),
      SALARY         REAL);''')
conn.commit()
print("Table created successfully")

conn.close()

读取(Retrieve)

conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

MongoDB

连接数据库

Python可以使用pymongo库连接MongoDB数据库:

from pymongo import MongoClient

client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
print("Opened MongoDB database successfully")
client.close()

CRUD操作

接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。

创建(Create)

在MongoDB中,文档的创建操作通常包含在插入操作中:

client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]

employees = db["Employees"]
employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}

employees.insert_one(employee)
print("Document inserted successfully")

client.close()

读取(Retrieve)

client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]

employees = db["Employees"]
cursor = employees.find()
for document in cursor:
    print(document)

client.close()

更新(Update)

client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]

employees = db["Employees"]
query = { "id": "1" }
new_values = { "$set": { "salary": "25000.00" } }

employees.update_one(query, new_values)

print("Document updated successfully")

client.close()

删除(Delete)

client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]

employees = db["Employees"]
query = { "id": "1" }

employees.delete_one(query)

print("Document deleted successfully")

client.close()

SQLite

连接数据库

Python使用sqlite3库连接SQLite数据库:

import sqlite3

conn = sqlite3.connect('my_database.db')
print("Opened SQLite database successfully")
conn.close()

CRUD操作

接下来,我们将展示在SQLite中如何进行基本的CRUD操作。

创建(Create)

conn = sqlite3.connect('my_database.db')

cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
      (ID INT PRIMARY KEY     NOT NULL,
      NAME           TEXT    NOT NULL,
      AGE            INT     NOT NULL,
      ADDRESS        CHAR(50),
      SALARY         REAL);''')
conn.commit()
print("Table created successfully")

conn.close()

读取(Retrieve)

conn = sqlite3.connect('my_database.db')

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

conn = sqlite3.connect('my_database.db')

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

conn = sqlite3.connect('my_database.db')

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

DB2

连接数据库

Python可以使用ibm_db库连接DB2数据库:

import ibm_db

dsn = (
    "DRIVER={{IBM DB2 ODBC DRIVER}};"
    "DATABASE=my_database;"
    "HOSTNAME=127.0.0.1;"
    "PORT=50000;"
    "PROTOCOL=TCPIP;"
    "UID=username;"
    "PWD=password;"
)
conn = ibm_db.connect(dsn, "", "")
print("Opened DB2 database successfully")
ibm_db.close(conn)

CRUD操作

接下来,我们将展示在DB2中如何进行基本的CRUD操作。

创建(Create)

conn = ibm_db.connect(dsn, "", "")

sql = '''CREATE TABLE Employees
      (ID INT PRIMARY KEY     NOT NULL,
      NAME           VARCHAR(20)    NOT NULL,
      AGE            INT     NOT NULL,
      ADDRESS        CHAR(50),
      SALARY         DECIMAL(9, 2));'''
stmt = ibm_db.exec_immediate(conn, sql)
print("Table created successfully")

ibm_db.close(conn)

读取(Retrieve)

conn = ibm_db.connect(dsn, "", "")

sql = "SELECT id, name, address, salary from Employees"
stmt = ibm_db.exec_immediate(conn, sql)
while ibm_db.fetch_row(stmt):
    print("ID = ", ibm_db.result(stmt, "ID"))
    print("NAME = ", ibm_db.result(stmt, "NAME"))
    print("ADDRESS = ", ibm_db.result(stmt, "ADDRESS"))
    print("SALARY = ", ibm_db.result(stmt, "SALARY"))

ibm_db.close(conn)

更新(Update)

conn = ibm_db.connect(dsn, "", "")

sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"
stmt = ibm_db.exec_immediate(conn, sql)
ibm_db.commit(conn)

print("Total number of rows updated :", ibm_db.num_rows(stmt))

ibm_db.close(conn)

删除(Delete)

conn = ibm_db.connect(dsn, "", "")

sql = "DELETE from Employees where ID = 1"
stmt = ibm_db.exec_immediate(conn, sql)
ibm_db.commit(conn)

print("Total number of rows deleted :", ibm_db.num_rows(stmt))

ibm_db.close(conn)

Microsoft Access

连接数据库

Python可以使用pyodbc库连接Microsoft Access数据库:

import pyodbc

conn_str = (
    r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
    r'DBQ=path_to_your_access_file.accdb;'
)
conn = pyodbc.connect(conn_str)
print("Opened Access database successfully")
conn.close()

CRUD操作

接下来,我们将展示在Access中如何进行基本的CRUD操作。

创建(Create)

conn = pyodbc.connect(conn_str)

cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
      (ID INT PRIMARY KEY     NOT NULL,
      NAME           TEXT    NOT NULL,
      AGE            INT     NOT NULL,
      ADDRESS        CHAR(50),
      SALARY         DECIMAL(9, 2));''')
conn.commit()
print("Table created successfully")

conn.close()

读取(Retrieve)

conn = pyodbc.connect(conn_str)

cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("ADDRESS = ", row[2])
    print("SALARY = ", row[3])

conn.close()

更新(Update)

conn = pyodbc.connect(conn_str)

cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()

print("Total number of rows updated :", cursor.rowcount)

conn.close()

删除(Delete)

conn = pyodbc.connect(conn_str)

cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()

print("Total number of rows deleted :", cursor.rowcount)

conn.close()

Cassandra

连接数据库

Python可以使用cassandra-driver库连接Cassandra数据库:

from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
print("Opened Cassandra database successfully")
cluster.shutdown()

CRUD操作

接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。

创建(Create)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')

session.execute("""
    CREATE TABLE Employees (
        id int PRIMARY KEY,
        name text,
        age int,
        address text,
        salary decimal
    )
""")
print("Table created successfully")

cluster.shutdown()

读取(Retrieve)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')

rows = session.execute('SELECT id, name, address, salary FROM Employees')
for row in rows:
    print("ID = ", row.id)
    print("NAME = ", row.name)
    print("ADDRESS = ", row.address)
    print("SALARY = ", row.salary)

cluster.shutdown()

更新(Update)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')

session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
print("Row updated successfully")

cluster.shutdown()

删除(Delete)

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')

session.execute("DELETE FROM Employees WHERE id = 1")
print("Row deleted successfully")

cluster.shutdown()

Redis

连接数据库

Python可以使用redis-py库连接Redis数据库:

import redis

r = redis.Redis(host='localhost', port=6379, db=0)
print("Opened Redis database successfully")

CRUD操作

接下来,我们将展示在Redis中如何进行基本的CRUD操作。

创建(Create)

r = redis.Redis(host='localhost', port=6379, db=0)

r.set('employee:1:name', 'John')
r.set('employee:1:age', '30')
r.set('employee:1:address', 'New York')
r.set('employee:1:salary', '1000.00')

print("Keys created successfully")

读取(Retrieve)

r = redis.Redis(host='localhost', port=6379, db=0)

print("NAME = ", r.get('employee:1:name').decode('utf-8'))
print("AGE = ", r.get('employee:1:age').decode('utf-8'))
print("ADDRESS = ", r.get('employee:1:address').decode('utf-8'))
print("SALARY = ", r.get('employee:1:salary').decode('utf-8'))

更新(Update)

r = redis.Redis(host='localhost', port=6379, db=0)

r.set('employee:1:salary', '25000.00')

print("Key updated successfully")

删除(Delete)

r = redis.Redis(host='localhost', port=6379, db=0)

r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')

print("Keys deleted successfully")

ElasticSearch

连接数据库

Python可以使用elasticsearch库连接ElasticSearch数据库:

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print("Opened ElasticSearch database successfully")

CRUD操作

接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。

创建(Create)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

employee = {
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
}
res = es.index(index='employees', doc_type='employee', id=1, body=employee)

print("Document created successfully")

读取(Retrieve)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

res = es.get(index='employees', doc_type='employee', id=1)
print("Document details:")
for field, details in res['_source'].items():
    print(f"{field.upper()} = ", details)

更新(Update)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

res = es.update(index='employees', doc_type='employee', id=1, body={
    'doc': {
        'salary': 25000.00
    }
})

print("Document updated successfully")

删除(Delete)

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

res = es.delete(index='employees', doc_type='employee', id=1)

print("Document deleted successfully")

Neo4j

连接数据库

Python可以使用neo4j库连接Neo4j数据库:

from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
print("Opened Neo4j database successfully")
driver.close()

CRUD操作

接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。

创建(Create)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    session.run("CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})")

print("Node created successfully")

driver.close()

读取(Retrieve)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
    for record in result:
        print("ID = ", record["n"]["id"])
        print("NAME = ", record["n"]["name"])
        print("ADDRESS = ", record["n"]["address"])
        print("SALARY = ", record["n"]["salary"])

driver.close()

更新(Update)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")

print("Node updated successfully")

driver.close()

删除(Delete)

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

with driver.session() as session:
    session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")

print("Node deleted successfully")

driver.close()

InfluxDB

连接数据库

Python可以使用InfluxDB-Python库连接InfluxDB数据库:

from influxdb import InfluxDBClient

client = InfluxDBClient(host='localhost', port=8086)
print("Opened InfluxDB database successfully")
client.close()

CRUD操作

接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。

创建(Create)

client = InfluxDBClient(host='localhost', port=8086)

json_body = [
    {
        "measurement": "employees",
        "tags": {
            "id": "1"
        },
        "fields": {
            "name": "John",
            "age": 30,
            "address": "New York",
            "salary": 1000.00
        }
    }
]

client.write_points(json_body)

print("Point created successfully")

client.close()

读取(Retrieve)

client = InfluxDBClient(host='localhost', port=8086)

result = client.query('SELECT "name", "age", "address", "salary" FROM "employees"')

for point in result.get_points():
    print("ID = ", point['id'])
    print("NAME = ", point['name'])
    print("AGE = ", point['age'])
    print("ADDRESS = ", point['address'])
    print("SALARY = ", point['salary'])

client.close()

更新(Update)

InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。

删除(Delete)

同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。

client = InfluxDBClient(host='localhost', port=8086)

# 删除整个系列
client.query('DROP SERIES FROM "employees"')

# 删除某个时间段的数据
# client.query('DELETE FROM "employees" WHERE time < now() - 1d')

print("Series deleted successfully")

client.close()

Snowflake

连接数据库

Python可以使用snowflake-connector-python库连接Snowflake数据库:

from snowflake.connector import connect

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)
print("Opened Snowflake database successfully")
con.close()

CRUD操作

接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。

创建(Create)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)

cur = con.cursor()
cur.execute("""
CREATE TABLE EMPLOYEES (
    ID INT,
    NAME STRING,
    AGE INT,
    ADDRESS STRING,
    SALARY FLOAT
)
""")

cur.execute("""
INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
(1, 'John', 30, 'New York', 1000.00)
""")

print("Table created and row inserted successfully")

con.close()

读取(Retrieve)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)

cur = con.cursor()
cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")

rows = cur.fetchall()

for row in rows:
    print("ID = ", row[0])
    print("NAME = ", row[1])
    print("AGE = ", row[2])
    print("ADDRESS = ", row[3])
    print("SALARY = ", row[4])

con.close()

更新(Update)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)

cur = con.cursor()
cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")

print("Row updated successfully")

con.close()

删除(Delete)

con = connect(
    user='username',
    password='password',
    account='account_url',
    warehouse='warehouse',
    database='database',
    schema='schema'
)

cur = con.cursor()
cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")

print("Row deleted successfully")

con.close()

Amazon DynamoDB

连接数据库

Python可以使用boto3库连接Amazon DynamoDB:

import boto3

dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
                          aws_access_key_id='Your AWS Access Key',
                          aws_secret_access_key='Your AWS Secret Key')

print("Opened DynamoDB successfully")

CRUD操作

接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。

创建(Create)

table = dynamodb.create_table(
    TableName='Employees',
    KeySchema=[
        {
            'AttributeName': 'id',
            'KeyType': 'HASH'
        },
    ],
    AttributeDefinitions=[
        {
            'AttributeName': 'id',
            'AttributeType': 'N'
        },
    ],
    ProvisionedThroughput={
        'ReadCapacityUnits': 5,
        'WriteCapacityUnits': 5
    }
)

table.put_item(
   Item={
        'id': 1,
        'name': 'John',
        'age': 30,
        'address': 'New York',
        'salary': 1000.00
    }
)

print("Table created and item inserted successfully")

读取(Retrieve)

table = dynamodb.Table('Employees')

response = table.get_item(
   Key={
        'id': 1,
    }
)

item = response['Item']
print(item)

更新(Update)

table = dynamodb.Table('Employees')

table.update_item(
    Key={
        'id': 1,
    },
    UpdateExpression='SET salary = :val1',
    ExpressionAttributeValues={
        ':val1': 25000.00
    }
)

print("Item updated successfully")

删除(Delete)

table = dynamodb.Table('Employees')

table.delete_item(
    Key={
        'id': 1,
    }
)

print("Item deleted successfully")

Microsoft Azure CosMos DB

连接数据库

Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB:

from azure.cosmos import CosmosClient, PartitionKey, exceptions

url = 'Cosmos DB Account URL'
key = 'Cosmos DB Account Key'
client = CosmosClient(url, credential=key)

database_name = 'testDB'
database = client.get_database_client(database_name)

container_name = 'Employees'
container = database.get_container_client(container_name)

print("Opened CosMos DB successfully")

CRUD操作

接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。

创建(Create)

database = client.create_database_if_not_exists(id=database_name)

container = database.create_container_if_not_exists(
    id=container_name, 
    partition_key=PartitionKey(path="/id"),
    offer_throughput=400
)

container.upsert_item({
    'id': '1',
    'name': 'John',
    'age': 30,
    'address': 'New York',
    'salary': 1000.00
})

print("Container created and item upserted successfully")

读取(Retrieve)

for item in container.read_all_items():
    print(item)

更新(Update)

for item in container.read_all_items():
    if item['id'] == '1':
        item['salary'] = 25000.00
        container.upsert_item(item)
        
print("Item updated successfully")

删除(Delete)

for item in container.read_all_items():
    if item['id'] == '1':
        container.delete_item(item, partition_key='1')
        
print("Item deleted successfully")

如有帮助,请多关注
个人微信公众号:【Python全视角】
TeahLead_KrisChang,10+年的互联网和人工智能从业经验,10年+技术和业务团队管理经验,同济软件工程本科,复旦工程管理硕士,阿里云认证云服务资深架构师,上亿营收AI产品业务负责人。

与Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!相似的内容:

Python史上最全种类数据库操作方法,你能想到的数据库类型都在里面!甚至还有云数据库!

本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,El

< Python全景系列-4 > 史上最全文件类型读写库大盘点!什么?还包括音频、视频?

介绍史上最全PYTHON文件类型读写库大盘点!包含常用和不常用的大量文件格式!文本、音频、视频应有尽有!废话不多说!走起来!

含辞未吐,声若幽兰,史上最强免费人工智能AI语音合成TTS服务微软Azure(Python3.10接入)

所谓文无第一,武无第二,云原生人工智能技术目前呈现三足鼎立的态势,微软,谷歌以及亚马逊三大巨头各擅胜场,不分伯仲,但目前微软Azure平台不仅仅只是一个PaaS平台,相比AWS,以及GAE,它应该是目前提供云计算人工智能服务最全面的一个平台,尤其是语音合成领域,论AI语音的平顺、自然以及拟真性,无平

闻其声而知雅意,基于Pytorch(mps/cpu/cuda)的人工智能AI本地语音识别库Whisper(Python3.10)

前文回溯,之前一篇:含辞未吐,声若幽兰,史上最强免费人工智能AI语音合成TTS服务微软Azure(Python3.10接入),利用AI技术将文本合成语音,现在反过来,利用开源库Whisper再将语音转回文字,所谓闻其声而知雅意。 Whisper 是一个开源的语音识别库,它是由Facebook AI

吾剑未尝不利,国内Azure平替,科大讯飞人工智能免费AI语音合成(TTS)服务Python3.10接入

微软Azure平台的语音合成(TTS)技术确实神乎其技,这一点在之前的一篇:含辞未吐,声若幽兰,史上最强免费人工智能AI语音合成TTS服务微软Azure(Python3.10接入),已经做过详细介绍,然则Azure平台需要信用卡验证,有一定门槛,对国内用户不太友好,放眼神州,科大讯飞的讯飞开放平台也

《最新出炉》系列初窥篇-Python+Playwright自动化测试-13-playwright操作iframe-下篇

1.简介 通过前边两篇的学习,想必大家已经对iframe有了一定的认识和了解,今天这一篇主要是对iframe做一个总结,主要从iframe的操作(输入框、点击等等)和定位两个方面进行总结。 2.iframe是什么? iframe 简单来说就是一个 html 嵌套了另外一个 html。在页面元素上最简

Python学习之三: 编译二进制

Python学习之三: 编译二进制 摘要 每次使用python 执行py文件其实是比较麻烦的 主要是还得安装python的虚拟机,以及安装对应的pip包. 感觉比较繁杂 理论上最快捷的方式是编译成 二进制直接运行. 所以这里主要是说一下通过pycharm和linux机器进行二进制编译的过程 Pych

快速上手python的简单web框架flask

简介 python可以做很多事情,虽然它的强项在于进行向量运算和机器学习、深度学习等方面。但是在某些时候,我们仍然需要使用python对外提供web服务。 比如我们现在有一个用python写好的模型算法,这个模型算法需要接收前端的输入,然后进行模拟运算,最终得到最后的输出。这个流程是一个典型的web

Python装饰器实例讲解(三)

Python装饰器实例讲解(三) 本文多参考《流畅的python》,在此基础上增加了一些实例便于理解 姊妹篇 Python装饰器实例讲解(一),让你简单的会用 Python装饰器实例讲解(二),主要讲了一个万能公式(原理) 本文其实反而是最最基础的部分,当然也回答了好几个关键的问题,也有一些是重复的

<Python全景系列-1> Hello World,1分钟配置好你的python环境

欢迎来到我们的系列博客《Python360全景》!在这个系列中,我们将带领你从Python的基础知识开始,一步步深入到高级话题,帮助你掌握这门强大而灵活的编程语法。无论你是编程新手,还是有一定基础的开发者,这个系列都将提供你需要的知识和技能。这是我们的第一篇文章,让我们从最基础的开始:如何在你的电脑上配置Python环境。