前言

在公司使用PySpark处理ElasticSearch中的数据。在连接ElasticSearch是死活报错:

import os
from urllib.parse import urlsplit

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /Users/thsheep/Downloads/elasticsearch-hadoop-5.3.2/dist/elasticsearch-spark-20_2.10-5.3.2.jar pyspark-shell'


if __name__ == '__main__':
    conf = SparkConf().setAppName("ESTest")
    sc = SparkContext(conf=conf)
    sqlContext = SQLContext(sc)
    query = """
    {
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "type.keyword": "搜索引擎"
          }
        }
      ],
      "must_not": [],
      "should": []
    }
  }
}
    """
    es_read_conf = {
        "es.nodes": "http://elasticsearch.web.zz",
        "es.port": "9200",
        "es.resource": "eduaio/text",
        "es.input.json": "yes",
        "es.query": query,
    }
    es_rdd = sc.newAPIHadoopRDD(
        inputFormatClass='org.elasticsearch.hadoop.mr.EsInputFormat',
        keyClass='org.apache.hadoop.io.NullWritable',
        valueClass='org.elasticsearch.hadoop.mr.LinkedMapWritable',
        conf=es_read_conf
    )
    sqlContext.createDataFrame(es_rdd).collect()
: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: No data nodes with HTTP-enabled available

在翻了一堆GitHub Issue 和 Google 之后在官方文档找到了这么一段描述:


In an ideal setup, elasticsearch-hadoop achieves best performance when Elasticsearch and Hadoop are fully accessible from every other, that is each node on the Hadoop side can access every node inside the Elasticsearch cluster. This allows maximum parallelism between the two system and thus, as the clusters scale out so does the communication between them.

However not all environments are setup like that, in particular cloud platforms such as Amazon Web Services, Microsoft Azure or Google Compute Engine or dedicated Elasticsearch services like Cloud that allow computing resources to be rented when needed. The typical setup here is for the spawned nodes to be started in the cloud, within a dedicated private network and be made available over the Internet at a dedicated address. This effectively means the two systems, Elasticsearch and Hadoop/Spark, are running on two separate networks that do not fully see each other (if at all); rather all access to it goes through a publicly exposed gateway.

Running elasticsearch-hadoop against such an Elasticsearch instance will quickly run into issues simply because the connector once connected, will discover the cluster nodes, their IPs and try to connect to them to read and/or write. However as the Elasticsearch nodes are using non-routeable, private IPs and are not accessible from outside the cloud infrastructure, the connection to the nodes will fail.

There are several possible workarounds for this problem:


大意是说:Elastic-Hadoop 在Hadoop和Elasticsearch 能完全访问时能实现性能最大化!因为没个Hadoop节点都能直接访问ElasticSearch这样能实现并行最大化。Elastic-Hadoop默认采用这种方式。但是在我们通常情况下ElasticSearch和Hadoop节点不能互相访问。但是我们的程序会启动时候对集群节点IP进行 写入 和 读取操作。所以会造成连接失败。

解决办法:

官方文档给出了解决办法,前两个我们忽略。需要更改基础架构,第三种就不需要了


Introduced in 2.2, elasticsearch-hadoop can be configured to run in WAN mode that is to restrict or completely reduce its parallelism when connecting to Elasticsearch. By setting es.nodes.wan.only, the connector will limit its network usage and instead of connecting directly to the target resource shards, it will make connections to the Elasticsearch cluster only through the nodes declared in es.nodes settings. It will not perform any discovery, ignore data or client nodes and simply make network call through the aforementioned nodes. This effectively ensures that network access happens only through the declared network nodes.

Last but not least, the further the clusters are and the more data needs to go between them, the lower the performance will be since each network call is quite expensive.


大意是: 我们可以设置 es.nodes.wan.only 来不进行检查。

我们来看看配置参数 es.nodes.wan.only 参数:


es.nodes.wan.only (default false) Whether the connector is used against an Elasticsearch instance in a cloud/restricted environment over the WAN, such as Amazon Web Services. In this mode, the connector disables discovery and only connects through the declared es.nodes during all operations, including reads and writes. Note that in this mode, performance is highly affected.


其默认为:false 我们只需要将其设置为 true即可。

注意: 此处明显提示会对性能产生很大影响。因此对新能有要求的话还是使用 第一种和第二种解决办法比较好。

 es_read_conf = {
        "es.nodes": "http://elasticsearch.web.zz",
        "es.port": "9200",
        "es.resource": "eduaio/text",
        "es.input.json": "yes",
        "es.query": query,
        "es.nodes.wan.only": "true"
    }

这样就可以了。