背景

实体类定义属性idLong类型,但在调用 spring-data-elasticsearch:3.2.10.RELEASE中的putMapping(Class<T>)方法时却被转换成了keyword类型

源码

查看putMapping方法,可以发现最终调用最下边的重载方法

class ElasticsearchRestTemplate {
  ...
  @Override
  public <T> boolean putMapping(Class<T> clazz) {
    return putMapping(clazz, buildMapping(clazz));
  }
​
  @Override
  public <T> boolean putMapping(Class<T> clazz, Object mapping) {
    return putMapping(getPersistentEntityFor(clazz).getIndexName(), getPersistentEntityFor(clazz).getIndexType(),
                      mapping);
  }
​
  @Override
  public <T> boolean putMapping(String indexName, String type, Class<T> clazz) {
    return putMapping(indexName, type, buildMapping(clazz));
  }
​
  @Override
  public boolean putMapping(String indexName, String type, Object mapping) {
    Assert.notNull(indexName, "No index defined for putMapping()");
    Assert.notNull(type, "No type defined for putMapping()");
    PutMappingRequest request = new PutMappingRequest(indexName).type(type);
    if (mapping instanceof String) {
      request.source(String.valueOf(mapping), XContentType.JSON);
    } else if (mapping instanceof Map) {
      request.source((Map) mapping);
    } else if (mapping instanceof XContentBuilder) {
      request.source((XContentBuilder) mapping);
    }
    try {
      return client.indices().putMapping(request, RequestOptions.DEFAULT).isAcknowledged();
    } catch (IOException e) {
      throw new ElasticsearchException("Failed to put mapping for " + indexName, e);
    }
  }
  ...
}
复制代码

查看buildMapping方法,因为并没有定义外部mappingPath配置文件,所以走最下边的mappingBuilder.buildPropertyMapping(clazz)来进行解析出String类型的json文件

abstract class AbstractElasticsearchTemplate {
  ...
  protected String buildMapping(Class<?> clazz) {
    // load mapping specified in Mapping annotation if present
    if (clazz.isAnnotationPresent(Mapping.class)) {
      String mappingPath = clazz.getAnnotation(Mapping.class).mappingPath();
      if (!StringUtils.isEmpty(mappingPath)) {
        String mappings = ResourceUtil.readFileFromClasspath(mappingPath);
        if (!StringUtils.isEmpty(mappings)) {
          return mappings;
        }
      } else {
        LOGGER.info("mappingPath in @Mapping has to be defined. Building mappings using @Field");
      }
    }
​
    // build mapping from field annotations
    try {
      MappingBuilder mappingBuilder = new MappingBuilder(elasticsearchConverter);
      return mappingBuilder.buildPropertyMapping(clazz);
    } catch (Exception e) {
      throw new ElasticsearchException("Failed to build mapping for " + clazz.getSimpleName(), e);
    }
  }
  ...
}
复制代码

查看buildPropertyMapping方法

class MappingBuilder {
  ...
  String buildPropertyMapping(Class<?> clazz) throws IOException {
    // 提前解析出一些通用属性,比如indexName,indexType等等
		ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
				.getRequiredPersistentEntity(clazz);
		// 构造一个json构造器,以indexType开始
		XContentBuilder builder = jsonBuilder().startObject().startObject(entity.getIndexType());

		// 添加dynamic template
		addDynamicTemplatesMapping(builder, entity);

		// 父子文档判断
		String parentType = entity.getParentType();
		if (hasText(parentType)) {
			builder.startObject(FIELD_PARENT).field(FIELD_TYPE, parentType).endObject();
		}

		// 属性解析开始标志properties
		builder.startObject(FIELD_PROPERTIES);
		// 具体的properties解析,为根对象非nested对象
		mapEntity(builder, entity, true, "", false, FieldType.Auto, null);

		builder.endObject() // FIELD_PROPERTIES
				.endObject() // indexType
				.endObject() // root object
				.close();

		return builder.getOutputStream().toString();
  }
  
