在Python中执行 PostgreSQL 存储过程和函数
存储过程是一系列结构化的过程语言查询,作为数据字典存储在关系数据库管理系统中,可以多次共享和重用。所有的CRUD操作、查询操作都可以通过调用存储过程来进行。存储过程的使用减少了反复使用的代码的重复。在本文中,让我们讨论如何在Python中执行 PostgreSQL 存储过程和函数。
创建存储过程
- 第一步是编写一个存储过程,语法类似于常规的SQL语句。
- 下面的代码片段显示了一个存储过程,该过程提取书籍名称和书籍 ID,其销售额已超过某个基准量。
- 此存储过程名为get_book_sales(sale_amount),它将 sale_amount 作为必须传递基准销售额的参数。
- 该存储过程包含一个嵌套查询,该查询提取超过基准销售额的图书的图书名称和 ID。
- 此存储过程可以在需要时运行。
- 将以下代码段作为存储过程保存在您的 PostgreSQL 数据库中。
CREATE OR REPLACE FUNCTION get_book_sales(sale_amount integer)
RETURNS TABLE(book_name VARCHAR, book_id INTEGER, store VARCHAR) AS
$
BEGIN
RETURN QUERY
SELECT books.id, books.book_name,
FROM books where books.book_id IN
(SELECT book_id FROM book_sales where book_sales.sales > sale_amount)
END
$
LANGUAGE plpgsql
在Python中执行 PostgreSQL 存储过程和函数
- 创建存储过程后,我们可以定义一个函数,该函数在调用此函数时执行此存储过程。让我们将此函数命名为以销售额为基准的 get_sale_benchmark(sale_amount)。
- 通过将连接字符串传递给 psycopg2.connect()函数,从Python建立到 PostgreSQL 数据库的连接。
- 建立连接后,将 cursor() 对象实例化为名为 engine 的变量
- 然后调用 psycopg2 callproc()函数,它接受两个参数,存储过程的名称和存储过程的参数。
句法:
cursor.callproc(procname[, parameters])
- 此存储过程的结果存储在引擎对象中。这可以使用 fetchall()函数访问。
- 稍后使用这个 fetchall()函数遍历存储过程返回的所有行,如下所示。
- 为了在对数据库执行存储过程时捕获任何错误并在提交所有更改后顺利关闭连接,请使用 try、expect 和 finally 块,如下面的代码所示。
- 通过使用适当的 sales_amount 调用函数来测试 get_sales_benchmark()函数。
解释:
获取查询结果的所有行,将它们作为元组列表返回。如果没有记录,则返回一个空列表。
Python3
import psycopg2
def get_sale_benchmark(sale_amount):
connector = None
try:
conn_string = "host='host_name' dbname='database_name'\
user='user_name' password='your_password'"
connector = psycopg2.connect(conn_string)
engine = connector.cursor()
# call stored procedure
engine.callproc('get_book_sales', [sale_amount, ])
print("fechting Book list that has crosssed sales benchmark")
result = cursor.fetchall()
for row in result:
print("Book Id = ", row[0], )
print("Book Name = ", row[1])
except (Exception, psycopg2.DatabaseError) as error:
print("Error while connecting to PostgreSQL", error)
finally:
# closing database connection.
if connector:
engine.close()
connector.close()
print("PostgreSQL connection is closed")
get_sale_benchmark(500)
输出: