任务流水线在开发者日常的工作中算是比较常见的任务。比如读 MySQL,然后往 Redis 或者 ElasticSearch (后文称为 ES
)中灌数。
一般来讲,应付一下需求,迅速弄出一个快糙猛的方案,活干完就得了。但是,一而再再而三的接到这样的类似的需求,不管是提高效率,还是节省自己体力,最后总会想想怎么能偷点懒,更快更省事的弄出来。
本文探讨一种通用的简易方案。
业务系统使用 ES 作为 DB 查询的加速器,ES 是很久很久的一个版本,需要升级到较新版本,因此,需要将全量数据(约上亿量级)从 MySQL 索引至新升级的 ES。
约束条件:
本着先易后难,先具体后抽象的原则,将 Pipeline 总体框架构思出来。
Endpoint
POST /api/backdoor/db-to-es
请求参数
名称 | 说明 |
---|---|
offset |
查询数据库起始偏移量 |
pageSize |
查询数据库分页大小,简化起见,同时也是 ES 批量索引大小 |
maxId |
最大查询到 ID 为止 |
esIndex |
ES 索引名称 |
esIndexType |
ES Type 名称 |
esEndpoint |
ES API 地址 |
请求举例
{
"offset": 0,
"pageSize": 1000,
"maxId": 10000000,
"esIndex": "fake_fake3",
"esIndexType": "fake_person",
"esEndpoint": "http://localhost:9200"
}
响应
原则上,API 提交跑数任务后,立即返回。
举例:
Submit OK
逻辑上很简单,取数据,拆分任务提交线程池,执行任务写 ES。流程上,类似上图。
按页查询数据
按照分页的参数,使用 jdbc-template 查询出来对象列表
@Autowired
private JdbcTemplate jdbcTemplate;
private List<FakePerson> findPersonPage(final QueryPageParams pageParams) {
final long offset = pageParams.offset;
final long maxId = pageParams.maxId;
final int pageSize = pageParams.pageSize;
StringBuilder sb = new StringBuilder("(");
long ceil = Math.min(offset+pageSize, maxId);
for (long id = offset; id < ceil; id++) {
sb.append("'")
.append(id+1)
.append("'");
if (id+1 < ceil) {
sb.append(",");
}
}
sb.append(")");
String sql = "select id,first_name,last_name,full_name,email from fake_person where id in " + sb.toString();
List<FakePerson> personList = jdbcTemplate.query(
sql, (rs, rowNum) -> {
FakePerson p = new FakePerson();
p.setId(rs.getLong("id"));
p.setEmail(rs.getString("email"));
p.setFirstName(rs.getString("first_name"));
p.setLastName(rs.getString("last_name"));
p.setFullName(rs.getString("full_name"));
p.setTimestamp(new Date());
return p;
});
return personList;
}
独立提出来的分页逻辑,通过迭代器,将分页查询和分页逻辑组合在一起。
@Data
@AllArgsConstructor
@Builder
static class QueryPageParams {
long offset;
int pageSize;
long maxId;
}
private Iterator<List<FakePerson>> pageIterator(
final SyncRequest syncRequest,
final Function<QueryPageParams, List<FakePerson>> fn) {
return new Iterator<List<FakePerson>>() {
long offset = syncRequest.offset;
long max = syncRequest.maxId;
@Override
public boolean hasNext() {
return this.offset < this.max;
}
@Override
public List<FakePerson> next() {
List<FakePerson> page = fn.apply(QueryPageParams.builder()
.offset(offset)
.pageSize(syncRequest.pageSize)
.maxId(syncRequest.maxId)
.build());
this.offset += syncRequest.pageSize;
return page;
}
};
}
写数至 ES
由于 ES 本身版本演进较快,官方的 Client 和社区的 Client 都较为混乱,考虑到这里对 ES 的操作需求很明确,就是
批量索引 Doc. 这里采用了 okhttp3
这个库,如果你是基于 Spring Boot 开发应用,okhttp3 是 Spring Boot 包含的依赖,简单添加就好。
修改 pom.xml
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
</dependency>
ES 批量索引 API 的格式如下,基于文本的协议。
POST <ES_HOST>/_bulk
{"index": {"_index": "index-name", "_type": "type-name", "_id": "ID"}}\n
{"id": <ID>, "field1": "value1", ...}
请求体的第一行称为指令行,本例为索引指令。第二行是序列化为 JSON 字符串的 Doc。再有第二个 Doc, 重复此格式。
private OkHttpClient httpClient = new OkHttpClient(); // 直接使用 HTTP Client 访问 ES
private ObjectMapper objectMapper = new ObjectMapper(); // JSON 序列化
private static final String NEW_LINE = "\n";
// 将 Page 写入到 ES
private void writePageToES(
final Collection<FakePerson> page,
String esIndex, String esIndexType, String esEndpoint) throws IOException {
StringBuilder bulkRequest = new StringBuilder();
for (FakePerson p : page) {
String action = "{\"index\":{\"_index\":\""+esIndex+"\",\"_type\":\""+esIndexType+"\",\"_id\":\""+p.getId()+"\"}}";
String json = objectMapper.writeValueAsString(p); // 将 Page 序列化为 JSON
bulkRequest.append(action).append(NEW_LINE).append(json).append(NEW_LINE);
}
Request request = new Request.Builder()
.url(esEndpoint + "/_bulk")
.post(okhttp3.RequestBody
.create(MediaType.parse("application/json"), bulkRequest.toString()))
.build();
Response response = httpClient.newCall(request).execute();
if (response != null) {
response.close();
}
if (response.code() > 201) {
throw new IOException("write to ES got error, response code = " + response.code());
}
}
任务调度
任务执行请求定义
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class SyncRequest {
long offset;
int pageSize;
long maxId;
String esIndex;
String esIndexType; // ES7 不再支持 Type
String esEndpoint; // ES HTTP 接口地址,例如 http://localhost:9200
}
最后,任务调度逻辑,任务提交线程池执行。
private final ExecutorService executorService = Executors.newFixedThreadPool(8);
private volatile boolean db2esRunning = false;
@PostMapping("/backdoor/db2es-v1")
public Object syncDBToES(@RequestBody SyncRequest syncRequest) {
if (db2esRunning) {
return "Can't, db2esRunning is true";
}
if (syncRequest.offset < 0) {
return "ERROR: offset must >= 0";
}
if (syncRequest.pageSize < 1) {
return "ERROR: pageSize must > 0";
}
if (syncRequest.maxId <= 0) {
return "ERROR: maxId must > 0";
}
if (syncRequest.maxId < syncRequest.offset) {
return "ERROR: maxId must > offset";
}
if (StringUtils.isEmpty(syncRequest.esIndex)) {
return "ERROR: esIndex required";
}
if (StringUtils.isEmpty(syncRequest.esIndexType)) {
return "ERROR: esIndexType required when ES version before 7";
}
if (StringUtils.isEmpty(syncRequest.esEndpoint)) {
return "ERROR: esEndpoint required, format <http://localhost:9200>";
}
final Iterator<List<FakePerson>> pageSource = pageIterator(syncRequest, this::findPersonPage);
final Semaphore maxQueuedTask = new Semaphore(20);
new Thread(() -> {
long startAt = System.currentTimeMillis();
try {
BackdoorControllerV1.this.db2esRunning = true;
while (pageSource.hasNext()) {
try {
maxQueuedTask.acquire(); // (1) 取 token
final List<FakePerson> page = pageSource.next();
executorService.submit(() -> {
try {
BackdoorControllerV1.this.writePageToES(
Collections.unmodifiableCollection(page),
syncRequest.esIndex, syncRequest.esIndexType, syncRequest.esEndpoint);
} catch (JsonProcessingException e) {
// TODO: 处理异常
e.printStackTrace();
} catch (IOException e) {
// TODO: 处理异常
e.printStackTrace();
} finally {
maxQueuedTask.release(); // (2) 还 token
}
});
} catch (InterruptedException e) {
e.printStackTrace(); // TODO: 处理异常
}
}
} finally {
LOGGER.info("all sub-task was done, time duration is {}", System.currentTimeMillis() - startAt);
BackdoorControllerV1.this.db2esRunning = false; // 重置标志位
}
}).start();
return "Submit OK";
}
限制线程池任务排队数量:
- (1) 取 Token,如果取不到,则阻塞等待,直到有任务执行完成,归还 Token
- (2) 归还 Token。任务执行完毕,或者抛异常结束,都务必要通过
finally
归还 Token