Topo-gw and Topo-engine
Part1: topo-gw
topo-gw POD contain 3 containers
init container: service-init, it used to check etcd cluster & kafka cluster & zookeeper cluster's status
topo-gw.jar & jre, it's the real application
cim, it's handle configuration online change, notification forwarder and logging forwerder
main logic
topo-gw is a k8s client, and it listen to the k8s PODs changes EVENT,
when it find the EVENT's POD annotations contains NF_ID,NF_TYPE and NF_SERVICE_ID and NF_SERVICE_TYPE
it will prepare the PodDetails and send it to Kafka, topoengine will handle it
the main logic in PodWatcher.java
---
if (isValid(annotations)) {
String xgvelaId = annotations.get(Constants.XGVELA_ID).asText();
if (!xgvelaId.equals(selfXGVelaId)) {
LOG.debug("Pod xgvelaId does not match: " + xgvelaId);
return;
}
String nfName = annotations.get(Constants.NF_ID).asText();
String nfType = annotations.get(Constants.NF_TYPE).asText();
String nfServiceName = annotations.get(Constants.NF_SERVICE_ID).asText();
String nfServiceType = annotations.get(Constants.NF_SERVICE_TYPE).asText();
if (!Leader.leaderFlag.get()) {
//For topo-gw dont return let it push to kafka even if its non-leader
if (!nfServiceName.equalsIgnoreCase("topo-gw")) {
return;
}
}
pushToKafka(
new PodDetails(action, podName, namespace, nfName, nfType, nfServiceName,
nfServiceType),nfName);
----
Logging logic
topo-gw use Log4j to handle the log.debug & log.error....
log4j's config file decide the logging will send to NatsAppender, the NAT Server——cim,and then send to KAFKA
log4j2.xml
---
<Configuration packages="org.xgvela.logging">
<Appenders>
<Console name="STDOUT" target="SYSTEM_OUT">
<PatternLayout
pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}][%-5level][%corrId]:[%-5t] %logger{2}- %msg%n" />
</Console>
<NatsAppender name="NatsAppender">
<PatternLayout
pattern="[%d{yyyy-MM-dd HH:mm:ss.SSS}][%-5level][%corrId]:[%-5t] %logger{2}- %msg" />
</NatsAppender>
</Appenders>
<Loggers>
<Logger name="org.xgvela" level="debug" additivity="false">
<AppenderRef ref="NatsAppender" />
<AppenderRef ref="STDOUT" />
</Logger>
<Root level="error">
<AppenderRef ref="NatsAppender" />
<AppenderRef ref="STDOUT" />
</Root>
</Loggers>
</Configuration>
---
NatsAppender.java
---
private void publishToNats(String record) {
Connection natsConnection = NatsUtil.getConnection();
byte[] buff = buildFlatBuff(record);
natsConnection.publish("LOG", buff);
}
---
Configuration online change is not enabled in topo-gw application
Notification is not enabled in topo-gw application
Part2 Topoengine
Topoengine POD contain 3 containers
init container: service-init, it used to check etcd cluster & kafka cluster & zookeeper cluster's status
topoengine.jar & jre, it's the real application
cim, it's handle configuration online change, nofication forwarder and logging forwerder
main logic
Topoengine listien KAFKA EVENT's pre-defined TOPIC, and then update itself ManagedElemet, the Topo logic Tree
EventListener.java
---
@KafkaListener(topics = Constants.KAFKA_TOPIC, containerFactory = "kafkaListenerContainerFactory")
private void listen(@Payload PodDetails podDetails, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(X_CORRELATION_ID) String messageCorrelationId,@Header(KafkaHeaders.OFFSET) String offset) {
try {
LOG.debug("Partition: " + partition + ", KafkaMsg Offset: " + offset + ", CorrelationId: " + messageCorrelationId + ", " + podDetails.toString());
ConstructTree.kafkaListenerLatch.await();
manager.updateManagedElement(podDetails);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
---
TopoManager.java
---
switch (podDetails.getAction()) {
case ADDED:
addToTree(podDetails, nfId, nfDn, nfServiceId, nfServiceDn, nfServiceInstanceId, nfServiceInstanceDn);
break;
case MODIFIED:
updateInTree(nfServiceInstanceId, nfServiceId, nfId, podDetails);
break;
case DELETED:
deleteFromTree(nfServiceInstanceId, nfServiceId, nfId, podDetails);
break;
case ERROR:
break;
default:
break;
}
---
Logging logic is same as Topo-gw's.
Configuration online change
cim watch the etcd's change, and then handle real application's /updateConfig, and real applicaton modify the config immediatly.
real application will callback cim, and cim will change the etcd's such configuration store's status.
UpdateConfigController.java
---
@PostMapping(path = "/updateConfig")
@ResponseStatus(HttpStatus.OK)
public @ResponseBody JsonNode updateConfig(@RequestBody JsonNode requestBody) throws JsonProcessingException {
LOG.info("===> Received request for config-update: " + requestBody.toPrettyString());
UpdateRequest updateRequest = Config.mapper.treeToValue(requestBody, UpdateRequest.class);
boolean status = Config.applyPatch(Config.mapper.readTree(updateRequest.patch));
ObjectNode cimResponse = JsonNodeFactory.instance.objectNode();
cimResponse.put("change-set-key", updateRequest.changeSet);
cimResponse.put("revision", updateRequest.revision);
// default: success
cimResponse.put("status", "success");
cimResponse.put("remarks", "successfully applied the config update");
// failure case
if (!status) {
LOG.debug("Config update failed");
cimResponse.put("status", "failure");
cimResponse.put("remarks", "failed to apply the config update");
}
// publish event to CIM over NATS
Config.publishResponse(cimResponse, status);
LOG.info("<=== Configuration upto date");
return cimResponse;
}
---
Notification
When the topo tree has changed, topoengine will send notification by notifer, it use NotificationUtil.sendEvent,
which send event to Nat Server——cim, and then send to KAFKA
NotificationUtil.java
---
Event.startEvent(builder);
Event.addEventName(builder, name);
Event.addEventTime(builder, eventTime);
Event.addContainerId(builder, container);
if (mngdObj != 0)
Event.addManagedObject(builder, mngdObj);
if (addInfo != 0)
Event.addAdditionalInfo(builder, addInfo);
if (thrInfo != 0)
Event.addThresholdInfo(builder, thrInfo);
if (stChDef != 0)
Event.addStateChangeDefinition(builder, stChDef);
if (monAttr != 0)
Event.addMonitoredAttributes(builder, monAttr);
Event.addSourceId(builder, srcId);
Event.addSourceName(builder, srcName);
int event = Event.endEvent(builder);
builder.finish(event);
byte[] buffer = builder.sizedByteArray();
builder.clear();
LOG.info("Publishing event: " + eventName + " to NATS for source: " + sourceName);
publishEvent(buffer);
---
private static void publishEvent(byte[] message) {
Connection natsConnection = NatsUtil.getConnection();
natsConnection.publish("EVENT", message);
}
---