1.EnvironmentSettings类分析 

   这个类是初始化表环境参数的,这些参数仅在创建的时候设置,实例化之后无法改变  

成员变量
    //流模型
    public static final String STREAMING_MODE = "streaming-mode";
    //类的名称
	public static final String CLASS_NAME = "class-name";
    //默认的目录
	public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
    //默认的数据库
	public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
    //计划类
    private final String plannerClass;
    //执行类
    private final String executorClass;
    //构建的目录名称
    private final String builtInCatalogName;
    //构建的数据库名称
    private final String builtInDatabaseName;
    //是流的模式
    private final boolean isStreamingMode;
EnvironmentSettings 的方法
public static Builder newInstance() {return new Builder();}
public String getBuiltInCatalogName() {return builtInCatalogName;}
public String getBuiltInDatabaseName() {return builtInDatabaseName;}
public boolean isStreamingMode() {return isStreamingMode;}
//HashMap 存放是流模型的话讲类名和计划类放入kv 中
public Map<String, String> toPlannerProperties() {
		Map<String, String> properties = new HashMap<>(toCommonProperties());
		if (plannerClass != null) {
			properties.put(CLASS_NAME, plannerClass);
		}
		return properties;
	}
//HashMap 存放是流模型的话讲类名和执行类放入kv 中
public Map<String, String> toExecutorProperties() {
		Map<String, String> properties = new HashMap<>(toCommonProperties());
		if (executorClass != null) {
			properties.put(CLASS_NAME, executorClass);
		}
		return properties;
	}
//放回存放的Map 是否是流处理模式
private Map<String, String> toCommonProperties() {
		Map<String, String> properties = new HashMap<>();
		properties.put(STREAMING_MODE, Boolean.toString(isStreamingMode));
		return properties;
	}
EnvironmentSettings 类的成员方法是私有化的,因此无法通过new一个EnvironmentSettings 来创建对象,
但是flink提供了一个newInstance方法可以创建一个Builder的对象  

2.Builder 类

成员变量
//存放着 计划和执行类的全类名    默认是流处理模式
        private static final String OLD_PLANNER_FACTORY ="org.apache.flink.table.planner.StreamPlannerFactory";
		private static final String OLD_EXECUTOR_FACTORY = "org.apache.flink.table.executor.StreamExecutorFactory";
		private static final String BLINK_PLANNER_FACTORY = "org.apache.flink.table.planner.delegation.BlinkPlannerFactory";
		private static final String BLINK_EXECUTOR_FACTORY = "org.apache.flink.table.planner.delegation.BlinkExecutorFactory";

		private String plannerClass = OLD_PLANNER_FACTORY;
		private String executorClass = OLD_EXECUTOR_FACTORY;
		private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
		private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
		private boolean isStreamingMode = true;
方法 
// 使用旧的执行计划 默认的
        public Builder useOldPlanner() {
			this.plannerClass = OLD_PLANNER_FACTORY;
			this.executorClass = OLD_EXECUTOR_FACTORY;
			return this;
		}

//使用blink计划   可设置
		public Builder useBlinkPlanner() {
			this.plannerClass = BLINK_PLANNER_FACTORY;
			this.executorClass = BLINK_EXECUTOR_FACTORY;
			return this;
		}

//不设置 计划类    和执行类 都等于null 
		public Builder useAnyPlanner() {
			this.plannerClass = null;
			this.executorClass = null;
			return this;
		}

//设置为批处理模式
		public Builder inBatchMode() {
			this.isStreamingMode = false;
			return this;
		}

//设置为流处理模式  默认的
		public Builder inStreamingMode() {
			this.isStreamingMode = true;
			return this;
		}

//设置目录名称
		public Builder withBuiltInCatalogName(String builtInCatalogName) {
			this.builtInCatalogName = builtInCatalogName;
			return this;
		}

//设置数据库的名称
		public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
			this.builtInDatabaseName = builtInDatabaseName;
			return this;
		}

//创建  一个 EnvironmentSettings 类的实例化
		public EnvironmentSettings build() {
			return new EnvironmentSettings(
				plannerClass,
				executorClass,
				builtInCatalogName,
				builtInDatabaseName,
				isStreamingMode);
		}
	}

 

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