  private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity entity, boolean isRootObject,
			String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType,
			@Nullable Field parentFieldAnnotation) throws IOException {

		boolean writeNestedProperties = !isRootObject && (isAnyPropertyAnnotatedWithField(entity) || nestedOrObjectField);
		if (writeNestedProperties) {

			String type = nestedOrObjectField ? fieldType.toString().toLowerCase()
					: FieldType.Object.toString().toLowerCase();
			builder.startObject(nestedObjectFieldName).field(FIELD_TYPE, type);

			if (nestedOrObjectField && FieldType.Nested == fieldType && parentFieldAnnotation != null
					&& parentFieldAnnotation.includeInParent()) {

				builder.field("include_in_parent", parentFieldAnnotation.includeInParent());
			}

			builder.startObject(FIELD_PROPERTIES);
		}
    // 对象字段属性的解析
		if (entity != null) {

			entity.doWithProperties((PropertyHandler<ElasticsearchPersistentProperty>) property -> {
				try {
					if (property.isAnnotationPresent(Transient.class) || isInIgnoreFields(property, parentFieldAnnotation)) {
						return;
					}

					buildPropertyMapping(builder, isRootObject, property);
				} catch (IOException e) {
					logger.warn("error mapping property with name {}", property.getName(), e);
				}
			});
		}

		if (writeNestedProperties) {
			builder.endObject().endObject();
		}
	}
  // 解析每个property的方法
  private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
			ElasticsearchPersistentProperty property) throws IOException {

		if (property.isAnnotationPresent(Mapping.class)) {

			String mappingPath = property.getRequiredAnnotation(Mapping.class).mappingPath();
			if (!StringUtils.isEmpty(mappingPath)) {

				ClassPathResource mappings = new ClassPathResource(mappingPath);
				if (mappings.exists()) {
					builder.rawField(property.getFieldName(), mappings.getInputStream(), XContentType.JSON);
					return;
				}
			}
		}
		// geo标识
		boolean isGeoPointProperty = isGeoPointProperty(property);
    // completion标识
		boolean isCompletionProperty = isCompletionProperty(property);
    // nested object标识
		boolean isNestedOrObjectProperty = isNestedOrObjectProperty(property);
		// 属性上的Field注解
		Field fieldAnnotation = property.findAnnotation(Field.class);
		if (!isGeoPointProperty && !isCompletionProperty && property.isEntity() && hasRelevantAnnotation(property)) {

			if (fieldAnnotation == null) {
				return;
			}

			Iterator<? extends TypeInformation<?>> iterator = property.getPersistentEntityTypes().iterator();
			ElasticsearchPersistentEntity<?> persistentEntity = iterator.hasNext()
					? elasticsearchConverter.getMappingContext().getPersistentEntity(iterator.next())
					: null;

			mapEntity(builder, persistentEntity, false, property.getFieldName(), isNestedOrObjectProperty,
					fieldAnnotation.type(), fieldAnnotation);

			if (isNestedOrObjectProperty) {
				return;
			}
		}

		MultiField multiField = property.findAnnotation(MultiField.class);

		if (isGeoPointProperty) {
			applyGeoPointFieldMapping(builder, property);
			return;
		}

		if (isCompletionProperty) {
			CompletionField completionField = property.findAnnotation(CompletionField.class);
			applyCompletionFieldMapping(builder, property, completionField);
		}
		// 判断是否为id属性
		if (isRootObject && fieldAnnotation != null && property.isIdProperty()) {
			applyDefaultIdFieldMapping(builder, property);
		} else if (multiField != null) {
			addMultiFieldMapping(builder, property, multiField, isNestedOrObjectProperty);
		} else if (fieldAnnotation != null) {
			addSingleFieldMapping(builder, property, fieldAnnotation, isNestedOrObjectProperty);
		}
	}
  ...
}
复制代码

至此可以看到,只要fieldNameiddocument就判定为是id属性,然后将type设置为keyword并且可被索引。疑问到这里解决

class SimpleElasticsearchPersistentProperty {
  ...
  private static final List<String> SUPPORTED_ID_PROPERTY_NAMES = Arrays.asList("id", "document");
  public SimpleElasticsearchPersistentProperty(Property property,
      PersistentEntity<?, ElasticsearchPersistentProperty> owner, SimpleTypeHolder simpleTypeHolder) {
    ...
    this.isId = super.isIdProperty() || SUPPORTED_ID_PROPERTY_NAMES.contains(getFieldName());
    ...
  }
  
  @Override
  public boolean isIdProperty() {
    return isId;
  }
  private void applyDefaultIdFieldMapping(XContentBuilder builder, ElasticsearchPersistentProperty property)
      throws IOException {
​
    builder.startObject(property.getFieldName()).field(FIELD_TYPE, TYPE_VALUE_KEYWORD).field(FIELD_INDEX, true)
        .endObject();
  }
  ...
}
复制代码

话外题

项目中使用的ElasticSearch实体类都是采取@Document指定indexName来操作的,但是索引和表都涉及到分库分表,所以又不能写死,然后就采取的SpEL配合ThreadLocal从上下文里setget,其实Springelasticsearch操作类似于关系型数据库也封装的有一层Repository抽象,名为ElasticsearchRepository,我们可以直接定义实体类操作接口继承就可以完成对单索引的CRUD以及Page等操作,但这样有一个问题,那就是indexName无法动态去调整,所以就放弃了这种,改用更底层的RestHighLevelClient封装的ElasticSearchRestTemplate模版类,这样在面对分库分表时就可以手动去对每个Document进行set不同的indexName,跨索引查询时也可以指定多个,也可以直接指定索引的alias,需要注意的时,在进行更新时,只指定alias是不被允许的,需要手动查出符合条件的Document在进行索引的分组批量更新,即调用ElasticSearchbulk api

在对ElasticSearch和数据库的一致性问题上,我是通过封装不同的方法来确保强一致性和最终一致性

强一致性

类似插入、更新、删除等场景下,都是放在一个事务里,先操作数据库,再操作ElasticSearch,这样可以确保操作ElasticSearch失败时,数据库可以成功回滚。一般只运用于对数据实时性要求敏感的场景,并且数据量不大的情况,但即便这样还是会有至少1s的延迟,这里就涉及到ElasticSearch的刷盘策略问题上了,这里不展开研究

最终一致性

批量的插入、更新这些操作,如果放在一个大事务里,对数据库也是一种压力,所以一般是分批操作数据库,另起一个线程池对事务提交进行监听,将数据库数据同步到ElasticSearch里,在同步成功后反转数据库的同步状态字段。为了确保万无一失,后台会启动一个定时扫描数据库同步字段的线程去定时扫描同步。这种一般适用于大数据量的场景。当然你也可以去监听MySQLbinlog日志来进行同步。