1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class ProducerFuture implements FailureCallback, SuccessCallback<SendResult<String, Object>> { private static final Logger logger = LoggerFactory.getLogger(ProducerFuture.class);
@Resource private KafkaTemplate<String, Object> kafkaTemplate; private String uniqueId;
public void async(String topicLcs, String value) { uniqueId = MDC.get("UNIQUE_ID"); logger.info("send {} data:{}", topicLcs, value); ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicLcs, value); listenableFuture.addCallback(this, this); }
@Override public void onFailure(Throwable ex) { MDC.put(UNIQUE_ID, uniqueId); logger.error("sendFailure:", ex); MDC.remove(UNIQUE_ID); }
@Override public void onSuccess(SendResult<String, Object> result) { MDC.put(UNIQUE_ID, uniqueId); logger.info("sendSuccess {} ", result.getRecordMetadata().topic() + result.getRecordMetadata().offset()); MDC.remove(UNIQUE_ID); } }
|