📜  SQLAlchemy ORM – 查询

📅  最后修改于: 2022-05-13 01:55:29.631000             🧑  作者: Mango

SQLAlchemy ORM – 查询

在本文中,我们将看到如何在Python中使用 SQLAlchemy ORM 进行查询。

为了继续阅读本文,我们需要在我们的系统中安装 sqlalchemy 和任何数据库。为了理解本文,我们使用了 MySQL 数据库。

创建了一个 Profile 表和一个 Students 表:

在这里,我们将介绍以下方法:

  • 添加列()
  • 添加实体()
  • 数数()
  • 删除()
  • 清楚的()
  • 筛选()
  • 得到()
  • 第一的()
  • 通过...分组()
  • 加入()
  • 一()

添加列()

add_columns() 方法有助于将任何其他列或实体添加到现有查询中。从输出中可以看出,初始查询仅包含 first_name 列,但后来当我们使用 add_columns() 方法时,我们也能够添加 last_name 和 course 列。请注意 add_column() 已被弃用,应该避免使用,相反,我们可以使用 add_columns() (如上所示)或 with_entities() 方法。

Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50),
                           primary_key=True)
    last_name = db.Column(db.String(50),
                          primary_key=True)
    course = db.Column(db.String(50),
                       primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY
# IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT first_name FROM students
result = session.query(Students.first_name)
print("Query 1:", result)
 
# SELECT first_name, last_name, course
# FROM students
result = result.add_columns(Students.last_name,
                            Students.course)
print("Query 2:", result)
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.first_name, "|", r.last_name, "|", r.course)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM PROFILE
result = session.query(Students).count()
 
# VIEW THE RESULT
print("Count:", result)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50))
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT DISTINCT(first_name) FROM students;
result = session.query(Students) \
    .with_entities(db.distinct(Students.first_name)).all()
 
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r)


Python
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine("mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email   = db.Column(db.String(50), primary_key=True)
    name    = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind = engine)
session = Session()
 
# DELETE FROM profile WHERE email = 'ravipandey@zmail.com'
result = session.query(Profile) \
    .filter(Profile.email == 'ravipandey@zmail.com') \
        .delete(synchronize_session=False)
print("Rows deleted:", result)
 
result = session.query(Profile).count()
print("Total records:", result)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT email FROM student WHERE name LIKE 'Amit%';
result = session.query(Profile) \
    .with_entities(Profile.email) \
    .filter(Profile.name.like('Amit%')).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print("\n", r.email)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50))
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT first_name, last_name, SUM(score)
# AS total FROM students GROUP BY first_name, last_name;
result = session.query(Students) \
    .with_entities(
        Students.first_name,
        Students.last_name,
        db.func.sum(Students.score).label('total')
).group_by(
        Students.first_name,
        Students.last_name
).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.first_name, r.last_name, "| Score =", r[2])


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM students ORDER BY score DESC, course;
result = session.query(Students) \
    .order_by(
        Students.score.desc(),
        Students.course
).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.first_name, r.last_name, r.course, r.score)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM profile LIMIT 1
result = session.query(Profile).first()
 
print(result.email, "|", result.name, "|", result.contact)
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.email, r.name, r.contact)


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# profile table should contain only one row, else error
result = session.query(Profile).one()


Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
class Location(Base):
 
    __tablename__ = 'location'
 
    email = db.Column(db.String(50), primary_key=True)
    location = db.Column(db.String(100))
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM PROFILE
result = session.query(
    Profile.email,
    Profile.name,
    Profile.contact,
    Location.location
).join(Location, Profile.email == Location.email)
 
print("Query:", result)
print()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.email, "|", r.name, "|", r.contact, "|", r.location)


输出:

数数()

count() 方法是我们在 SQL 查询中使用的 COUNT 的同义词。它返回表中存在的记录数。在我们的例子中,students 表由 12 条记录组成,同样可以从开头显示的 students 表截图中验证。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM PROFILE
result = session.query(Students).count()
 
# VIEW THE RESULT
print("Count:", result)

输出:

Count: 12

清楚的()

sqlalchemy 的 distinct() 方法是 SQL 中使用的 DISTINCT 的同义词。它将根据提供的列名返回不同的记录作为参考。在上面的示例中,我们采用了 first_name 字段中存在的不同记录。在 12 个条目中,我们得到 5 个唯一的名字。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50))
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT DISTINCT(first_name) FROM students;
result = session.query(Students) \
    .with_entities(db.distinct(Students.first_name)).all()
 
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r)

输出:

删除()

