Java源码示例:com.google.api.gax.batching.BatchingSettings

示例1
@Bean
@ConditionalOnMissingBean(name = "publisherBatchSettings")
public BatchingSettings publisherBatchSettings() {
	BatchingSettings.Builder builder = BatchingSettings.newBuilder();

	GcpPubSubProperties.Batching batching = this.gcpPubSubProperties.getPublisher()
			.getBatching();

	FlowControlSettings flowControlSettings = buildFlowControlSettings(batching.getFlowControl());
	if (flowControlSettings != null) {
		builder.setFlowControlSettings(flowControlSettings);
	}

	return ifNotNull(batching.getDelayThresholdSeconds(),
				(x) -> builder.setDelayThreshold(Duration.ofSeconds(x)))
			.apply(ifNotNull(batching.getElementCountThreshold(), builder::setElementCountThreshold)
			.apply(ifNotNull(batching.getEnabled(), builder::setIsEnabled)
			.apply(ifNotNull(batching.getRequestByteThreshold(), builder::setRequestByteThreshold)
			.apply(false)))) ? builder.build() : null;
}
 
示例2
@Bean
@ConditionalOnMissingBean
public PublisherFactory defaultPublisherFactory(
		@Qualifier("publisherExecutorProvider") ExecutorProvider executorProvider,
		@Qualifier("publisherBatchSettings") ObjectProvider<BatchingSettings> batchingSettings,
		@Qualifier("publisherRetrySettings") ObjectProvider<RetrySettings> retrySettings,
		TransportChannelProvider transportChannelProvider) {
	DefaultPublisherFactory factory = new DefaultPublisherFactory(this.finalProjectIdProvider);
	factory.setExecutorProvider(executorProvider);
	factory.setCredentialsProvider(this.finalCredentialsProvider);
	factory.setHeaderProvider(this.headerProvider);
	factory.setChannelProvider(transportChannelProvider);
	retrySettings.ifAvailable(factory::setRetrySettings);
	batchingSettings.ifAvailable(factory::setBatchingSettings);
	return factory;
}
 
