Java源码示例:org.apache.flink.api.java.io.jdbc.split.ParameterValuesProvider
示例1
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[]{TEST_DATA[3].author};
queryParameters[1] = new String[]{TEST_DATA[0].author};
ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every queryParameters row)
Assert.assertEquals(queryParameters.length, splits.length);
verifySplit(splits[0], TEST_DATA[3].id);
verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);
jdbcInputFormat.closeInputFormat();
}
示例2
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[]{TEST_DATA[3].author};
queryParameters[1] = new String[]{TEST_DATA[0].author};
ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every queryParameters row)
Assert.assertEquals(queryParameters.length, splits.length);
verifySplit(splits[0], TEST_DATA[3].id);
verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);
jdbcInputFormat.closeInputFormat();
}
示例3
@Test
public void testJDBCInputFormatWithParallelismAndGenericSplitting() throws IOException {
Serializable[][] queryParameters = new String[2][1];
queryParameters[0] = new String[]{TEST_DATA[3].author};
queryParameters[1] = new String[]{TEST_DATA[0].author};
ParameterValuesProvider paramProvider = new GenericParameterValuesProvider(queryParameters);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_AUTHOR)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(paramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every queryParameters row)
Assert.assertEquals(queryParameters.length, splits.length);
verifySplit(splits[0], TEST_DATA[3].id);
verifySplit(splits[1], TEST_DATA[0].id + TEST_DATA[1].id);
jdbcInputFormat.closeInputFormat();
}
示例4
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every id)
Assert.assertEquals(TEST_DATA.length, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例5
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; //generate a single split
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(fetchSize, min, max);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//assert that a single split was generated
Assert.assertEquals(1, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例6
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every id)
Assert.assertEquals(TEST_DATA.length, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例7
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; //generate a single split
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DRIVER_CLASS)
.setDBUrl(DB_URL)
.setQuery(JDBCTestBase.SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//assert that a single split was generated
Assert.assertEquals(1, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例8
@Test
public void testJDBCInputFormatWithParallelismAndNumericColumnSplitting() throws IOException {
final int fetchSize = 1;
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - fetchSize].id;
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//this query exploit parallelism (1 split for every id)
Assert.assertEquals(TEST_DATA.length, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例9
@Test
public void testJDBCInputFormatWithoutParallelismAndNumericColumnSplitting() throws IOException {
final long min = TEST_DATA[0].id;
final long max = TEST_DATA[TEST_DATA.length - 1].id;
final long fetchSize = max + 1; //generate a single split
ParameterValuesProvider pramProvider = new NumericBetweenParametersProvider(min, max).ofBatchSize(fetchSize);
jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS_SPLIT_BY_ID)
.setRowTypeInfo(ROW_TYPE_INFO)
.setParametersProvider(pramProvider)
.setResultSetType(ResultSet.TYPE_SCROLL_INSENSITIVE)
.finish();
jdbcInputFormat.openInputFormat();
InputSplit[] splits = jdbcInputFormat.createInputSplits(1);
//assert that a single split was generated
Assert.assertEquals(1, splits.length);
int recordCount = 0;
Row row = new Row(5);
for (InputSplit split : splits) {
jdbcInputFormat.open(split);
while (!jdbcInputFormat.reachedEnd()) {
Row next = jdbcInputFormat.nextRecord(row);
assertEquals(TEST_DATA[recordCount], next);
recordCount++;
}
jdbcInputFormat.close();
}
jdbcInputFormat.closeInputFormat();
Assert.assertEquals(TEST_DATA.length, recordCount);
}
示例10
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
format.parameterValues = parameterValuesProvider.getParameterValues();
return this;
}
示例11
public JDBCInputFormatBuilder setParametersProvider(ParameterValuesProvider parameterValuesProvider) {
format.parameterValues = parameterValuesProvider.getParameterValues();
return this;
}