既然有了elasticsearch hadoop为什么还要用hadoop和spark

4551人阅读
这里有一篇很好的文章,很不错,翻译和整理了一下,英文不错的,建议直接看原文:
elasticsearch里面有BOOL&、AND、OR、NOT&,这几个看起来很相似,都有什么区别呢?什么时候用bool?什么时候用AND
filter呢?
事实上,bool filter和AND 、OR、NOT filter 是完全不同,在查询性能上面的影响是非常大的。
首先咱们需要了解的是filter里面都是怎么工作的,其中核心的一个东西叫,可以理解为一个很大的bit数组,数组里面的每个元素有2个状态:0和1(bloom filter知道么?),而filter大家都知道,只处理文档是否匹配与否,不涉及文档评分操作。如果一个文档和filter查询匹配,那么其对应的bit位就设置为1,匹配不上则设置为0。
es在执行filter查询过滤的时候,会打开lucene的每个segment段文件,然后去判断里面的文档符合该filter与否,这个匹配的结果我们就可以用bitset来存储起来,下次同样的filter查询过来,我们就直接使用内存里面的bitset来进行判断就行了,而不需要再打开lucene的segment文件了,避免了io的操作,这样就可以大大提高查询处理的速度,这也是为什么filter这么高效的原因。
因为lucene的segment段文件是不变的,lucene会产生新段,但是旧段是不变的,所以bitset是重复利用的,根据不同的filter条件和不同的段,会产生相应的bitset,另外不同的查询可能会涉及到多个bitset的做交集,计算机对这种bit位处理过程是非常拿手的,速度很快。
另外,如果filter的结果如果是空的,那么里面的bitset位都是0,es以后在处理该filter的时候,会把该bitset整个忽略掉,提高性能。
前面说完了基础内容,咱们再看看bool filter和AND filter这些的区别吧
bool filter会使用到前面提到过的bitset数据结构(bitset派),而AND \OR\ NOTfilter则不能利用到bitset(non-bitset派),为什么呢?
AND、OR、NOT filter是doc by doc的逐个文档的处理,es逐个加载文档里面的字段内容,然后检查字段的内容是否满足查询条件,不满足的文档就排除在结果集之外,依次迭代进行,直到过完一遍所有的文档,这中间的过程用不到前面提到过的bitset,也就不能重复利用缓存资源
如果你有多个filter条件,即一个AND、OR、NOT里面包含多个filter过滤条件(支持数组的方式),那么处理的逻辑就是每个filter会将依次将生成的结果集传到下一个filter,理论上处理的文档数会越来越少,因为只会过滤减少,不会增加,这样依次过滤,所以一般限制条件比较苛刻的可以放前面执行,这样后面的filter需要处理的文档数就会很小,这样可以大大提高整体处理的速度,另外除了数量上的考虑外,还需要考虑filter的效率问题,一些filter执行效率很低,如Geo filter(大量计算)或者script
based filter(动态脚本),建议将这些性能开销比较大的查询放最后执行来提高整体的处理速度。
好了,现在应该有这么一个概念了,AND、OR、NOT是文档by文档,依次处理,如果你的结果集很大,即一个很宽松的查询,命中很多,那么你使用AND、OR、NOT filter是不合适的,但是有些filter是必须文档by文档处理的,如下面的这几个filter:
Geo* filtersScriptsNumeric_range
所以除了上面那几个没有办法的,其它的filter应该一律使用bool filter来提高查询性能。
如果你的查询里面需要同时使用到bitset和non-bitset类型的filter,则可以组合起来使用bool filter和AND\OR\NOT filter,
前面说了,AND 是结果集依次向后传递,所以我们把性能比较好的放前面,non-bitset放AND的filter的后面,如下面一个包含多个filter类型的复杂的filter
&bool& : {
&must& : [
{ &term& : {} },
{ &range& : {} },
{ &term& : {} }
{ &custom_script& : {} },
{ &geo_distance& : {} }
and 在最外层做wrapper,第一个filter是一个bool filter,里面有3个must的子filter,处理完了之后,得到文档结果集,然后再执行一个or的子filter,OR里面两个查询会分别进行,最终的文档结果集就是我们的搜索结果了。
总之,filter使用的时候,一定要优先使用bitset流,然后还要考虑filter顺序和组合的问题
Geo, Script&or&Numeric_range&filter:
使用 And/Or/Not Filters所有其它的: 使用 Bool Filter
掌握了以上这些,就不难写出高性能的查询了。
相关的3个链接:
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:188513次
积分:2283
积分:2283
排名:第11969名
原创:52篇
转载:55篇
评论:18条Spark on elasticsearch-hadoop トライアル - Qiita
この投稿にどのような問題がありますか? スパムです
攻撃的または迷惑な内容を含んでいます
不適切な内容を含んでいます
送信ストックストック済み解除からリンク
送信いただいたご意見への返信は行っておりません。返信の必要な内容については、 からお問い合わせください。Hadoop&Spark解决二次排序问题(Hadoop篇) – 过往记忆
欢迎关注Hadoop、Hive、Hbase、Flume等微信公共账号:iteblog_hadoop。
文章总数:685
浏览总数:7,286,727
评论:3920
分类目录:80 个
注册用户数:1567
最后更新:日
欢迎关注微信公共帐号:iteblog_hadoop
  二次排序就是对每一个key对应的value进行排序,也就是对MapReduce的输出(KEY, Value(v1,v2,v3,......,vn))中的Value(v1,v2,v3,......,vn)值进行排序(升序或者降序),使得Value(s1,s2,s3,......,sn),si ∈ (v1,v2,v3,......,vn)且s1 & s2 & s3 & ...... & sn。假设我们有以下输入文件(逗号分割的分别是年,月,总数):
[ /tmp]# vim data.txt
我们期望的输出结果是
  但是默认的输出结果只能对Key进行排序,其中Value中的值次序是不定的;也就是说,默认的输出可能如下:
  针对这个问题我们有两种方法来解决:(1)、将每个Key对应的Value全部存储到内存(这个只会存储到单台机器),然后对这些Value进行相应的排序。但是如果Value的数据量非常大,导致单台内存无法存储这些数据,这将会导致程序出现java.lang.OutOfMemoryError,所以这个方法不是很通用。(2)、这种方法将Value中的值和旧的Key组成一个新的Key,这样我们就可以利用Reduce来排序这个Key,其生成的结果就是我们需要的。过程如下:
  1、原始的键值对是(k,v)。这里的k就是就的key,也可以 称为natural key;
  2、我们可以将k和v组合成新的key(可以称为composite key),也就是((k,v), v)
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中;
  4、自定义分组函数,将k相同的键值对当作一个分组。
  文字比较枯燥,我们来看看下面实例:
  1、原始数据是
[ /tmp]# vim data.txt
我们将年、月组成key(natural key),总数作为value,结果变成:
  2、将value和key(natural key)组成新的key(composite key),如下:
  3、自定义分区函数,将k相同的键值对发送到同一个Reduce中,结果如下:
[((),24),((),3),((),4),((),21)]
[((),-43),((),0),((),35)]
[((),56),((),46)]
  4、自定义组排序函数,结果如下:
[((),3),((),4),((),21),((),24)]
[((),-43),((),0),((),35)]
[((),46),((),56)]
  5、自定义分组函数,结果如下:
((),(3,4,21,24))
((),(-43,0,35))
((),(46,56))
  6、最后输出的结果就是我们要的:
  下面将贴出使用MapReduce解决这个问题的代码:
package com.
import org.apache.hadoop.io.WritableC
import java.io.DataI
import java.io.DataO
import java.io.IOE
* User: 过往记忆
* Time: 下午23:49
* 本文地址:/archives/1415
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
public class Entry implements WritableComparable&Entry& {
private String yearM
public Entry() {
public int compareTo(Entry entry) {
int result = pareTo(entry.getYearMonth());
if (result == 0) {
result = compare(count, entry.getCount());
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(yearMonth);
dataOutput.writeInt(count);
public void readFields(DataInput dataInput) throws IOException {
this.yearMonth = dataInput.readUTF();
this.count = dataInput.readInt();
public String getYearMonth() {
return yearM
public void setYearMonth(String yearMonth) {
this.yearMonth = yearM
public int getCount() {
public void setCount(int count) {
this.count =
public static int compare(int a, int b) {
return a & b ? -1 : (a & b ? 1 : 0);
public String toString() {
return yearM
  上面就是将旧的Key(natural key)和Value组合成新的Key(composite key)的代码,接下来看下自定义的分区类:
package com.
import org.apache.hadoop.mapreduce.P
public class EntryPartitioner extends Partitioner&Entry, Integer& {
public int getPartition(Entry entry, Integer integer, int numberPartitions) {
return Math.abs((entry.getYearMonth().hashCode() % numberPartitions));
  这个类使得natural key相同的数据分派到同一个Reduce中。然后看下自定义分组类:
package com.
import org.apache.hadoop.io.WritableC
import org.apache.hadoop.io.WritableC
* User: 过往记忆
* Time: 下午23:49
* 本文地址:/archives/1415
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
* 过往记忆博客微信公共帐号:iteblog_hadoop
public class EntryGroupingComparator extends WritableComparator {
public EntryGroupingComparator() {
super(Entry.class, true);
public int compare(WritableComparable a, WritableComparable b) {
Entry a1 = (Entry)
Entry b1 = (Entry)
return a1.getYearMonth().compareTo(b1.getYearMonth());
  只要是natural key相同,我们就认为是同一个分组,这样Reduce内部才可以对Value中的值进行排序。接下来看下Map类
public class SecondarySortMapper extends Mapper&LongWritable, Text, Entry, Text& {
private Entry entry = new Entry();
private Text value = new Text();
protected void map(LongWritable key, Text lines, Context context)
throws IOException, InterruptedException {
String line = lines.toString();
String[] tokens = line.split(&,&);
// YYYY = tokens[0]
// MM = tokens[1]
// count = tokens[2]
String yearMonth = tokens[0] + &-& + tokens[1];
int count = Integer.parseInt(tokens[2]);
entry.setYearMonth(yearMonth);
entry.setCount(count);
value.set(tokens[2]);
context.write(entry, value);
  其实就是解析每一行的数据,然后将旧的Key(natural key)和Value组合成新的Key(composite key)。接下来看下Reduce类实现
public class SecondarySortReducer extends Reducer&Entry, Text, Entry, Text& {
protected void reduce(Entry key, Iterable&Text& values, Context context)
throws IOException, InterruptedException {
StringBuilder builder = new StringBuilder();
for (Text value : values) {
builder.append(value.toString());
builder.append(&,&);
context.write(key, new Text(builder.toString()));
builder存储的就是排序好的Value序列,最后来看看启动程序的使用:
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Iteblog.class);
job.setJobName(&SecondarySort&);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Entry.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setPartitionerClass(EntryPartitioner.class);
job.setGroupingComparatorClass(EntryGroupingComparator.class);
关键看上面第12-15行的代码。下面是运行这个程序的方法和结果:
[ /hadoop]# bin/hadoop jar /tmp/iteblog-1.0-SNAPSHOT.jar
com.iteblog.Main
/iteblog/data.txt /iteblog/output
[ /hadoop]# bin/hadoop fs -cat /iteblog/output/pa*
  明天我将使用来解决这个问题,敬请关注本博客。
本博客文章除特别声明,全部都是原创!
禁止个人和公司转载本文、谢谢理解:
下面文章您可能感兴趣}

我要回帖

更多关于 elasticsearch 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信