📅  最后修改于: 2023-12-03 14:49:19.313000             🧑  作者: Mango
Apache Spark 可以轻松地从 Elasticsearch 中读取数据,并利用 Elasticsearch 的强大搜索和聚合功能进行分析。本文将介绍如何使用 Spark 从 Elasticsearch 中加载数据。
首先需要安装 Elasticsearch,并创建一个索引和一些文档。可以使用curl命令来创建索引:
curl -H "Content-Type: application/json" -XPUT "http://localhost:9200/test" -d '
{
"mappings": {
"properties": {
"name": {"type": "keyword"},
"age": {"type": "integer"},
"gender": {"type": "keyword"}
}
}
}'
然后使用以下命令向索引中添加一些文档:
curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Alice", "age": 25, "gender": "female"}'
curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Bob", "age": 30, "gender": "male"}'
curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Charlie", "age": 35, "gender": "male"}'
在读取 Elasticsearch 数据之前,需要创建一个 Spark 会话。以下是创建 Spark 会话的代码:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
.appName("ElasticsearchReader")
.master("local[*]")
.getOrCreate()
现在可以使用 Spark 中的 Elasticsearch 数据源 API 从 Elasticsearch 中读取数据。以下是代码:
val elasticOptions = Map(
"es.nodes" -> "localhost",
"es.port" -> "9200",
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.read.field.as.array.include" -> "tags"
)
val people = spark.read.format("org.elasticsearch.spark.sql")
.options(elasticOptions)
.load("test/_doc")
people.show()
以上代码将从名为 "test" 的 Elasticsearch 索引的 "_doc" 类型中读取所有文档,并将它们加载到 Spark DataFrame 中。可以使用 show()
方法查看 DataFrame 的内容。
可以将 Spark DataFrame 中的数据保存回 Elasticsearch 索引中。可以使用以下代码:
val elasticOptions = Map(
"es.nodes" -> "localhost",
"es.port" -> "9200",
"es.index.auto.create" -> "true",
"es.nodes.wan.only" -> "true",
"es.write.operation" -> "index"
)
val people = Seq(("David", 40, "male"), ("Elsa", 35, "female"))
.toDF("name", "age", "gender")
people.write.format("org.elasticsearch.spark.sql")
.options(elasticOptions)
.mode("Append")
.save("test/_doc")
此代码将一个包含三列的 DataFrame 中的两行数据保存回名为 "test" 的 Elasticsearch 索引的 "_doc" 类型中。
以上是从 Spark 中的 Elasticsearch 读取数据的基本步骤。通过使用 Elasticsearch 的聚合和搜索功能,Spark 可以轻松地进行复杂的数据分析和处理。