Blog

从零开始构建一个任务流水线:Java 版

Cover Image for 从零开始构建一个任务流水线:Java 版
ZD
ZD

背景

任务流水线在开发者日常的工作中算是比较常见的任务。比如读 MySQL,然后往 Redis 或者 ElasticSearch (后文称为 ES)中灌数。

一般来讲,应付一下需求,迅速弄出一个快糙猛的方案,活干完就得了。但是,一而再再而三的接到这样的类似的需求,不管是提高效率,还是节省自己体力,最后总会想想怎么能偷点懒,更快更省事的弄出来。

本文探讨一种通用的简易方案。

需求

业务系统使用 ES 作为 DB 查询的加速器,ES 是很久很久的一个版本,需要升级到较新版本,因此,需要将全量数据(约上亿量级)从 MySQL 索引至新升级的 ES。

约束条件:

  • 总量 1 亿
  • 控制跑数机器的资源使用量
  • 跑一次全量索引用时尽量短

Approach I

本着先易后难,先具体后抽象的原则,将 Pipeline 总体框架构思出来。

API 设计

Endpoint

POST /api/backdoor/db-to-es

请求参数

名称 说明
offset 查询数据库起始偏移量
pageSize 查询数据库分页大小,简化起见,同时也是 ES 批量索引大小
maxId 最大查询到 ID 为止
esIndex ES 索引名称
esIndexType ES Type 名称
esEndpoint ES API 地址

请求举例

{
	"offset": 0,
	"pageSize": 1000,
	"maxId": 10000000,
	"esIndex": "fake_fake3",
	"esIndexType": "fake_person",
	"esEndpoint": "http://localhost:9200"
}

响应

原则上,API 提交跑数任务后,立即返回。

举例:

Submit OK

实现

逻辑上很简单,取数据,拆分任务提交线程池,执行任务写 ES。流程上,类似上图。

按页查询数据

按照分页的参数,使用 jdbc-template 查询出来对象列表

@Autowired
private JdbcTemplate jdbcTemplate;

private List<FakePerson> findPersonPage(final QueryPageParams pageParams) {
    final long offset = pageParams.offset;
    final long maxId = pageParams.maxId;
    final int pageSize = pageParams.pageSize;

    StringBuilder sb = new StringBuilder("(");
    long ceil = Math.min(offset+pageSize, maxId);
    for (long id = offset; id < ceil; id++) {
        sb.append("'")
                .append(id+1)
                .append("'");
        if (id+1 < ceil) {
            sb.append(",");
        }
    }
    sb.append(")");
    String sql = "select id,first_name,last_name,full_name,email from fake_person where id in " + sb.toString();

    List<FakePerson> personList = jdbcTemplate.query(
            sql, (rs, rowNum) -> {
        FakePerson p = new FakePerson();
        p.setId(rs.getLong("id"));
        p.setEmail(rs.getString("email"));
        p.setFirstName(rs.getString("first_name"));
        p.setLastName(rs.getString("last_name"));
        p.setFullName(rs.getString("full_name"));
        p.setTimestamp(new Date());
        return p;
    });
    return personList;
}

独立提出来的分页逻辑,通过迭代器,将分页查询和分页逻辑组合在一起。

@Data
@AllArgsConstructor
@Builder
static class QueryPageParams {
    long offset;
    int pageSize;
    long maxId;
}

private Iterator<List<FakePerson>> pageIterator(
        final SyncRequest syncRequest,
        final Function<QueryPageParams, List<FakePerson>> fn) {

    return new Iterator<List<FakePerson>>() {
        long offset = syncRequest.offset;
        long max = syncRequest.maxId;

        @Override
        public boolean hasNext() {
            return this.offset < this.max;
        }

        @Override
        public List<FakePerson> next() {
            List<FakePerson> page = fn.apply(QueryPageParams.builder()
                    .offset(offset)
                    .pageSize(syncRequest.pageSize)
                    .maxId(syncRequest.maxId)
                    .build());
            this.offset += syncRequest.pageSize;
            return page;
        }
    };
}

写数至 ES

由于 ES 本身版本演进较快,官方的 Client 和社区的 Client 都较为混乱,考虑到这里对 ES 的操作需求很明确,就是 批量索引 Doc. 这里采用了 okhttp3 这个库,如果你是基于 Spring Boot 开发应用,okhttp3 是 Spring Boot 包含的依赖,简单添加就好。

修改 pom.xml

<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
</dependency>

ES 批量索引 API 的格式如下,基于文本的协议。

POST <ES_HOST>/_bulk

{"index": {"_index": "index-name", "_type": "type-name", "_id": "ID"}}\n
{"id": <ID>, "field1": "value1", ...}