delete() 方法将根据某些条件删除表中存在的记录。在我们的案例中,我们提供了电子邮件为 ravipandey@zmail.com 的条件。该方法返回受到影响的记录数的计数。删除后,使用 count() 方法进行验证。在删除任何条目之前,配置文件表包含 3 条记录(如开头所示)。由于一条记录受到影响,现在我们在表中还剩下 2 条记录。

Python

import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine("mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email   = db.Column(db.String(50), primary_key=True)
    name    = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind = engine)
session = Session()
 
# DELETE FROM profile WHERE email = 'ravipandey@zmail.com'
result = session.query(Profile) \
    .filter(Profile.email == 'ravipandey@zmail.com') \
        .delete(synchronize_session=False)
print("Rows deleted:", result)
 
result = session.query(Profile).count()
print("Total records:", result)

输出:

筛选()

filter() 方法的工作方式类似于 SQL 中的 WHERE 子句。它接受一个表达式并仅返回满足所提供表达式的那些记录。可以有一个或多个用“ & ”分隔的表达式。在示例中,我们为配置文件表中的名称列提供了 LIKE 条件。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT email FROM student WHERE name LIKE 'Amit%';
result = session.query(Profile) \
    .with_entities(Profile.email) \
    .filter(Profile.name.like('Amit%')).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print("\n", r.email)

输出:

通过...分组()

SQL 中的 GROUP BY 子句可以使用 group_by() 方法在 SQLAlchemy 中编写。它以实体或列名作为参数,并根据这些列进行分组。我们在上面看到的结果是对学生表的 first_name 和 last_name 列执行的 group by 操作,其中分数由 SUM函数聚合。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50))
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT first_name, last_name, SUM(score)
# AS total FROM students GROUP BY first_name, last_name;
result = session.query(Students) \
    .with_entities(
        Students.first_name,
        Students.last_name,
        db.func.sum(Students.score).label('total')
).group_by(
        Students.first_name,
        Students.last_name
).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.first_name, r.last_name, "| Score =", r[2])

输出:

order_by()

在上面的示例中,我们根据 score 列对学生表记录进行了排序。 order_by() 子句将列名作为参数。默认情况下,除非列对象通过 desc() 方法传递,否则假定按升序排序。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM students ORDER BY score DESC, course;
result = session.query(Students) \
    .order_by(
        Students.score.desc(),
        Students.course
).all()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.first_name, r.last_name, r.course, r.score)

输出:

第一的()

first() 方法返回查询的第一条记录。它类似于在 SQL 查询末尾应用 LIMIT 1。在输出中,我们可以看到我们能够从表中获取记录。但是,如果我们尝试迭代结果,我们会得到一个错误,即对象不可迭代,原因是结果中只有一条记录。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
class Students(Base):
 
    __tablename__ = 'students'
 
    first_name = db.Column(db.String(50), primary_key=True)
    last_name = db.Column(db.String(50), primary_key=True)
    course = db.Column(db.String(50), primary_key=True)
    score = db.Column(db.Float)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM profile LIMIT 1
result = session.query(Profile).first()
 
print(result.email, "|", result.name, "|", result.contact)
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.email, r.name, r.contact)

输出:

一()

方法 one() 和 first() 可能听起来很相似,但事实并非如此。在上面的示例中,我们看到了错误跟踪,因为 one() 方法试图从只有一个条目的表中获取记录。如果表有 0、2 或更多条目,则会引发异常。它的使用非常罕见。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# profile table should contain only one row, else error
result = session.query(Profile).one()

输出:

加入()

为了理解 join() 方法,我们创建了一个名为 location 的附加表,它有两列,即 email 和 location。我们在此表中插入了一个条目,其中电子邮件为“amitpathak@zmail.com”,位置为“孟买”。我们已经在配置文件表中有一个具有此电子邮件 ID 的条目,因此让我们尝试在电子邮件列中连接这两个表。在 join() 方法中,第一个参数是我们需要加入的另一个表(配置文件),以及条件中的第二个参数(SQL 中的 ON)。

Python

from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
 
Base = declarative_base()
 
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
    "mysql+pymysql://root:password@localhost/Geeks4Geeks")
 
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
 
    __tablename__ = 'profile'
 
    email = db.Column(db.String(50), primary_key=True)
    name = db.Column(db.String(100))
    contact = db.Column(db.Integer)
 
 
class Location(Base):
 
    __tablename__ = 'location'
 
    email = db.Column(db.String(50), primary_key=True)
    location = db.Column(db.String(100))
 
 
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
 
# SELECT * FROM PROFILE
result = session.query(
    Profile.email,
    Profile.name,
    Profile.contact,
    Location.location
).join(Location, Profile.email == Location.email)
 
print("Query:", result)
print()
 
# VIEW THE ENTRIES IN THE RESULT
for r in result:
    print(r.email, "|", r.name, "|", r.contact, "|", r.location)

输出: