大数据技术之Flink


    数技术Flink
    第章 Flink简介
    11 初识Flink
    Flink起源Stratosphere项目Stratosphere2010~2014年3处柏林学欧洲学进行研究项目2014年4月Stratosphere代码复制捐赠Apache软件基金会参加孵化项目初始成员Stratosphere系统核心开发员2014年12月Flink跃成Apache软件基金会顶级项目
    德语中Flink词表示快速灵巧项目采松鼠彩色图案作logo仅松鼠具快速灵巧特点柏林松鼠种迷红棕色Flink松鼠logo拥爱尾巴尾巴颜色Apache软件基金会logo颜色相呼应说Apache风格松鼠

    Flink Logo
    Flink项目理念:Apache Flink分布式高性时准确流处理应程序造开源流处理框架
    Apache Flink框架分布式处理引擎界界数流进行状态计算Flink设计常见集群环境中运行存执行速度意规模执行计算

    12 Flink重特点
    121 事件驱动型(Eventdriven)
    事件驱动型应类具状态应事件流提取数根事件触发计算状态更新外部动作较典型kafka代表消息队列事件驱动型应
    SparkStreaming微批次图:

    事件驱动型:

    122 流批世界观
    批处理特点界持久量非常适合需访问全套记录完成计算工作般离线统计
    流处理特点界实时 需针整数集执行操作通系统传输数项执行操作般实时统计
    spark世界观中切批次组成离线数批次实时数限批次组成
    flink世界观中切流组成离线数界限流实时数没界限流谓界流界流
    界数流:界数流开始没结束会生成时终止提供数必须连续处理界流说必须获取立处理event界数流法等数达输入界时间点会完成处理界数通常求特定序(例事件发生序)获取event便够推断结果完整性
    界数流:界数流明确定义开始结束执行计算前通获取数处理界流处理界流需序获取始终界数集进行排序界流处理称批处理

    种流世界观架构获处具极低延迟
    123 分层api

    底层级抽象仅仅提供状态流通程函数(Process Function)嵌入DataStream API中底层程函数(Process Function) DataStream API 相集成某特定操作进行底层抽象允许户处理数流事件致容错状态外户注册事件时间处理时间回调程序处理复杂计算
    实际数应需述底层抽象针核心API(Core APIs) 进行编程DataStream API(界界流数)DataSet API(界数集)API数处理提供通构建模块户定义种形式转换(transformations)连接(joins)聚合(aggregations)窗口操作(windows)等等DataSet API 界数集提供额外支持例循环迭代API处理数类型类(classes)形式编程语言表示
    Table API 表中心声明式编程中表会动态变化(表达流数时)Table API遵循(扩展)关系模型:表二维数结构(schema)(类似关系数库中表)时API提供较操作例selectprojectjoingroupbyaggregate等Table API程序声明式定义什逻辑操作应该执行准确确定操作代码
    Table API通种类型户定义函数(UDF)进行扩展核心API更具表达力起更加简洁(代码量更少)外Table API程序执行前会置优化器进行优化
    表 DataStreamDataSet 间缝切换允许程序 Table API DataStream DataSet 混合
    Flink提供高层级抽象 SQL 层抽象语法表达力 Table API 类似SQL查询表达式形式表现程序SQL抽象Table API交互密切时SQL查询直接Table API定义表执行
    目前Flink作批处理流Spark成熟DataSetFlink Table APIFlink SQL完善厂商定制学DataStream API实际Flink作接Google DataFlow模型实现流批统观点基DataStream
    Flink模块
    l Flink Table & SQL(没开发完)
    l Flink Gelly(图计算)
    l Flink CEP(复杂事件处理)

    第二章 快速手
    21 搭建maven工程 FlinkTutorial
    211 pom文件


    xmlnsxsihttpwwww3org2001XMLSchemainstance
    xsischemaLocationhttpmavenapacheorgPOM400 httpmavenapacheorgxsdmaven400xsd>
    400
    comatguiguflink
    FlinkTutorial
    10SNAPSHOT



    orgapacheflink
    flinkscala_211
    1100

    < httpsmvnrepositorycomartifactorgapacheflinkflinkstreamingscala >

    orgapacheflink
    flinkstreamingscala_211
    1100





    < 该插件Scala代码编译成class文件 >

    netalchim31maven
    scalamavenplugin
    346


    < 声明绑定mavencompile阶段 >

    compile





    orgapachemavenplugins
    mavenassemblyplugin
    300


    jarwithdependencies




    makeassembly
    package

    single








    212 添加scala框架 scala文件夹

    22 批处理wordcount
    srcmainscalacomatguiguwcWordCountscala
    object WordCount {
    def main(args Array[String]) Unit {
    创建执行环境
    val env ExecutionEnvironmentgetExecutionEnvironment
    文件中读取数
    val inputPath D\\Projects\\BigData\\TestWC1\\src\\main\\resources\\hellotxt
    val inputDS DataSet[String] envreadTextFile(inputPath)
    分词单词进行groupby分组然sum进行聚合
    val wordCountDS AggregateDataSet[(String Int)] inputDSflatMap(_split( ))map((_ 1))groupBy(0)sum(1)

    印输出
    wordCountDSprint()
    }
    }
    注意:Flink程序支持javascala两种语言课程中scala语言引入包中javascala两种包时注意scala包
    23 流处理 wordcount
    srcmainscalacomatguiguwcStreamWordCountscala
    object StreamWordCount {
    def main(args Array[String]) Unit {
    外部命令中获取参数
    val params ParameterTool ParameterToolfromArgs(args)
    val host String paramsget(host)
    val port Int paramsgetInt(port)

    创建流处理环境
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    接收socket文流
    val textDstream DataStream[String] envsocketTextStream(host port)

    flatMapMap需引隐式转换
    import orgapacheflinkapiscala_
    val dataStream DataStream[(String Int)] textDstreamflatMap(_split(\\s))filter(_nonEmpty)map((_ 1))keyBy(0)sum(1)

    dataStreamprint()setParallelism(1)

    启动executor执行务
    envexecute(Socket stream word count)
    }
    }
    测试——linux系统中netcat命令进行发送测试
    nc lk 7777

    第三章 Flink部署
    31 Standalone模式
    311 安装
    解压缩 flink1100binscala_211tgz进入conf目录中
    1)修改 flinkconfflinkconfyaml 文件:

    2)修改 confslaves文件:

    3)分发外两台机子:

    4)启动:

    访问httplocalhost8081flink集群务进行监控理



    312 提交务

    1) 准备数文件

    2) 含数文件文件夹分发taskmanage 机器中


    读取数磁盘读取实际务会分发taskmanage机器中目标文件分发

    3) 执行程序
    flink run c comatguiguwcStreamWordCount –p 2 FlinkTutorial10SNAPSHOTjarwithdependenciesjar host lcoalhost –port 7777


    4) 目标文件夹中查计算结果
    注意:计算结果根会保存taskmanage机器会jobmanage

    5) webui控制台查计算程



    32 Yarn模式
    Yarn模式部署Flink务时求FlinkHadoop支持版Hadoop环境需保证版22集群中安装HDFS服务
    321 Flink on Yarn
    Flink提供两种yarn运行模式分SessionClusterPerJobCluster模式
    1) Sessioncluster 模式:

    SessionCluster模式需先启动集群然提交作业接着会yarn申请块空间资源永远保持变果资源满作业法提交等yarn中中作业执行完成释放资源作业会正常提交作业享DispatcherResourceManager享资源适合规模执行时间短作业
    yarn中初始化flink集群开辟指定资源提交务里提交flink集群会常驻yarn集群中非手工停止
    2) PerJobCluster 模式:

    Job会应集群提交作业会根身情况会单独yarn申请资源直作业执行完成作业失败否会影响作业正常提交运行独享DispatcherResourceManager需接受资源申请适合规模长时间运行作业
    次提交会创建新flink集群务间互相独立互影响方便理务执行完成创建集群会消失
    322 Session Cluster
    1) 启动hadoop集群(略)
    2) 启动yarnsession

    yarnsessionsh n 2 s 2 jm 1024 tm 1024 nm test d
    中:
    n(container):TaskManager数量
    s(slots): TaskManagerslot数量默认slotcore默认taskmanagerslot数1时taskmanager做冗余
    jm:JobManager存(单位MB)
    tm:taskmanager存(单位MB)
    nm:yarn appName(现yarnui名字)
    d:台执行


    3) 执行务
    flink run c comatguiguwcStreamWordCount FlinkTutorial10SNAPSHOTjarwithdependenciesjar host lcoalhost –port 7777

    4) yarn控制台查务状态

    5) 取消yarnsession
    yarn application kill application_1577588252906_0001
    322 Per Job Cluster
    1) 启动hadoop集群(略)
    2) 启动yarnsession直接执行job
    flink run –m yarncluster c comatguiguwcStreamWordCount FlinkTutorial10SNAPSHOTjarwithdependenciesjar host lcoalhost –port 7777
    33 Kubernetes部署
    容器化部署时目前业界流行项技术基Docker镜运行够户更加方便应进行理运维容器理工具中流行Kubernetes(k8s)Flink版中支持k8s部署模式
    1) 搭建Kubernetes集群(略)
    2) 配置组件yaml文件
    k8s构建Flink Session Cluster需Flink集群组件应docker镜分k8s启动包括JobManagerTaskManagerJobManagerService三镜服务镜服务中央镜仓库中获取
    3)启动Flink Session Cluster
    启动jobmanagerservice 服务
    kubectl create f jobmanagerserviceyaml
    启动jobmanagerdeployment服务
    kubectl create f jobmanagerdeploymentyaml
    启动taskmanagerdeployment服务
    kubectl create f taskmanagerdeploymentyaml
    4)访问Flink UI页面
    集群启动通JobManagerServicers中配置WebUI端口浏览器输入url访问Flink UI页面:
    http{JobManagerHostPort}apiv1namespacesdefaultservicesflinkjobmanageruiproxy

    第四章 Flink运行架构
    41 Flink运行时组件
    Flink运行时架构包括四组件会运行流处理应程序时协工作:作业理器(JobManager)资源理器(ResourceManager)务理器(TaskManager)分发器(Dispatcher)FlinkJavaScala实现组件会运行Java虚拟机组件职责:
    l 作业理器(JobManager)
    控制应程序执行进程说应程序会JobManager控制执行JobManager会先接收执行应程序应程序会包括:作业图(JobGraph)逻辑数流图(logical dataflow graph)包类库资源JAR包JobManager会JobGraph转换成物理层面数流图图做执行图(ExecutionGraph)包含发执行务JobManager会资源理器(ResourceManager)请求执行务必资源务理器(TaskManager)插槽(slot)旦获取足够资源会执行图分发真正运行TaskManager运行程中JobManager会负责需中央协调操作说检查点(checkpoints)协调
    l 资源理器(ResourceManager)
    负责理务理器(TaskManager)插槽(slot)TaskManger插槽Flink中定义处理资源单元Flink环境资源理工具提供资源理器YARNMesosK8sstandalone部署JobManager申请插槽资源时ResourceManager会空闲插槽TaskManager分配JobManager果ResourceManager没足够插槽满足JobManager请求资源提供台发起会话提供启动TaskManager进程容器外ResourceManager负责终止空闲TaskManager释放计算资源
    l 务理器(TaskManager)
    Flink中工作进程通常Flink中会TaskManager运行TaskManager包含定数量插槽(slots)插槽数量限制TaskManager够执行务数量启动TaskManager会资源理器注册插槽收资源理器指令TaskManager会者插槽提供JobManager调JobManager插槽分配务(tasks)执行执行程中TaskManager运行应程序TaskManager交换数
    l 分发器(Dispatcher)
    跨作业运行应提交提供REST接口应提交执行时分发器会启动应移交JobManagerREST接口Dispatcher作集群HTTP接入点样够受防火墙阻挡Dispatcher会启动Web UI方便展示监控作业执行信息Dispatcher架构中必需取决应提交运行方式
    42 务提交流程
    应提交执行时Flink组件交互协作:


    图 务提交组件交互流程

    图较高层级视角应中组件交互协作果部署集群环境(例YARNMesosKubernetesstandalone等)中步骤省略组件会运行JVM进程中
    具体果Flink集群部署YARN会提交流程:

    图 Yarn模式务提交流程
    Flink务提交ClientHDFS传FlinkJar包配置Yarn ResourceManager提交务ResourceManager分配Container资源通知应NodeManager启动ApplicationMasterApplicationMaster启动加载FlinkJar包配置构建环境然启动JobManagerApplicationMasterResourceManager申请资源启动TaskManagerResourceManager分配Container资源ApplicationMaster通知资源节点NodeManager启动TaskManagerNodeManager加载FlinkJar包配置构建环境启动TaskManagerTaskManager启动JobManager发送心跳包等JobManager分配务
    43 务调度原理

    图 务调度原理
    客户端运行时程序执行部分准备发送dataflow(JobGraph)Master(JobManager)然客户端断开连接者维持连接等接收计算结果

    Flink 集群启动首先会启动 JobManger TaskManager Client 提交务 JobManagerJobManager 调度务 TaskManager 执行然 TaskManager 心跳统计信息汇报 JobManagerTaskManager 间流形式进行数传输述三者均独立 JVM 进程
    Client 提交 Job 客户端运行机器( JobManager 环境连通)提交 Job Client 结束进程(Streaming务)结束等结果返回
    JobManager 负责调度 Job 协调 Task 做 checkpoint职责 Storm Nimbus Client 处接收 Job JAR 包等资源会生成优化执行计划 Task 单元调度 TaskManager 执行
    TaskManager 启动时候设置槽位数(Slot) slot 启动 TaskTask 线程 JobManager 处接收需部署 Task部署启动游建立 Netty 连接接收数处理
    431 TaskMangerSlots
    Flink中worker(TaskManager)JVM进程会独立线程执行+subtask控制worker接收少taskworker通task slot进行控制(worker少task slot)
    task slot表示TaskManager拥资源固定子集假TaskManager三slot会理存分成三份slot资源slot化意味着subtask需jobsubtask竞争理存取代拥定数量存储备需注意里会涉CPU隔离slot目前仅仅隔离task受理存
    通调整task slot数量允许户定义subtask间互相隔离果TaskManagerslot意味着task group运行独立JVM中(该JVM通特定容器启动)TaskManagerslot意味着更subtask享JVMJVM进程中task享TCP连接(基路复)心跳消息享数集数结构减少task负载

    图 TaskManagerSlot

    图 子务享Slot

    默认情况Flink允许子务享slot务子务(前提job) 样结果slot保存作业整道
    Task Slot静态概念指TaskManager具发执行力通参数taskmanagernumberOfTaskSlots进行配置行度parallelism动态概念TaskManager运行程序时实际发力通参数parallelismdefault进行配置
    说假设3TaskManagerTaskManager中分配3TaskSlotTaskManager接收3task9TaskSlot果设置parallelismdefault1运行程序默认行度19TaskSlot18空闲设置合适行度提高效率



    432 程序数流(DataFlow)

    Flink程序三部分组成: Source TransformationSink
    Source负责读取数源Transformation利种算子进行处理加工Sink负责输出
    运行时Flink运行程序会映射成逻辑数流(dataflows)包含三部分dataflowsources开始sinks结束dataflow类似意环图(DAG)部分情况程序中转换运算(transformations)dataflow中算子(operator)应关系时候transformation应operator

    图 程序数流
    433 执行图(ExecutionGraph)
    Flink程序直接映射成数流图StreamGraph称逻辑流图表示计算逻辑高级视图执行流处理程序Flink需逻辑流图转换物理数流图(执行图)详细说明程序执行方式

    Flink 中执行图分成四层:StreamGraph > JobGraph > ExecutionGraph > 物理执行图
    StreamGraph:根户通 Stream API 编写代码生成初图表示程序拓扑结构
    JobGraph:StreamGraph优化生成 JobGraph提交 JobManager 数结构优化符合条件节点 chain 起作节点样减少数节点间流动需序列化反序列化传输消耗
    ExecutionGraph:JobManager 根 JobGraph 生成ExecutionGraphExecutionGraphJobGraph行化版调度层核心数结构
    物理执行图:JobManager 根 ExecutionGraph Job 进行调度TaskManager 部署 Task 形成图具体数结构

    434 行度(Parallelism)
    Flink程序执行具行分布式特性
    执行程中流(stream)包含分区(stream partition)算子(operator)包含子务(operator subtask)子务线程物理机容器中彼互赖执行
    特定算子子务(subtask)数称行度(parallelism)般情况流程序行度认算子中行度程序中算子具行度

    图 行数流
    Stream算子间传输数形式onetoone(forwarding)模式redistributing模式具体种形式取决算子种类
    Onetoone:stream(sourcemap operator间)维护着分区元素序意味着map 算子子务元素数序source 算子子务生产元素数序相mapfliterflatMap等算子onetoone应关系
    Ø 类似spark中窄赖
    Redistributing:stream(map()keyBywindow间者keyBywindowsink间)分区会发生改变算子子务选择transformation发送数目标务例keyBy() 基hashCode重分区broadcastrebalance会机重新分区算子会引起redistribute程redistribute程类似Spark中shuffle程
    Ø 类似spark中宽赖
    435 务链(Operator Chains)
    相行度one to one操作Flink样相连算子链接起形成task原算子成里面部分算子链接成task非常效优化:减少线程间切换基缓存区数交换减少时延时提升吞吐量链接行编程API中进行指定


    图 taskoperator chains












    第五章 Flink 流处理API

    51 Environment
    511 getExecutionEnvironment
    创建执行环境表示前执行程序文 果程序独立调方法返回执行环境果命令行客户端调程序提交集群方法返回集群执行环境说getExecutionEnvironment会根查询运行方式决定返回什样运行环境常种创建执行环境方式

    val env ExecutionEnvironment ExecutionEnvironmentgetExecutionEnvironment

    val env StreamExecutionEnvironmentgetExecutionEnvironment

    果没设置行度会flinkconfyaml中配置准默认1

    512 createLocalEnvironment
    返回执行环境需调时指定默认行度
    val env StreamExecutionEnvironmentcreateLocalEnvironment(1)

    513 createRemoteEnvironment
    返回集群执行环境Jar提交远程服务器需调时指定JobManagerIP端口号指定集群中运行Jar包
    val env ExecutionEnvironmentcreateRemoteEnvironment(jobmanagehostname 6123YOURPATHwordcountjar)

    52 Source
    521 集合读取数
    定义样例类传感器id时间戳温度
    case class SensorReading(id String timestamp Long temperature Double)

    object Sensor {
    def main(args Array[String]) Unit {
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    val stream1 env
    fromCollection(List(
    SensorReading(sensor_1 1547718199 358)
    SensorReading(sensor_6 1547718201 154)
    SensorReading(sensor_7 1547718202 67)
    SensorReading(sensor_10 1547718205 381)
    ))

    stream1print(stream1)setParallelism(1)

    envexecute()
    }
    }

    522 文件读取数
    val stream2 envreadTextFile(YOUR_FILE_PATH)

    523 kafka消息队列数作源
    需引入kafka连接器赖:
    pomxml
    < httpsmvnrepositorycomartifactorgapacheflinkflinkconnectorkafka011 >

    orgapacheflink
    flinkconnectorkafka011_211
    1100


    具体代码:
    val properties new Properties()
    propertiessetProperty(bootstrapservers localhost9092)
    propertiessetProperty(groupid consumergroup)
    propertiessetProperty(keydeserializer orgapachekafkacommonserializationStringDeserializer)
    propertiessetProperty(valuedeserializer orgapachekafkacommonserializationStringDeserializer)
    propertiessetProperty(autooffsetreset latest)

    val stream3 envaddSource(new FlinkKafkaConsumer011[String](sensor new SimpleStringSchema() properties))
    524 定义Source
    source数源定义source需做传入SourceFunction具体调:
    val stream4 envaddSource( new MySensorSource() )
    希机生成传感器数MySensorSource具体代码实现:
    class MySensorSource extends SourceFunction[SensorReading]{

    flag 表示数源否正常运行
    var running Boolean true

    override def cancel() Unit {
    running false
    }

    override def run(ctx SourceFunctionSourceContext[SensorReading]) Unit {
    初始化机数发生器
    val rand new Random()

    var curTemp 1to(10)map(
    i > ( sensor_ + i 65 + randnextGaussian() * 20 )
    )

    while(running){
    更新温度值
    curTemp curTempmap(
    t > (t_1 t_2 + randnextGaussian() )
    )
    获取前时间戳
    val curTime SystemcurrentTimeMillis()

    curTempforeach(
    t > ctxcollect(SensorReading(t_1 curTime t_2))
    )
    Threadsleep(100)
    }
    }
    }
    53 Transform
    转换算子

    531 map


    val streamMap streammap { x > x * 2 }

    532 flatMap
    flatMap函数签名:def flatMap[AB](as List[A])(f A ⇒ List[B]) List[B]
    例 flatMap(List(123))(i ⇒ List(ii))
    结果List(112233) 
    List(a b c d)flatMap(line ⇒ linesplit( ))
    结果List(a b c d)

    val streamFlatMap streamflatMap{
    x > xsplit( )
    }

    533 Filter



    val streamFilter streamfilter{
    x > x 1
    }

    534 KeyBy

    DataStream → KeyedStream:逻辑流拆分成相交分区分区包含具相key元素部hash形式实现
    535 滚动聚合算子(Rolling Aggregation)
    算子针KeyedStream支流做聚合
    l sum()
    l min()
    l max()
    l minBy()
    l maxBy()
    536 Reduce
    KeyedStream → DataStream:分组数流聚合操作合前元素次聚合结果产生新值返回流中包含次聚合结果返回次聚合终结果

    val stream2 envreadTextFile(YOUR_PATH\\sensortxt)
    map( data > {
    val dataArray datasplit()
    SensorReading(dataArray(0)trim dataArray(1)trimtoLong dataArray(2)trimtoDouble)
    })
    keyBy(id)
    reduce( (x y) > SensorReading(xid xtimestamp + 1 ytemperature) )



    537 Split Select
    Split

    图 Split
    DataStream → SplitStream:根某特征DataStream拆分成两者DataStream
    Select

    图 Select
    SplitStream→DataStream:SplitStream中获取者DataStream
    需求:传感器数温度高低(30度界)拆分成两流

    val splitStream stream2
    split( sensorData > {
    if (sensorDatatemperature > 30) Seq(high) else Seq(low)
    } )

    val high splitStreamselect(high)
    val low splitStreamselect(low)
    val all splitStreamselect(high low)


    538 Connect CoMap

    图 Connect算子
    DataStreamDataStream → ConnectedStreams:连接两保持类型数流两数流Connect放流中部然保持数形式发生变化两流相互独立
    CoMapCoFlatMap

    图 CoMapCoFlatMap
    ConnectedStreams → DataStream:作ConnectedStreams功mapflatMap样ConnectedStreams中Stream分进行mapflatMap处理

    val warning highmap( sensorData > (sensorDataid sensorDatatemperature) )
    val connected warningconnect(low)

    val coMap connectedmap(
    warningData > (warningData_1 warningData_2 warning)
    lowData > (lowDataid healthy)
    )

    539 Union

    图 Union
    DataStream → DataStream:两者两DataStream进行union操作产生包含DataStream元素新DataStream

    合印
    val unionStream DataStream[StartUpLog] appStoreStreamunion(otherStream)
    unionStreamprint(union)



    Connect Union 区:
    1. Union前两流类型必须样Connect样coMap中调整成样
    2 Connect操作两流Union操作

    54 支持数类型
    Flink流应程序处理数象表示事件流Flink部需够处理象需序列化反序列化便通网络传送者状态端检查点保存点读取效做点Flink需明确知道应程序处理数类型Flink类型信息概念表示数类型数类型生成特定序列化器反序列化器较器
    Flink具类型提取系统该系统分析函数输入返回类型动获取类型信息获序列化器反序列化器某情况例lambda函数泛型类型需显式提供类型信息应程序正常工作提高性
    Flink支持JavaScala中常见数类型广泛类型种
    541 基础数类型
    Flink支持JavaScala基础数类型Int Double Long String …​
    val numbers DataStream[Long] envfromElements(1L 2L 3L 4L)
    numbersmap( n > n + 1 )
    542 JavaScala元组(Tuples)
    val persons DataStream[(String Integer)] envfromElements(
    (Adam 17)
    (Sarah 23) )
    personsfilter(p > p_2 > 18)
    543 Scala样例类(case classes)
    case class Person(name String age Int)
    val persons DataStream[Person] envfromElements(
    Person(Adam 17)
    Person(Sarah 23) )
    personsfilter(p > page > 18)
    544 Java简单象(POJOs)
    public class Person {
    public String name
    public int age
    public Person() {}
    public Person(String name int age) {
    thisname name
    thisage age
    }
    }
    DataStream persons envfromElements(
    new Person(Alex 42)
    new Person(Wendy 23))
    545 (Arrays Lists Maps Enums 等等)
    FlinkJavaScala中特殊目类型支持JavaArrayListHashMapEnum等等
    55 实现UDF函数——更细粒度控制流
    551 函数类(Function Classes)
    Flink暴露udf函数接口(实现方式接口者抽象类)例MapFunction FilterFunction ProcessFunction等等
    面例子实现FilterFunction接口:
    class FilterFilter extends FilterFunction[String] {
    override def filter(value String) Boolean {
    valuecontains(flink)
    }
    }
    val flinkTweets tweetsfilter(new FlinkFilter)
    函数实现成匿名类
    val flinkTweets tweetsfilter(
    new RichFilterFunction[String] {
    override def filter(value String) Boolean {
    valuecontains(flink)
    }
    }
    )
    filter字符串flink作参数传进
    val tweets DataStream[String]
    val flinkTweets tweetsfilter(new KeywordFilter(flink))

    class KeywordFilter(keyWord String) extends FilterFunction[String] {
    override def filter(value String) Boolean {
    valuecontains(keyWord)
    }
    }
    552 匿名函数(Lambda Functions)
    val tweets DataStream[String]
    val flinkTweets tweetsfilter(_contains(flink))
    553 富函数(Rich Functions)
    富函数DataStream API提供函数类接口Flink函数类Rich版常规函数获取运行环境文拥生命周期方法实现更复杂功
    l RichMapFunction
    l RichFlatMapFunction
    l RichFilterFunction
    l …​
    Rich Function生命周期概念典型生命周期方法:
    l open()方法rich function初始化方法算子例map者filter调前open()会调
    l close()方法生命周期中调方法做清理工作
    l getRuntimeContext()方法提供函数RuntimeContext信息例函数执行行度务名字state状态
    class MyFlatMap extends RichFlatMapFunction[Int (Int Int)] {
    var subTaskIndex 0

    override def open(configuration Configuration) Unit {
    subTaskIndex getRuntimeContextgetIndexOfThisSubtask
    做初始化工作例建立HDFS连接
    }

    override def flatMap(in Int out Collector[(Int Int)]) Unit {
    if (in 2 subTaskIndex) {
    outcollect((subTaskIndex in))
    }
    }

    override def close() Unit {
    做清理工作例断开HDFS连接
    }
    }
    56 Sink
    Flink没类似spark中foreach方法户进行迭代操作外输出操作利Sink完成通类似方式完成整务终输出操作
    streamaddSink(new MySink(xxxx))
    官方提供部分框架sink外需户定义实现sink


    561 Kafka
    pomxml
    < httpsmvnrepositorycomartifactorgapacheflinkflinkconnectorkafka011 >

    orgapacheflink
    flinkconnectorkafka011_211
    1100



    函数中添加sink:
    val union highunion(low)map(_temperaturetoString)

    unionaddSink(new FlinkKafkaProducer011[String](localhost9092 test new SimpleStringSchema()))



    562 Redis
    pomxml
    < httpsmvnrepositorycomartifactorgapachebahirflinkconnectorredis >

    orgapachebahir
    flinkconnectorredis_211
    10



    定义redismapper类定义保存redis时调命令:
    class MyRedisMapper extends RedisMapper[SensorReading]{
    override def getCommandDescription RedisCommandDescription {
    new RedisCommandDescription(RedisCommandHSET sensor_temperature)
    }
    override def getValueFromData(t SensorReading) String ttemperaturetoString

    override def getKeyFromData(t SensorReading) String tid
    }

    函数中调:
    val conf new FlinkJedisPoolConfigBuilder()setHost(localhost)setPort(6379)build()
    dataStreamaddSink( new RedisSink[SensorReading](conf new MyRedisMapper) )

    563 Elasticsearch
    pomxml

    orgapacheflink
    flinkconnectorelasticsearch6_211
    1100



    函数中调:
    val httpHosts new utilArrayList[HttpHost]()
    httpHostsadd(new HttpHost(localhost 9200))

    val esSinkBuilder new ElasticsearchSinkBuilder[SensorReading]( httpHosts new ElasticsearchSinkFunction[SensorReading] {
    override def process(t SensorReading runtimeContext RuntimeContext requestIndexer RequestIndexer) Unit {
    println(saving data + t)
    val json new utilHashMap[String String]()
    jsonput(data ttoString)
    val indexRequest RequestsindexRequest()index(sensor)`type`(readingData)source(json)
    requestIndexeradd(indexRequest)
    println(saved successfully)
    }
    } )
    dataStreamaddSink( esSinkBuilderbuild() )


    564 JDBC 定义sink
    < httpsmvnrepositorycomartifactmysqlmysqlconnectorjava >

    mysql
    mysqlconnectorjava
    5144



    添加MyJdbcSink
    class MyJdbcSink() extends RichSinkFunction[SensorReading]{
    var conn Connection _
    var insertStmt PreparedStatement _
    var updateStmt PreparedStatement _

    open 创建连接
    override def open(parameters Configuration) Unit {
    superopen(parameters)

    conn DriverManagergetConnection(jdbcmysqllocalhost3306test root 123456)
    insertStmt connprepareStatement(INSERT INTO temperatures (sensor temp) VALUES ( ))
    updateStmt connprepareStatement(UPDATE temperatures SET temp WHERE sensor )
    }
    调连接执行sql
    override def invoke(value SensorReading context SinkFunctionContext[_]) Unit {

    updateStmtsetDouble(1 valuetemperature)
    updateStmtsetString(2 valueid)
    updateStmtexecute()

    if (updateStmtgetUpdateCount 0) {
    insertStmtsetString(1 valueid)
    insertStmtsetDouble(2 valuetemperature)
    insertStmtexecute()
    }
    }

    override def close() Unit {
    insertStmtclose()
    updateStmtclose()
    connclose()
    }
    }


    main方法中增加明细保存mysql中

    dataStreamaddSink(new MyJdbcSink())

    第六章 Flink中Window
    61 Window
    611 Window概述
    streaming流式计算种设计处理限数集数处理引擎限数集指种断增长质限数集window种切割限数限块进行处理手段
    Window限数流处理核心Window限stream拆分成限buckets桶桶做计算操作
    612 Window类型
    Window分成两类:
    Ø CountWindow:指定数条数生成Window时间关
    Ø TimeWindow:时间生成Window
    TimeWindow根窗口实现原理分成三类:滚动窗口(Tumbling Window)滑动窗口(Sliding Window)会话窗口(Session Window)
    1 滚动窗口(Tumbling Windows)
    数固定窗口长度数进行切片
    特点:时间齐窗口长度固定没重叠
    滚动窗口分配器元素分配指定窗口窗口中滚动窗口固定会出现重叠例:果指定5分钟滚动窗口窗口创建图示:

    图 滚动窗口
    适场景:适合做BI统计等(做时间段聚合计算)
    2 滑动窗口(Sliding Windows)
    滑动窗口固定窗口更广义种形式滑动窗口固定窗口长度滑动间隔组成
    特点:时间齐窗口长度固定重叠
    滑动窗口分配器元素分配固定长度窗口中滚动窗口类似窗口窗口参数配置窗口滑动参数控制滑动窗口开始频率滑动窗口果滑动参数窗口话窗口重叠种情况元素会分配窗口中
    例10分钟窗口5分钟滑动窗口中5分钟窗口里包含着10分钟产生数图示:

    图 滑动窗口
    适场景:时间段统计(求某接口5min失败率决定否报警)
    3 会话窗口(Session Windows)
    系列事件组合指定时间长度timeout间隙组成类似web应session段时间没接收新数会生成新窗口
    特点:时间齐
    session窗口分配器通session活动元素进行分组session窗口滚动窗口滑动窗口相会重叠固定开始时间结束时间情况相反固定时间周期收元素非活动间隔产生窗口会关闭session窗口通session间隔配置session间隔定义非活跃周期长度非活跃周期产生前session关闭续元素分配新session窗口中

    图 会话窗口
    62 Window API
    621 TimeWindow
    TimeWindow指定时间范围数组成window次window里面数进行计算
    1 滚动窗口
    Flink默认时间窗口根Processing Time 进行窗口划分Flink获取数根进入Flink时间划分窗口中

    val minTempPerWindow dataStream
    map(r > (rid rtemperature))
    keyBy(__1)
    timeWindow(Timeseconds(15))
    reduce((r1 r2) > (r1_1 r1_2min(r2_2)))

    时间间隔通Timemilliseconds(x)Timeseconds(x)Timeminutes(x)等中指定

    2 滑动窗口(SlidingEventTimeWindows)
    滑动窗口滚动窗口函数名完全致传参数时需传入两参数window_sizesliding_size
    面代码中sliding_size设置5s说5s计算输出结果次次计算window范围15s元素
    val minTempPerWindow DataStream[(String Double)] dataStream
    map(r > (rid rtemperature))
    keyBy(__1)
    timeWindow(Timeseconds(15) Timeseconds(5))
    reduce((r1 r2) > (r1_1 r1_2min(r2_2)))

    window(SlidingEventTimeWindowsof(Timeseconds(15)Timeseconds(5))


    时间间隔通Timemilliseconds(x)Timeseconds(x)Timeminutes(x)等中指定

    622 CountWindow
    CountWindow根窗口中相key元素数量触发执行执行时计算元素数量达窗口key应结果
    注意:CountWindowwindow_size指相Key元素数输入元素总数
    1 滚动窗口
    默认CountWindow滚动窗口需指定窗口元素数量达窗口时会触发窗口执行

    val minTempPerWindow DataStream[(String Double)] dataStream
    map(r > (rid rtemperature))
    keyBy(__1)
    countWindow(5)
    reduce((r1 r2) > (r1_1 r1_2max(r2_2)))

    2 滑动窗口
    滑动窗口滚动窗口函数名完全致传参数时需传入两参数window_sizesliding_size
    面代码中sliding_size设置2说收两相key数计算次次计算window范围10元素

    val keyedStream KeyedStream[(String Int) Tuple] dataStreammap(r > (rid rtemperature))keyBy(0)
    某key数达2时候触发计算计算该key10元素容
    val windowedStream WindowedStream[(String Int) Tuple GlobalWindow] keyedStreamcountWindow(102)
    val sumDstream DataStream[(String Int)] windowedStreamsum(1)
    623 window function
    window function 定义窗口中收集数做计算操作分两类:
    l 增量聚合函数(incremental aggregation functions)
    条数进行计算保持简单状态典型增量聚合函数ReduceFunction AggregateFunction
    l 全窗口函数(full window functions)
    先窗口数收集起等计算时候会遍历数ProcessWindowFunction全窗口函数
    624 选API
    l trigger() —— 触发器
    定义 window 什时候关闭触发计算输出结果
    l evitor() —— 移器
    定义移某数逻辑
    l allowedLateness() —— 允许处理迟数
    l sideOutputLateData() —— 迟数放入侧输出流
    l getSideOutput() —— 获取侧输出流

    第七章 时间语义Wartermark
    71 Flink中时间语义
    Flink流式处理中会涉时间概念图示:

    图 Flink时间概念
    Event Time:事件创建时间通常事件中时间戳描述例采集日志数中条日志会记录生成时间Flink通时间戳分配器访问事件时间戳
    Ingestion Time:数进入Flink时间
    Processing Time:执行基时间操作算子系统时间机器相关默认时间属性Processing Time
    例子——电影星球战:

    例条日志进入Flink时间20171112 100000123达Window系统时间20171112 100001234日志容:
    20171102 183715624 INFO Fail over to rm2
    业务说统计1min障日志数时间意义?—— eventTime根日志生成时间进行统计
    72 EventTime引入
    Flink流式处理中绝部分业务会eventTime般eventTime法时会迫ProcessingTime者IngestionTime
    果EventTime需引入EventTime时间属性引入方式示:
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    调时刻开始env创建stream追加时间特征
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)
    73 Watermark
    731 基概念
    知道流处理事件产生流sourceoperator中间程时间然部分情况流operator数事件产生时间序排网络分布式等原导致乱序产生谓乱序指Flink接收事件先序严格事件Event Time序排列

    图 数乱序
    时出现问题旦出现乱序果根eventTime决定window运行明确数否全部位限期等时必须机制保证特定时间必须触发window进行计算特机制Watermark
    l Watermark种衡量Event Time进展机制
    l Watermark处理乱序事件正确处理乱序事件通常Watermark机制结合window实现
    l 数流中Watermark表示timestampWatermark数已达window执行Watermark触发
    l Watermark理解成延迟触发机制设置Watermark延时时长t次系统会校验已达数中maxEventTime然认定eventTimemaxEventTime t数已达果窗口停止时间等maxEventTime – t窗口触发执行
    序流Watermarker图示:(Watermark设置0)

    图 序数Watermark
    乱序流Watermarker图示:(Watermark设置2)

    图 序数Watermark
    Flink接收数时会定规生成Watermark条Watermark等前达数中maxEventTime 延迟时长说Watermark基数携带时间戳生成旦Watermark前未触发窗口停止时间晚会触发相应窗口执行event time数携带果运行程中法获取新数没触发窗口永远触发
    图中设置允许延迟达时间2s时间戳7s事件应Watermark5s时间戳12s事件Watermark10s果窗口11s~5s窗口26s~10s时间戳7s事件达时Watermarker恰触发窗口1时间戳12s事件达时Watermark恰触发窗口2

    Watermark 触发前窗口关窗时间旦触发关门前时刻准窗口范围数会收入窗中
    没达水位现实中时间推进久会触发关窗
    732 Watermark引入
    watermark引入简单乱序数常见引方式:
    dataStreamassignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Timemilliseconds(1000)) {
    override def extractTimestamp(element SensorReading) Long {
    elementtimestamp * 1000
    }
    } )

    Event Time定指定数源中时间戳否程序法知道事件事件时间什(数源里数没时间戳话Processing Time)
    面例子中创建起点复杂类类实现实分配时间戳接口Flink暴露TimestampAssigner接口供实现定义事件数中抽取时间戳
    val env StreamExecutionEnvironmentgetExecutionEnvironment

    调时刻开始env创建stream追加时间特性
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)

    val readings DataStream[SensorReading] env
    addSource(new SensorSource)
    assignTimestampsAndWatermarks(new MyAssigner())
    MyAssigner两种类型
    l AssignerWithPeriodicWatermarks
    l AssignerWithPunctuatedWatermarks
    两接口继承TimestampAssigner
    Assigner with periodic watermarks
    周期性生成watermark:系统会周期性watermark插入流中(水位线种特殊事件)默认周期200毫秒ExecutionConfigsetAutoWatermarkInterval()方法进行设置
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)

    隔5秒产生watermark
    envgetConfigsetAutoWatermarkInterval(5000)
    产生watermark逻辑:隔5秒钟Flink会调AssignerWithPeriodicWatermarksgetCurrentWatermark()方法果方法返回时间戳前水位时间戳新watermark会插入流中检查保证水位线单调递增果方法返回时间戳等前水位时间戳会产生新watermark
    例子定义周期性时间戳抽取:
    class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
    val bound Long 60 * 1000 延时1分钟
    var maxTs Long LongMinValue 观察时间戳

    override def getCurrentWatermark Watermark {
    new Watermark(maxTs bound)
    }

    override def extractTimestamp(r SensorReading previousTS Long) {
    maxTs maxTsmax(rtimestamp)
    rtimestamp
    }
    }
    种简单特殊情况果事先知数流时间戳单调递增说没乱序assignAscendingTimestamps方法会直接数时间戳生成watermark
    val stream DataStream[SensorReading]
    val withTimestampsAndWatermarks stream
    assignAscendingTimestamps(e > etimestamp)

    >> result E(1) W(1) E(2) W(2)
    乱序数流果致估算出数流中事件延迟时间代码:
    val stream DataStream[SensorReading]
    val withTimestampsAndWatermarks streamassignTimestampsAndWatermarks(
    new SensorTimeAssigner
    )

    class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Timeseconds(5)) {
    抽取时间戳
    override def extractTimestamp(r SensorReading) Long rtimestamp
    }

    >> relust E(10) W(0) E(8) E(7) E(11) W(1)
    Assigner with punctuated watermarks
    间断式生成watermark周期性生成方式种方式固定时间根需条数进行筛选处理直接代码举例子sensor_1传感器数流插入watermark:
    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
    val bound Long 60 * 1000

    override def checkAndGetNextWatermark(r SensorReading extractedTS Long) Watermark {
    if (rid sensor_1) {
    new Watermark(extractedTS bound)
    } else {
    null
    }
    }
    override def extractTimestamp(r SensorReading previousTS Long) Long {
    rtimestamp
    }
    }

    74 EvnetTimewindow中
    741 滚动窗口(TumblingEventTimeWindows)
    def main(args Array[String]) Unit {
    环境
    val env StreamExecutionEnvironment StreamExecutionEnvironmentgetExecutionEnvironment

    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)
    envsetParallelism(1)

    val dstream DataStream[String] envsocketTextStream(localhost7777)

    val textWithTsDstream DataStream[(String Long Int)] dstreammap { text >
    val arr Array[String] textsplit( )
    (arr(0) arr(1)toLong 1)
    }
    val textWithEventTimeDstream DataStream[(String Long Int)] textWithTsDstreamassignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String Long Int)](Timemilliseconds(1000)) {
    override def extractTimestamp(element (String Long Int)) Long {

    return element_2
    }
    })

    val textKeyStream KeyedStream[(String Long Int) Tuple] textWithEventTimeDstreamkeyBy(0)
    textKeyStreamprint(textkey)

    val windowStream WindowedStream[(String Long Int) Tuple TimeWindow] textKeyStreamwindow(TumblingEventTimeWindowsof(Timeseconds(2)))

    val groupDstream DataStream[mutableHashSet[Long]] windowStreamfold(new mutableHashSet[Long]()) { case (set (key ts count)) >
    set + ts
    }

    groupDstreamprint(window)setParallelism(1)

    envexecute()
    }
    }

    结果Event Time时间窗口计算出关系统时间(包括输入快慢)
    742 滑动窗口(SlidingEventTimeWindows)

    def main(args Array[String]) Unit {
    环境
    val env StreamExecutionEnvironment StreamExecutionEnvironmentgetExecutionEnvironment

    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)
    envsetParallelism(1)

    val dstream DataStream[String] envsocketTextStream(localhost7777)

    val textWithTsDstream DataStream[(String Long Int)] dstreammap { text >
    val arr Array[String] textsplit( )
    (arr(0) arr(1)toLong 1)
    }
    val textWithEventTimeDstream DataStream[(String Long Int)] textWithTsDstreamassignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String Long Int)](Timemilliseconds(1000)) {
    override def extractTimestamp(element (String Long Int)) Long {

    return element_2
    }
    })

    val textKeyStream KeyedStream[(String Long Int) Tuple] textWithEventTimeDstreamkeyBy(0)
    textKeyStreamprint(textkey)

    val windowStream WindowedStream[(String Long Int) Tuple TimeWindow] textKeyStreamwindow(SlidingEventTimeWindowsof(Timeseconds(2)Timemilliseconds(500)))

    val groupDstream DataStream[mutableHashSet[Long]] windowStreamfold(new mutableHashSet[Long]()) { case (set (key ts count)) >
    set + ts
    }

    groupDstreamprint(window)setParallelism(1)

    envexecute()
    }


    743 会话窗口(EventTimeSessionWindows)
    相邻两次数EventTime时间差超指定时间间隔会触发执行果加入Watermark 会符合窗口触发情况进行延迟达延迟水位进行窗口触发
    def main(args Array[String]) Unit {
    环境
    val env StreamExecutionEnvironment StreamExecutionEnvironmentgetExecutionEnvironment

    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)
    envsetParallelism(1)

    val dstream DataStream[String] envsocketTextStream(localhost7777)

    val textWithTsDstream DataStream[(String Long Int)] dstreammap { text >
    val arr Array[String] textsplit( )
    (arr(0) arr(1)toLong 1)
    }
    val textWithEventTimeDstream DataStream[(String Long Int)] textWithTsDstreamassignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String Long Int)](Timemilliseconds(1000)) {
    override def extractTimestamp(element (String Long Int)) Long {

    return element_2
    }
    })

    val textKeyStream KeyedStream[(String Long Int) Tuple] textWithEventTimeDstreamkeyBy(0)
    textKeyStreamprint(textkey)

    val windowStream WindowedStream[(String Long Int) Tuple TimeWindow] textKeyStreamwindow(EventTimeSessionWindowswithGap(Timemilliseconds(500)) )

    windowStreamreduce((text1text2)>
    ( text1_10Ltext1_3+text2_3)
    ) map(__3)print(windows)setParallelism(1)

    envexecute()

    }

    第八章 ProcessFunction API(底层API)
    前学转换算子法访问事件时间戳信息水位线信息应场景极重例MapFunction样map转换算子法访问时间戳者前事件事件时间
    基DataStream API提供系列LowLevel转换算子访问时间戳watermark注册定时事件输出特定事件例超时事件等Process Function构建事件驱动应实现定义业务逻辑(前window函数转换算子法实现)例Flink SQLProcess Function实现
    Flink提供8Process Function:
    · ProcessFunction
    · KeyedProcessFunction
    · CoProcessFunction
    · ProcessJoinFunction
    · BroadcastProcessFunction
    · KeyedBroadcastProcessFunction
    · ProcessWindowFunction
    · ProcessAllWindowFunction
    81 KeyedProcessFunction
    里重点介绍KeyedProcessFunction
    KeyedProcessFunction操作KeyedStreamKeyedProcessFunction会处理流元素输出01者元素Process Function继承RichFunction接口open()close()getRuntimeContext()等方法KeyedProcessFunction[KEY IN OUT]额外提供两方法
    · processElement(v IN ctx Context out Collector[OUT]) 流中元素会调方法调结果会放Collector数类型中输出Context访问元素时间戳元素keyTimerService时间服务Context结果输出流(side outputs)
    · onTimer(timestamp Long ctx OnTimerContext out Collector[OUT])回调函数前注册定时器触发时调参数timestamp定时器设定触发时间戳Collector输出结果集合OnTimerContextprocessElementContext参数样提供文信息例定时器触发时间信息(事件时间者处理时间)
    82 TimerService 定时器(Timers)
    ContextOnTimerContext持TimerService象拥方法
    · currentProcessingTime() Long 返回前处理时间
    · currentWatermark() Long 返回前watermark时间戳
    · registerProcessingTimeTimer(timestamp Long) Unit 会注册前keyprocessing time定时器processing time达定时时间时触发timer
    · registerEventTimeTimer(timestamp Long) Unit 会注册前keyevent time 定时器水位线等定时器注册时间时触发定时器执行回调函数
    · deleteProcessingTimeTimer(timestamp Long) Unit 删前注册处理时间定时器果没时间戳定时器执行
    · deleteEventTimeTimer(timestamp Long) Unit 删前注册事件时间定时器果没时间戳定时器执行
    定时器timer触发时会执行回调函数onTimer()注意定时器timerkeyed streams面

    面举例子说明KeyedProcessFunction操作KeyedStream
    需求:监控温度传感器温度值果温度值秒钟(processing time)连续升报警
    val warnings readings
    keyBy(_id)
    process(new TempIncreaseAlertFunction)
    TempIncreaseAlertFunction实现 程序中ValueState样状态变量
    class TempIncreaseAlertFunction extends KeyedProcessFunction[String SensorReading String] {
    保存传感器温度值
    lazy val lastTemp ValueState[Double] getRuntimeContextgetState(
    new ValueStateDescriptor[Double](lastTemp Typesof[Double])
    )

    保存注册定时器时间戳
    lazy val currentTimer ValueState[Long] getRuntimeContextgetState(
    new ValueStateDescriptor[Long](timer Typesof[Long])
    )

    override def processElement(r SensorReading
    ctx KeyedProcessFunction[String SensorReading String]#Context
    out Collector[String]) Unit {
    取出次温度
    val prevTemp lastTempvalue()
    前温度更新次温度变量中
    lastTempupdate(rtemperature)

    val curTimerTimestamp currentTimervalue()
    if (prevTemp 00 || rtemperature < prevTemp) {
    温度降者第温度值删定时器
    ctxtimerService()deleteProcessingTimeTimer(curTimerTimestamp)
    清空状态变量
    currentTimerclear()
    } else if (rtemperature > prevTemp && curTimerTimestamp 0) {
    温度升没设置定时器
    val timerTs ctxtimerService()currentProcessingTime() + 1000
    ctxtimerService()registerProcessingTimeTimer(timerTs)

    currentTimerupdate(timerTs)
    }
    }

    override def onTimer(ts Long
    ctx KeyedProcessFunction[String SensorReading String]#OnTimerContext
    out Collector[String]) Unit {
    outcollect(传感器id + ctxgetCurrentKey + 传感器温度值已连续1s升)
    currentTimerclear()
    }
    }
    83 侧输出流(SideOutput)
    部分DataStream API算子输出单输出某种数类型流split算子条流分成条流流数类型相process functionside outputs功产生条流流数类型样side output定义OutputTag[X]象X输出流数类型process function通Context象发射事件者side outputs
    面示例程序:
    val monitoredReadings DataStream[SensorReading] readings
    process(new FreezingMonitor)

    monitoredReadings
    getSideOutput(new OutputTag[String](freezingalarms))
    print()

    readingsprint()
    接实现FreezingMonitor函数监控传感器温度值温度值低32F温度输出side output
    class FreezingMonitor extends ProcessFunction[SensorReading SensorReading] {
    定义侧输出标签
    lazy val freezingAlarmOutput OutputTag[String]
    new OutputTag[String](freezingalarms)

    override def processElement(r SensorReading
    ctx ProcessFunction[SensorReading SensorReading]#Context
    out Collector[SensorReading]) Unit {
    温度32F时输出警告信息
    if (rtemperature < 320) {
    ctxoutput(freezingAlarmOutput sFreezing Alarm for {rid})
    }
    数直接常规输出流
    outcollect(r)
    }
    }
    84 CoProcessFunction
    两条输入流DataStream API提供CoProcessFunction样lowlevel操作CoProcessFunction提供操作输入流方法 processElement1()processElement2()
    类似ProcessFunction两种方法通Context象调Context象访问事件数定时器时间戳TimerServiceside outputsCoProcessFunction提供onTimer()回调函数

    第九章 状态编程容错机制
    流式计算分状态状态两种情况状态计算观察独立事件根事件输出结果例流处理应程序传感器接收温度读数温度超90度时发出警告状态计算会基事件输出结果例子
    l 类型窗口例计算时均温度状态计算
    l 复杂事件处理状态机例分钟收两相差20度温度读数发出警告状态计算
    l 流流间关联操作流静态表动态表间关联操作状态计算
    图展示状态流处理状态流处理区状态流处理分接收条数记录(图中黑条)然根新输入数生成输出数(白条)状态流处理会维护状态(根条输入记录进行更新)基新输入记录前状态值生成输出记录(灰条)

    图 状态状态流处理
    图中输入数黑条表示状态流处理次转换条输入记录仅根新输入记录输出结果(白条)状态 流处理维护已处理记录状态值根条新输入记录更新状态输出记录(灰条)反映综合考虑事件结果
    状态计算重流处理状态计算更感兴趣事实正确实现状态计算实现状态计算难旧流处理系统支持状态计算新代流处理系统状态正确性视重中重
    91 状态算子应程序
    Flink置算子数源source数存储sink状态流中数buffer records会保存定元素者元数例 ProcessWindowFunction会缓存输入流数ProcessFunction会保存设置定时器信息等等
    Flink中状态始终特定算子相关联总说两种类型状态:
    l 算子状态(operator state)
    l 键控状态(keyed state)
    931 算子状态(operator state)
    算子状态作范围限定算子务意味着行务处理数访问相状态状态务言享算子状态相算子务访问

    图 具算子状态务
    Flink算子状态提供三种基数结构:
    l 列表状态(List state)
    状态表示组数列表
    l 联合列表状态(Union list state)
    状态表示数列表常规列表状态区发生障时者保存点(savepoint)启动应程序时恢复
    l 广播状态(Broadcast state)
    果算子项务项务状态相种特殊情况适合应广播状态
    932 键控状态(keyed state)
    键控状态根输入数流中定义键(key)维护访问Flink键值维护状态实例具相键数分区算子务中务会维护处理key应状态务处理条数时会动状态访问范围限定前数key具相key数会访问相状态Keyed State类似分布式keyvalue map数结构KeyedStream(keyBy算子处理)

    图 具键控状态务
    FlinkKeyed State支持数类型:
    l ValueState[T]保存单值值类型T
    o get操作 ValueStatevalue()
    o set操作 ValueStateupdate(value T)
    l ListState[T]保存列表列表里元素数类型T基操作:
    o ListStateadd(value T)
    o ListStateaddAll(values javautilList[T])
    o ListStateget()返回Iterable[T]
    o ListStateupdate(values javautilList[T])
    l MapState[K V]保存KeyValue
    o MapStateget(key K)
    o MapStateput(key K value V)
    o MapStatecontains(key K)
    o MapStateremove(key K)
    l ReducingState[T]
    l AggregatingState[I O]
    Stateclear()清空操作
    val sensorData DataStream[SensorReading]
    val keyedData KeyedStream[SensorReading String] sensorDatakeyBy(_id)

    val alerts DataStream[(String Double Double)] keyedData
    flatMap(new TemperatureAlertFunction(17))

    class TemperatureAlertFunction(val threshold Double) extends RichFlatMapFunction[SensorReading (String Double Double)] {
    private var lastTempState ValueState[Double] _

    override def open(parameters Configuration) Unit {
    val lastTempDescriptor new ValueStateDescriptor[Double](lastTemp classOf[Double])
    lastTempState getRuntimeContextgetState[Double](lastTempDescriptor)
    }

    override def flatMap(reading SensorReading
    out Collector[(String Double Double)]) Unit {
    val lastTemp lastTempStatevalue()
    val tempDiff (readingtemperature lastTemp)abs
    if (tempDiff > threshold) {
    outcollect((readingid readingtemperature tempDiff))
    }
    thislastTempStateupdate(readingtemperature)
    }
    }
    通RuntimeContext注册StateDescriptorStateDescriptor状态state名字存储数类型参数
    open()方法中创建state变量注意复前RichFunction相关知识
    接FlatMap with keyed ValueState快捷方式flatMapWithState实现需求
    val alerts DataStream[(String Double Double)] keyedSensorData
    flatMapWithState[(String Double Double) Double] {
    case (in SensorReading None) >
    (Listempty Some(intemperature))
    case (r SensorReading lastTemp Some[Double]) >
    val tempDiff (rtemperature lastTempget)abs
    if (tempDiff > 17) {
    (List((rid rtemperature tempDiff)) Some(rtemperature))
    } else {
    (Listempty Some(rtemperature))
    }
    }
    92 状态致性
    分布式系统中引入状态时然引入致性问题致性实际正确性级种说法说成功处理障恢复结果没发生障时结果相前者底正确?举例说假设时登录户计数系统历障计数结果少?果偏差漏掉计数重复计数?
    921 致性级
    流处理中致性分3级:
    l atmostonce 实没正确性保障委婉说法——障发生计数结果丢失样udp
    l atleastonce 表示计数结果正确值绝会正确值说计数程序发生障算绝会少算
    l exactlyonce 指系统保证发生障计数结果正确值致
    atleastonce非常流行第代流处理器(StormSamza)刚问世时保证atleastonce原二
    l 保证exactlyonce系统实现起更复杂基础架构层(决定什代表正确exactlyonce范围什)实现层挑战性
    l 流处理系统早期户愿意接受框架局限性应层想办法弥补(例应程序具幂等性者批量计算层做遍计算)
    先保证exactlyonce系统(Storm TridentSpark Streaming)性表现力两方面付出代价保证exactlyonce系统法单独条记录运应逻辑时处理条(批)记录保证批处理全部成功全部失败导致结果前必须等批记录处理结束户常两流处理框架(保证exactlyonce元素做低延迟处理)结果基础设施更加复杂户保证exactlyonce获低延迟效率间权衡利弊Flink避免种权衡
    Flink重价值保证exactlyonce具低延迟高吞吐处理力
    根说Flink通身满足需求避免权衡业界次意义重技术飞跃外行神奇旦解会恍然悟
    922端端(endtoend)状态致性
    目前致性保证流处理器实现说 Flink 流处理器部保证真实应中流处理应流处理器外包含数源(例 Kafka)输出持久化系统
    端端致性保证意味着结果正确性贯穿整流处理应始终组件保证致性整端端致性级取决组件中致性弱组件具体划分:
    l 部保证 —— 赖checkpoint
    l source 端 —— 需外部源重设数读取位置
    l sink 端 —— 需保证障恢复时数会重复写入外部系统
    sink端两种具体实现方式:幂等(Idempotent)写入事务性(Transactional)写入
    l 幂等写入
    谓幂等操作说操作重复执行次导致次结果更改说面重复执行起作
    l 事务写入
    需构建事务写入外部系统构建事务应着 checkpoint等 checkpoint 真正完成时候应结果写入 sink 系统中

    事务性写入具体两种实现方式:预写日志(WAL)两阶段提交(2PC)DataStream API 提供GenericWriteAheadSink模板类TwoPhaseCommitSinkFunction 接口方便实现两种方式事务性写入

    Source Sink 致性保证表说明:

    93 检查点(checkpoint)
    Flink具体保证exactlyonce呢 种称检查点(checkpoint)特性出现障时系统重置回正确状态面通简单类解释检查点作
    假设两位朋友正数项链少颗珠子图示捏住珠子边数边拨拨颗珠子总数加朋友样数手中珠子分神忘记数里时办呢 果项链珠子显然想头数遍尤三速度样试图合作时候更(想记录前分钟三数少颗珠子回想分钟滚动窗口)

    想更办法 项链隔段松松系根色皮筋珠子分隔开 珠子拨动时候皮筋拨动 然安排助手朋友拨皮筋时记录总数种方法数错时必头开始数相反发出错误警示然根皮筋处开始重数助手会告诉重数时起始数值例粉色皮筋处数值少
    Flink检查点作类似皮筋标记数珠子类关键点 指定皮筋言珠子相位置确定 皮筋成重新计数参考点总状态(珠子总数)颗珠子拨动更新次助手会保存根皮筋应检查点状态遇粉色皮筋时数少珠子遇橙色皮筋时少问题出现时种方法重新计数变简单
    931 Flink检查点算法
    Flink检查点核心作确保状态正确遇程序中断正确记住基点例子检查点运行Flink户提供定义状态工具例Scala程序输入记录第字段(字符串)进行分组维护第二字段计数状态
    val stream DataStream[(String Int)]
    val counts DataStream[(String Int)] stream
    keyBy(record > record_1)
    mapWithState( (in (String Int) state Option[Int]) >
    state match {
    case Some(c) > ( (in_1 c + in_2) Some(c + in_2) )
    case None > ( (in_1 in_2) Some(in_2) )
    })
    该程序两算子 keyBy算子记录第元素(字符串)进行分组根该key数进行重新分区然记录发送算子 状态map算子(mapWithState)map算子接收元素输入记录第二字段数加现总数中更新元素发射出图表示程序初始状态 输入流中6条记录检查点分割线(checkpoint barrier)隔开map算子状态均0(计数未开始)keya记录顶层map算子处理keyb记录中间层map算子处理keyc记录底层map算子处理

    图 key累加计数程序初始状态
    图程序初始状态注意abc三组初始计数状态0三圆柱值ckpt表示检查点分割线(checkpoint barriers)条记录处理序严格遵守检查点前规定例[b2]检查点前处理[a2]检查点处理
    该程序处理输入流中6条记录时涉操作遍布3行实例(节点CPU核等)检查点该保证exactlyonce呢
    检查点分割线普通数记录类似算子处理参计算会触发检查点相关行读取输入流数源(例中keyBy算子联)遇检查点屏障时输入流中位置保存持久化存储中果输入流消息传输系统(Kafka)位置偏移量Flink存储机制插件化持久化存储分布式文件系统HDFS图展示程

    图 遇checkpoint barrier时保存输入流中位置

    Flink数源(例中keyBy算子联)遇检查点分界线(barrier)时会输入流中位置保存持久化存储中 Flink根该位置重启
    检查点普通数记录样算子间流动map算子处理完前3条数收检查点分界线时会状态异步方式写入持久化存储图示

    图 保存map算子状态前key计数值

    位检查点前记录([b2][b3][c1])map算子处理情况时持久化存储已备份检查点分界线输入流中位置(备份操作发生barrier输入算子处理时候)map算子接着开始处理检查点分界线触发状态异步备份稳定存储中动作
    map算子状态备份检查点分界线位置备份确认该检查点操作标记完成图示须停止者阻断计算条件逻辑时间点(应检查点屏障输入流中位置)计算状态拍快通确保备份状态位置指逻辑时间点文解释基备份恢复计算保证exactlyonce值注意没出现障时Flink检查点开销极检查点操作速度持久化存储带宽决定回顾数珠子例子 数错需皮筋外皮筋会快拨

    图 检查点操作完成继续处理数

    检查点操作完成状态位置均已备份稳定存储中输入流中数记录已处理完成值注意备份状态值实际状态值备份反映检查点状态
    果检查点操作失败Flink丢弃该检查点继续正常执行某检查点会成功然恢复时间更长状态保证旧力系列连续检查点操作失败Flink会抛出错误通常预示着发生严重持久错误
    现图示情况 检查点操作已完成障紧

    图 障紧检查点导致底部实例丢失

    种情况Flink会重新拓扑(会获取新执行资源)输入流倒回检查点然恢复状态值该处开始继续计算例中[a2][a2][c2]条记录重播
    图展示重新处理程检查点开始重新计算保证剩记录处理map算子状态值没发生障时状态值致

    图 障时状态恢复

    Flink输入流倒回检查点屏障位置时恢复map算子状态值然Flink处开始重新处理样做保证记录处理map算子状态值没发生障时致
    Flink检查点算法正式名称异步分界线快(asynchronous barrier snapshotting)该算法致基ChandyLamport分布式快算法
    检查点Flink价值创新Flink保证exactlyonce需牺牲性
    932 Flink+Kafka实现端端exactlyonce语义
    知道端端状态致性实现需组件实现Flink + Kafka数道系统(Kafka进Kafka出)言组件样保证exactlyonce语义呢?
    l 部 —— 利checkpoint机制状态存盘发生障时候恢复保证部状态致性
    l source —— kafka consumer作source偏移量保存果续务出现障恢复时候连接器重置偏移量重新消费数保证致性
    l sink —— kafka producer作sink采两阶段提交 sink需实现 TwoPhaseCommitSinkFunction
    部checkpoint机制已解sourcesink具体样运行呢?接逐步做分析
    知道FlinkJobManager协调TaskManager进行checkpoint存储checkpoint保存 StateBackend中默认StateBackend存级改文件级进行持久化保存

    checkpoint 启动时JobManager 会检查点分界线(barrier)注入数流barrier会算子间传递

    算子会前状态做快保存状态端source务言会前offset作状态保存起次checkpoint恢复时source务重新提交偏移量次保存位置开始重新消费数

    部 transform 务遇 barrier 时会状态存 checkpoint 里
    sink 务首先数写入外部 kafka数属预提交事务(消费)遇 barrier 时状态保存状态端开启新预提交事务

    算子务快完成次 checkpoint 完成时JobManager 会务发通知确认次 checkpoint 完成
    sink 务收确认通知会正式提交前事务kafka 中未确认数改已确认数真正消费

    执行程实际两段式提交算子执行完成会进行预提交直执行完sink操作会发起确认提交果执行失败预提交会放弃掉
    具体两阶段提交步骤总结:
    l 第条数开启 kafka 事务(transaction)正常写入 kafka 分区日志标记未提交预提交
    l jobmanager 触发 checkpoint 操作barrier source 开始传递遇 barrier 算子状态存入状态端通知 jobmanager
    l sink 连接器收 barrier保存前状态存入 checkpoint通知 jobmanager开启阶段事务提交检查点数
    l jobmanager 收务通知发出确认信息表示 checkpoint 完成
    l sink 务收 jobmanager 确认信息正式提交段时间数
    l 外部kafka关闭事务提交数正常消费

    果宕机需通StateBackend进行恢复恢复确认提交操作
    94 选择状态端(state backend)
    · MemoryStateBackend
    存级状态端会键控状态作存中象进行理存储TaskManagerJVM堆checkpoint存储JobManager存中
    · FsStateBackend
    checkpoint存远程持久化文件系统(FileSystem)状态MemoryStateBackend样会存TaskManagerJVM堆
    · RocksDBStateBackend
    状态序列化存入RocksDB中存储
    注意:RocksDB支持直接包含flink中需引入赖:

    orgapacheflink
    flinkstatebackendrocksdb_211
    1100


    设置状态端FsStateBackend:
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    val checkpointPath String
    val backend new RocksDBStateBackend(checkpointPath)

    envsetStateBackend(backend)
    envsetStateBackend(new FsStateBackend(filetmpcheckpoints))
    envenableCheckpointing(1000)
    配置重启策略
    envsetRestartStrategy(RestartStrategiesfixedDelayRestart(60 Timeof(10 TimeUnitSECONDS)))
    第十章 Table API SQL
    Table API流处理批处理通关系型APITable API基流输入者批输入运行需进行修改Table APISQL语言超集专门Apache Flink设计Table APIScala Java语言集成式API常规SQL语言中查询指定字符串Table API查询JavaScala中语言嵌入样式定义具IDE支持动完成语法检测
    101 需引入pom赖

    orgapacheflink
    flinktableplanner_211
    1100


    orgapacheflink
    flinktableapiscalabridge_211
    1100


    102 简单解TableAPI

    def main(args Array[String]) Unit {
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    envsetParallelism(1)

    val inputStream envreadTextFile(\\sensortxt)
    val dataStream inputStream
    map( data > {
    val dataArray datasplit()
    SensorReading(dataArray(0)trim dataArray(1)trimtoLong dataArray(2)trimtoDouble)
    }
    )
    基env创建 tableEnv
    val settings EnvironmentSettings EnvironmentSettingsnewInstance()useOldPlanner()inStreamingMode()build()
    val tableEnv StreamTableEnvironment StreamTableEnvironmentcreate(env settings)

    条流创建张表
    val dataTable Table tableEnvfromDataStream(dataStream)

    表里选取特定数
    val selectedTable Table dataTableselect('id 'temperature)
    filter(id 'sensor_1')

    val selectedStream DataStream[(String Double)] selectedTable
    toAppendStream[(String Double)]

    selectedStreamprint()

    envexecute(table test)

    }


    1021 动态表
    果流中数类型case class直接根case class结构生成table
    tableEnvfromDataStream(dataStream)
    者根字段序单独命名
    tableEnvfromDataStream(dataStream’id’timestamp )

    动态表转换流进行输出
    tabletoAppendStream[(StringString)]

    1022 字段
    单引放字段前面标识字段名 name id ’amount 等

    103 TableAPI 窗口聚合操作
    1031 通例子解TableAPI
    统计10秒中传感器温度值数
    def main(args Array[String]) Unit {
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    envsetParallelism(1)
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)

    val inputStream envreadTextFile(\\sensortxt)
    val dataStream inputStream
    map( data > {
    val dataArray datasplit()
    SensorReading(dataArray(0)trim dataArray(1)trimtoLong dataArray(2)trimtoDouble)
    }
    )
    assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Timeseconds(1)) {
    override def extractTimestamp(element SensorReading) Long elementtimestamp * 1000L
    })
    基env创建 tableEnv
    val settings EnvironmentSettings EnvironmentSettingsnewInstance()useOldPlanner()inStreamingMode()build()
    val tableEnv StreamTableEnvironment StreamTableEnvironmentcreate(env settings)

    条流创建张表字段定义指定事件时间时间字段
    val dataTable Table tableEnvfromDataStream(dataStream 'id 'temperature 'tsrowtime)

    时间开窗聚合统计
    val resultTable Table dataTable
    window( Tumble over 10seconds on 'ts as 'tw )
    groupBy('id 'tw)
    select('id 'idcount)

    val selectedStream DataStream[(Boolean (String Long))] resultTable
    toRetractStream[(String Long)]

    selectedStreamprint()

    envexecute(table window test)
    }



    1032 关group by
    1 果 groupbytable转换流时候toRetractDstream
    val dataStream DataStream[(Boolean (String Long))] table
    toRetractStream[(StringLong)]

    2 toRetractDstream 第boolean型字段标识 true新数(Insert)false表示期老数(Delete)

    val dataStream DataStream[(Boolean (String Long))] table
    toRetractStream[(StringLong)]
    dataStreamfilter(__1)print()

    3 果api包括时间窗口窗口字段必须出现groupBy中
    val resultTable Table dataTable
    window( Tumble over 10seconds on 'ts as 'tw )
    groupBy('id 'tw)
    select('id 'idcount)

    1033 关时间窗口
    1 时间窗口必须提前声明时间字段果processTime直接创建动态表时进行追加
    val dataTable Table tableEnvfromDataStream(dataStream 'id 'temperature 'psproctime)


    2 果EventTime创建动态表时声明
    val dataTable Table tableEnvfromDataStream(dataStream 'id 'temperature 'tsrowtime)

    3 滚动窗口Tumble over 10000millis on 表示
    val resultTable Table dataTable
    window( Tumble over 10seconds on 'ts as 'tw )
    groupBy('id 'tw)
    select('id 'idcount)

    104 SQL编写
    统计10秒中传感器温度值数
    def main(args Array[String]) Unit {
    val env StreamExecutionEnvironmentgetExecutionEnvironment
    envsetParallelism(1)
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)

    val inputStream envreadTextFile(\\sensortxt)
    val dataStream inputStream
    map( data > {
    val dataArray datasplit()
    SensorReading(dataArray(0)trim dataArray(1)trimtoLong dataArray(2)trimtoDouble)
    }
    )
    assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Timeseconds(1)) {
    override def extractTimestamp(element SensorReading) Long elementtimestamp * 1000L
    })
    基env创建 tableEnv
    val settings EnvironmentSettings EnvironmentSettingsnewInstance()useOldPlanner()inStreamingMode()build()
    val tableEnv StreamTableEnvironment StreamTableEnvironmentcreate(env settings)

    条流创建张表字段定义指定事件时间时间字段
    val dataTable Table tableEnvfromDataStream(dataStream 'id 'temperature 'tsrowtime)

    直接写sql完成开窗统计
    val resultSqlTable Table tableEnvsqlQuery(select id count(id) from
    + dataTable + group by id tumble(ts interval '15' second))

    val selectedStream DataStream[(Boolean (String Long))] resultSqlTabletoRetractStream[(String Long)]

    selectedStreamprint()

    envexecute(table window test)
    }

    第十章 Flink CEP简介
    111 什复杂事件处理CEP
    简单事件构成事件流通定规匹配然输出户想数满足规复杂事件
    特征:
    Ø 目标:序简单事件流中发现高阶特征
    Ø 输入:简单事件构成事件流
    Ø 处理:识简单事件间联系符合定规简单事件构成复杂事件
    Ø 输出:满足规复杂事件

    CEP分析低延迟频繁产生源事件流CEP帮助复杂相关事件流中找出意义模式复杂关系接实时准实时获通知阻止行
    CEP支持流进行模式匹配根模式条件分连续条件连续条件模式条件允许时间限制条件范围没达满足条件时会导致模式匹配超时
    起简单功:
    Ø 输入流数快产生结果
    Ø 2event流基时间进行聚合类计算
    Ø 提供实时准实时警告通知
    Ø 样数源中产生关联分析模式
    Ø 高吞吐低延迟处理
    市场种CEP解决方案例SparkSamzaBeam等没提供专门library支持Flink提供专门CEP library
    112 Flink CEP
    FlinkCEP提供专门Flink CEP library包含组件:
    Ø Event Stream
    Ø pattern定义
    Ø pattern检测
    Ø 生成Alert

    首先开发员DataStream流定义出模式条件Flink CEP引擎进行模式检测必时生成告警
    Flink CEP需导入赖:

    orgapacheflink
    flinkcep_{scalabinaryversion}
    {flinkversion}

    Event Streams
    登陆事件流例:
    case class LoginEvent(userId String ip String eventType String eventTime String)

    val env StreamExecutionEnvironmentgetExecutionEnvironment
    envsetStreamTimeCharacteristic(TimeCharacteristicEventTime)
    envsetParallelism(1)

    val loginEventStream envfromCollection(List(
    LoginEvent(1 19216801 fail 1558430842)
    LoginEvent(1 19216802 fail 1558430843)
    LoginEvent(1 19216803 fail 1558430844)
    LoginEvent(2 1921681010 success 1558430845)
    ))assignAscendingTimestamps(_eventTimetoLong)
    Pattern API
    Pattern应该包含步骤者做statestatestate通常需定义条件例列代码:
    val loginFailPattern Patternbegin[LoginEvent](begin)
    where(_eventTypeequals(fail))
    next(next)
    where(_eventTypeequals(fail))
    within(Timeseconds(10)
    state应该标示:例begin[LoginEvent](begin)中begin
    state需唯名字需filter滤条件滤条件定义事件需符合条件例
    where(_eventTypeequals(fail))
    通subtype限制event子类型:
    startsubtype(SubEventclass)where()
    事实次调subtypewhere方法果where条件相关通or指定单独filter函数:
    patternwhere()or()
    条件基础通next者followedBy方法切换statenext意思说步符合条件元素紧挨着元素followedBy求定挨着元素两者分称严格邻非严格邻
    val strictNext startnext(middle)
    val nonStrictNext startfollowedBy(middle)
    Pattern条件限定定时间范围:
    nextwithin(Timeseconds(10))
    时间Processing TimeEvent Time
    Pattern 检测
    通input DataStream刚刚定义Pattern创建PatternStream:
    val input
    val pattern

    val patternStream CEPpattern(input pattern)
    val patternStream CEPpattern(loginEventStreamkeyBy(_userId) loginFailPattern)
    旦获PatternStream通selectflatSelectMap序列找需警告信息
    select
    select方法需实现PatternSelectFunction通select方法输出需警告接受Map包含stringevent中keystate名字event真实Event
    val loginFailDataStream patternStream
    select((pattern Map[String Iterable[LoginEvent]]) > {
    val first patterngetOrElse(begin null)iteratornext()
    val second patterngetOrElse(next null)iteratornext()

    Warning(firstuserId firsteventTime secondeventTime warning)
    })
    返回值仅1条记录
    flatSelect
    通实现PatternFlatSelectFunction实现select相似功唯区flatSelect方法返回条记录通Collector[OUT]类型参数输出数传递游
    超时事件处理
    通within方法parttern规匹配事件限定定窗口范围超窗口时间达event通selectflatSelect中实现PatternTimeoutFunctionPatternFlatTimeoutFunction处理种情况
    val patternStream PatternStream[Event] CEPpattern(input pattern)

    val outputTag OutputTag[String](sideoutput)

    val result SingleOutputStreamOperator[ComplexEvent] patternStreamselect(outputTag){
    (pattern Map[String Iterable[Event]] timestamp Long) > TimeoutEvent()
    } {
    pattern Map[String Iterable[Event]] > ComplexEvent()
    }

    val timeoutResult DataStream resultgetSideOutput(outputTag)
    附录 常见面试问题汇总
    1.面试题:应架构
    问题:公司提交实时务少 Job Manager?
    解答: 1 yarn session模式提交务种方式次提交会创建新 Flink 集群 job 提供资源务间互相独立互影响方便理务执行完成创建集群会消失线命令脚:
    binyarnsessionsh n 7 s 8 jm 3072 tm 32768 qu root** nm ** d
    中申请 7 taskManager 8 核 taskmanager 32768M 存
    2 集群默认 Job Manager防止单点障配置高standlone模式公司般配置 Job Manager两备 Job Manager然结合 ZooKeeper 达高yarn模式yarnJob Mananger障会动进行重启需配置重启次数10次
    2.面试题二:压测监控
    问题:做压力测试监控?
    解答:般碰压力方面:
    产生数流速度果快游算子消费话会产生背压背压监控 Flink Web UI(localhost8081) 视化监控旦报警知道般情况背压问题产生 sink 操作符没优化做优化果写入 ElasticSearch 改成批量写入调 ElasticSearch 队列等等策略
    二设置watermark延迟时间参数果设置会造成存压力设置延迟时间然迟元素发送侧输出流中晚点更新结果者类似 RocksDB 样状态端 RocksDB 会开辟堆外存储空间 IO 速度会变慢需权衡
    三滑动窗口长度果长滑动距离短话Flink 性会降厉害通时间分片方法元素存入重叠窗口样减少窗口处理中状态写入参见链接:httpswwwinfoqcnarticlesIhs_qY6HCpMQNblTI9M
    四状态端 RocksDB没碰撑爆问题
    3.面试题三:什Flink
    问题:什 Flink 代 Spark?
    解答:考虑flink低延迟高吞吐量流式数应场景更支持外flink处理乱序数保证exactlyonce状态致性详见文档第章 Flink Spark 详细
    4.面试题四:checkpoint存储
    问题:Flink checkpoint 存里?
    解答:存文件系统者 RocksDB详见文档94节
    5.面试题五:exactlyonce保证
    问题:果级存储支持事务Flink 保证 exactlyonce?
    解答:端端exactlyoncesink求较高具体实现幂等写入事务性写入两种方式幂等写入场景赖业务逻辑更常见事务性写入事务性写入预写日志(WAL)两阶段提交(2PC)两种方式
    果外部系统支持事务预写日志方式结果数先成状态保存然收 checkpoint 完成通知时次性写入 sink 系统
    参见文档9293节课件Flink状态致性
    6.面试题六:状态机制
    问题:说 Flink 状态机制?
    解答:Flink置算子包括源source数存储sink状态Flink中状态始终特定算子相关联Flink会checkpoint形式务状态进行快保证障恢复时状态致性Flink通状态端理状态checkpoint存储状态端配置选择详见文档第九章
    7.面试题七:海量key重
    问题:重?考虑实时场景:双十场景滑动窗口长度 1 时滑动距离 10 秒钟亿级户样计算 UV?
    解答:类似 scala set 数结构者 redis set 显然行亿 Key存放考虑布隆滤器(Bloom Filter)重
    8.面试题八:checkpointspark较
    问题:Flink checkpoint 机制 spark 什优势?
    解答:spark streaming checkpoint 仅仅针 driver 障恢复做数元数 checkpoint flink checkpoint 机制 复杂采轻量级分布式快实现算子快流动中数快参见文档93节文章链接: httpscloudtencentcomdeveloperarticle1189624
    9.面试题九:watermark机制
    问题:请详细解释FlinkWatermark机制
    解答:Watermark质Flink中衡量EventTime进展机制处理乱序数详见文档73节
    10.面试题十:exactlyonce实现
    问题:Flink中exactlyonce语义实现状态存储?
    解答:Flinkcheckpoint机制实现exactlyonce语义果实现端端exactlyonce需外部sourcesink满足定条件状态存储通状态端理Flink中配置状态端详见文档929394节
    11.面试题十:CEP
    问题:Flink CEP 编程中状态没达时候会数保存里?
    解答:流式处理中CEP 然支持 EventTime 相应支持数迟现象watermark处理逻辑CEP未匹配成功事件序列处理迟数类似 Flink CEP处理逻辑中状态没满足迟数会存储Map数结构中说果限定判断事件序列时长5分钟存中会存储5分钟数存极损伤
    12.面试题十二:三种时间语义
    问题:Flink 三种时间语义什分说出应场景?
    解答:
    1 Event Time:实际应常见时间语义具体见文档第七章
    2 Processing Time:没事件时间情况者实时性求超高情况
    3 Ingestion Time:存Source Operator情况Source Operator系统时钟指派 Ingestion Time续基时间相关种操作会数记录中 Ingestion Time
    13.面试题十三:数高峰处理
    问题:Flink程序面数高峰期时处理?
    解答:容量 Kafka 数先放消息队列里面作数源 Flink 进行消费样会影响点实时性

    文档香网(httpswwwxiangdangnet)户传

    《香当网》用户分享的内容,不代表《香当网》观点或立场,请自行判断内容的真实性和可靠性!
    该内容是文档的文本内容,更好的格式请下载文档

    下载文档到电脑,查找使用更方便

    文档的实际排版效果,会与网站的显示效果略有不同!!

    需要 6 香币 [ 分享文档获得香币 ]

    下载文档

    相关文档

    大数据技术之大数据概论

    大数据(big data),指无法在一定时间范围内用常规软件工具进行捕捉、管理和处理的数据集合,是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力的海量、高增长率和多样化的信息资产。

    5年前   
    1327    0

    数据加密技术

    数据加密技术数据加密技术 发布时间: 2003-11-3 作者:秩名 我们经常需要一种措施来保护我们的数据,防止被一些怀有不良用心的人所看到或者破坏。        在信息时代,信息可以帮助团...

    8年前   
    519    0

    感受数据管理技术的应用

    本节课是《数据管理技术》课的开篇,是在《信息技术基础》课的基础上对数据管理知识的进一步认识、拓展与加深。共有两方面的主要内容,一是体验数据管理技术,二是数据管理技术的应用。这节课既要让学生了解认...

    5年前   
    1051    0

    《数据库技术及应用》知识点总结

    《数据库技术与应用》知识点总结第一章 数据库基础1. 基本概念: 数据:数据泛指对客观事物的数量、属性、位置及其相互关系的抽象表示,以适合于用人工或自然的方式进行保存、传递和处理。...

    3年前   
    918    0

    《数据库应用技术》大作业

    《数据库应用技术》大作业题 目: 数 据 库 应 用 技 术 专 业: 网 络 工 程 学 ...

    3年前   
    652    0

    Server数据完整性约束的实现技术

    基于Client/Server数据完整性约束的实现技术  摘 要:本论文主要讨论基于Client/Server数据完整性约束及其如何实施企业业务规则,并以SQLServer和PowerBuil...

    9年前   
    398    0

    《VMware 虚拟化数据中心技术方案》

    VMware虚拟化数据中心解决方案 目 录一、VMWARE公司简介及解决方案综述 11.1. VMware公司简介 11.2. 虚拟化架构的优势 11.3. VM...

    2年前   
    395    0

    大数据技术在广电领域的应用探索

    大数据又称为海量数据、巨量数据,其中所包含的数据量规模巨大到无法通过人工在一定的时间内达到管理、截取、处理并整理为人类能够解读的信息。

    6年前   
    2086    0

    课程数据信息表(XX工程技术大学)

    课程数据信息表课程平台单位(公章):基本信息课程名称 学校名称 课程负责人 单期课程开设周数 课程上线平台名称:课程开设情况开设学期起止时间选课人数课程链接1   2     …   第( )...

    2年前   
    527    0

    **大酒店数据接入技术方案d座

    根据**大酒店的相关需求,并结合到**大酒店d座建筑物平面布置及预埋线路,现将**酒店数据网络建设所需的设备设施、施工费用及宽带接入费用预算如下:

    3年前   
    377    0

    技术合同:数据保密协议

    技术合同:数据保密协议  甲方:_________________  乙方:_________________  双方经平等协商同意,自愿签订本协议,共同遵守本协议所列条款。  1.保密的内容...

    9年前   
    425    0

    10大高手之唐万新

    10大高手之唐万新   代表作品:整合新疆屯河   什么赚钱做什么   唐氏四兄弟:唐万里、唐万平、唐万川、唐万新,“万里平川一片新”中,小老弟唐万新是四兄弟之核心,也是新疆德隆集团...

    9年前   
    18366    0

    数据库课程设计之房屋中介管理系统

    《数据库系统原理》课程设计报告项目名称: 房屋中介管理系统 专 业: 网络工程 年 级: 20XX级 ...

    1年前   
    528    0

    银行创新之金融大数据时代

    银行创新之金融大数据时代 近日和我的会计主管闲聊时候谈及余额宝,它也成为街头巷尾热议的话题,并持续占据着诸多媒体重要版面,金融已经完全从一个高贵、专业、远离大众的行业,随着互联网金融的迅猛发...

    9年前   
    7327    0

    毕业论文之SMT技术的简述

     毕业论文之SMT技术的简述      smt技术的简述  这学期我们学习了电子工艺技术基础,电子工艺包括很多种技术。其中表面安装技术smt就是一种非常有用的电子工艺技术。下面我对smt技术简...

    10年前   
    476    0

    技能培训之配电设备作业安全技术

    配电设备作业安全技术1 配电设备上工作的一般规定1.1 配电设备 [包括:高压配电室、箱式变电站、配电变压器台架、低压配电室 (箱)、环网柜、电缆分支箱]停电检修时,应使用电力线路第一种工作票...

    1年前   
    391    0

    绩效管理的十大困扰之八

    绩效管理的十大困扰 作者:行天人力资源管理 曹子祥 第八大困扰,考评结果运用不良;  考评结果运用不良,体现在两个方面,一是压根不运用,考评完了就完了,好象考评本身是一个任务一样;第二是运用单...

    10年前   
    534    0

    绩效管理的十大困扰之六

    绩效管理的十大困扰 作者:行天人力资源管理 曹子祥 第六大困扰,绩效管理的考评标准难制定;  绩效管理的考评标准难制定,这也是我们企业一大困扰。给大家讲一些案例:  一个是我的一个客户,做清洁...

    10年前   
    525    0

    个人清洁之八大戒条

    个人清洁之八大戒条  个人卫生别忘清洁牙齿  本文作者经过调查总结的个人清洁之八大戒条,如果你不小心犯了下面“八戒”中的其中一条,那就需要抓紧时间“自我改造”了。  蘸着唾液点钱  一次去银行...

    9年前   
    482    0

    绩效管理的十大困扰之五

    绩效管理的十大困扰 作者:行天人力资源管理 曹子祥第五大困扰,指标过多。  指标设得过多有时似乎是没办法的事,有时像我们顾问公司给企业做方案,一开始订指标时,进行各种各样的分析之后,好不容易提...

    9年前   
    518    0