专栏名称: 程序员小灰
一群喜爱编程技术和算法的小仓鼠。
目录
相关文章推荐
程序员小灰  ·  疯了!下载 DeepSeek 最高判20年! ·  4 天前  
OSC开源社区  ·  DeepSeek-V3满血版在国产沐曦GPU ... ·  3 天前  
程序员的那些事  ·  热搜第一!DeepSeek百万年薪招AI人才 ... ·  3 天前  
51好读  ›  专栏  ›  程序员小灰

10分钟教你写一个数据库

程序员小灰  · 公众号  · 程序员  · 2022-10-14 17:26

正文

今天教大家借助一款框架快速实现一个数据库,这个框架就是 Calcite ,下面会带大家通过两个例子快速教会大家怎么实现,一个是可以通过 SQL 语句的方式可以直接查询文件内容,第二个是模拟 Mysql 查询功能,以及最后告诉大家怎么实现 SQL 查询 Kafka 数据。

Calcite

Calcite 是一个用于优化异构数据源的查询处理的可插拔基础框架(他是一个框架),可以将任意数据(Any data, Anywhere)DML 转换成基于 SQL 的 DML 引擎,并且我们可以选择性的使用它的部分功能。

Calcite能干什么

  1. 使用 SQL 访问内存中某个数据

  2. 使用 SQL 访问某个文件的数据

  3. 跨数据源的数据访问、聚合、排序等(例如 Mysql 和 Redis 数据源中的数据进行join)

当我们需要自建一个数据库的时候,数据可以为任何格式的,比如text、word、xml、mysql、es、csv、第三方接口数据等等,我们只有数据,我们想让这些数据支持 SQL 形式动态增删改查。

另外,像Hive、Drill、Flink、Phoenix 和 Storm 等项目中,数据处理系统都是使用 Calcite 来做 SQL 解析和查询优化,当然,还有部分用来构建自己的 JDBC driver。

名词解释

Token

就是将标准 SQL(可以理解为Mysql)关键词以及关键词之间的字符串截取出来,每一个 token ,会被封装为一个 SqlNode SqlNode 会衍生很多子类,比如 Select 会被封装为 SqlSelect ,当前 SqlNode 也能反解析为 SQL 文本。

RelDataTypeField

某个字段的名称和类型信息

RelDataType

多个 RelDataTypeField 组成了 RelDataType,可以理解为数据行

Table

一个完整的表的信息

Schema

所有元数据的组合,可以理解为一组 Table 或者库的概念

开始使用

1. 引入包


    org.apache.calcite
    calcite-core
    
    1.32.0

2. 创建model.json文件和表结构csv

model.json 里面主要描述或者说告诉 Calcite 如何创建 Schema ,也就是告诉框架怎么创建出库。

{
"version""1.0",//忽略
"defaultSchema""CSV",//设置默认的schema
"schemas": [//可定义多个schema
        {
          "name""CSV",//相当于namespace和上面的defaultSchema的值对应
          "type""custom",//写死
          "factory""csv.CsvSchemaFactory",//factory的类名必须是你自己实现的factory的包的全路径
          "operand": { //这里可以传递自定义参数,最终会以map的形式传递给factory的operand参数
          "directory""csv"//directory代表calcite会在resources下面的csv目录下面读取所有的csv文件,factory创建的Schema会吧这些文件全部构建成Table,可以理解为读取数据文件的根目录,当然key的名称也不一定非得用directory,你可以随意指定
                }
        }
      ]
}

接下来还需要定义一个 csv 文件,用来定义表结构。

NAME:string,MONEY:string
aixiaoxian,10000万
xiaobai,10000万
adong,10000万
maomao,10000万
xixi,10000万
zizi,10000万
wuwu,10000万
kuku,10000万

整个项目的结构大概就是这样:

3. 实现Schema的工厂类

