• spark导入elasticsearch


    前面简单的介绍了elasticsearch。现在开始使用,当我们面对海量数据的时候,如果想把数据导入到es,肯定不能想以前那样单条导入,我使用的是spark导入到es的批量导入。第三方依赖包:这里写链接内容
    可以到这里下载相应的依赖包解压使用。PS:在使用的时候一定要记得spark和scala以及es的对应关系,我使用的是spark1.6.2 ,scala使用的是2.10.4,在使用的时spark-es的jar包elasticsearch-spark_2.10-2.3.3,该版本对应的编译的时候使用的是scala2.10.5.。开始没注意导致出现了一个问题:错误提示为:java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
    好了,下面开始介绍使用spark-es:
    导入依赖包到spark环境变量:
    spark使用es的时候,需要配置es的相应参数两种方式:
    1)、val conf = new SparkConf( ).setAppName( "loadToES" ) conf.set("es.nodes", "192.168.0.23") conf.set("es.port", "9200") conf.set("es.cluster.name","elasticsearch") val sc = new SparkContext( conf )
    2)、val configuration = Map("es.nodes"->"192.168.0.23","es.port"->"9200","es.cluster.name"->"elasticsearch")
    spark使用es的时候分为导入和读取,spark分别提供如下两个接口:
    EsSpark和EsSparkSQL,分别使用spark-rdd和spark-DataFrame的方式进行读取和写入
    EsSpark-rdd读取:
    1)def esRDD(sc: SparkContext): RDD[(String, Map[String, AnyRef])]
    2)def esRDD(sc: SparkContext, resource: String, query: String, cfg: Map[String, String]): RDD[(String, Map[String, AnyRef])]
    3)def esJsonRDD(sc: SparkContext, resource: String, query: String, cfg: Map[String, String]): RDD[(String, String)]
    例如:sc.esRDD(“radio/artists”, “q=me*”)
    查询出create an RDD streaming all the documents matching me* from index radio/artists

    val conf = new JobConf()
    conf.set(“es.resource”, “radio/artists”)
    conf.set(“es.query”, “q=me*”)
    val esRDD = sc.hadoopRDD(conf,
    classOf[EsInputFormat[Text, MapWritable]],
    classOf[Text], classOf[MapWritable]))
    val docCount = esRDD.count();

    val conf = new Configuration()
    conf.set(“es.resource”, “radio/artists”)
    conf.set(“es.query”, “q=me*”)
    val esRDD = sc.newAPIHadoopRDD(conf,
    classOf[EsInputFormat[Text, MapWritable]],
    classOf[Text], classOf[MapWritable]))
    val docCount = esRDD.count();

    EsSpark-rdd批量写入:
    1)def saveToEs(rdd: RDD[_], resource: String, cfg: Map[String, String])
    2)def saveToEsWithMeta[K,V](rdd: RDD[(K,V)], resource: String, cfg: Map[String, String])
    3)saveJsonToEs(rdd: RDD[_], resource: String, cfg: Map[String, String])

    EsSpark-DataFrame批量写入:
    1)def esDF(sc: SQLContext, resource: String, query: String)
    // Spark 1.3 style
    val df = sql.load(“spark/index”, “org.elasticsearch.spark.sql”)

    // Spark 1.4 style
    val df = sql.read.format(“org.elasticsearch.spark.sql”).load(“spark/index”)
    // Spark 1.5 style
    val df = sql.read.format(“es”).load(“spark/index”)
    val sql = new SQLContext…
    // options for Spark 1.3 need to include the target path/resource
    val options13 = Map(“path” -> “spark/index”,
    “pushdown” -> “true”,
    “es.nodes” -> “someNode”, “es.port” -> “9200”)

    // Spark 1.3 style
    val spark13DF = sql.load(“org.elasticsearch.spark.sql”, options13)

    // options for Spark 1.4 - the path/resource is specified separately
    val options = Map(“pushdown” -> “true”, “es.nodes” -> “someNode”, “es.port” -> “9200”)

    // Spark 1.4 style
    val spark14DF = sql.read.format(“org.elasticsearch.spark.sql”)
    .options(options).load(“spark/index”)

    pushdown option - specific to Spark data sources

    sqlContext.sql(
    “CREATE TEMPORARY TABLE myIndex ” +
    “USING org.elasticsearch.spark.sql ” +
    “OPTIONS ( resource ‘spark/index’, nodes ‘spark/index’)” ) “

    include

    es.read.field.include = name, address.

    exclude

    es.read.field.exclude = *.created
    2)

    def saveToEs(srdd: DataFrame, resource: String, cfg: Map[String, String])

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.SQLContext._

    import org.elasticsearch.spark.sql._

    val sqlContext = new SQLContext(sc)
    case class Person(name: String, surname: String, age: Int)
    val people = sc.textFile(“people.txt”)
    .map(_.split(“,”))
    .map(p => Person(p(0), p(1), p(2).trim.toInt))
    .toDF()
    people.saveToEs(“spark/people”)

    使用案例:

    使用自定义的_id:
    方法一:
    val rdd = sc.textFile("hdfs://master:9000/es.nb").map( x => (Hashing.md5().hashString( x,Charsets.UTF_8 ).asLong(),x))
        val relation = sqlContext.createDataFrame( rdd ).toDF("key","val")
        EsSparkSQL.saveToEs( relation,"es/info",configuration )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    val otp = Map(“iata” -> “OTP”, “name” -> “Otopeni”)
    val muc = Map(“iata” -> “MUC”, “name” -> “Munich”)
    val sfo = Map(“iata” -> “SFO”, “name” -> “San Fran”)

    // instance of SparkContext
    val sc = …

    val airportsRDD = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo)))
    pairRDD.saveToEsWithMeta(airportsRDD, “airports/2015”)
    此方式实现的是指定_id;

    方法二:
    import org.elasticsearch.spark.rdd.Metadata._ (1)

    val otp = Map(“iata” -> “OTP”, “name” -> “Otopeni”)
    val muc = Map(“iata” -> “MUC”, “name” -> “Munich”)
    val sfo = Map(“iata” -> “SFO”, “name” -> “San Fran”)

    // metadata for each document
    // note it’s not required for them to have the same structure
    val otpMeta = Map(ID -> 1, TTL -> “3h”) (2)
    val mucMeta = Map(ID -> 2, VERSION -> “23”) (3)
    val sfoMeta = Map(ID -> 3) (4)

    // instance of SparkContext
    val sc = …

    val airportsRDD = sc.makeRDD(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) (5)
    pairRDD.saveToEsWithMeta(airportsRDD, “airports/2015”) (6)
    (1)导入这个包,即元数据的枚举类型
    (2)ID with a value of 1 and TTL with a value of 3h

    其他的方法大家可以尝试一下。小杨也是刚接触elasticsearch,处于摸索阶段,希望大家多多指导。欢迎来访,qq:791279468

  • 相关阅读:
    Boost库学习笔记(三)内存对齐模块
    多路并归,贪心:《信息学奥赛一本通》:池塘钓鱼
    Aeron:Online Resources
    Socket通信
    高等数学(第七版)同济大学 习题10-5 个人解答
    (十四)51单片机——LCD1602实现滚动效果
    Ubuntu 系统上使用 QQ 邮箱的 SMTP 服务器发送邮件,msmtp(已验证)
    Swin-Transformer 从数据尺度变换角度解析
    RocketMQ知识点总结
    数据结构 | 队列的实现
  • 原文地址:https://blog.csdn.net/m0_67401134/article/details/126496571