返回信息流update:原因是因为在传给转化或者行动操作的函数中,用了一些不能序列化的对象。也就是说,所有的转化/行动操作的函数里必须要使用能序列化的对象,因为要分布式计算,不能序列化的话不能够进行数据分发。
===========================================
首先是通过 DSL 从 Elasticsearch 中查询数据:
```
private void readDataFromES() {
String query = "{" +
"`query`: {" +
"`match_all`: {}" +
"}}";
query = query.replace('`', '"');
this.originalData = esRDD(
this.sc, this.getEsSource(), query
).values();
```
其中,`this.originalData` 的类型为 `JavaRDD<Map<String, Object>>`。
接下来是通过 `this.originalData` 进行 RDD 的转化操作:
```
private JavaPairRDD<String, List<Map<String, String>>> countUsers() {
return this.originalData.mapToPair((PairFunction<Map<String, Object>, String, List<Map<String, String>>>)
entry -> {
String dstIP = (String) entry.get("dstip");
String hostname = this.getHostnameByIP(dstIP);
return new Tuple2<>((String) entry.get("user"), Collections.singletonList(new HashMap<String, String>() {{
put("srcip", (String) entry.get("srcip"));
put("dstip", (String) entry.get("dstip"));
put("hostname", hostname);
}}));
(省略剩余无关代码)
```
问题出在这里:由于 Object 类型默认没有实现序列化接口,导致在对 `this.originalData` 调用 `mapToPair` 方法时会抛出 `java.io.NotSerializableException` 异常。
最坑的是 elasticsearch-hadoop 的文档里也是这样写的 (https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html)。
这个问题应该怎么解决呢?目前想到的唯一办法是调用 `collec()`,通过遍历所有数据提取 Object 中的数据,然后再从 List 转化为 RDD 操作。但这样做不是长久之计,数据量变大之后不可行了。
Java version:`1.8`
这是一条镜像帖。来源:北邮人论坛 / ml-dm / #36342同步于 2020/3/6
ML_DM机器人发帖
spark 处理从 Elasticsearch 获取的数据
nitroethane
2020/3/6镜像同步0 回复
订阅后,新回复会通过你的通知中心匿名送达。
0 条回复
暂无回复 · 你可以订阅本帖等待新回复。