在上述文件中指定的包路径下去编写 CsvSchemaFactory 类,实现 SchemaFactory 接口,并且实现里面唯一的方法 create 方法,创建 Schema (库)。

public class CsvSchemaFactory implements SchemaFactory {
    /**
     * parentSchema 父节点,一般为root
     * name 为model.json中定义的名字
     * operand 为model.json中定于的数据,这里可以传递自定义参数
     *
     * @param parentSchema Parent schema
     * @param name         Name of this schema
     * @param operand      The "operand" JSON property
     * @return
     */

    @Override
    public Schema create(SchemaPlus parentSchema, String name,
                         Map operand)
 
{
        final String directory = (String) operand.get("directory");
        File directoryFile = new File(directory);
        return new CsvSchema(directoryFile, "scannable" );
    }
}

4. 自定义Schma类

有了 SchemaFactory ,接下来需要自定义 Schema 类。

自定义的 Schema 需要实现 Schema 接口,但是直接实现要实现的方法太多,我们去实现官方的 AbstractSchema 类,这样就只需要实现一个方法就行(如果有其他定制化需求可以实现原生接口)。

核心的逻辑就是 createTableMap 方法,用于创建出 Table 表。

他会扫描指定的 Resource 下面的所有 csv 文件,将每个文件映射成 Table 对象,最终以 map 形式返回, Schema 接口的其他几个方法会用到这个对象。

  //实现这一个方法就行了
    @Override
    protected Map getTableMap() {
        if (tableMap == null) {
            tableMap = createTableMap();
        }
        return tableMap;
    }
  private Map createTableMap() {
        // Look for files in the directory ending in ".csv"
        final Source baseSource = Sources.of(directoryFile);
        //会自动过滤掉非指定文件后缀的文件,我这里写的csv
        File[] files = directoryFile.listFiles((dir, name) -> {
            final String nameSansGz = trim(name, ".gz");
            return nameSansGz.endsWith(".csv");
        });
        if (files == null) {
            System.out.println("directory " + directoryFile + " not found");
            files = new File[0];
        }
        // Build a map from table name to table; each file becomes a table.
        final ImmutableMap.Builder builder = ImmutableMap.builder();
        for (File file : files) {
            Source source = Sources.of(file);
            final Source sourceSansCsv = source.trimOrNull(".csv");
            if (sourceSansCsv != null) {
                final Table table = createTable(source);
                builder.put(sourceSansCsv.relative(baseSource).path(), table);
            }
        }
        return builder.build();
    }

5. 自定义 Table

Schema 有了,并且数据文件 csv 也映射成 Table 了,一个 csv 文件对应一个 Table

接下来我们去自定义 Table ,自定义 Table 的核心是我们要定义字段的类型和名称,以及如何读取 csv 文件。

  1. 先获取数据类型和名称,即单表结构,从 csv 文件头中获取(当前文件头需要我们自己定义,包括规则我们也可以定制化)。
/**
 * Base class for table that reads CSV files.
 */

public abstract class CsvTable extends AbstractTable {
    protected final Source source;
    protected final @Nullable RelProtoDataType protoRowType;
    private @Nullable RelDataType rowType;
    private @Nullable List fieldTypes;

    /**
     * Creates a CsvTable.
     */

    CsvTable(Source source, @Nullable RelProtoDataType protoRowType) {
        this.source = source;
        this.protoRowType = protoRowType;
    }
  /**
  * 创建一个CsvTable,继承AbstractTable,需要实现里面的getRowType方法,此方法就是获取当前的表结构。
   Table的类型有很多种,比如还有视图类型,AbstractTable类中帮我们默认实现了Table接口的一些方法,比如getJdbcTableType   方法,默认为Table类型,如果有其他定制化需求可直接实现Table接口。
   和AbstractSchema很像
  */

