其实最大的坑就是storm官方提供的Bolt已经没法用了,就是这玩意儿https://github.com/apache/storm/tree/master/external/storm-elasticsearch。里面的RestClient已经和最新的ES6不兼容了,所以不用去折腾这个库了。
Storm的不用,就用ElasticSearch提供的好了。 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/storm.html。
如何输出到ElasticSearch里面写的已经比较清楚了。关键在于几个关键配置
配置项
es.storm.bolt.write.ack
写到es以后,ack给上游。默认是false,esbolt会在接收了data之后就返回ack,置为true后,esblot需要确定写入es以后才返回,esbolt需要一直保存数据,因此会增加内存使用量。
es.storm.bolt.flush.entries.size
每次一个document不会马上写入index,而是存在本地的一个bulk里面,等到一定数量后批量写入。这里就是配置批量写入的触发数值,当设置为1时,等于每个Tuple都被实时写入es。
es.storm.bolt.tick.tuple.flush
接收一个特定的tick bolt后进行flush,可以从外部触发flush。
es.mapping.id
这个比较有意思,它涉及到一个问题,就是EsBolt是如何处理Tuple到Document的。就正常情况来说,作为Bolt,收到的Tuple结构会定义在Field声明里,EsBolt通过配置这个id的field名称,会将Tuple中匹配的field作为ElasticSearch里面的id。举例来说,一个Tuple定义了两个字段,id和name,并使用es.mapping.id=id来指定id的mapping,当一个tuple的值为{“id”: 1, “name”: “test”}时,输出到ElasticSearch里面就会是一个id为1的文档,文档字段为{“id”:1, “name”:”test”}。如果不指定映射,则文档的值不变,但是id会是ElasticSearch随机生成的值。
es.input.json
继续上面的问题,从上游发布到ESBolt的Tuple需要指定好文档的结构,但是各种类型的数据都可能会有,每种类型都写一个ESBolt的上游会很难看,过于hard code。
很多场景下面,比如日志分析系统,数据最原始的形态是string,也有很大的可能是json字符串,作为分拣的上游bolt,是无法根据每个字符串里面json字段不同,而动态的声明field,所以只能整个输出到下游。也就是说输出的Tuple字段是类似{“doc”: “{\“id\“:1, \“name\”: \“test\”}”},它只有一个字段,但是字段的key是无关紧要的,重要的是值是一个json,我们想保存到ElasticSearch里面的是值本事,而不是key加上值。
默认情况下,ESBolt当然会把这个所谓的”doc”:”xxx”当成文档的属性存起来,文档看起来会是这个样子:
{
"doc": {
"id": 1,
"name" : "test"
}
}
想要访问id的话,就是doc.id。等于被多嵌套了一层。使用es.input.json=true,ESBolt就会把field的value作为输入,而忽略key,从而减少这层嵌套。当然上游的数据源也必须只输出一个field。
es.nodes
ElasticSearch服务的地址。
es.port
ElasticSearch服务的端口。
es.index.auto.create
当目标index不存在时,创建index。
动态输出数据到不同index
当数据内部包含了自己要存到哪个index时,需要解决的问题就是如何输出的问题,有两个途径,一种稍严格但是更强大,另一种简单灵活一些。
使用streamId指定ESBolt
整体思路是,预先定义好所有的index,每个index建立一个ESBolt并声明上游的streamid为index。
数据经过一个RoutingBolt的时候,该Bolt解析数据的目标Index,将数据放置在以index为命名的steram里发送到下游,下游对应的ESBlot则会负责将其输出到对应的index里面。
这种做法利用了一个隐含的约定,即每个ESBolt输出的index与它所接收的数据stream名称之间有一对一的关联,RoutingBolt则利用该关联,解析出数据的目标:index后,将其发送向正确的stream。
这种写法最主要的繁琐在于需要事先确定所有的index和type。但是好处是数据已经根据不同的streamId分流了,针对不同index数据的分析和统计可以很顺利的接进来,不需要自己再分辨。
public enum INDEX {
SHOP_REGISTRY("shop", "registry"),
SHOP_BINDCARD("shop", "bindcard"),
SHOP_CANCEL("shop", "cancel"),
SHOP_ORDER("shop", "order"),
SHOP_WITHDRA("shop", "withdraw");
private String app;
private String business;
private String index;
private String type;
INDEX(String app, String business) {
this.app = app;
this.business = business;
this.index = app + "-" + business;
this.type = business;
}
public String getApp() {
return app;
}
public void setApp(String app) {
this.app = app;
}
public String getBusiness() {
return business;
}
public void setBusiness(String business) {
this.business = business;
}
public String getIndex() {
return index;
}
public void setIndex(String index) {
this.index = index;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
public class FilterBolt implements IRichBolt{
private OutputCollector collector;
public void execute(Tuple input){
String log = input.getString(0);
System.out.println("LOG IS: " + filebeatLog);
try {
Gson gson = new Gson();
Type mapType = new TypeToken<HashMap<String, Object>>() {}.getType();
HashMap<String, Object> message = gson.fromJson(log, mapType);
String app = message.get("app").toString().toLowerCase();
String business = message.get("business").toString().toLowerCase();
LinkedTreeMap source = (LinkedTreeMap)message.get("source");
String streamId = app+"-"+business;
collector.emit(streamId, new Values(gson.toJson(source)));
} catch (Exception e) { e.printStackTrace();}
collector.ack(input);
}
public void prepare(Map StormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void cleanup() {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source"));
for (INDEX index : INDEX.values()) {
declarer.declareStream(index.getIndex().toLowerCase(), new Fields("source"));
}
}
public Map<String, Object> getComponentConfiguration() {return null;}
}
public class ESTopology {
public static void main(String[] args) {
BrokerHosts hosts = new ZkHosts("127.0.0.1:2181");
SpoutConfig config = new SpoutConfig(hosts, "es", "/es", UUID.randomUUID().toString());
config.maxOffsetBehind = 1L;
config.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(config);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-reader", kafkaSpout);
builder.setBolt("filter", new FilterBoltBasedType()).shuffleGrouping("kafka-reader");
INDEX[] indexs = INDEX.values();
Map esConf = new HashMap();
esConf.put("es.input.json", "true");
esConf.put("es.mapping.id", "id");
for (INDEX index : indexs) {
EsBolt esBolt = new EsBolt(index.getIndex() + "/" +index.getType(), esConf);
builder.setBolt("indexer-"+index.getIndex(), esBolt,10).shuffleGrouping("filter", index.getIndex().toLowerCase() );
}
Config conf = new Config();
conf.setDebug(true);
conf.put("es.nodes", "127.0.0.1");
conf.put("es.port", 9200);
conf.put("es.index.auto.create", "true");
conf.put("es.storm.bolt.write.ack", "true");
conf.put("es.storm.bolt.flush.entries.size", 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("ESIndexer", conf,builder.createTopology());
}
}
使用ESBolt的index字段功能
ESBolt还提供了一种方式,可以允许一个ESBlot实例动态的向不同的index输出数据。需要在创建ESBolt的时候,使用index的field代替原本的值,ESBolt就会根据传进来的Tuple上该field的具体值来指定输出的ElasticSearch Index。
在这种情况下,上面例子里的RoutingBolt,就只需要将解析出来的index和type值作为新的字段放在tuple里就行了,而不需要区分stream,下游也只有一个ESBolt。
EsBolt esBolt = new EsBolt( "{index}/{type}", esConf);
builder.setBolt("indexer", esBolt,10).shuffleGrouping("filter" );
String app = message.get("app").toString().toLowerCase();
String business = message.get("business").toString().toLowerCase();
LinkedTreeMap source = (LinkedTreeMap)message.get("source");
source.put("index", streamId);
source.put("type", business);
collector.emit(new Values(gson.toJson(source)));