[Elasticsearch 实战][全唐宋诗][上] Java 导入诗集数据

0 项目介绍

最近找到一个诗词数据库,想把它导入 ES 中,做一点分析。主要是为了熟悉 ES 的基本使用。

  1. 输入:包含全唐诗和全宋诗的 json 文件目录。
  2. 输出:诗人作品数量的标签云(top50)

1 准备工作

2 创建索引模板

我打算把每个 JSON 文件里的数据,分别导入到一个单独的索引中。预先创建一个索引模板,生成新索引的时候,就不用重复设置了(根据业务需要,我们可能会针对不同的字段,配置不同的分词器)。

在 Kibana 中发送请求:

PUT _template/poem
{
  "index_patterns": ["poem*"],
  "settings": {
    "number_of_shards": 1
  },
  "mappings": {
    "properties": {
      "author": {
        "type": "keyword"
      },
      "paragraphs": {
        "type": "text"
      },
      "title": {
        "type": "text"
      },
      "id": {
        "type": "text",
        "index": false
      },
      "tags": {
        "type": "keyword"
      }
    }
  }
}

所有以 poem 开头的索引,都将使用这个模板。

3 编写 Java 代码

这里讲解一些重点的代码。完整项目点这里(//todo)。

3.1 导入依赖

build.gradle 导入依赖:

implementation ('org.elasticsearch.client:transport:7.4.2') {
    exclude module: 'org.elasticsearch.plugin:transport-netty4-client:6.4.3'
}
implementation 'org.elasticsearch.plugin:transport-netty4-client:7.4.2'
implementation 'org.elasticsearch:elasticsearch:7.4.2'

3.2 编写 EsService 类

我们将通过 EsService 类来跟 ES 打交道,用到 ElasticSearch Client 的 Java API。主要做以下工作:

  • 创建与 ES 的连接;
  • 使用 bulk API 向 ES 批量插入 JSON 数据;
  • 切断连接。

代码如下:

@Component
@Slf4j
public class EsService {

    @Autowired
    private EsConfig esConfig;

    private TransportClient client;

    @PostConstruct
    public void init() throws UnknownHostException {
        client = new PreBuiltTransportClient(Settings.EMPTY)
                .addTransportAddress(new TransportAddress(InetAddress.getByName(esConfig.getHost()), esConfig.getPort()));
    }

    public void post(List<String> jsonList, String index, String type) {
        BulkRequestBuilder bulkRequest = client.prepareBulk();

        for (String json : jsonList) {
            bulkRequest.add(client.prepareIndex(index, type)
                    .setSource(json, XContentType.JSON)
            );
        }

        BulkResponse bulkResponse = bulkRequest.get();

        if (bulkResponse.hasFailures()) {
            log.error("has failures!");
            throw new RuntimeException("failed");
        } else {
            log.info("complete");
        }
    }

    public void close() {
        client.close();
    }
}

 3.3 主逻辑类 PoemSender

PoemSender 类将包含导入数据到 ES 的整体逻辑,大致如下:

  1. 加载待处理的 json 文件名列表;
  2. 遍历该列表:根据文件名得到索引名,加载文件内容,切分成 JSON 字符串列表,发送到 ES。

代码如下:

@Component
@Slf4j
public class PoemSender {
    private static final String POEM_TYPE = "_doc";

    @Autowired
    private PoemConfig poemConfig;

    @Autowired
    private EsService esService;

    public void loadAndSend() throws IOException {

        List<String> fileNames = poemConfig.getFileNames();
        for (String fileName : fileNames) {
            List<String> poems = loadJsonData(fileName);
            String index = fileName2Index(fileName);
            esService.post(poems, index, POEM_TYPE);
            log.info(String.format("%s posted", index));
        }

        esService.close();
    }

    private String fileName2Index(String fileName) {
        String res = fileName.replaceFirst("poet", "poem");
        res = res.replace(".json", "");
        res = res.replaceAll("\\.", "_");
        return res;
    }

    private List<String> loadJsonData(String fileName) throws IOException {
        GlobalTimer.getInstance().restart();

        String content = FileUtils.readFile(poemConfig.getPoemHome(), fileName);
        content = StringUtils.trimWhitespace(content);
        content = StringUtils.trimLeadingCharacter(content, '[');
        content = StringUtils.trimTrailingCharacter(content, ']');
        String[] poems = content.split(",(?=\\s+\\{)");

        int time = GlobalTimer.getInstance().getTimeInSeconds();
        log.info("load time: " + time);

        return Arrays.asList(poems);
    }
}

 4 运行验证

执行 Java 代码,会把全唐诗和全宋诗的所有内容导入到 ES 中。

在 Kibana 中发送如下请求:

GET poem*/_search
{
  "query": {
    "match_all": {}
  }
}

结果如图: