Java源码示例:com.google.api.gax.batching.BatchingSettings
示例1
@Bean
@ConditionalOnMissingBean(name = "publisherBatchSettings")
public BatchingSettings publisherBatchSettings() {
BatchingSettings.Builder builder = BatchingSettings.newBuilder();
GcpPubSubProperties.Batching batching = this.gcpPubSubProperties.getPublisher()
.getBatching();
FlowControlSettings flowControlSettings = buildFlowControlSettings(batching.getFlowControl());
if (flowControlSettings != null) {
builder.setFlowControlSettings(flowControlSettings);
}
return ifNotNull(batching.getDelayThresholdSeconds(),
(x) -> builder.setDelayThreshold(Duration.ofSeconds(x)))
.apply(ifNotNull(batching.getElementCountThreshold(), builder::setElementCountThreshold)
.apply(ifNotNull(batching.getEnabled(), builder::setIsEnabled)
.apply(ifNotNull(batching.getRequestByteThreshold(), builder::setRequestByteThreshold)
.apply(false)))) ? builder.build() : null;
}
示例2
@Bean
@ConditionalOnMissingBean
public PublisherFactory defaultPublisherFactory(
@Qualifier("publisherExecutorProvider") ExecutorProvider executorProvider,
@Qualifier("publisherBatchSettings") ObjectProvider<BatchingSettings> batchingSettings,
@Qualifier("publisherRetrySettings") ObjectProvider<RetrySettings> retrySettings,
TransportChannelProvider transportChannelProvider) {
DefaultPublisherFactory factory = new DefaultPublisherFactory(this.finalProjectIdProvider);
factory.setExecutorProvider(executorProvider);
factory.setCredentialsProvider(this.finalCredentialsProvider);
factory.setHeaderProvider(this.headerProvider);
factory.setChannelProvider(transportChannelProvider);
retrySettings.ifAvailable(factory::setRetrySettings);
batchingSettings.ifAvailable(factory::setBatchingSettings);
return factory;
}
示例3
CPSPublisherTask(StartRequest request, MetricsHandler metricsHandler, int workerCount) {
super(request, metricsHandler, workerCount);
log.warn("constructing CPS publisher");
this.payload = getPayload();
try {
this.publisher =
Publisher.newBuilder(ProjectTopicName.of(request.getProject(), request.getTopic()))
.setBatchingSettings(
BatchingSettings.newBuilder()
.setElementCountThreshold((long) request.getPublisherOptions().getBatchSize())
.setRequestByteThreshold(9500000L)
.setDelayThreshold(
Duration.ofMillis(
Durations.toMillis(request.getPublisherOptions().getBatchDuration())))
.setIsEnabled(true)
.build())
.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例4
private SinkFactory createFlushingPool(final RetrySettings retrySettings,
final BatchingSettings batchingSettings) {
return (vc, sinkName, registry) -> {
final String projectId = vc.configuration().global.gcps.projectId.orElseThrow(IllegalStateException::new);
final ProjectTopicName topicName = ProjectTopicName.of(projectId, topic);
final Publisher.Builder builder =
Publisher.newBuilder(topicName)
.setRetrySettings(retrySettings)
.setBatchingSettings(batchingSettings);
final Publisher publisher = IOExceptions.wrap(builder::build).get();
return new GoogleCloudPubSubFlushingPool(sinkName,
vc.configuration().global.gcps.threads,
vc.configuration().global.gcps.bufferSize,
publisher,
Optional.empty(),
registry.getSchemaBySinkName(sinkName));
};
}
示例5
private Publisher getPublisher() throws IOException {
return Publisher.newBuilder(topic)
.setCredentialsProvider(credentialsProvider)
.setChannelProvider(getChannelProvider())
// Batching settings borrowed from PubSub Load Test Framework
.setBatchingSettings(
BatchingSettings.newBuilder()
.setElementCountThreshold(950L)
.setRequestByteThreshold(9500000L)
.setDelayThreshold(Duration.ofMillis(10))
.build())
.build();
}
示例6
@Test
public void testPublisherBatchingSettings() {
this.contextRunner.run((context) -> {
BatchingSettings settings = context.getBean("publisherBatchSettings",
BatchingSettings.class);
assertThat(settings.getFlowControlSettings().getMaxOutstandingElementCount()).isEqualTo(19);
assertThat(settings.getFlowControlSettings().getMaxOutstandingRequestBytes()).isEqualTo(20);
assertThat(settings.getFlowControlSettings().getLimitExceededBehavior())
.isEqualTo(LimitExceededBehavior.Ignore);
assertThat(settings.getElementCountThreshold()).isEqualTo(21);
assertThat(settings.getRequestByteThreshold()).isEqualTo(22);
assertThat(settings.getDelayThreshold()).isEqualTo(Duration.ofSeconds(23));
assertThat(settings.getIsEnabled()).isTrue();
});
}
示例7
private void createPublisher() {
ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);
com.google.cloud.pubsub.v1.Publisher.Builder builder =
com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
.setCredentialsProvider(gcpCredentialsProvider)
.setBatchingSettings(
BatchingSettings.newBuilder()
.setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
.setElementCountThreshold(maxBufferSize)
.setRequestByteThreshold(maxBufferBytes)
.build())
.setRetrySettings(
RetrySettings.newBuilder()
// All values that are not configurable come from the defaults for the publisher
// client library.
.setTotalTimeout(Duration.ofMillis(maxTotalTimeoutMs))
.setMaxRpcTimeout(Duration.ofMillis(maxRequestTimeoutMs))
.setInitialRetryDelay(Duration.ofMillis(5))
.setRetryDelayMultiplier(2)
.setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE))
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.build());
try {
publisher = builder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
示例8
public static GooglePubsubPublisher buildPublisher(
GooglePubsubPublisherConfig config, ObjectMapper mapper) {
GooglePubsubPublisher publisher = new GooglePubsubPublisher();
publisher.setName(config.getName());
ProjectTopicName fullName = ProjectTopicName.of(config.getProject(), config.getTopicName());
publisher.setTopicName(config.getTopicName());
publisher.setFullTopicName(fullName.toString());
publisher.setContent(config.getContent());
publisher.setMapper(mapper);
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(config.getBatchCountThreshold())
.setDelayThreshold(Duration.ofMillis(config.getDelayMillisecondsThreshold()))
.build();
try {
Publisher p =
Publisher.newBuilder(fullName)
.setCredentialsProvider(new GooglePubsubCredentialsProvider(config.getJsonPath()))
.setBatchingSettings(batchingSettings)
.build();
publisher.setPublisher(p);
} catch (IOException ioe) {
log.error("Could not create Google Pubsub Publishers", ioe);
}
return publisher;
}
示例9
@Override
public SinkFactory getFactory() {
final RetrySettings retrySettings = this.retrySettings.createRetrySettings();
final BatchingSettings batchingSettings = this.batchingSettings.createBatchingSettings();
final Optional<String> emulator = Optional.ofNullable(System.getenv("PUBSUB_EMULATOR_HOST"));
return emulator.map(hostport -> createFlushingPool(retrySettings, batchingSettings, hostport))
.orElseGet(() -> createFlushingPool(retrySettings, batchingSettings));
}
示例10
private SinkFactory createFlushingPool(final RetrySettings retrySettings,
final BatchingSettings batchingSettings,
final String hostPort) {
// Based on Google's PubSub documentation:
// https://cloud.google.com/pubsub/docs/emulator#pubsub-emulator-java
// What's going on here? Well…
// - Authentication is disabled; the emulator doesn't use or support it.
// - When Pub/Sub wants an I/O channel to talk to its Google Cloud endpoint, we're substituting our
// own endpoint instead. This channel also has TLS disabled, because the emulator doesn't need, use
// or support it.
//
return (vc, sinkName, registry) -> {
logger.info("Configuring sink to use Google Cloud Pub/Sub emulator: {}", sinkName, hostPort);
final String projectId = vc.configuration().global.gcps.projectId.orElseThrow(IllegalStateException::new);
final ProjectTopicName topicName = ProjectTopicName.of(projectId, topic);
final ManagedChannel channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
final TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
// There's no easy way to create topics for the emulator, so we create the topic ourselves.
createTopic(hostPort, channelProvider, topicName);
final Publisher.Builder builder =
Publisher.newBuilder(topicName)
.setRetrySettings(retrySettings)
.setBatchingSettings(batchingSettings)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create());
final Publisher publisher = IOExceptions.wrap(builder::build).get();
return new GoogleCloudPubSubFlushingPool(sinkName,
vc.configuration().global.gcps.threads,
vc.configuration().global.gcps.bufferSize,
publisher,
Optional.of(channel),
registry.getSchemaBySinkName(sinkName));
};
}
示例11
public BatchingSettings createBatchingSettings() {
return BatchingSettings.newBuilder()
.setElementCountThreshold(elementCountThreshold)
.setRequestByteThreshold(requestBytesThreshold)
.setDelayThreshold(to310bp(delayThreshold))
.build();
}
示例12
@Test
public void testDefaultBatchingConfigurationValid() {
// Check that we can generate settings from our defaults.
final BatchingSettings batchingSettings =
GoogleCloudPubSubSinkConfiguration.DEFAULT_BATCHING_SETTINGS.createBatchingSettings();
assertNotNull(batchingSettings);
}
示例13
private Publisher.Builder getPublisherBuilder(ProcessContext context) {
final Long batchSize = context.getProperty(BATCH_SIZE).asLong();
return Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(batchSize)
.setIsEnabled(true)
.build());
}
示例14
/**
* Set the API call batching configuration.
* @param batchingSettings the batching settings to set
*/
public void setBatchingSettings(BatchingSettings batchingSettings) {
this.batchingSettings = batchingSettings;
}