📜  从 spark 中的 elasticsearch 读取 (1)

📅  最后修改于: 2023-12-03 14:49:19.313000             🧑  作者: Mango

从 Spark 中的 Elasticsearch 读取

Apache Spark 可以轻松地从 Elasticsearch 中读取数据,并利用 Elasticsearch 的强大搜索和聚合功能进行分析。本文将介绍如何使用 Spark 从 Elasticsearch 中加载数据。

步骤1:配置 Elasticsearch

首先需要安装 Elasticsearch,并创建一个索引和一些文档。可以使用curl命令来创建索引:

curl -H "Content-Type: application/json" -XPUT "http://localhost:9200/test" -d '
{
  "mappings": {
    "properties": {
      "name": {"type": "keyword"},
      "age": {"type": "integer"},
      "gender": {"type": "keyword"}
    }
  }
}'

然后使用以下命令向索引中添加一些文档:

curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Alice", "age": 25, "gender": "female"}'
curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Bob", "age": 30, "gender": "male"}'
curl -H "Content-Type: application/json" -XPOST "http://localhost:9200/test/_doc" -d '{"name": "Charlie", "age": 35, "gender": "male"}'
步骤2:创建 Spark 会话

在读取 Elasticsearch 数据之前,需要创建一个 Spark 会话。以下是创建 Spark 会话的代码:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
  .appName("ElasticsearchReader")
  .master("local[*]")
  .getOrCreate()
步骤3:读取 Elasticsearch 数据

现在可以使用 Spark 中的 Elasticsearch 数据源 API 从 Elasticsearch 中读取数据。以下是代码:

val elasticOptions = Map(
  "es.nodes" -> "localhost",
  "es.port" -> "9200",
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.read.field.as.array.include" -> "tags"
)

val people = spark.read.format("org.elasticsearch.spark.sql")
  .options(elasticOptions)
  .load("test/_doc")

people.show()

以上代码将从名为 "test" 的 Elasticsearch 索引的 "_doc" 类型中读取所有文档,并将它们加载到 Spark DataFrame 中。可以使用 show() 方法查看 DataFrame 的内容。

步骤4:将数据保存到 Elasticsearch

可以将 Spark DataFrame 中的数据保存回 Elasticsearch 索引中。可以使用以下代码:

val elasticOptions = Map(
  "es.nodes" -> "localhost",
  "es.port" -> "9200",
  "es.index.auto.create" -> "true",
  "es.nodes.wan.only" -> "true",
  "es.write.operation" -> "index"
)

val people = Seq(("David", 40, "male"), ("Elsa", 35, "female"))
  .toDF("name", "age", "gender")

people.write.format("org.elasticsearch.spark.sql")
  .options(elasticOptions)
  .mode("Append")
  .save("test/_doc")

此代码将一个包含三列的 DataFrame 中的两行数据保存回名为 "test" 的 Elasticsearch 索引的 "_doc" 类型中。

以上是从 Spark 中的 Elasticsearch 读取数据的基本步骤。通过使用 Elasticsearch 的聚合和搜索功能,Spark 可以轻松地进行复杂的数据分析和处理。