今日指数项目之FlinkCEP入门案例

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

 public void setEventType(String eventType) {
        this.eventType = eventType;
    }
    public Long getEventTime() {
        return eventTime;
    }
    public void setEventTime(Long eventTime) {
        this.eventTime = eventTime;
    }
    public LoginEvent(int userId, String ip, String eventType, Long eventTime) {
        this.userId = userId;
        this.ip = ip;
        this.eventType = eventType;
        this.eventTime = eventTime;
    }
    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId=" + userId +
                ", ip='" + ip + '\'' +
                ", eventType='" + eventType + '\'' +
                ", eventTime=" + eventTime +
                '}';
    }
}

}

### 2.监控市场价格
需求:  
 物价局和工商局会监督市场上各种商品得销售价格,随着市场行情和商品供需得变化,商品价格会有一定程度得浮动,如果商品价格在指定得价格区间波动,政府部门是不会干预的额,如果商品价格在一定的时间范围内波动幅度超出了指定的区间范围,并且上行幅度过大,物价局会上报敏感数据信息,并规范市场价格。  
 在此,我们假定如果商品售价在1分钟之内有连续两次超过预定商品价格阀值就发送告警信息。
测试数据

{“goodsId”:100001,“goodsPrice”:6,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430843000}

{“goodsId”:100007,“goodsPrice”:0.5,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430844000}

{“goodsId”:100002,“goodsPrice”:2,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430845000}

{“goodsId”:100003,“goodsPrice”:2,“goodsName”:“flour”,“alias”:“面粉”,“orderTime”:1558430846000}

{“goodsId”:100004,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430847000}

{“goodsId”:100005,“goodsPrice”:20,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430848000}

{“goodsId”:100006,“goodsPrice”:3,“goodsName”:“banana”,“alias”:“香蕉”,“orderTime”:1558430849000}

{“goodsId”:100007,“goodsPrice”:10,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430850000}

{“goodsId”:100001,“goodsPrice”:16,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430852000}

{“goodsId”:100007,“goodsPrice”:15,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430853000}

{“goodsId”:100002,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430854000}

{“goodsId”:100003,“goodsPrice”:12,“goodsName”:“flour”,“alias”:“面粉”,“orderTime”:1558430855000}

{“goodsId”:100004,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430856000}

{“goodsId”:100005,“goodsPrice”:20,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430857000}

{“goodsId”:100006,“goodsPrice”:13,“goodsName”:“banana”,“alias”:“香蕉”,“orderTime”:1558430858000}

{“goodsId”:100007,“goodsPrice”:10,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430859000}

创建kafka topic

./kafka-topics.sh --create --topic cep --zookeeper node01:2181 --partitions 1 --replication-factor 1

生产数据

./kafka-console-producer.sh --broker-list node01:9092 --topic cep

redis保存限制价格
jedisCluster.hset(“product”,“apple”,“10”);  
 jedisCluster.hset(“product”,“rice”,“6”);  
 jedisCluster.hset(“product”,“flour”,“6”);  
 jedisCluster.hset(“product”,“banana”,“8”);  
 jedisCluster.hset(“product”,“mask”,“5”);
开发步骤  
 在test源码目录下创建测试类:cn.itcast.CepMarkets  
 1.获取流处理执行环境  
 2.设置事件时间、并行度  
 整合kafka  
 4.数据转换  
 5.process获取bean,设置status,并设置事件时间  
 6.定义匹配模式,设置时间长度  
 7.匹配模式(分组)  
 8.查询告警数据
2.1.代码开发

