java - flume-ng - avro event delivery reliability -
i using flume-ng aggregate info produced applications central location processing. producers publish events local http source, forwarded collectors via avro sink. collectors receive events avro sink via avro source. producers replicate events both collectors.
i have noticed few nuances in wild concern me, , have not been able understand finish ramifications of configuration via docs , surrounding info on flume-ng. apple apple comparing of event publication producer indicates not events arrive @ collector. latency of events delivered higher anticipated. see events landing @ front end door of collector hours behind schedule. intermittent exceptions in both downstream , upstream flume-ng logs.
using file channels on producer leads me believe still durable, flume-ng eventually events upstream (correct?). with collectors leveraging memory channels exposing ourselves info loss? if so, there techniques determining how many events lost, or @ to the lowest degree how lost? with collector's transaction capacity lower producer, issues arise? what sane checkpoint intervals file channels requiring guaranteed delivery? assume interval how flume-ng ensures events on local disk (eg. based on configuration expect maximum 1s info loss in powerfulness outage, etc).below relevant snippets of each stage's flume-ng configuration stack trace snippets. wondering if can shed lite on discrepancies in configuration based on aforementioned questions (assuming there any).
upstream exceptions:
(org.apache.avro.ipc.nettyserver$nettyserveravrohandler.channelclosed:209) - connection /<ip address>:<port> disconnected. (org.apache.avro.ipc.nettyserver$nettyserveravrohandler.exceptioncaught:201) - unexpected exception downstream.
downstream exceptions:
(org.apache.flume.sinkrunner$pollingrunner.run:160) - unable deliver event. exception follows. org.apache.flume.eventdeliveryexception: failed send events @ org.apache.flume.sink.abstractrpcsink.process(abstractrpcsink.java:382) @ org.apache.flume.sink.defaultsinkprocessor.process(defaultsinkprocessor.java:68) @ org.apache.flume.sinkrunner$pollingrunner.run(sinkrunner.java:147) @ java.lang.thread.run(thread.java:679) caused by: org.apache.flume.flumeexception: nettyavrorpcclient { host: <producer ip>, port: <producer port> }: rpc connection error @ org.apache.flume.api.nettyavrorpcclient.connect(nettyavrorpcclient.java:161) @ org.apache.flume.api.nettyavrorpcclient.connect(nettyavrorpcclient.java:115) @ org.apache.flume.api.nettyavrorpcclient.configure(nettyavrorpcclient.java:590) @ org.apache.flume.api.rpcclientfactory.getinstance(rpcclientfactory.java:88) @ org.apache.flume.sink.avrosink.initializerpcclient(avrosink.java:127) @ org.apache.flume.sink.abstractrpcsink.createconnection(abstractrpcsink.java:209) @ org.apache.flume.sink.abstractrpcsink.verifyconnection(abstractrpcsink.java:269) @ org.apache.flume.sink.abstractrpcsink.process(abstractrpcsink.java:339) ... 3 more caused by: java.io.ioexception: error connecting /<producer ip>:<producer port> @ org.apache.avro.ipc.nettytransceiver.getchannel(nettytransceiver.java:261) @ org.apache.avro.ipc.nettytransceiver.<init>(nettytransceiver.java:203) @ org.apache.avro.ipc.nettytransceiver.<init>(nettytransceiver.java:152) @ org.apache.flume.api.nettyavrorpcclient.connect(nettyavrorpcclient.java:147) ... 10 more caused by: java.net.noroutetohostexception: no route host @ sun.nio.ch.socketchannelimpl.checkconnect(native method) @ sun.nio.ch.socketchannelimpl.finishconnect(socketchannelimpl.java:592) @ org.jboss.netty.channel.socket.nio.nioclientsocketpipelinesink$boss.connect(nioclientsocketpipelinesink.java:396) @ org.jboss.netty.channel.socket.nio.nioclientsocketpipelinesink$boss.processselectedkeys(nioclientsocketpipelinesink.java:358) @ org.jboss.netty.channel.socket.nio.nioclientsocketpipelinesink$boss.run(nioclientsocketpipelinesink.java:274) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1146) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) ... 1 more
producer configuration:
agent.sources.t1d.type = http agent.sources.t1d.bind = <source ip> agent.sources.t1d.port = <source port> agent.sources.t1d.channels = c01 c02 c03 agent.sources.t1d.selector.type = replicating agent.channels.c01.type = file agent.channels.c01.checkpointdir = /path/to/checkpoint1 agent.channels.c01.datadirs = /path/to/data1 agent.channels.c01.capacity = 4000000 agent.channels.c01.transactioncapacity = 10000 agent.channels.c01.checkpointinterval = 1000 agent.channels.c01.keep-alive = 3 agent.channels.c02.type = file agent.channels.c02.checkpointdir = /path/to/checkpoint2 agent.channels.c02.datadirs = /path/to/data2 agent.channels.c02.capacity = 4000000 agent.channels.c02.transactioncapacity = 10000 agent.channels.c02.checkpointinterval = 1000 agent.channels.c02.keep-alive = 3 agent.channels.c03.type = file agent.channels.c03.checkpointdir = /path/to/checkpoint3 agent.channels.c03.datadirs = /path/to/data3 agent.channels.c03.capacity = 4000000 agent.channels.c03.transactioncapacity = 10000 agent.channels.c03.checkpointinterval = 1000 agent.channels.c03.keep-alive = 3 agent.sinks.s01.type = avro agent.sinks.s01.hostname = <collector ip> agent.sinks.s01.port = <collector port> agent.sinks.s01.channel = c01 agent.sinks.s01.batch-size = 1000 agent.sinks.s01.compression-type = deflate agent.sinks.s01.compression-level = 5 agent.sinks.s02.type = avro agent.sinks.s02.hostname = <collector ip> agent.sinks.s02.port = <collector port> agent.sinks.s02.channel = c02 agent.sinks.s02.batch-size = 1000 agent.sinks.s02.compression-type = deflate agent.sinks.s02.compression-level = 5 agent.sinks.s03.type = file_roll agent.sinks.s03.channel = c03 agent.sinks.s03.sink.directory = /path/to/producer/reconciliation agent.sinks.s03.sink.rollinterval = 1200
collector configuration:
agent.sources.a.type = avro agent.sources.a.bind = <source ip> agent.sources.a.port = <source port> agent.sources.a.channels = c01 c03 agent.sources.a.compression-type = deflate agent.channels.c01.type = memory agent.channels.c01.capacity = 1000000 agent.channels.c01.transactioncapacity = 1000 agent.channels.c03.type = memory agent.channels.c03.capacity = 1000000 agent.channels.c03.transactioncapacity = 1000 agent.sinks.s01.type = file_roll agent.sinks.s01.channel = c01 agent.sinks.s01.sink.directory = /path/to/storage/s01 agent.sinks.s01.sink.rollinterval = 300 agent.sinks.s03.type = file_roll agent.sinks.s03.channel = c03 agent.sinks.s03.sink.directory = /path/to/storage/s03 agent.sinks.s03.sink.rollinterval = 300
collector b configuration:
agent.sources.b.type = avro agent.sources.b.bind = <source ip> agent.sources.b.port = <source port> agent.sources.b.channels = c11 c13 agent.sources.b.compression-type = deflate agent.channels.c11.type = memory agent.channels.c11.capacity = 1000000 agent.channels.c11.transactioncapacity = 1000 agent.channels.c13.type = memory agent.channels.c13.capacity = 1000000 agent.channels.c13.transactioncapacity = 1000 agent.sinks.s11.type = file_roll agent.sinks.s11.channel = c11 agent.sinks.s11.sink.directory = /path/to/storage/s11 agent.sinks.s11.sink.rollinterval = 300 agent.sinks.s13.type = file_roll agent.sinks.s13.channel = c13 agent.sinks.s13.sink.directory = /path/to/storage/s13 agent.sinks.s13.sink.rollinterval = 300
java flume flume-ng
No comments:
Post a Comment