既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
public void setEventType(String eventType) { this.eventType = eventType; } public Long getEventTime() { return eventTime; } public void setEventTime(Long eventTime) { this.eventTime = eventTime; } public LoginEvent(int userId, String ip, String eventType, Long eventTime) { this.userId = userId; this.ip = ip; this.eventType = eventType; this.eventTime = eventTime; } @Override public String toString() { return "LoginEvent{" + "userId=" + userId + ", ip='" + ip + '\'' + ", eventType='" + eventType + '\'' + ", eventTime=" + eventTime + '}'; } }
}
### 2.监控市场价格 需求: 物价局和工商局会监督市场上各种商品得销售价格,随着市场行情和商品供需得变化,商品价格会有一定程度得浮动,如果商品价格在指定得价格区间波动,政府部门是不会干预的额,如果商品价格在一定的时间范围内波动幅度超出了指定的区间范围,并且上行幅度过大,物价局会上报敏感数据信息,并规范市场价格。 在此,我们假定如果商品售价在1分钟之内有连续两次超过预定商品价格阀值就发送告警信息。 测试数据
{“goodsId”:100001,“goodsPrice”:6,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430843000}
{“goodsId”:100007,“goodsPrice”:0.5,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430844000}
{“goodsId”:100002,“goodsPrice”:2,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430845000}
{“goodsId”:100003,“goodsPrice”:2,“goodsName”:“flour”,“alias”:“面粉”,“orderTime”:1558430846000}
{“goodsId”:100004,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430847000}
{“goodsId”:100005,“goodsPrice”:20,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430848000}
{“goodsId”:100006,“goodsPrice”:3,“goodsName”:“banana”,“alias”:“香蕉”,“orderTime”:1558430849000}
{“goodsId”:100007,“goodsPrice”:10,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430850000}
{“goodsId”:100001,“goodsPrice”:16,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430852000}
{“goodsId”:100007,“goodsPrice”:15,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430853000}
{“goodsId”:100002,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430854000}
{“goodsId”:100003,“goodsPrice”:12,“goodsName”:“flour”,“alias”:“面粉”,“orderTime”:1558430855000}
{“goodsId”:100004,“goodsPrice”:12,“goodsName”:“rice”,“alias”:“大米”,“orderTime”:1558430856000}
{“goodsId”:100005,“goodsPrice”:20,“goodsName”:“apple”,“alias”:“苹果”,“orderTime”:1558430857000}
{“goodsId”:100006,“goodsPrice”:13,“goodsName”:“banana”,“alias”:“香蕉”,“orderTime”:1558430858000}
{“goodsId”:100007,“goodsPrice”:10,“goodsName”:“mask”,“alias”:“口罩”,“orderTime”:1558430859000}
创建kafka topic
./kafka-topics.sh --create --topic cep --zookeeper node01:2181 --partitions 1 --replication-factor 1
生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic cep
redis保存限制价格 jedisCluster.hset(“product”,“apple”,“10”); jedisCluster.hset(“product”,“rice”,“6”); jedisCluster.hset(“product”,“flour”,“6”); jedisCluster.hset(“product”,“banana”,“8”); jedisCluster.hset(“product”,“mask”,“5”); 开发步骤 在test源码目录下创建测试类:cn.itcast.CepMarkets 1.获取流处理执行环境 2.设置事件时间、并行度 整合kafka 4.数据转换 5.process获取bean,设置status,并设置事件时间 6.定义匹配模式,设置时间长度 7.匹配模式(分组) 8.查询告警数据 2.1.代码开发
public class CepMarkets {
public static void main(String[] args) throws Exception { //1.获取流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //3.整合kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "node01:9092"); //broker地址 properties.setProperty("group.id", "cep"); //消费组 properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "5000"); FlinkKafkaConsumer011kafkaConsumer = new FlinkKafkaConsumer011<>("cep", new SimpleStringSchema(), properties); kafkaConsumer.setStartFromEarliest(); DataStreamSource source = env.addSource(kafkaConsumer); //4.数据转换 SingleOutputStreamOperator mapData = source.map(new MapFunction () { @Override public Product map(String value) throws Exception { JSONObject json = JSON.parseObject(value); Product product = new Product( json.getLong("goodsId"), json.getDouble("goodsPrice"), json.getString("goodsName"), json.getString("alias"), json.getLong("orderTime"), false ); return product; } }); //5.保留告警数据(设置时间) SingleOutputStreamOperator waterData = mapData.keyBy(Product::getGoodsId) .process(new KeyedProcessFunction () { Map map = null; @Override public void open(Configuration parameters) throws Exception { JedisCluster jedisCluster = RedisUtil.getJedisCluster(); map = jedisCluster.hgetAll("product"); } @Override public void processElement(Product value, Context ctx, Collector out) throws Exception { long priceAlert = Long.parseLong(map.get(value.getGoodsName())); if (value.getGoodsPrice() > priceAlert) { value.setStatus(true); } out.collect(value); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.seconds(0)) { @Override public long extractTimestamp(Product element) { return element.getOrderTime(); } }) ; //6.定义匹配模式,设置时间长度 Pattern pattern = Pattern. begin("begin") .where(new SimpleCondition () { @Override public boolean filter(Product value) throws Exception { return value.getStatus() == true; } }) .next("next") .where(new SimpleCondition () { @Override public boolean filter(Product value) throws Exception { return value.getStatus() == true; } }) .within(Time.seconds(60)); //7.匹配模式(分组) PatternStream cep = CEP.pattern(waterData.keyBy(Product::getGoodsId), pattern); //8.查询告警数据 cep.select(new PatternSelectFunction () { @Override public Object select(Map > pattern) throws Exception { List result = pattern.get("next"); return result; } }).print("告警数据:"); env.execute(); }
}
2.2.Bean对象
属性:goodsId、goodsPrice、goodsName、alias、orderTime、status
public class Product {
private Long goodsId;
private Double goodsPrice;
private String goodsName;
private String alias;
private Long orderTime;
private Boolean status;
public Product(Long goodsId, Double goodsPrice, String goodsName, String alias, Long orderTime, Boolean status) { this.goodsId = goodsId; this.goodsPrice = goodsPrice; this.goodsName = goodsName; this.alias = alias; this.orderTime = orderTime; this.status = status; } @Override public String toString() { return "Product{" + "goodsId=" + goodsId + ", goodsPrice=" + goodsPrice + ", goodsName='" + goodsName + '\'' + ", alias='" + alias + '\'' + ", orderTime=" + orderTime + ", status=" + status + '}'; } public Long getGoodsId() { return goodsId; } public void setGoodsId(Long goodsId) { this.goodsId = goodsId; } public Double getGoodsPrice() { return goodsPrice; } public void setGoodsPrice(Double goodsPrice) { this.goodsPrice = goodsPrice; } public String getGoodsName() { return goodsName; } public void setGoodsName(String goodsName) { this.goodsName = goodsName; } public String getAlias() { return alias; } public void setAlias(String alias) { this.alias = alias; } public Long getOrderTime() { return orderTime;
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取
[外链图片转存中…(img-LIZTQw3o-1715726852898)]
[外链图片转存中…(img-8utSNZ8Z-1715726852898)]
既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!
由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新
需要这份系统化资料的朋友,可以戳这里获取