public class CepMarkets {

public static void main(String[] args) throws Exception {
   
    //1.获取流处理执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    //2.设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    //3.整合kafka
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "node01:9092"); //broker地址
    properties.setProperty("group.id", "cep"); //消费组
    properties.setProperty("enable.auto.commit", "true");
    properties.setProperty("auto.commit.interval.ms", "5000");
    FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011<>("cep", new SimpleStringSchema(), properties);
    kafkaConsumer.setStartFromEarliest();
    DataStreamSource source = env.addSource(kafkaConsumer);
    //4.数据转换
    SingleOutputStreamOperator mapData = source.map(new MapFunction() {
        @Override
        public Product map(String value) throws Exception {
            JSONObject json = JSON.parseObject(value);
            Product product = new Product(
                    json.getLong("goodsId"),
                    json.getDouble("goodsPrice"),
                    json.getString("goodsName"),
                    json.getString("alias"),
                    json.getLong("orderTime"),
                    false
            );
            return product;
        }
    });
    //5.保留告警数据(设置时间)
    SingleOutputStreamOperator waterData = mapData.keyBy(Product::getGoodsId)
            .process(new KeyedProcessFunction() {
                Map map = null;
                @Override
                public void open(Configuration parameters) throws Exception {
                    JedisCluster jedisCluster = RedisUtil.getJedisCluster();
                    map = jedisCluster.hgetAll("product");
                }
                @Override
                public void processElement(Product value, Context ctx, Collector out) throws Exception {
                    long priceAlert = Long.parseLong(map.get(value.getGoodsName()));
                    if (value.getGoodsPrice() > priceAlert) {
                        value.setStatus(true);
                    }
                    out.collect(value);
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(0)) {
                @Override
                public long extractTimestamp(Product element) {
                    return element.getOrderTime();
                }
            })
            ;
     //6.定义匹配模式,设置时间长度
    Pattern pattern = Pattern.begin("begin")
            .where(new SimpleCondition() {
                @Override
                public boolean filter(Product value) throws Exception {
                    return value.getStatus() == true;
                }
            })
            .next("next")
            .where(new SimpleCondition() {
                @Override
                public boolean filter(Product value) throws Exception {
                    return value.getStatus() == true;
                }
            })
            .within(Time.seconds(60));
    //7.匹配模式(分组)
    PatternStream cep = CEP.pattern(waterData.keyBy(Product::getGoodsId), pattern);
    //8.查询告警数据
    cep.select(new PatternSelectFunction() {
        @Override
        public Object select(Map> pattern) throws Exception {
            List result = pattern.get("next");
            return result;
        }
    }).print("告警数据:");
    env.execute();
}

}

2.2.Bean对象

属性:goodsId、goodsPrice、goodsName、alias、orderTime、status

public class Product {

private Long goodsId;

private Double goodsPrice;

private String goodsName;

private String alias;

private Long orderTime;

private Boolean status;

public Product(Long goodsId, Double goodsPrice, String goodsName, String alias, Long orderTime, Boolean status) {
    this.goodsId = goodsId;
    this.goodsPrice = goodsPrice;
    this.goodsName = goodsName;
    this.alias = alias;
    this.orderTime = orderTime;
    this.status = status;
}
@Override
public String toString() {
    return "Product{" +
            "goodsId=" + goodsId +
            ", goodsPrice=" + goodsPrice +
            ", goodsName='" + goodsName + '\'' +
            ", alias='" + alias + '\'' +
            ", orderTime=" + orderTime +
            ", status=" + status +
            '}';
}
public Long getGoodsId() {
    return goodsId;
}
public void setGoodsId(Long goodsId) {
    this.goodsId = goodsId;
}
public Double getGoodsPrice() {
    return goodsPrice;
}
public void setGoodsPrice(Double goodsPrice) {
    this.goodsPrice = goodsPrice;
}
public String getGoodsName() {
    return goodsName;
}
public void setGoodsName(String goodsName) {
    this.goodsName = goodsName;
}
public String getAlias() {
    return alias;
}
public void setAlias(String alias) {
    this.alias = alias;
}
public Long getOrderTime() {
    return orderTime;

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取

[外链图片转存中…(img-LIZTQw3o-1715726852898)]

[外链图片转存中…(img-8utSNZ8Z-1715726852898)]

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

需要这份系统化资料的朋友,可以戳这里获取