    @Override
    public RelDataType getRowType(RelDataTypeFactory typeFactory) {
        if (protoRowType != null) {
            return protoRowType.apply(typeFactory);
        }
        if (rowType == null) {
            rowType = CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    null);
        }
        return rowType;
    }

    /**
     * Returns the field types of this CSV table.
     */

    public List getFieldTypes(RelDataTypeFactory typeFactory) {
        if (fieldTypes == null) {
            fieldTypes = new ArrayList<>();
            CsvEnumerator.deduceRowType((JavaTypeFactory) typeFactory, source,
                    fieldTypes);
        }
        return fieldTypes;
    }
  
   public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                            Source source, @Nullable List fieldTypes)
 
{
        final List types = new ArrayList<>();
        final List names = new ArrayList<>();
        try (CSVReader reader = openCsv(source)) {
            String[] strings = reader.readNext();
            if (strings == null) {
                strings = new String[]{"EmptyFileHasNoColumns:boolean"};
            }
            for (String string : strings) {
                final String name;
                final RelDataType fieldType;
                //就是简单的读取字符串冒号前面是名称,冒号后面是类型
                final int colon = string.indexOf(':');
                if (colon >= 0) {
                    name = string.substring(0 , colon);
                    String typeString = string.substring(colon + 1);
                    Matcher decimalMatcher = DECIMAL_TYPE_PATTERN.matcher(typeString);
                    if (decimalMatcher.matches()) {
                        int precision = Integer.parseInt(decimalMatcher.group(1));
                        int scale = Integer.parseInt(decimalMatcher.group(2));
                        fieldType = parseDecimalSqlType(typeFactory, precision, scale);
                    } else {
                        switch (typeString) {
                            case "string":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                            case "boolean":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BOOLEAN);
                                break;
                            case "byte":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TINYINT);
                                break;
                            case "char":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.CHAR);
                                break;
                            case "short":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.SMALLINT);
                                break;
                            case "int":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.INTEGER);
                                break;
                            case "long":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.BIGINT);
                                break;
                            case "float":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.REAL);
                                break;
                            case "double":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DOUBLE);
                                break;
                            case "date":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.DATE);
                                break;
                            case "timestamp":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIMESTAMP);
                                break;
                            case "time":
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.TIME);
                                break;
                            default:
                                LOGGER.warn(
                                        "Found unknown type: {} in file: {} for column: {}. Will assume the type of "
                                                + "column is string.",
                                        typeString, source.path(), name);
                                fieldType = toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR);
                                break;
                        }
                    }
                } else {
                    //  如果没定义,默认都是String类型,字段名称也是string
                    name = string;
                    fieldType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
                }
                names.add(name);
                types.add(fieldType);
                if (fieldTypes != null) {
                    fieldTypes.add(fieldType);
                }
            }
        } catch (IOException e) {
            // ignore
        }
        if (names.isEmpty()) {
            names.add("line");
            types.add(typeFactory.createSqlType(SqlTypeName.VARCHAR));
        }
        return typeFactory.createStructType(Pair.zip(names, types));
    }
}
  1. 获取文件中的数据,上面把 Table 的表结构字段名称和类型都获取到了以后,就剩最后一步了,获取文件中的数据。我们需要自定义一个类,实现 ScannableTable 接口,并且实现里面唯一的方法 scan 方法,其实本质上就是读文件,然后把文件的每一行的数据和上述获取的 fileType 进行匹配。
@Override
    public Enumerable scan(DataContext root) {
        JavaTypeFactory typeFactory = root.getTypeFactory();
        final List fieldTypes = getFieldTypes(typeFactory);
        final List fields = ImmutableIntList.identity(fieldTypes.size());
        final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
        return new AbstractEnumerable<@Nullable Object[]>() {
            @Override
            public Enumerator<@Nullable Object[]> enumerator() {
                //返回我们自定义的读取数据的类
                return new CsvEnumerator<>(source, cancelFlag, falsenull,
                        CsvEnumerator.arrayConverter(fieldTypes, fields, false));
            }
        };
    }
 
 
 public CsvEnumerator(Source source, AtomicBoolean cancelFlag, boolean stream,
                         @Nullable String @Nullable [] filterValues, RowConverter rowConverter)
 
