keycloak~扩展事件机制,集成kafka中间件
对于KC的后台或者接口的操作,当用户,组,角色这些实体状态发生改变时,KC会对外发布事件,而这些事件处理程序我们是可以在后台配置的,默认继承了jboss-logging日志事件,而我们可以在事件管理中去配置自己的事件处理程序。
事件处理程序SPI
实现EventListenerProviderFactory
和EventListenerProvider
这两个SPI即可,我们在这里可以订阅由KC发出现的事件,针对我们感兴趣的事件,去添加处理代码。
/**
* 后端管理平台事件
*
* @param adminEvent
* @param b
*/
@Override
public void onEvent(AdminEvent adminEvent, boolean b) {
logger.info(adminEvent.getResourceTypeAsString()))
}
总结常用的消息类型
- 操作类型:operationType
- 后端类型
- CREATE 新建
- UPDATE 编辑
- DELETE 删除
- 后台资源类型:resourceType
- REALM_ROLE 域的角色
- REALM_ROLE_MAPPING 域的角色绑定
- USER 用户
- GROUP 组
- GROUP_MEMBERSHIP 组绑定
- CLIENT_ROLE 客户端角色
- CLIENT_ROLE_MAPPING 客户端角色绑定
- 前端类型
- LOGIN 登录
- LOGIN_ERROR 登录失败
- REGISTER 注册
- REGISTER_ERROR 注册失败
- LOGOUT 登出
- LOGOUT_ERROR 登出失败
- 后端类型
扩展一个KAFKA
- 需要扩展相关组件
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
- 需要配置你的kafka的生产者
kc这边是一个kafka的生产者,当它收到由KC发出来的事件之后,把消息发到kafka,其它服务方可以消费这个kafka消息
/**
* kafka生产者.
*/
public class Producer {
private final static String BOOTSTRAP_SERVER = ConfigFactory.getInstance().getStrPropertyValue("kafka.host");
private final static Logger logger = Logger.getLogger(Producer.class);
private static KafkaProducer<String, String> producer;
private static KafkaProducer<String, String> getProducer() {
if (producer == null) {
//reset thread context
resetThreadContext();
// create the producer
producer = new KafkaProducer<String, String>(getProperties());
}
return producer;
}
public static void publishEvent(String topic, String value) {
// create a producer record
ProducerRecord<String, String> eventRecord =
new ProducerRecord<String, String>(topic, value);
// send data - asynchronous
getProducer().send(eventRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
logger.info(String.format("The offset of the record we just sent is:%s", recordMetadata.offset()));
}
}
});
}
private static void resetThreadContext() {
Thread.currentThread().setContextClassLoader(null);
}
public static Properties getProperties() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
return properties;
}
}
最后,在你的kc事件处理SPI里,调用咱们的KAFKA生产者去生产消息就行了