副标题[/!--empirenews.page--]
                         前言
 
近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。 
关键技术栈
    - Elasticsearch
 
    - jdbc
 
    - ExecutorServiceThread
 
    - sql
 
 
工具说明
maven依赖 
- <dependency> 
 -  <groupId>mysql</groupId> 
 -  <artifactId>mysql-connector-java</artifactId> 
 -  <version>${mysql.version}</version> 
 - </dependency> 
 - <dependency> 
 -  <groupId>org.elasticsearch</groupId> 
 -  <artifactId>elasticsearch</artifactId> 
 -  <version>${elasticsearch.version}</version> 
 - </dependency> 
 - <dependency> 
 -  <groupId>org.elasticsearch.client</groupId> 
 -  <artifactId>transport</artifactId> 
 -  <version>${elasticsearch.version}</version> 
 - </dependency> 
 - <dependency> 
 -  <groupId>org.projectlombok</groupId> 
 -  <artifactId>lombok</artifactId> 
 -  <version>${lombok.version}</version> 
 - </dependency> 
 - <dependency> 
 -  <groupId>com.alibaba</groupId> 
 -  <artifactId>fastjson</artifactId> 
 -  <version>${fastjson.version}</version> 
 - </dependency> 
 
  
java线程池设置 
默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。 
- private static int THREADS = 21; 
 - public static ExecutorService POR = Executors.newFixedThreadPool(THREADS); 
 - public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS); 
 
  
定义已办生产者线程/已阅生产者线程:ZlPendProducer/ZlReadProducer 
- public class ZlPendProducer implements Runnable { 
 -  ... 
 -  @Override 
 -  public void run() { 
 -  System.out.println(threadName + "::启动..."); 
 -  for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++) 
 -  try { 
 -  .... 
 -  int size = 1000; 
 -  for (int i = 0; i < count; i += size) { 
 -  if (i + size > count) { 
 -  //作用为size最后没有100条数据则剩余几条newList中就装几条 
 -  size = count - i; 
 -  } 
 -  String sql = "select * from " + tableName + " limit " + i + ", " + size; 
 -  System.out.println(tableName + "::sql::" + sql); 
 -  rs = statement.executeQuery(sql); 
 -  List<HistPendingEntity> lst = new ArrayList<>(); 
 -  while (rs.next()) { 
 -  HistPendingEntity p = PendUtils.getHistPendingEntity(rs); 
 -  lst.add(p); 
 -  } 
 -  MteExecutor.POR.submit(new ZlPendConsumer(lst)); 
 -  Thread.sleep(2000); 
 -  } 
 -  .... 
 -  } catch (Exception e) { 
 -  e.printStackTrace(); 
 -  } 
 -  } 
 - } 
 - public class ZlReadProducer implements Runnable { 
 -  ...已阅生产者处理逻辑同已办生产者 
 - } 
 
  
定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer 
- public class ZlPendConsumer implements Runnable { 
 -  private String threadName; 
 -  private List<HistPendingEntity> lst; 
 -  public ZlPendConsumer(List<HistPendingEntity> lst) { 
 -  this.lst = lst; 
 -  } 
 -  @Override 
 -  public void run() { 
 -  ... 
 -  lst.forEach(v -> { 
 -  try { 
 -  String json = new Gson().toJson(v); 
 -  EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null); 
 -  Const.COUNTER.LD_P.incrementAndGet(); 
 -  } catch (Exception e) { 
 -  e.printStackTrace(); 
 -  System.out.println("err::PendingId::" + v.getPendingId()); 
 -  } 
 -  }); 
 -  ... 
 -  } 
 - } 
 - public class ZlReadConsumer implements Runnable { 
 -  //已阅消费者处理逻辑同已办消费者 
 - } 
 
  
定义导入Elasticsearch数据监控线程:Monitor 
                                                (编辑:我爱故事小小网_铜陵站长网) 
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! 
                     |