Java源码示例:org.apache.flume.conf.ConfigurationException
示例1
@Override
protected void doConfigure(Context context) throws FlumeException {
LOGGER.trace("configure...");
canalConf.setServerUrl(context.getString(CanalSourceConstants.SERVER_URL));
canalConf.setServerUrls(context.getString(CanalSourceConstants.SERVER_URLS));
canalConf.setZkServers(context.getString(CanalSourceConstants.ZOOKEEPER_SERVERS));
canalConf.setDestination(context.getString(CanalSourceConstants.DESTINATION));
canalConf.setUsername(context.getString(CanalSourceConstants.USERNAME, CanalSourceConstants.DEFAULT_USERNAME));
canalConf.setPassword(context.getString(CanalSourceConstants.PASSWORD, CanalSourceConstants.DEFAULT_PASSWORD));
canalConf.setFilter(context.getString(CanalSourceConstants.FILTER));
canalConf.setBatchSize(context.getInteger(CanalSourceConstants.BATCH_SIZE, CanalSourceConstants.DEFAULT_BATCH_SIZE));
canalConf.setOldDataRequired(context.getBoolean(CanalSourceConstants.OLD_DATA_REQUIRED, CanalSourceConstants.DEFAULT_OLD_DATA_REQUIRED));
if (!canalConf.isConnectionUrlValid()) {
throw new ConfigurationException(String.format("\"%s\",\"%s\" AND \"%s\" at least one must be specified!",
CanalSourceConstants.ZOOKEEPER_SERVERS,
CanalSourceConstants.SERVER_URL,
CanalSourceConstants.SERVER_URLS));
}
}
示例2
/**
* Some of the producer properties are especially important
* We documented them and gave them a camel-case name to match Flume config
* If user set these, we will override any existing parameters with these
* settings.
* Knowledge of which properties are documented is maintained here for now.
* If this will become a maintenance issue we'll set a proper data structure.
*/
private static void addDocumentedKafkaProps(Context context,
Properties kafkaProps)
throws ConfigurationException {
String zookeeperConnect = context.getString(
KafkaSourceConstants.ZOOKEEPER_CONNECT_FLUME);
if (zookeeperConnect == null) {
throw new ConfigurationException("ZookeeperConnect must contain " +
"at least one ZooKeeper server");
}
kafkaProps.put(KafkaSourceConstants.ZOOKEEPER_CONNECT, zookeeperConnect);
String groupID = context.getString(KafkaSourceConstants.GROUP_ID_FLUME);
if (groupID != null ) {
kafkaProps.put(KafkaSourceConstants.GROUP_ID, groupID);
}
}
示例3
/**
* We configure the source and generate properties for the Kafka Consumer
*
* Kafka Consumer properties are generated as follows:
*
* 1. Generate a properties object with some static defaults that can be
* overridden by Source configuration 2. We add the configuration users added
* for Kafka (parameters starting with kafka. and must be valid Kafka Consumer
* properties 3. We add the source documented parameters which can override
* other properties
*
* @param context
*/
public void configure(Context context) {
this.context = context;
batchUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_SIZE,
KafkaSourceConstants.DEFAULT_BATCH_SIZE);
timeUpperLimit = context.getInteger(KafkaSourceConstants.BATCH_DURATION_MS,
KafkaSourceConstants.DEFAULT_BATCH_DURATION);
topic = context.getString(KafkaSourceConstants.TOPIC);
if(topic == null) {
throw new ConfigurationException("Kafka topic must be specified.");
}
kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
consumerTimeout = Integer.parseInt(kafkaProps.getProperty(
KafkaSourceConstants.CONSUMER_TIMEOUT));
kafkaAutoCommitEnabled = Boolean.parseBoolean(kafkaProps.getProperty(
KafkaSourceConstants.AUTO_COMMIT_ENABLED));
if (counter == null) {
counter = new KafkaSourceCounter(getName());
}
}
示例4
public void configure(Context context) {
this.context = context;
this.batchUpperLimit = context.getInteger("batchSize", Integer.valueOf(1000)).intValue();
this.timeUpperLimit = context.getInteger("batchDurationMillis", Integer.valueOf(1000)).intValue();
this.topic = context.getString("topic");
if(this.topic == null) {
throw new ConfigurationException("Kafka topic must be specified.");
} else {
this.kafkaProps = KafkaSourceUtil.getKafkaProperties(context);
this.consumerTimeout = Integer.parseInt(this.kafkaProps.getProperty("consumer.timeout.ms"));
this.kafkaAutoCommitEnabled = Boolean.parseBoolean(this.kafkaProps.getProperty("auto.commit.enable"));
if(this.counter == null) {
this.counter = new KafkaSourceCounter(this.getName());
}
}
}
示例5
@Override
public synchronized void start() {
LOGGER.info("Starting Morphline Sink {} ...", this);
sinkCounter.start();
if (handler == null) {
MorphlineHandler tmpHandler;
try {
tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
} catch (Exception e) {
throw new ConfigurationException(e);
}
tmpHandler.configure(context);
handler = tmpHandler;
}
super.start();
LOGGER.info("Morphline Sink {} started.", getName());
}
示例6
@Override
public void configure(Context context) throws ConfigurationException {
super.configure(context);
sinks = Arrays.asList(context.getString(
BasicConfigurationConstants.CONFIG_SINKS).split("\\s+"));
Map<String, String> params = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SINK_PROCESSOR_PREFIX);
processorContext = new Context();
processorContext.putAll(params);
SinkProcessorType spType = getKnownSinkProcessor(processorContext.getString(
BasicConfigurationConstants.CONFIG_TYPE));
if (spType != null) {
processorConf =
(SinkProcessorConfiguration) ComponentConfigurationFactory.create(
this.getComponentName() + "-processor",
spType.toString(),
ComponentType.SINK_PROCESSOR);
if (processorConf != null) {
processorConf.setSinks(new HashSet<String>(sinks));
processorConf.configure(processorContext);
}
}
setConfigured();
}
示例7
@Override
public void configure(Context context) {
topic = context.getString(CONF_TOPIC);
if (topic == null) {
throw new ConfigurationException("Kafka topic must be specified.");
}
writeBody = context.getBoolean(CONF_WRITE_BODY, DEFAULT_WRITE_BODY);
ImmutableMap<String, String> subProperties = context.getSubProperties(CONF_KAFKA);
Properties properties = new Properties();
properties.putAll(subProperties);
producer = new Producer<String, String>(new ProducerConfig(properties));
mapper = new ObjectMapper();
}
示例8
@Override
public void configure(Context context) {
conf = new SinkGroupConfiguration("sinkgrp");
try {
conf.configure(context);
} catch (ConfigurationException e) {
throw new FlumeException("Invalid Configuration!", e);
}
configure(conf);
}
示例9
@Override
public void configure(Context context) {
// TODO Auto-generated method stub
this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, DEFAULT_POLL_FREQUENCY);
//zabbix hosts
String hosts = context.getString(this.CONF_HOSTS);
if (hosts == null || hosts.isEmpty()) {
throw new ConfigurationException("Hosts list cannot be empty.");
}
parseHostsFromString(hosts);
}
示例10
@Override
public void configure(Context context) {
this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY, 60);
String localHosts = context.getString(this.CONF_HOSTS);
if (localHosts == null || localHosts.isEmpty()) {
throw new ConfigurationException("Hosts list cannot be empty.");
}
this.hosts = this.getHostsFromString(localHosts);
this.isGanglia3 = context.getBoolean(this.CONF_ISGANGLIA3, false);
}
示例11
@Override
public void configure(Context context) {
this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);
if (this.maxBlobLength <= 0) {
throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY
+ " must be greater than zero: " + maxBlobLength);
}
}
示例12
protected BlobDeserializer(Context context, ResettableInputStream in) {
this.in = in;
this.maxBlobLength = context.getInteger(MAX_BLOB_LENGTH_KEY, MAX_BLOB_LENGTH_DEFAULT);
if (this.maxBlobLength <= 0) {
throw new ConfigurationException("Configuration parameter " + MAX_BLOB_LENGTH_KEY
+ " must be greater than zero: " + maxBlobLength);
}
this.isOpen = true;
}
示例13
public void configure(Context context) throws ConfigurationException {
super.configure(context);
this.channel = context.getString("channel");
if (this.channel == null || this.channel.isEmpty()) {
errors
.add(new FlumeConfigurationError(componentName, "channel",
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
throw new ConfigurationException("No channel configured for sink: "
+ this.getComponentName());
}
}
示例14
/**
* Test fails without FLUME-1847
*/
@Test(expected = ConfigurationException.class)
public void testFLUME1847() throws Exception {
Context context = new Context();
context.put("type", "something");
SourceConfiguration sourceConfig = new SourceConfiguration("src");
sourceConfig.configure(context);
}
示例15
@Test
public void confWithMissingTablesFails() {
final CassandraSink sink = new CassandraSink();
final Context context = new Context();
thrown.expect(ConfigurationException.class);
thrown.expectMessage("tables is mandatory");
sink.configure(context);
}
示例16
@Test
public void confWithBadHostsFails() {
final CassandraSink sink = new CassandraSink();
final Context context = new Context();
context.put("tables", "keyspace.table");
context.put("hosts", "localhost:9badport");
thrown.expect(ConfigurationException.class);
thrown.expectMessage("Could not parse host: localhost:9badport");
thrown.expectCause(new CauseMatcher(IllegalArgumentException.class));
sink.configure(context);
}
示例17
@Test
public void confMissingCqlFileFails() {
final CassandraSink sink = new CassandraSink();
final Context context = new Context();
context.put("tables", "keyspace.table");
context.put("cqlFile", "/NOT/FOUND/MY.CQL");
thrown.expect(ConfigurationException.class);
thrown.expectMessage("Cannot read CQL file: /NOT/FOUND/MY.CQL");
thrown.expectCause(new CauseMatcher(FileNotFoundException.class));
sink.configure(context);
}
示例18
public void configure(Context context) {
topic = context.getString("topic");
if (topic == null) {
throw new ConfigurationException("Kafka topic must be specified.");
}
producer = KafkaSinkUtil.getProducer(context);
}
示例19
@Override
public void configure(Context ctx) {
String topicStr = ctx.getString(TOPIC);
if (topicStr == null || topicStr.isEmpty()) {
topicStr = DEFAULT_TOPIC;
LOGGER
.info("Topic was not specified. Using " + topicStr + " as the topic.");
}
topic.set(topicStr);
String groupId = ctx.getString(GROUP_ID_FLUME);
if (groupId == null || groupId.isEmpty()) {
groupId = DEFAULT_GROUP_ID;
LOGGER.info(
"Group ID was not specified. Using " + groupId + " as the group id.");
}
String brokerList = ctx.getString(BROKER_LIST_FLUME_KEY);
if (brokerList == null || brokerList.isEmpty()) {
throw new ConfigurationException("Broker List must be specified");
}
String zkConnect = ctx.getString(ZOOKEEPER_CONNECT_FLUME_KEY);
if (zkConnect == null || zkConnect.isEmpty()) {
throw new ConfigurationException(
"Zookeeper Connection must be specified");
}
Long timeout = ctx.getLong(TIMEOUT, Long.valueOf(DEFAULT_TIMEOUT));
kafkaConf.putAll(ctx.getSubProperties(KAFKA_PREFIX));
kafkaConf.put(GROUP_ID, groupId);
kafkaConf.put(BROKER_LIST_KEY, brokerList);
kafkaConf.put(ZOOKEEPER_CONNECT, zkConnect);
kafkaConf.put(AUTO_COMMIT_ENABLED, String.valueOf(false));
kafkaConf.put(CONSUMER_TIMEOUT, String.valueOf(timeout));
kafkaConf.put(REQUIRED_ACKS_KEY, "-1");
LOGGER.info(kafkaConf.toString());
parseAsFlumeEvent =
ctx.getBoolean(PARSE_AS_FLUME_EVENT, DEFAULT_PARSE_AS_FLUME_EVENT);
boolean readSmallest = ctx.getBoolean(READ_SMALLEST_OFFSET,
DEFAULT_READ_SMALLEST_OFFSET);
// If the data is to be parsed as Flume events, we always read the smallest.
// Else, we read the configuration, which by default reads the largest.
if (parseAsFlumeEvent || readSmallest) {
// readSmallest is eval-ed only if parseAsFlumeEvent is false.
// The default is largest, so we don't need to set it explicitly.
kafkaConf.put("auto.offset.reset", "smallest");
}
if (counter == null) {
counter = new KafkaChannelCounter(getName());
}
}
示例20
public void configure(Context context) throws ConfigurationException {
super.configure(context);
}
示例21
@Override
public void configure(Context context) throws ConfigurationException {
super.configure(context);
}
示例22
public void configure(Context context) throws ConfigurationException {
super.configure(context);
try {
String channelList = context.getString(
BasicConfigurationConstants.CONFIG_CHANNELS);
if (channelList != null) {
this.channels =
new HashSet<String>(Arrays.asList(channelList.split("\\s+")));
}
if (channels.isEmpty()) {
errors.add(new FlumeConfigurationError(componentName,
ComponentType.CHANNEL.getComponentType(),
FlumeConfigurationErrorType.PROPERTY_VALUE_NULL,
ErrorOrWarning.ERROR));
throw new ConfigurationException("No channels set for "
+ this.getComponentName());
}
Map<String, String> selectorParams = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
String selType;
if (selectorParams != null && !selectorParams.isEmpty()) {
selType = selectorParams.get(BasicConfigurationConstants.CONFIG_TYPE);
} else {
selType = ChannelSelectorConfigurationType.REPLICATING.toString();
}
if (selType == null || selType.isEmpty()) {
selType = ChannelSelectorConfigurationType.REPLICATING.toString();
}
ChannelSelectorType selectorType =
this.getKnownChannelSelector(selType);
Context selectorContext = new Context();
selectorContext.putAll(selectorParams);
String config = null;
if (selectorType == null) {
config = selectorContext.getString(
BasicConfigurationConstants.CONFIG_CONFIG);
if (config == null || config.isEmpty()) {
config = "OTHER";
}
} else {
config = selectorType.toString().toUpperCase();
}
this.selectorConf =
(ChannelSelectorConfiguration) ComponentConfigurationFactory
.create(ComponentType.CHANNELSELECTOR.getComponentType(), config,
ComponentType.CHANNELSELECTOR);
selectorConf.setChannels(channels);
selectorConf.configure(selectorContext);
} catch (Exception e) {
errors.add(new FlumeConfigurationError(componentName,
ComponentType.CHANNELSELECTOR.getComponentType(),
FlumeConfigurationErrorType.CONFIG_ERROR,
ErrorOrWarning.ERROR));
throw new ConfigurationException("Failed to configure component!", e);
}
}
示例23
public void configure(Context context) throws ConfigurationException {
}