其实,关于UDF这部分官方文档就写的挺好的,简单明了,而且配有DEMO,有兴趣的同学,可以到 参考文档 里去找到连接。
首先,如果想使用自定义函数,那么必须在之前来注册这个函数,使用TableEnvironment的registerFunction()方法来注册。注册之后自定义函数会被插入到TableEnvironment的函数目录中,以便API或SQL正确解析并执行它。在 Flink 中,UDF分为三类:标量函数(ScalarFunction)、表函数(``TableFunction) 、聚合函数(``AggregateFunction)。
标量函数(ScalarFunction)
简单的说,标量函数,就是你输入几个数(0个或几个都行),经过一系列的处理,再返回给你几个数,这个案例咱们还使用上一篇文章中使用的意甲射手榜的案例,一般来说,总进球数=主场进球数+客场进球数,但是今年的规则有变,客场进球按两个球计算(本文案例和前文有区别,使用scala,大家注意一下)。
import org.apache.flink.table.functions.ScalarFunction
class TotalScores extends ScalarFunction{
private var wight:Int = 1 ;
def this(wight:Int){
this()
this.wight = wight
}
def eval(home:Int,visit:Int): Int = home+visit*this.wight
}
首先,需要继承ScalarFunction该类,这里我们添加了一个构造器,传入的参数作为客场进球权重,然后实现eval方法,输入参数为主客场进球数,输出则为总进球数。
接下来,我们来写测试类:
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
object TestScalarFunction {
def main(args: Array[String]): Unit = {
val filePath = “E:\\devlop\\workspace\\streaming1\\src\\main\\resources\\testdata.csv”
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvtable = CsvTableSource
.builder
.path(filePath)
.ignoreFirstLine
.fieldDelimiter(“,”)
.field(“rank”, Types.INT)
.field(“player”, Types.STRING)
.field(“club”, Types.STRING)
.field(“matches”, Types.INT)
.field(“red_card”, Types.INT)
.field(“total_score”, Types.INT)
.field(“total_score_home”, Types.INT)
.field(“total_score_visit”, Types.INT)
.field(“pass”, Types.INT)
.field(“shot”, Types.INT)
.build
tableEnv.registerTableSource(“goals”, csvtable)
tableEnv.registerFunction(“ts”,new TotalScores(2))
val tableTest = tableEnv.sqlQuery(“select player,total_score_home,total_score_visit,ts(total_score_home,total_score_visit) from goals where total_score > 10”)//.scan(“test”).where(“id=‘5’”).select(“id,sources,targets”)
tableEnv.toDataSet[Row](tableTest).print()
}
}
首先别忘记引用
import org.apache.flink.api.scala._
否则会有奇怪事情发生。
然后,注册函数,默认构造客场进球权重为2
tableEnv.registerFunction(“ts”,new TotalScores(2))
“select player,total_score_home,total_score_visit,ts(total_score_home,total_score_visit) from goals where total_score > 10”
在SQL中使用函数 ts(total_score_home,total_score_visit) 就这么简单
我们来看下输出:
C-罗纳尔多,5,7,19
夸利亚雷拉,5,5,15
萨帕塔,1,4,9
米利克,0,1,2
皮亚特克,2,0,2
因莫比莱,3,3,9
卡普托,2,4,10
表函数(TableFunction)
简单的说,表函数,就是你输入几个数(0个或几个都行),经过一系列的处理,再返回给你行数,返回的行可以包含一列或是多列值。这里我们使用一套新的数据案例来做一个说明。
假设这是某年四个直辖市四个季度GDP的一张透视表(说到透视表,想了解的同学可以异步到我之前的 文章 去看看)
provice,s1,s2,s3,s4
天津,10,11,13,14
北京,13,16,17,18
重庆,14,12,13,14
上海,15,11,15,17
我们来将这张透视表,还原成一张列表,接下来,我们来看代码
import org.apache.flink.table.functions.TableFunction
class UnPivotFunction(separator: String) extends TableFunction[(String)] {
@scala.annotation.varargs
def eval(strs:String*): Unit = {
strs.foreach(x=>collect(x))
}
}
函数要继承TableFunction,后面泛型需要输入返回列的类型,这里为了方便,我们就使用了字符串。我们计划在查询里面把四个季度的值都输入进来,转换成列表。collect是TableFunction提供的函数,用于添加列,eval方法的参数,可以根据你的需要自行扩展,注意在使用不确定参数值的时候,加上注解@scala.annotation.varargs
接下来,我们来测试一下
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Row
import wang.datahub.udf.UnPivotFunction
object TestMyTableFunction2 {
def main(args: Array[String]): Unit = {
val filepath = “E:\\devlop\\workspace\\testsbtflink\\src\\main\\resources\\GDP.csv”
val env = ExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv.registerFunction(“mtf2”, new UnPivotFunction(“@”))
val cts = CsvTableSource.builder().ignoreFirstLine()
//provice,s1,s2,s3,s4
.field(“provice”,Types.STRING)
.field(“s1”,Types.STRING)
.field(“s2”,Types.STRING)
.field(“s3”,Types.STRING)
.field(“s4”,Types.STRING)
.path(filepath)
.build()
tableEnv.registerTableSource(“m”,cts)
val tableTest = tableEnv.sqlQuery(“select provice,word from m , LATERAL TABLE(mtf2(s1,s2,s3,s4)) as T(word)”)
val stream = tableEnv.toDataSet[Row](tableTest)
stream.print()
}
}
在SQL我使用了 JOIN LATERAL ,有兴趣了解的同学,可以看下云栖的文章,我放在参考文档里了。
我们来看下输出结果:
天津,10
天津,11
小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。
深知大多数初中级Java工程师,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!
因此收集整理了一份《2024年最新Java开发全套学习资料》送给大家,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。
由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频
如果你觉得这些内容对你有帮助,可以添加下面V无偿领取!(备注Java)
总结
上述知识点,囊括了目前互联网企业的主流应用技术以及能让你成为“香饽饽”的高级架构知识,每个笔记里面几乎都带有实战内容。
很多人担心学了容易忘,这里教你一个方法,那就是重复学习。
打个比方,假如你正在学习 spring 注解,突然发现了一个注解@Aspect,不知道干什么用的,你可能会去查看源码或者通过博客学习,花了半小时终于弄懂了,下次又看到@Aspect 了,你有点郁闷了,上次好像在哪哪哪学习,你快速打开网页花了五分钟又学会了。
从半小时和五分钟的对比中可以发现多学一次就离真正掌握知识又近了一步。
人的本性就是容易遗忘,只有不断加深印象、重复学习才能真正掌握,所以很多书我都是推荐大家多看几遍。哪有那么多天才,他只是比你多看了几遍书。
内容对你有帮助,可以添加下面V无偿领取!(备注Java)**
[外链图片转存中…(img-v7nIjDP5-1711198876541)]
总结
上述知识点,囊括了目前互联网企业的主流应用技术以及能让你成为“香饽饽”的高级架构知识,每个笔记里面几乎都带有实战内容。
很多人担心学了容易忘,这里教你一个方法,那就是重复学习。
打个比方,假如你正在学习 spring 注解,突然发现了一个注解@Aspect,不知道干什么用的,你可能会去查看源码或者通过博客学习,花了半小时终于弄懂了,下次又看到@Aspect 了,你有点郁闷了,上次好像在哪哪哪学习,你快速打开网页花了五分钟又学会了。
从半小时和五分钟的对比中可以发现多学一次就离真正掌握知识又近了一步。
[外链图片转存中…(img-V8iijqHY-1711198876542)]
人的本性就是容易遗忘,只有不断加深印象、重复学习才能真正掌握,所以很多书我都是推荐大家多看几遍。哪有那么多天才,他只是比你多看了几遍书。
本文已被CODING开源项目:【一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码】收录