本文共 6977 字,大约阅读时间需要 23 分钟。
场景是meta与index,为1对多,index与user为1对多,程序需要先判断meta表中是否存在逻辑进行表的处理,然后index根据meta_id来进行是否表存在逻辑,然后user表再根据index_id进行表存储;这样逻辑处理的需要串行,这样操作处理数据库效率会非常低。针对这种情况,可以采用三个固定队列的方式,meta队列,index队列,user队列,队列按照先进先出的策略执行,这样就会使一个串行线程,增加缓冲时间。例如meta线程处理完一个后,index线程进行处理,这时又有线程进入了meta队列进行处理了,增加队列错开时间。如下图:
线程图
线程执行流程,以及效率提示执行缩略图
中间index处理的线程,处理metaQueue,增加到indexQueue
public class IndexProducer implements Runnable { private BlockingQueueuser入库的线程,处理indexQueue队列metaqueue ; private BlockingQueue indexqueue ; private VbIndexRepository vbIndexRepository ; private int chat_time ; private int liveStartTime ; private int vacuateTime ; private int videoId ; private long time; public IndexProducer(BlockingQueue mq,BlockingQueue iq,VbIndexRepository vbIndexRepository){ this.metaqueue=mq; this.indexqueue=iq ; this.vbIndexRepository =vbIndexRepository ; } @Override public void run(){ while (true){ log.info("index producer ,metaqueue size:"+metaqueue.size()+";indexqueue size:"+indexqueue.size()); try{ IndexForQueue ifq = metaqueue.take() ; VbMeta meta = ifq.getVbMeta() ; chat_time = ifq.getChat_time() ; liveStartTime = ifq.getLiveStartTime() ; vacuateTime = ifq.getVacuateTime() ; videoId = ifq.getVideoId() ; time = ifq.getTime() ; long metaId = meta.getMetaId() ; int timestamp = chat_time - liveStartTime ; int offset = timestamp <0 ? 0:timestamp; int indextime = (offset/vacuateTime)*vacuateTime; VbIndex index = vbIndexRepository.queryIndexByVideoTimeMeta(videoId,Long.valueOf(String.valueOf(indextime)),metaId) ;// boolean idxexist = false ; if(null==index){ index = new VbIndex(); index.setCreateTime(time); index.setMetaId(metaId); index.setTimestamp(Long.valueOf(String.valueOf(indextime))); index.setVideoId(videoId); index = vbIndexRepository.save(index) ; } indexqueue.put(index); } catch (InterruptedException e) { log.error("indexproducer run error:"+e); e.printStackTrace(); } } }}
public class IndexConsumer implements Runnable { private BlockingQueueindexQueue ; private VbUserRepository vbUserRepository ; private VbConfigRepository vbConfigRepository ; private long time ; private int chat_time ; private int liveStartTime ; private int uid ; private int videoId ; @Value("${lexue.client}") private String client ; public IndexConsumer(BlockingQueue iq,VbUserRepository vbUserRepository,VbConfigRepository vbConfigRepository,long time,int chat_time,int liveStartTime ,int uid,int videoId){ this.indexQueue =iq ; this.vbUserRepository = vbUserRepository ; this.vbConfigRepository= vbConfigRepository ; this.time=time ; this.chat_time =chat_time ; this.liveStartTime = liveStartTime ; this.uid =uid ; this.videoId = videoId ; } @Override public void run(){ log.info("indexConsumer run,indexQueue size:"+indexQueue.size()); try{ int timestamp = (chat_time - liveStartTime) <0?0:(chat_time - liveStartTime) ; VbIndex index = indexQueue.take() ; VbUser user = new VbUser(); user.setCreateTime(time); user.setIndexId(index.getIndexId()); user.setTimestamp(timestamp); user.setUid(uid); user.setVideoId(videoId); vbUserRepository.save(user) ; VbConfig vc = vbConfigRepository.findOne(videoId) ; if(vc==null || "0".equals(vc.getUpdateStatus())){ updateConfig(videoId) ; } } catch (InterruptedException e) { log.error("indexConsumer error:"+e); e.printStackTrace(); } }
}
主流程处理:meta为主流程处理完后,放入metaQueue中,index线程是一直循环处理,user线程是固定线程池进行执行
private static Thread indexpro=null ; ExecutorService userservice = Executors.newFixedThreadPool(10); /** * 定时监控线程是否挂掉 */ @Scheduled(cron="0/30 * * * * ? " ) private void checkThread(){ if(indexpro ==null || !indexpro.isAlive()){ indexpro = new Thread(new IndexProducer(metaQueue,indexQueue,vbIndexRepository)) ; indexpro.start(); log.warn("indexThread new start"); } } /** * 直播弹幕入库 * @param uid * @param liveroom * @param content * @param chat_time * @param msg_type */ public void addLiveBullets(int uid , int liveroom , String content ,int chat_time ,int msg_type ){ log.info("addLiveBullets start:"+new Date()); if(!("".equals(content) || liveroom <1)){ try{ Live live = cacheService.queryLiveByRoom(liveroom) ; if(live !=null){ VbMeta meta = vbMetaRepository.queryByContent(content) ; long time = System.currentTimeMillis()/1000 ; if(null==meta){ meta = new VbMeta() ; meta.setContent(content); meta.setContentType(Short.valueOf(String.valueOf(msg_type))); meta.setCount(Long.valueOf("1")); meta.setCreateTime(time); meta.setDisplay(Short.valueOf("0")); }else{ meta.setCount(meta.getCount()+1); meta.setUpdateTime(time); } meta = vbMetaRepository.save(meta) ; IndexForQueue ifq = new IndexForQueue() ; ifq.setVbMeta(meta); ifq.setVideoId(live.getVideoId()); ifq.setChat_time(chat_time); ifq.setLiveStartTime(live.getStartTime()); ifq.setTime(time); ifq.setVacuateTime(vacuateTime); metaQueue.put(ifq); log.info("meta deal end:"+new Date()); Thread indexcons = new Thread(new IndexConsumer(indexQueue,vbUserRepository,vbConfigRepository,time,chat_time,live.getStartTime(),uid,live.getVideoId())) ; userservice.execute(indexcons); }else{ log.info("not query live by liveroom:"+liveroom); } }catch (Exception e){ log.error("addLiveBullets error:"+e); } }else{ log.info("信息不完整,忽略该信息content:"+content); } log.info("addLiveBullets end:"+new Date()); }
最后测试的结果,提高效率差不多是3倍的效果,类似于切分了三次操作库。每分钟插入数据3张表,每张表各2000多条数据。后续可能需要再设置结构,进行相对提升。
另一种方案:就是不进行通过db的方式,不进行数据逻辑校验,等数据全部入库后,再进行数据的合并逻辑处理。