示例3
CPSPublisherTask(StartRequest request, MetricsHandler metricsHandler, int workerCount) {
  super(request, metricsHandler, workerCount);
  log.warn("constructing CPS publisher");
  this.payload = getPayload();
  try {
    this.publisher =
        Publisher.newBuilder(ProjectTopicName.of(request.getProject(), request.getTopic()))
            .setBatchingSettings(
                BatchingSettings.newBuilder()
                    .setElementCountThreshold((long) request.getPublisherOptions().getBatchSize())
                    .setRequestByteThreshold(9500000L)
                    .setDelayThreshold(
                        Duration.ofMillis(
                            Durations.toMillis(request.getPublisherOptions().getBatchDuration())))
                    .setIsEnabled(true)
                    .build())
            .build();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
示例4
private SinkFactory createFlushingPool(final RetrySettings retrySettings,
                                       final BatchingSettings batchingSettings) {
    return (vc, sinkName, registry) -> {
        final String projectId = vc.configuration().global.gcps.projectId.orElseThrow(IllegalStateException::new);
        final ProjectTopicName topicName = ProjectTopicName.of(projectId, topic);
        final Publisher.Builder builder =
            Publisher.newBuilder(topicName)
                     .setRetrySettings(retrySettings)
                     .setBatchingSettings(batchingSettings);
        final Publisher publisher = IOExceptions.wrap(builder::build).get();
        return new GoogleCloudPubSubFlushingPool(sinkName,
                                                 vc.configuration().global.gcps.threads,
                                                 vc.configuration().global.gcps.bufferSize,
                                                 publisher,
                                                 Optional.empty(),
                                                 registry.getSchemaBySinkName(sinkName));
    };
}
 
示例5
private Publisher getPublisher() throws IOException {
  return Publisher.newBuilder(topic)
      .setCredentialsProvider(credentialsProvider)
      .setChannelProvider(getChannelProvider())
      // Batching settings borrowed from PubSub Load Test Framework
      .setBatchingSettings(
          BatchingSettings.newBuilder()
              .setElementCountThreshold(950L)
              .setRequestByteThreshold(9500000L)
              .setDelayThreshold(Duration.ofMillis(10))
              .build())
      .build();
}
 
示例6
@Test
public void testPublisherBatchingSettings() {
	this.contextRunner.run((context) -> {
		BatchingSettings settings = context.getBean("publisherBatchSettings",
				BatchingSettings.class);
		assertThat(settings.getFlowControlSettings().getMaxOutstandingElementCount()).isEqualTo(19);
		assertThat(settings.getFlowControlSettings().getMaxOutstandingRequestBytes()).isEqualTo(20);
		assertThat(settings.getFlowControlSettings().getLimitExceededBehavior())
				.isEqualTo(LimitExceededBehavior.Ignore);
		assertThat(settings.getElementCountThreshold()).isEqualTo(21);
		assertThat(settings.getRequestByteThreshold()).isEqualTo(22);
		assertThat(settings.getDelayThreshold()).isEqualTo(Duration.ofSeconds(23));
		assertThat(settings.getIsEnabled()).isTrue();
	});
}
 
示例7
private void createPublisher() {
  ProjectTopicName fullTopic = ProjectTopicName.of(cpsProject, cpsTopic);
  com.google.cloud.pubsub.v1.Publisher.Builder builder =
      com.google.cloud.pubsub.v1.Publisher.newBuilder(fullTopic)
          .setCredentialsProvider(gcpCredentialsProvider)
          .setBatchingSettings(
              BatchingSettings.newBuilder()
                  .setDelayThreshold(Duration.ofMillis(maxDelayThresholdMs))
                  .setElementCountThreshold(maxBufferSize)
                  .setRequestByteThreshold(maxBufferBytes)
                  .build())
          .setRetrySettings(
              RetrySettings.newBuilder()
                  // All values that are not configurable come from the defaults for the publisher
                  // client library.
                  .setTotalTimeout(Duration.ofMillis(maxTotalTimeoutMs))
                  .setMaxRpcTimeout(Duration.ofMillis(maxRequestTimeoutMs))
                  .setInitialRetryDelay(Duration.ofMillis(5))
                  .setRetryDelayMultiplier(2)
                  .setMaxRetryDelay(Duration.ofMillis(Long.MAX_VALUE))
                  .setInitialRpcTimeout(Duration.ofSeconds(10))
                  .setRpcTimeoutMultiplier(2)
                  .build());
  try {
    publisher = builder.build();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}
 
示例8
public static GooglePubsubPublisher buildPublisher(
    GooglePubsubPublisherConfig config, ObjectMapper mapper) {
  GooglePubsubPublisher publisher = new GooglePubsubPublisher();
  publisher.setName(config.getName());
  ProjectTopicName fullName = ProjectTopicName.of(config.getProject(), config.getTopicName());
  publisher.setTopicName(config.getTopicName());
  publisher.setFullTopicName(fullName.toString());
  publisher.setContent(config.getContent());
  publisher.setMapper(mapper);

  BatchingSettings batchingSettings =
      BatchingSettings.newBuilder()
          .setElementCountThreshold(config.getBatchCountThreshold())
          .setDelayThreshold(Duration.ofMillis(config.getDelayMillisecondsThreshold()))
          .build();

  try {
    Publisher p =
        Publisher.newBuilder(fullName)
            .setCredentialsProvider(new GooglePubsubCredentialsProvider(config.getJsonPath()))
            .setBatchingSettings(batchingSettings)
            .build();
    publisher.setPublisher(p);
  } catch (IOException ioe) {
    log.error("Could not create Google Pubsub Publishers", ioe);
  }

  return publisher;
}
 
示例9
@Override
public SinkFactory getFactory() {
    final RetrySettings retrySettings = this.retrySettings.createRetrySettings();
    final BatchingSettings batchingSettings = this.batchingSettings.createBatchingSettings();
    final Optional<String> emulator = Optional.ofNullable(System.getenv("PUBSUB_EMULATOR_HOST"));
    return emulator.map(hostport -> createFlushingPool(retrySettings, batchingSettings, hostport))
                   .orElseGet(() -> createFlushingPool(retrySettings, batchingSettings));
}
 
示例10
private SinkFactory createFlushingPool(final RetrySettings retrySettings,
                                       final BatchingSettings batchingSettings,
                                       final String hostPort) {
    // Based on Google's PubSub documentation:
    //   https://cloud.google.com/pubsub/docs/emulator#pubsub-emulator-java
    // What's going on here? Well…
    //  - Authentication is disabled; the emulator doesn't use or support it.
    //  - When Pub/Sub wants an I/O channel to talk to its Google Cloud endpoint, we're substituting our
    //    own endpoint instead. This channel also has TLS disabled, because the emulator doesn't need, use
    //    or support it.
    //
    return (vc, sinkName, registry) -> {
        logger.info("Configuring sink to use Google Cloud Pub/Sub emulator: {}", sinkName, hostPort);
        final String projectId = vc.configuration().global.gcps.projectId.orElseThrow(IllegalStateException::new);
        final ProjectTopicName topicName = ProjectTopicName.of(projectId, topic);
        final ManagedChannel channel = ManagedChannelBuilder.forTarget(hostPort).usePlaintext().build();
        final TransportChannelProvider channelProvider =
            FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
        // There's no easy way to create topics for the emulator, so we create the topic ourselves.
        createTopic(hostPort, channelProvider, topicName);
        final Publisher.Builder builder =
            Publisher.newBuilder(topicName)
                     .setRetrySettings(retrySettings)
                     .setBatchingSettings(batchingSettings)
                     .setChannelProvider(channelProvider)
                     .setCredentialsProvider(NoCredentialsProvider.create());
        final Publisher publisher = IOExceptions.wrap(builder::build).get();
        return new GoogleCloudPubSubFlushingPool(sinkName,
                                                 vc.configuration().global.gcps.threads,
                                                 vc.configuration().global.gcps.bufferSize,
                                                 publisher,
                                                 Optional.of(channel),
                                                 registry.getSchemaBySinkName(sinkName));
    };
}
 
示例11
public BatchingSettings createBatchingSettings() {
    return BatchingSettings.newBuilder()
                           .setElementCountThreshold(elementCountThreshold)
                           .setRequestByteThreshold(requestBytesThreshold)
                           .setDelayThreshold(to310bp(delayThreshold))
                           .build();
}
 
示例12
@Test
public void testDefaultBatchingConfigurationValid() {
    // Check that we can generate settings from our defaults.
    final BatchingSettings batchingSettings =
        GoogleCloudPubSubSinkConfiguration.DEFAULT_BATCHING_SETTINGS.createBatchingSettings();
    assertNotNull(batchingSettings);
}
 
示例13
private Publisher.Builder getPublisherBuilder(ProcessContext context) {
    final Long batchSize = context.getProperty(BATCH_SIZE).asLong();

    return Publisher.newBuilder(getTopicName(context))
            .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
            .setBatchingSettings(BatchingSettings.newBuilder()
            .setElementCountThreshold(batchSize)
            .setIsEnabled(true)
            .build());
}
 
示例14
/**
 * Set the API call batching configuration.
 * @param batchingSettings the batching settings to set
 */
public void setBatchingSettings(BatchingSettings batchingSettings) {
	this.batchingSettings = batchingSettings;
}