admin 管理员组

文章数量: 1184232

感慨:玩大数据java必须要精通,不然遇到眼花缭乱的框架以及插件拓展的时候就会一下子傻眼了。各种框架之间版本不同有现成的插件或者方案到还可以但是没有就需要自己扩展。目前我使用的是CDH6.3.2,flink使用的是1.6,Phoenix版本的是5.0.0这有在我的博客中提到过,hbase使用的是自带的2.x。这就遇到问题了目前有支持的比较好的是dlinky这个里面的插件,我现在需要做的是将dlink-connector-phoenix这个插件编译打包上传到flink的lib目录中使用sql-client进行测试。

问题:目前dlinky支持flink1.14不支持flink1.16所以需要扩展。经过我比对flink源码中的flink-connector-jdbc的写法。结合
dlinky中dlink-connector-phoenix-1.14的版本进行扩展

如果想要已经打包好的

1、拉取dlinky的源码,

并且切换到0.7.3分支

2、按照官网来基本环境要求来

特殊说明:
mvn的仓库配置的是

<mirrors><mirror><id>alimaven</id><mirrorOf>central</mirrorOf><name>nexus</name><url></url></mirror></mirrors>

3、将dlink-connectors中的dlink-connector-phoenix-1.14拷贝一份到同级目录下面。修改名称为dlink-connector-phoenix-1.16

5、编译过程中出现兼容问题,需要修改源码PhoenixDynamicTableSource.java这个类中由于dlinky0.7.3使用了
TableSchemaUtils这个工具类中的projectSchema方法.但是这个方法在flink1.16已经给删除了。于是可以将flink1.14中TableSchemaUtils的projectSchema方法写到PhoenixDynamicTableSource.java这个类中稍后贴出源码直接覆盖类就行

6、打包

mvn clean install --settings /Users/admin/Documents/softwares/repository-zi/settings-aliyun.xml  -DskipTests=true -P aliyun,prod,scala-2.12,web,fast,flink-1.16

8、编译完成后如果是想只使用flink只需要到入一下包到flink的lib目录下
dlink-connector-phoenix-1.16-0.7.3.jar,phoenix-5.0.0-cdh6.2.0-client.jar,phoenix-core-5.0.0-cdh6.2.0.jar

注意:一定不要与flink自带的hbase-connector包放一起,会冲突

9、如果将Phoenix添加到dlinky web中运行需要将phoenix-5.0.0-cdh6.2.0-client-dlinky.jar包中的servlet包删除
路径:javax/servlet。在重新打开

-- 解压包
jar xvf xxx.jar
-- 打包
jar cvf xxx.jar <打包路径>
/*
 *
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *     
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 *
 */
package org.apache.flink.connector.phoenix.table;import org.apache.flink.connector.phoenix.dialect.JdbcDialect;import org.apache.flink.connector.phoenix.internal.options.JdbcLookupOptions;import org.apache.flink.connector.phoenix.internal.options.JdbcReadOptions;import org.apache.flink.connector.phoenix.internal.options.PhoenixJdbcOptions;import org.apache.flink.connector.phoenix.split.JdbcNumericBetweenParametersProvider;import org.apache.flink.table.api.TableSchema;import org.apache.flink.table.connector.ChangelogMode;import org.apache.flink.table.connector.source.DynamicTableSource;import org.apache.flink.table.connector.source.InputFormatProvider;import org.apache.flink.table.connector.source.LookupTableSource;import org.apache.flink.table.connector.source.ScanTableSource;import org.apache.flink.table.connector.source.TableFunctionProvider;import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;import org.apache.flink.table.types.logical.RowType;import org.apache.flink.table.utils.TableSchemaUtils;import org.apache.flink.util.Preconditions;import org.apache.flink.table.types.FieldsDataType;import org.apache.flink.table.types.logical.RowType;import java.util.Objects;import org.apache.flink.table.api.TableColumn;import org.apache.flink.table.types.utils.DataTypeUtils;
/**
 * PhoenixDynamicTableSource
 *
 * @author gy
 * @since 2022/3/17 10:40
 **/
public class PhoenixDynamicTableSource implements ScanTableSource, LookupTableSource, SupportsProjectionPushDown,
        SupportsLimitPushDown {
    private final PhoenixJdbcOptions options;
    private final JdbcReadOptions readOptions;
    private final JdbcLookupOptions lookupOptions;
    private TableSchema physicalSchema;
    private final String dialectName;
    private long limit = -1L;
    public PhoenixDynamicTableSource(PhoenixJdbcOptions options, JdbcReadOptions readOptions, JdbcLookupOptions lookupOptions, TableSchema physicalSchema){
        this.options = options;
        this.readOptions = readOptions;
        this.lookupOptions = lookupOptions;
        this.physicalSchema = physicalSchema;
        this.dialectName = options.getDialect().dialectName();}
    @Override
    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context){
        // JDBC only support non-nested look up keys
        String[] keyNames = new String[context.getKeys().length];for(int i =0; i < keyNames.length; i++){
            int[] innerKeyArr = context.getKeys()[i];
            Preconditions.checkArgument(
                    innerKeyArr.length ==1, "JDBC only support non-nested look up keys");
            keyNames[i]= physicalSchema.getFieldNames()[innerKeyArr[0]];}
        final RowType rowType =(RowType) physicalSchema.toRowDataType().getLogicalType();return TableFunctionProvider.of(
                new PhoenixRowDataLookupFunction(
                        options,
                        lookupOptions,
                        physicalSchema.getFieldNames(),
                        physicalSchema.getFieldDataTypes(),
                        keyNames,
                        rowType));}
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext){
        PhoenixJdbcRowDataInputFormat.Builder builder = PhoenixJdbcRowDataInputFormat.builder()
                .setDrivername(this.options.getDriverName())
                .setDBUrl(this.options.getDbURL())
                .setUsername((String)this.options.getUsername().orElse((String) null))
                .setPassword((String)this.options.getPassword().orElse((String) null))
                .setAutoCommit(this.readOptions.getAutoCommit())
                //setting phoenix schema
                .setNamespaceMappingEnabled(this.options.getNamespaceMappingEnabled())
                .setMapSystemTablesToNamespace(this.options.getMapSystemTablesToNamespace());if(this.readOptions.getFetchSize()!=0){
            builder.setFetchSize(this.readOptions.getFetchSize());}
        JdbcDialect dialect = this.options.getDialect();
        String query = dialect.getSelectFromStatement(this.options.getTableName(), this.physicalSchema.getFieldNames(), new String[0]);if(this.readOptions.getPartitionColumnName().isPresent()){
            long lowerBound =(Long)this.readOptions.getPartitionLowerBound().get();
            long upperBound =(Long)this.readOptions.getPartitionUpperBound().get();
            int numPartitions =(Integer)this.readOptions.getNumPartitions().get();
            builder.setParametersProvider((new JdbcNumericBetweenParametersProvider(lowerBound, upperBound)).ofBatchNum(numPartitions));
            query = query + " WHERE " + dialect.quoteIdentifier((String)this.readOptions.getPartitionColumnName().get()) + " BETWEEN ? AND ?";}if(this.limit >= 0L){
            query = String.format("%s %s", query, dialect.getLimitClause(this.limit));}
        builder.setQuery(query);
        RowType rowType =(RowType)this.physicalSchema.toRowDataType().getLogicalType();
        builder.setRowConverter(dialect.getRowConverter(rowType));
        builder.setRowDataTypeInfo(runtimeProviderContext.createTypeInformation(this.physicalSchema.toRowDataType()));return InputFormatProvider.of(builder.build());}
    @Override
    public ChangelogMode getChangelogMode(){return ChangelogMode.insertOnly();}
    @Override
    public boolean supportsNestedProjection(){returnfalse;}
    @Override
    public void applyProjection(int[][] projectedFields){
        this.physicalSchema = projectSchema(this.physicalSchema, projectedFields);}
    private boolean containsPhysicalColumnsOnly(TableSchema schema){
        Preconditions.checkNotNull(schema);return schema.getTableColumns().stream().allMatch(TableColumn::isPhysical);}
    private TableSchema projectSchema(TableSchema tableSchema, int[][] projectedFields){
        Preconditions.checkArgument(
                containsPhysicalColumnsOnly(tableSchema),
                "Projection is only supported for physical columns.");
        TableSchema.Builder builder = TableSchema.builder();
        FieldsDataType fields =(FieldsDataType)
                        DataTypeUtils.projectRow(tableSchema.toRowDataType(), projectedFields);
        RowType topFields =(RowType) fields.getLogicalType();for(int i =0; i < topFields.getFieldCount(); i++){
            builder.field(topFields.getFieldNames().get(i), fields.getChildren().get(i));}return builder.build();}
    public DynamicTableSource copy(){return new PhoenixDynamicTableSource(this.options, this.readOptions, this.lookupOptions, this.physicalSchema);}
    public String asSummaryString(){return"JDBC:" + this.dialectName;}
    public boolean equals(Object o){if(this == o){returntrue;}elseif(!(o instanceof PhoenixDynamicTableSource)){returnfalse;}else{
            PhoenixDynamicTableSource that =(PhoenixDynamicTableSource)o;return Objects.equals(this.options, that.options)&& Objects.equals(this.physicalSchema, that.physicalSchema)&& Objects.equals(this.dialectName, that.dialectName)&& Objects.equals(this.limit, that.limit);}}
    public int hashCode(){return Objects.hash(new Object[]{this.options, this.readOptions, this.lookupOptions, this.physicalSchema, this.dialectName, this.limit});}
    public void applyLimit(long limit){
        this.limit = limit;}}

本文标签: 使用的是 系统 编程