博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
必须串行执行程序如何提高执行效率之队列方法
阅读量:4041 次
发布时间:2019-05-24

本文共 6977 字,大约阅读时间需要 23 分钟。

场景是meta与index,为1对多,index与user为1对多,程序需要先判断meta表中是否存在逻辑进行表的处理,然后index根据meta_id来进行是否表存在逻辑,然后user表再根据index_id进行表存储;这样逻辑处理的需要串行,这样操作处理数据库效率会非常低。针对这种情况,可以采用三个固定队列的方式,meta队列,index队列,user队列,队列按照先进先出的策略执行,这样就会使一个串行线程,增加缓冲时间。例如meta线程处理完一个后,index线程进行处理,这时又有线程进入了meta队列进行处理了,增加队列错开时间。如下图:

线程图

线程执行流程,以及效率提示执行缩略图

中间index处理的线程,处理metaQueue,增加到indexQueue

public class IndexProducer implements Runnable {    private BlockingQueue
metaqueue ; private BlockingQueue
indexqueue ; private VbIndexRepository vbIndexRepository ; private int chat_time ; private int liveStartTime ; private int vacuateTime ; private int videoId ; private long time; public IndexProducer(BlockingQueue
mq,BlockingQueue
iq,VbIndexRepository vbIndexRepository){ this.metaqueue=mq; this.indexqueue=iq ; this.vbIndexRepository =vbIndexRepository ; } @Override public void run(){ while (true){ log.info("index producer ,metaqueue size:"+metaqueue.size()+";indexqueue size:"+indexqueue.size()); try{ IndexForQueue ifq = metaqueue.take() ; VbMeta meta = ifq.getVbMeta() ; chat_time = ifq.getChat_time() ; liveStartTime = ifq.getLiveStartTime() ; vacuateTime = ifq.getVacuateTime() ; videoId = ifq.getVideoId() ; time = ifq.getTime() ; long metaId = meta.getMetaId() ; int timestamp = chat_time - liveStartTime ; int offset = timestamp <0 ? 0:timestamp; int indextime = (offset/vacuateTime)*vacuateTime; VbIndex index = vbIndexRepository.queryIndexByVideoTimeMeta(videoId,Long.valueOf(String.valueOf(indextime)),metaId) ;// boolean idxexist = false ; if(null==index){ index = new VbIndex(); index.setCreateTime(time); index.setMetaId(metaId); index.setTimestamp(Long.valueOf(String.valueOf(indextime))); index.setVideoId(videoId); index = vbIndexRepository.save(index) ; } indexqueue.put(index); } catch (InterruptedException e) { log.error("indexproducer run error:"+e); e.printStackTrace(); } } }}
user入库的线程,处理indexQueue队列

public class IndexConsumer implements Runnable {    private BlockingQueue
indexQueue ; private VbUserRepository vbUserRepository ; private VbConfigRepository vbConfigRepository ; private long time ; private int chat_time ; private int liveStartTime ; private int uid ; private int videoId ; @Value("${lexue.client}") private String client ; public IndexConsumer(BlockingQueue
iq,VbUserRepository vbUserRepository,VbConfigRepository vbConfigRepository,long time,int chat_time,int liveStartTime ,int uid,int videoId){ this.indexQueue =iq ; this.vbUserRepository = vbUserRepository ; this.vbConfigRepository= vbConfigRepository ; this.time=time ; this.chat_time =chat_time ; this.liveStartTime = liveStartTime ; this.uid =uid ; this.videoId = videoId ; } @Override public void run(){ log.info("indexConsumer run,indexQueue size:"+indexQueue.size()); try{ int timestamp = (chat_time - liveStartTime) <0?0:(chat_time - liveStartTime) ; VbIndex index = indexQueue.take() ; VbUser user = new VbUser(); user.setCreateTime(time); user.setIndexId(index.getIndexId()); user.setTimestamp(timestamp); user.setUid(uid); user.setVideoId(videoId); vbUserRepository.save(user) ; VbConfig vc = vbConfigRepository.findOne(videoId) ; if(vc==null || "0".equals(vc.getUpdateStatus())){ updateConfig(videoId) ; } } catch (InterruptedException e) { log.error("indexConsumer error:"+e); e.printStackTrace(); } }
}

主流程处理:meta为主流程处理完后,放入metaQueue中,index线程是一直循环处理,user线程是固定线程池进行执行

private static Thread indexpro=null ;    ExecutorService userservice = Executors.newFixedThreadPool(10);    /**     * 定时监控线程是否挂掉     */    @Scheduled(cron="0/30 * *  * * ? " )    private void checkThread(){        if(indexpro ==null || !indexpro.isAlive()){            indexpro = new Thread(new IndexProducer(metaQueue,indexQueue,vbIndexRepository)) ;            indexpro.start();            log.warn("indexThread new start");        }    }    /**     * 直播弹幕入库     * @param uid     * @param liveroom     * @param content     * @param chat_time     * @param msg_type     */    public void addLiveBullets(int uid , int liveroom , String content ,int chat_time ,int msg_type ){        log.info("addLiveBullets start:"+new Date());        if(!("".equals(content) || liveroom <1)){            try{                Live live = cacheService.queryLiveByRoom(liveroom) ;                if(live !=null){                    VbMeta meta = vbMetaRepository.queryByContent(content) ;                    long time = System.currentTimeMillis()/1000 ;                    if(null==meta){                        meta = new VbMeta() ;                        meta.setContent(content);                        meta.setContentType(Short.valueOf(String.valueOf(msg_type)));                        meta.setCount(Long.valueOf("1"));                        meta.setCreateTime(time);                        meta.setDisplay(Short.valueOf("0"));                    }else{                        meta.setCount(meta.getCount()+1);                        meta.setUpdateTime(time);                    }                    meta = vbMetaRepository.save(meta) ;                    IndexForQueue ifq = new IndexForQueue() ;                    ifq.setVbMeta(meta);                    ifq.setVideoId(live.getVideoId());                    ifq.setChat_time(chat_time);                    ifq.setLiveStartTime(live.getStartTime());                    ifq.setTime(time);                    ifq.setVacuateTime(vacuateTime);                    metaQueue.put(ifq);                    log.info("meta deal end:"+new Date());                    Thread indexcons = new Thread(new IndexConsumer(indexQueue,vbUserRepository,vbConfigRepository,time,chat_time,live.getStartTime(),uid,live.getVideoId())) ;                    userservice.execute(indexcons);                }else{                    log.info("not query live by liveroom:"+liveroom);                }            }catch (Exception e){                log.error("addLiveBullets error:"+e);            }        }else{            log.info("信息不完整,忽略该信息content:"+content);        }        log.info("addLiveBullets end:"+new Date());    }

最后测试的结果,提高效率差不多是3倍的效果,类似于切分了三次操作库。每分钟插入数据3张表,每张表各2000多条数据。后续可能需要再设置结构,进行相对提升。

另一种方案:就是不进行通过db的方式,不进行数据逻辑校验,等数据全部入库后,再进行数据的合并逻辑处理。

你可能感兴趣的文章
01Java基础语法-19. 循环跳转控制语句
查看>>
Django框架全面讲解 -- Form
查看>>
socket,accept函数解析
查看>>
今日互联网关注(写在清明节后):每天都有值得关注的大变化
查看>>
”舍得“大法:把自己的优点当缺点倒出去
查看>>
[今日关注]鼓吹“互联网泡沫,到底为了什么”
查看>>
[互联网学习]如何提高网站的GooglePR值
查看>>
[关注大学生]求职不可不知——怎样的大学生不受欢迎
查看>>
[关注大学生]读“贫困大学生的自白”
查看>>
[互联网关注]李开复教大学生回答如何学好编程
查看>>
[关注大学生]李开复给中国计算机系大学生的7点建议
查看>>
[关注大学生]大学毕业生择业:是当"鸡头"还是"凤尾"?
查看>>
[茶余饭后]10大毕业生必听得歌曲
查看>>
gdb调试命令的三种调试方式和简单命令介绍
查看>>
C++程序员的几种境界
查看>>
VC++ MFC SQL ADO数据库访问技术使用的基本步骤及方法
查看>>
VUE-Vue.js之$refs,父组件访问、修改子组件中 的数据
查看>>
Vue-子组件改变父级组件的信息
查看>>
Python自动化之pytest常用插件
查看>>
Python自动化之pytest框架使用详解
查看>>