背景
实体类定义属性id
为Long
类型,但在调用 spring-data-elasticsearch:3.2.10.RELEASE
中的putMapping(Class<T>)
方法时却被转换成了keyword
类型
源码
查看putMapping
方法,可以发现最终调用最下边的重载方法
class ElasticsearchRestTemplate {
...
@Override
public <T> boolean putMapping(Class<T> clazz) {
return putMapping(clazz, buildMapping(clazz));
}
@Override
public <T> boolean putMapping(Class<T> clazz, Object mapping) {
return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(),
mapping);
}
@Override
public <T> boolean putMapping(String indexName, String type, Class<T> clazz) {
return putMapping(indexName, type, buildMapping(clazz));
}
@Override
public boolean putMapping(String indexName, String type, Object mapping) {
Assert.notNull(indexName, "No index defined for putMapping()");
Assert.notNull(type, "No type defined for putMapping()");
PutMappingRequest request = new PutMappingRequest(indexName).type(type);
if (mapping instanceof String) {
request.source(String.valueOf(mapping), XContentType.JSON);
} else if (mapping instanceof Map) {
request.source((Map) mapping);
} else if (mapping instanceof XContentBuilder) {
request.source((XContentBuilder) mapping);
}
try {
return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged();
} catch (IOException e) {
throw new ElasticsearchException("Failed to put mapping for " + indexName, e);
}
}
...
}
复制代码
查看buildMapping
方法,因为并没有定义外部mappingPath
配置文件,所以走最下边的mappingBuilder.buildPropertyMapping(clazz)
来进行解析出String
类型的json
文件
abstract class AbstractElasticsearchTemplate {
...
protected String buildMapping(Class<?> clazz) {
// load mapping specified in Mapping annotation if present
if (clazz.isAnnotationPresent(Mapping.class)) {
String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();
if (!StringUtils.isEmpty(mappingPath)) {
String mappings = ResourceUtil.readFileFromClasspath(mappingPath);
if (!StringUtils.isEmpty(mappings)) {
return mappings;
}
} else {
LOGGER.info("mappingPath in @Mapping has to be defined. Building mappings using @Field");
}
}
// build mapping from field annotations
try {
MappingBuilder mappingBuilder = new MappingBuilder(elasticsearchConverter);
return mappingBuilder.buildPropertyMapping(clazz);
} catch (Exception e) {
throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);
}
}
...
}
复制代码
查看buildPropertyMapping
方法
class MappingBuilder {
...
String buildPropertyMapping(Class<?> clazz) throws IOException {
// 提前解析出一些通用属性,比如indexName,indexType等等
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);
// 构造一个json构造器,以indexType开始
XContentBuilder builder = jsonBuilder().startObject().startObject(entity.getIndexType());
// 添加dynamic template
addDynamicTemplatesMapping(builder, entity);
// 父子文档判断
String parentType = entity.getParentType();
if (hasText(parentType)) {
builder.startObject(FIELD_PARENT).field(FIELD_TYPE, parentType).endObject();
}
// 属性解析开始标志properties
builder.startObject(FIELD_PROPERTIES);
// 具体的properties解析,为根对象非nested对象
mapEntity(builder, entity, true, "", false, FieldType.Auto, null);
builder.endObject() // FIELD_PROPERTIES
.endObject() // indexType
.endObject() // root object
.close();
return builder.getOutputStream().toString();
}
private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity entity, boolean isRootObject,
String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType,
@Nullable Field parentFieldAnnotation) throws IOException {
boolean writeNestedProperties = !isRootObject && (isAnyPropertyAnnotatedWithField(entity) || nestedOrObjectField);
if (writeNestedProperties) {
String type = nestedOrObjectField ? fieldType.toString().toLowerCase()
: FieldType.Object.toString().toLowerCase();
builder.startObject(nestedObjectFieldName).field(FIELD_TYPE, type);
if (nestedOrObjectField && FieldType.Nested == fieldType && parentFieldAnnotation != null
&& parentFieldAnnotation.includeInParent()) {
builder.field("include_in_parent", parentFieldAnnotation.includeInParent());
}
builder.startObject(FIELD_PROPERTIES);
}
// 对象字段属性的解析
if (entity != null) {
entity.doWithProperties((PropertyHandler<ElasticsearchPersistentProperty>) property -> {
try {
if (property.isAnnotationPresent(Transient.class) || isInIgnoreFields(property, parentFieldAnnotation)) {
return;
}
buildPropertyMapping(builder, isRootObject, property);
} catch (IOException e) {
logger.warn("error mapping property with name {}", property.getName(), e);
}
});
}
if (writeNestedProperties) {
builder.endObject().endObject();
}
}
// 解析每个property的方法
private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
ElasticsearchPersistentProperty property) throws IOException {
if (property.isAnnotationPresent(Mapping.class)) {
String mappingPath = property.getRequiredAnnotation(Mapping.class).mappingPath();
if (!StringUtils.isEmpty(mappingPath)) {
ClassPathResource mappings = new ClassPathResource(mappingPath);
if (mappings.exists()) {
builder.rawField(property.getFieldName(), mappings.getInputStream(), XContentType.JSON);
return;
}
}
}
// geo标识
boolean isGeoPointProperty = isGeoPointProperty(property);
// completion标识
boolean isCompletionProperty = isCompletionProperty(property);
// nested object标识
boolean isNestedOrObjectProperty = isNestedOrObjectProperty(property);
// 属性上的Field注解
Field fieldAnnotation = property.findAnnotation(Field.class);
if (!isGeoPointProperty && !isCompletionProperty && property.isEntity() && hasRelevantAnnotation(property)) {
if (fieldAnnotation == null) {
return;
}
Iterator<? extends TypeInformation<?>> iterator = property.getPersistentEntityTypes().iterator();
ElasticsearchPersistentEntity<?> persistentEntity = iterator.hasNext()
? elasticsearchConverter.getMappingContext().getPersistentEntity(iterator.next())
: null;
mapEntity(builder, persistentEntity, false, property.getFieldName(), isNestedOrObjectProperty,
fieldAnnotation.type(), fieldAnnotation);
if (isNestedOrObjectProperty) {
return;
}
}
MultiField multiField = property.findAnnotation(MultiField.class);
if (isGeoPointProperty) {
applyGeoPointFieldMapping(builder, property);
return;
}
if (isCompletionProperty) {
CompletionField completionField = property.findAnnotation(CompletionField.class);
applyCompletionFieldMapping(builder, property, completionField);
}
// 判断是否为id属性
if (isRootObject && fieldAnnotation != null && property.isIdProperty()) {
applyDefaultIdFieldMapping(builder, property);
} else if (multiField != null) {
addMultiFieldMapping(builder, property, multiField, isNestedOrObjectProperty);
} else if (fieldAnnotation != null) {
addSingleFieldMapping(builder, property, fieldAnnotation, isNestedOrObjectProperty);
}
}
...
}
复制代码
至此可以看到,只要fieldName
为id
或document
就判定为是id
属性,然后将type
设置为keyword
并且可被索引。疑问到这里解决
class SimpleElasticsearchPersistentProperty {
...
private static final List<String> SUPPORTED_ID_PROPERTY_NAMES = Arrays.asList("id", "document");
public SimpleElasticsearchPersistentProperty(Property property,
PersistentEntity<?, ElasticsearchPersistentProperty> owner, SimpleTypeHolder simpleTypeHolder) {
...
this.isId = super.isIdProperty() || SUPPORTED_ID_PROPERTY_NAMES.contains(getFieldName());
...
}
@Override
public boolean isIdProperty() {
return isId;
}
private void applyDefaultIdFieldMapping(XContentBuilder builder, ElasticsearchPersistentProperty property)
throws IOException {
builder.startObject(property.getFieldName()).field(FIELD_TYPE, TYPE_VALUE_KEYWORD).field(FIELD_INDEX, true)
.endObject();
}
...
}
复制代码
话外题
项目中使用的ElasticSearch
实体类都是采取@Document
指定indexName
来操作的,但是索引和表都涉及到分库分表,所以又不能写死,然后就采取的SpEL
配合ThreadLocal
从上下文里set
后get
,其实Spring
对elasticsearch
操作类似于关系型数据库也封装的有一层Repository
抽象,名为ElasticsearchRepository
,我们可以直接定义实体类操作接口继承就可以完成对单索引的CRUD以及Page等操作,但这样有一个问题,那就是indexName
无法动态去调整,所以就放弃了这种,改用更底层的RestHighLevelClient
封装的ElasticSearchRestTemplate
模版类,这样在面对分库分表时就可以手动去对每个Document
进行set
不同的indexName
,跨索引查询时也可以指定多个,也可以直接指定索引的alias
,需要注意的时,在进行更新时,只指定alias
是不被允许的,需要手动查出符合条件的Document
在进行索引的分组批量更新,即调用ElasticSearch
的bulk api
在对ElasticSearch
和数据库的一致性问题上,我是通过封装不同的方法来确保强一致性和最终一致性
强一致性
类似插入、更新、删除等场景下,都是放在一个事务里,先操作数据库,再操作ElasticSearch
,这样可以确保操作ElasticSearch
失败时,数据库可以成功回滚。一般只运用于对数据实时性要求敏感的场景,并且数据量不大的情况,但即便这样还是会有至少1s
的延迟,这里就涉及到ElasticSearch
的刷盘策略问题上了,这里不展开研究
最终一致性
批量的插入、更新这些操作,如果放在一个大事务里,对数据库也是一种压力,所以一般是分批操作数据库,另起一个线程池对事务提交进行监听,将数据库数据同步到ElasticSearch
里,在同步成功后反转数据库的同步状态字段。为了确保万无一失,后台会启动一个定时扫描数据库同步字段的线程去定时扫描同步。这种一般适用于大数据量的场景。当然你也可以去监听MySQL
的binlog
日志来进行同步。