{
        this.cancelFlag = cancelFlag;
        this.rowConverter = rowConverter;
        this.filterValues = filterValues == null ? null
                : ImmutableNullableList.copyOf(filterValues);
        try {
 
            this.reader = openCsv(source);
            //跳过第一行,因为第一行是定义类型和名称的
            this.reader.readNext(); // skip header row
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
//CsvEnumerator必须实现calcit自己的迭代器,里面有current、moveNext方法,current是返回当前游标所在的数据记录,moveNext是将游标指向下一个记录,官网中自己定义了一个类型转换器,是将csv文件中的数据转换成文件头指定的类型,这个需要我们自己来实现
     @Override
    public E current() {
        return castNonNull(current);
    }
 
    @Override
    public boolean moveNext() {
        try {
            outer:
            for (; ; ) {
                if (cancelFlag.get()) {
                    return false;
                }
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                if (filterValues != null) {
                    for (int i = 0; i                         String filterValue = filterValues.get(i);
                        if (filterValue != null) {
                            if (!filterValue.equals(strings[i])) {
                                continue outer;
                            }
                        }
                    }
                }
                current = rowConverter.convertRow(strings);
                return true;
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
 
        protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
            if (fieldType == null || string == null) {
                return string;
            }
            switch (fieldType.getSqlTypeName()) {
                case BOOLEAN:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Boolean.parseBoolean(string);
                case TINYINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Byte.parseByte(string);
                case SMALLINT:
                    if (string.length() == 0) {
                        return null ;
                    }
                    return Short.parseShort(string);
                case INTEGER:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Integer.parseInt(string);
                case BIGINT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Long.parseLong(string);
                case FLOAT:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Float.parseFloat(string);
                case DOUBLE:
                    if (string.length() == 0) {
                        return null;
                    }
                    return Double.parseDouble(string);
                case DECIMAL:
                    if (string.length() == 0) {
                        return null;
                    }
                    return parseDecimal(fieldType.getPrecision(), fieldType.getScale(), string);
                case DATE:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_DATE.parse(string);
                        return (int) (date.getTime() / DateTimeUtils.MILLIS_PER_DAY);
                    } catch (ParseException e) {
                        return null;
                    }
                case TIME:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIME.parse(string);
                        return (int) date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case TIMESTAMP:
                    if (string.length() == 0) {
                        return null;
                    }
                    try {
                        Date date = TIME_FORMAT_TIMESTAMP.parse(string);
                        return date.getTime();
                    } catch (ParseException e) {
                        return null;
                    }
                case VARCHAR:
                default:
                    return string;
            }
        }

6. 最后

至此我们需要准备的东西:库、表名称、字段名称、字段类型都有了,接下来我们去写我们的 SQL 语句查询我们的数据文件。

创建好几个测试的数据文件,例如上面项目结构中我创建 2 个 csv 文件 USERINFO.csv ASSET.csv ,然后创建测试类。

这样跑起来,就可以通过 SQL 语句的方式直接查询数据了。

public class Test {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(Test.class.getResource("/model.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo "));
 
            print(statement.executeQuery(" select age from userinfo where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo where name like 'a%' "));
        } finally {
            connection.close();
        }
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i                     System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查询结果:

这里在测试的时候踩到2个坑,大家如果自己实验的时候可以避免下。

  1. Calcite 默认会把你的 SQL 语句中的表名和类名全部转换为大写,因为默认的 csv(其他文件也一样)文件的名称就是表名,除非你自定义规则,所以你的文件名要写成大写。

  2. Calcite 有一些默认的关键字不能用作表名,不然会查询失败,比如我刚开始定的 user.csv 就一直查不出来,改成 USERINFO 就可以了,这点和 Mysql 的内置关键字差不多,也可以通过个性化配置去改。

演示Mysql

  1. 首先,还是先准备 Calcite 需要的东西:库、表名称、字段名称、字段类型。

如果数据源使用 Mysql 的话,这些都不用我们去 JAVA 服务中去定义,直接在 Mysql 客户端创建好,这里直接创建两张表用于测试,就和我们的 csv 文件一样。

CREATE TABLE `USERINFO1` (
  `NAME` varchar(255CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `AGE` int DEFAULT  NULL
ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

CREATE TABLE `ASSET` (
  `NAME` varchar(255CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL,
  `MONEY` varchar(255CHARACTER SET utf8mb3 COLLATE utf8_general_ci DEFAULT NULL
ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
  1. 上述 csv 案例中的 SchemaFactory 以及 Schema 这些都不需要创建,因为 Calcite 默认提供了 Mysql 的 Adapter适配器。

  2. 其实,上述两步都不需要做,我们真正要做的是,告诉 Calcite 你的 JDBC 的连接信息就行了,也是在 model.json 文件中定义。

{
  "version""1.0",
  "defaultSchema""Demo",
  "schemas": [
    {
      "name""Demo",
      "type""custom",
    //  这里是calcite默认的SchemaFactory,里面的流程和我们上述自己定义的相同,下面会简单看看源码。
      "factory""org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
      "operand": {
        //  我用的是mysql8以上版本,所以这里注意包的名称
        "jdbcDriver""com.mysql.cj.jdbc.Driver",
        "jdbcUrl""jdbc:mysql://localhost:3306/irving",
        "jdbcUser""root",
        "jdbcPassword""123456"
      }
    }
  ]
}
  1. 在项目中引入 Mysql 的驱动包

  mysql
  mysql-connector-java
  8.0.30

  1. 写好测试类,这样直接就相当于完成了所有的功能了。
public class TestMysql {
    public static void main(String[] args) throws SQLException {
        Connection connection = null;
        Statement statement = null;
        try {
            Properties info = new Properties();
            info.put("model", Sources.of(TestMysql.class.getResource("/mysqlmodel.json")).file().getAbsolutePath());
            connection = DriverManager.getConnection("jdbc:calcite:", info);
            statement = connection.createStatement();
            statement.executeUpdate(" insert into  userinfo1 values ('xxx',12) ");
            print(statement.executeQuery("select * from asset "));
 
            print(statement.executeQuery(" select * from userinfo1 "));
 
            print(statement.executeQuery(" select age from userinfo1 where name ='aixiaoxian' "));
 
            print(statement.executeQuery(" select * from userinfo1 where age >60 "));
 
            print(statement.executeQuery(" select * from userinfo1 where name like 'a%' "));
        } finally {
            connection.close();
        }
 
    }
 
    private static void print(ResultSet resultSet) throws SQLException {
        final ResultSetMetaData metaData = resultSet.getMetaData();
        final int columnCount = metaData.getColumnCount();
        while (resultSet.next()) {
            for (int i = 1; ; i++) {
                System.out.print(resultSet.getString(i));
                if (i                     System.out.print(", ");
                } else {
                    System.out.println();
                    break;
                }
            }
        }
    }
}

查询结果:

Mysql实现原理

上述我们在 model.json 文件中指定了 org.apache.calcite.adapter.jdbc.JdbcSchema$Factory 类,可以看下这个类的代码。

这个类是把 Factory Schema 写在了一起,其实就是调用 schemafactory 类的 create 方法创建一个 schema 出来,和我们上面自定义的流程是一样的。

其中 JdbcSchema 类也是 Schema 的子类,所以也会实现 getTable 方法(这个我们上述也实现了,我们当时是获取表结构和表的字段类型以及名称,是从csv文件头中读文件的), JdbcSchema 的实现是通过连接 Mysql 服务端查询元数据信息,再将这些信息封装成 Calcite 需要的对象格式。

这里同样要注意 csv 方式的2个注意点,大小写和关键字问题。

public static JdbcSchema create(
      SchemaPlus parentSchema,
      String name,
      Map operand)
 
{
    DataSource dataSource;
    try {
      final String dataSourceName = (String) operand.get("dataSource");
      if (dataSourceName != null) {
        dataSource =
            AvaticaUtils.instantiatePlugin(DataSource.classdataSourceName);
      } else {
        //会走在这里来,这里就是我们在model.json中指定的jdbc的连接信息,最终会创建一个datasource
        final String jdbcUrl = (String) requireNonNull(operand.get("jdbcUrl"), "jdbcUrl");
        final String jdbcDriver = (String) operand.get("jdbcDriver");
        final String jdbcUser = (String) operand.get("jdbcUser");
        final String jdbcPassword = (String) operand.get("jdbcPassword");
        dataSource = dataSource(jdbcUrl, jdbcDriver, jdbcUser, jdbcPassword);
      }
    } catch (Exception e) {
      throw new RuntimeException("Error while reading dataSource", e);
    }
    String jdbcCatalog = (String) operand.get("jdbcCatalog");
    String jdbcSchema = (String) operand.get("jdbcSchema");
    String sqlDialectFactory = (String) operand.get("sqlDialectFactory");
 
    if (sqlDialectFactory == null || sqlDialectFactory.isEmpty()) {
      return JdbcSchema.create(
          parentSchema, name, dataSource, jdbcCatalog, jdbcSchema);
    } else {
      SqlDialectFactory factory = AvaticaUtils.instantiatePlugin(
          SqlDialectFactory.classsqlDialectFactory);
      return JdbcSchema.create(
          parentSchema, name, dataSource, factory, jdbcCatalog, jdbcSchema);
    }
  }
 
  @Override public @Nullable Table getTable(String name) {
    return getTableMap(false).get(name);
  }
 
  private synchronized ImmutableMap getTableMap(
      boolean force)
 
{
    if (force || tableMap == null) {
      tableMap = computeTables();
    }
    return tableMap;
  }
 
  private ImmutableMap computeTables() {
    Connection connection = null;
    ResultSet resultSet = null;
    try {
      connection = dataSource.getConnection();
      final Pair<@Nullable String, @Nullable String> catalogSchema = getCatalogSchema(connection);
      final String catalog = catalogSchema.left;
      final String schema = catalogSchema.right;
      final Iterable tableDefs;
      Foo threadMetadata = THREAD_METADATA.get();
      if (threadMetadata != null) {
        tableDefs = threadMetadata.apply(catalog, schema);
      } else {
        final List tableDefList = new ArrayList<>();
        //  获取元数据
        final DatabaseMetaData metaData = connection.getMetaData();
        resultSet = metaData.getTables(catalog, schema, nullnull);
        while (resultSet.next()) {
        //获取库名,表明等信息
          final String catalogName = resultSet.getString(1);
          final String schemaName = resultSet.getString(2);
          final String tableName = resultSet.getString(3);
          final String tableTypeName = resultSet.getString(4);
          tableDefList.add(
              new MetaImpl.MetaTable(catalogName, schemaName, tableName,
                  tableTypeName));
        }
        tableDefs = tableDefList;
      }
 
      final ImmutableMap.Builder builder =
          ImmutableMap.builder();
      for (MetaImpl.MetaTable tableDef : tableDefs) {
        final String tableTypeName2 =
            tableDef.tableType == null
            ? null
            : tableDef.tableType.toUpperCase(Locale.ROOT).replace(' ''_');
        final TableType tableType =
            Util.enumVal(TableType.OTHER, tableTypeName2);
        if (tableType == TableType.OTHER  && tableTypeName2 != null) {
          System.out.println("Unknown table type: " + tableTypeName2);
        }
        //  最终封装成JdbcTable对象
        final






请到「今天看啥」查看全文