请求体的第一行称为指令行,本例为索引指令。第二行是序列化为 JSON 字符串的 Doc。再有第二个 Doc, 重复此格式。

private OkHttpClient httpClient = new OkHttpClient();  // 直接使用 HTTP Client 访问 ES
private ObjectMapper objectMapper = new ObjectMapper(); // JSON 序列化
private static final String NEW_LINE = "\n";

// 将 Page 写入到 ES
private void writePageToES(
        final Collection<FakePerson> page,
        String esIndex, String esIndexType, String esEndpoint) throws IOException {
    StringBuilder bulkRequest = new StringBuilder();

    for (FakePerson p : page) {
        String action = "{\"index\":{\"_index\":\""+esIndex+"\",\"_type\":\""+esIndexType+"\",\"_id\":\""+p.getId()+"\"}}";
        String json = objectMapper.writeValueAsString(p); // 将 Page 序列化为 JSON
        bulkRequest.append(action).append(NEW_LINE).append(json).append(NEW_LINE);
    }

    Request request = new Request.Builder()
            .url(esEndpoint + "/_bulk")
            .post(okhttp3.RequestBody
                    .create(MediaType.parse("application/json"), bulkRequest.toString()))
            .build();
    Response response = httpClient.newCall(request).execute();
    if (response != null) {
        response.close();
    }
    if (response.code() > 201) {
        throw new IOException("write to ES got error, response code = " + response.code());
    }
}

任务调度

任务执行请求定义

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class SyncRequest {
    long offset;
    int pageSize;
    long maxId;

    String esIndex;
    String esIndexType; // ES7 不再支持 Type
    String esEndpoint; // ES HTTP 接口地址,例如 http://localhost:9200
}

最后,任务调度逻辑,任务提交线程池执行。

private final ExecutorService executorService = Executors.newFixedThreadPool(8);

private volatile boolean db2esRunning = false;

@PostMapping("/backdoor/db2es-v1")
public Object syncDBToES(@RequestBody SyncRequest syncRequest) {
    if (db2esRunning) {
        return "Can't, db2esRunning is true";
    }

    if (syncRequest.offset < 0) {
        return "ERROR: offset must >= 0";
    }
    if (syncRequest.pageSize < 1) {
        return "ERROR: pageSize must > 0";
    }
    if (syncRequest.maxId <= 0) {
        return "ERROR: maxId must > 0";
    }
    if (syncRequest.maxId < syncRequest.offset) {
        return "ERROR: maxId must > offset";
    }
    if (StringUtils.isEmpty(syncRequest.esIndex)) {
        return "ERROR: esIndex required";
    }
    if (StringUtils.isEmpty(syncRequest.esIndexType)) {
        return "ERROR: esIndexType required when ES version before 7";
    }
    if (StringUtils.isEmpty(syncRequest.esEndpoint)) {
        return "ERROR: esEndpoint required, format <http://localhost:9200>";
    }

    final Iterator<List<FakePerson>> pageSource = pageIterator(syncRequest, this::findPersonPage);
    final Semaphore maxQueuedTask = new Semaphore(20);

    new Thread(() -> {
        long startAt = System.currentTimeMillis();
        try {
            BackdoorControllerV1.this.db2esRunning = true;
            while (pageSource.hasNext()) {
                try {
                    maxQueuedTask.acquire(); // (1) 取 token
                    final List<FakePerson> page = pageSource.next();
                    executorService.submit(() -> {
                        try {
                            BackdoorControllerV1.this.writePageToES(
                                    Collections.unmodifiableCollection(page),
                                    syncRequest.esIndex, syncRequest.esIndexType, syncRequest.esEndpoint);
                        } catch (JsonProcessingException e) {
                            // TODO: 处理异常
                            e.printStackTrace();
                        } catch (IOException e) {
                            // TODO: 处理异常
                            e.printStackTrace();
                        } finally {
                            maxQueuedTask.release(); // (2) 还 token
                        }
                    });
                } catch (InterruptedException e) {
                    e.printStackTrace(); // TODO: 处理异常
                }
            }
        } finally {
            LOGGER.info("all sub-task was done, time duration is {}", System.currentTimeMillis() - startAt);
            BackdoorControllerV1.this.db2esRunning = false; // 重置标志位
        }
    }).start();

    return "Submit OK";
}

限制线程池任务排队数量:

  • (1) 取 Token,如果取不到,则阻塞等待,直到有任务执行完成,归还 Token
  • (2) 归还 Token。任务执行完毕,或者抛异常结束,都务必要通过 finally 归还 Token

封面来自 Unsplash