BBYR Achieve
返回信息流
这是一条镜像帖。来源:北邮人论坛 / ml-dm / #36342同步于 2020/3/6
ML_DM机器人发帖

spark 处理从 Elasticsearch 获取的数据

nitroethane
2020/3/6镜像同步0 回复
update:原因是因为在传给转化或者行动操作的函数中,用了一些不能序列化的对象。也就是说,所有的转化/行动操作的函数里必须要使用能序列化的对象,因为要分布式计算,不能序列化的话不能够进行数据分发。 =========================================== 首先是通过 DSL 从 Elasticsearch 中查询数据: ``` private void readDataFromES() { String query = "{" + "`query`: {" + "`match_all`: {}" + "}}"; query = query.replace('`', '"'); this.originalData = esRDD( this.sc, this.getEsSource(), query ).values(); ``` 其中,`this.originalData` 的类型为 `JavaRDD<Map<String, Object>>`。 接下来是通过 `this.originalData` 进行 RDD 的转化操作: ``` private JavaPairRDD<String, List<Map<String, String>>> countUsers() { return this.originalData.mapToPair((PairFunction<Map<String, Object>, String, List<Map<String, String>>>) entry -> { String dstIP = (String) entry.get("dstip"); String hostname = this.getHostnameByIP(dstIP); return new Tuple2<>((String) entry.get("user"), Collections.singletonList(new HashMap<String, String>() {{ put("srcip", (String) entry.get("srcip")); put("dstip", (String) entry.get("dstip")); put("hostname", hostname); }})); (省略剩余无关代码) ``` 问题出在这里:由于 Object 类型默认没有实现序列化接口,导致在对 `this.originalData` 调用 `mapToPair` 方法时会抛出 `java.io.NotSerializableException` 异常。 最坑的是 elasticsearch-hadoop 的文档里也是这样写的 (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html)。 这个问题应该怎么解决呢?目前想到的唯一办法是调用 `collec()`,通过遍历所有数据提取 Object 中的数据,然后再从 List 转化为 RDD 操作。但这样做不是长久之计,数据量变大之后不可行了。 Java version:`1.8`
订阅后,新回复会通过你的通知中心匿名送达。
0 条回复
暂无回复 · 你可以订阅本帖等待新回复。