SQLAlchemy ORM – 查询
在本文中,我们将看到如何在Python中使用 SQLAlchemy ORM 进行查询。
为了继续阅读本文,我们需要在我们的系统中安装 sqlalchemy 和任何数据库。为了理解本文,我们使用了 MySQL 数据库。
创建了一个 Profile 表和一个 Students 表:
在这里,我们将介绍以下方法:
- 添加列()
- 添加实体()
- 数数()
- 删除()
- 清楚的()
- 筛选()
- 得到()
- 第一的()
- 通过...分组()
- 加入()
- 一()
添加列()
add_columns() 方法有助于将任何其他列或实体添加到现有查询中。从输出中可以看出,初始查询仅包含 first_name 列,但后来当我们使用 add_columns() 方法时,我们也能够添加 last_name 和 course 列。请注意 add_column() 已被弃用,应该避免使用,相反,我们可以使用 add_columns() (如上所示)或 with_entities() 方法。
Syntax: sqlalchemy.orm.Query.add_columns(*column)
Where: Add one or more column expressions to the list of result columns to be returned.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50),
primary_key=True)
last_name = db.Column(db.String(50),
primary_key=True)
course = db.Column(db.String(50),
primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY
# IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT first_name FROM students
result = session.query(Students.first_name)
print("Query 1:", result)
# SELECT first_name, last_name, course
# FROM students
result = result.add_columns(Students.last_name,
Students.course)
print("Query 2:", result)
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.first_name, "|", r.last_name, "|", r.course)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM PROFILE
result = session.query(Students).count()
# VIEW THE RESULT
print("Count:", result)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50))
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT DISTINCT(first_name) FROM students;
result = session.query(Students) \
.with_entities(db.distinct(Students.first_name)).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r)
Python
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine("mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind = engine)
session = Session()
# DELETE FROM profile WHERE email = 'ravipandey@zmail.com'
result = session.query(Profile) \
.filter(Profile.email == 'ravipandey@zmail.com') \
.delete(synchronize_session=False)
print("Rows deleted:", result)
result = session.query(Profile).count()
print("Total records:", result)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT email FROM student WHERE name LIKE 'Amit%';
result = session.query(Profile) \
.with_entities(Profile.email) \
.filter(Profile.name.like('Amit%')).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print("\n", r.email)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50))
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT first_name, last_name, SUM(score)
# AS total FROM students GROUP BY first_name, last_name;
result = session.query(Students) \
.with_entities(
Students.first_name,
Students.last_name,
db.func.sum(Students.score).label('total')
).group_by(
Students.first_name,
Students.last_name
).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.first_name, r.last_name, "| Score =", r[2])
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM students ORDER BY score DESC, course;
result = session.query(Students) \
.order_by(
Students.score.desc(),
Students.course
).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.first_name, r.last_name, r.course, r.score)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM profile LIMIT 1
result = session.query(Profile).first()
print(result.email, "|", result.name, "|", result.contact)
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.email, r.name, r.contact)
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# profile table should contain only one row, else error
result = session.query(Profile).one()
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
class Location(Base):
__tablename__ = 'location'
email = db.Column(db.String(50), primary_key=True)
location = db.Column(db.String(100))
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM PROFILE
result = session.query(
Profile.email,
Profile.name,
Profile.contact,
Location.location
).join(Location, Profile.email == Location.email)
print("Query:", result)
print()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.email, "|", r.name, "|", r.contact, "|", r.location)
输出:
数数()
count() 方法是我们在 SQL 查询中使用的 COUNT 的同义词。它返回表中存在的记录数。在我们的例子中,students 表由 12 条记录组成,同样可以从开头显示的 students 表截图中验证。
Syntax: sqlalchemy.orm.Query.count()
Return a count of rows this the SQL formed by this Query would return.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM PROFILE
result = session.query(Students).count()
# VIEW THE RESULT
print("Count:", result)
输出:
Count: 12
清楚的()
sqlalchemy 的 distinct() 方法是 SQL 中使用的 DISTINCT 的同义词。它将根据提供的列名返回不同的记录作为参考。在上面的示例中,我们采用了 first_name 字段中存在的不同记录。在 12 个条目中,我们得到 5 个唯一的名字。
Syntax: sqlalchemy.orm.Query.distinct(*expr)
Apply a DISTINCT to the query and return the newly resulting Query.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50))
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT DISTINCT(first_name) FROM students;
result = session.query(Students) \
.with_entities(db.distinct(Students.first_name)).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r)
输出:
删除()
delete() 方法将根据某些条件删除表中存在的记录。在我们的案例中,我们提供了电子邮件为 ravipandey@zmail.com 的条件。该方法返回受到影响的记录数的计数。删除后,使用 count() 方法进行验证。在删除任何条目之前,配置文件表包含 3 条记录(如开头所示)。由于一条记录受到影响,现在我们在表中还剩下 2 条记录。
Syntax: sqlalchemy.orm.Query.delete(synchronize_session=’evaluate’)
Perform a DELETE with an arbitrary WHERE clause. Deletes rows matched by this query from the database and return the count of rows matched as returned by the database’s “row count” feature.
Python
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine("mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind = engine)
session = Session()
# DELETE FROM profile WHERE email = 'ravipandey@zmail.com'
result = session.query(Profile) \
.filter(Profile.email == 'ravipandey@zmail.com') \
.delete(synchronize_session=False)
print("Rows deleted:", result)
result = session.query(Profile).count()
print("Total records:", result)
输出:
筛选()
filter() 方法的工作方式类似于 SQL 中的 WHERE 子句。它接受一个表达式并仅返回满足所提供表达式的那些记录。可以有一个或多个用“ & ”分隔的表达式。在示例中,我们为配置文件表中的名称列提供了 LIKE 条件。
Syntax: sqlalchemy.orm.Query.filter(*criterion)
Apply the given filtering criterion to a copy of this Query, using SQL expressions.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT email FROM student WHERE name LIKE 'Amit%';
result = session.query(Profile) \
.with_entities(Profile.email) \
.filter(Profile.name.like('Amit%')).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print("\n", r.email)
输出:
通过...分组()
SQL 中的 GROUP BY 子句可以使用 group_by() 方法在 SQLAlchemy 中编写。它以实体或列名作为参数,并根据这些列进行分组。我们在上面看到的结果是对学生表的 first_name 和 last_name 列执行的 group by 操作,其中分数由 SUM函数聚合。
Syntax: sqlalchemy.orm.Query.group_by(*clauses)
Apply one or more GROUP BY criterion to the query and return the newly resulting Query.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50))
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT first_name, last_name, SUM(score)
# AS total FROM students GROUP BY first_name, last_name;
result = session.query(Students) \
.with_entities(
Students.first_name,
Students.last_name,
db.func.sum(Students.score).label('total')
).group_by(
Students.first_name,
Students.last_name
).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.first_name, r.last_name, "| Score =", r[2])
输出:
order_by()
在上面的示例中,我们根据 score 列对学生表记录进行了排序。 order_by() 子句将列名作为参数。默认情况下,除非列对象通过 desc() 方法传递,否则假定按升序排序。
Syntax: sqlalchemy.orm.Query.order_by(*clauses)
Apply one or more ORDER BY criteria to the query and return the newly resulting Query.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM students ORDER BY score DESC, course;
result = session.query(Students) \
.order_by(
Students.score.desc(),
Students.course
).all()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.first_name, r.last_name, r.course, r.score)
输出:
第一的()
first() 方法返回查询的第一条记录。它类似于在 SQL 查询末尾应用 LIMIT 1。在输出中,我们可以看到我们能够从表中获取记录。但是,如果我们尝试迭代结果,我们会得到一个错误,即对象不可迭代,原因是结果中只有一条记录。
Syntax: sqlalchemy.engine.Result.first()
Fetch the first row or None if no row is present. Closes the result set and discards remaining rows.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
class Students(Base):
__tablename__ = 'students'
first_name = db.Column(db.String(50), primary_key=True)
last_name = db.Column(db.String(50), primary_key=True)
course = db.Column(db.String(50), primary_key=True)
score = db.Column(db.Float)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM profile LIMIT 1
result = session.query(Profile).first()
print(result.email, "|", result.name, "|", result.contact)
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.email, r.name, r.contact)
输出:
一()
方法 one() 和 first() 可能听起来很相似,但事实并非如此。在上面的示例中,我们看到了错误跟踪,因为 one() 方法试图从只有一个条目的表中获取记录。如果表有 0、2 或更多条目,则会引发异常。它的使用非常罕见。
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# profile table should contain only one row, else error
result = session.query(Profile).one()
输出:
加入()
为了理解 join() 方法,我们创建了一个名为 location 的附加表,它有两列,即 email 和 location。我们在此表中插入了一个条目,其中电子邮件为“amitpathak@zmail.com”,位置为“孟买”。我们已经在配置文件表中有一个具有此电子邮件 ID 的条目,因此让我们尝试在电子邮件列中连接这两个表。在 join() 方法中,第一个参数是我们需要加入的另一个表(配置文件),以及条件中的第二个参数(SQL 中的 ON)。
Syntax: sqlalchemy.orm.Query.join(target, *props, **kwargs)
Create a SQL JOIN against this Query object’s criterion and apply generatively, returning the newly resulting Query.
Python
from sqlalchemy.orm import sessionmaker
import sqlalchemy as db
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# DEFINE THE ENGINE (CONNECTION OBJECT)
engine = db.create_engine(
"mysql+pymysql://root:password@localhost/Geeks4Geeks")
# CREATE THE TABLE MODEL TO USE IT FOR QUERYING
class Profile(Base):
__tablename__ = 'profile'
email = db.Column(db.String(50), primary_key=True)
name = db.Column(db.String(100))
contact = db.Column(db.Integer)
class Location(Base):
__tablename__ = 'location'
email = db.Column(db.String(50), primary_key=True)
location = db.Column(db.String(100))
# CREATE A SESSION OBJECT TO INITIATE QUERY IN DATABASE
Session = sessionmaker(bind=engine)
session = Session()
# SELECT * FROM PROFILE
result = session.query(
Profile.email,
Profile.name,
Profile.contact,
Location.location
).join(Location, Profile.email == Location.email)
print("Query:", result)
print()
# VIEW THE ENTRIES IN THE RESULT
for r in result:
print(r.email, "|", r.name, "|", r.contact, "|", r.location)
输出: