Skip to content

Apache Kafka

Apache Kafka is a highly scalable, open-source event streaming platform designed for data pipelines, stream-based analysis, and event-driven computation. It enables event data exchange through both publication and subscription mechanisms between sinks and sources, with one or more Kafka brokers handling the communication. Kafka operates on an event-based architecture, where events (significant occurrences or state changes) are stored, and their history is retained in event logs. In computing and data architectures, events often represent actions or changes within an application or environment. For example, in building automation, an event could be a temperature sensor's reading change or a setpoint adjustment by a user or control application. Each event is characterized by a key (providing context, such as a user ID or sensor ID) and a value (the raw message). The key components of Kafka are discussed in detail below and depicted in Figure 1.

kafka
Figure 1: Structure of the Kafka architecture

In Apache Kafka, topics and partitions are key concepts that orchestrate the communication between producers and consumers. They ensure scalability, fault tolerance, and parallelism, which makes Kafka highly efficient for handling large volumes of real-time data streams. Let's break down how they work together to facilitate communication.

Topics (Logical Channels for Communication)

A topic in Kafka is a logical channel or category to which producers send messages and from which consumers read messages. Kafka topics are similar to message queues or topics in other messaging systems, but they have some distinct properties:

  • Producers publish messages to specific topics.
  • Consumers subscribe to topics to receive messages.

Kafka stores all the messages in a topic in a distributed log. Unlike traditional queues, Kafka topics do not delete messages once consumed (unless retention policies are configured), allowing multiple consumers to read the same messages independently.

Partitions (Ensuring Scalability and Parallelism)

Each Kafka topic is divided into partitions, and this is where the orchestration between producers and consumers happens at scale.

  • Parallelism: Partitions allow Kafka to scale horizontally. Each partition can be hosted on a different Kafka broker (node), so topics can span multiple brokers. Producers can write to different partitions in parallel, and consumers can read from different partitions simultaneously.

  • Ordering: Messages within a single partition are strictly ordered. However, Kafka does not guarantee order across partitions, only within a single partition. This means if ordering is important, producers must ensure related messages go to the same partition (e.g., using a key-based partitioning strategy).

  • Data Distribution: Kafka partitions enable data distribution and replication. Each partition can have replicas on multiple brokers, ensuring fault tolerance.

Producers (Sending Messages to Partitions)

When producers send messages to a topic, they determine the target partition based on one of the following strategies:

  • Round-Robin: Messages are sent to partitions in a round-robin fashion (even distribution).

  • Keyed Partitioning: If a message has a key, Kafka will use a partitioning algorithm (like a hash of the key) to ensure all messages with the same key go to the same partition. This ensures ordering for messages related to the same key.

  • Custom Partitioning: Producers can also use custom partitioning logic if specific partitioning is needed based on business requirements.

Consumers (Reading from Partitions)

  • Consumer Groups: Kafka uses the concept of consumer groups to provide parallelism. Multiple consumers in a consumer group read from a topic's partitions. Each partition is read by only one consumer within a group, but the same partition can be read by consumers in different groups.

  • Rebalancing: If a consumer in a group fails, Kafka automatically reassigns the partitions it was reading from to another consumer in the group, ensuring fault tolerance.

how to configure a Kafka publisher

In order to send data to an aedifion project on the aedifion cloud platform, you need to configure a Kafka data publisher. The essential attributes of a "kafka-publisher" are elaborated in the following on the basis of an example configuration YAML file.

kafka-publisher:
    client_id: project_handle
    bootstrap_servers: My_bootstrap_servers:9092
    session_timeout_ms: 6000
    auto_offset_reset: earliest
    auto_commit_interval_ms: 5000
    security_protocol: SSL
    sasl_mechanisms: SASL_SCRAM_SHA512
    sasl_username: SSL_Username
    sasl_password: SSL_Password
    topics: Example topic. 
    observations:
        name: mtw.aed.project_handle.data
        partitions: 1
        replication_factor: 3
        format: line-protocol
    metadata:
        name: mtw.aed.project_handle.metadata
        partitions: 1
        replication_factor: 3
  • client_id: Refers to the idendifier of the publishing client. Typically, the project_handle.

  • bootstrap_servers: This parameter tells Kafka producers and consumers where to connect to communicate with the Kafka cluster. It is a comma-seperated list with Kafka broker addresses that clients (consumers and producers) can connect to.

  • session_timeout_ms: The session timeout in Kafka refers to the time a Kafka consumer can be unresponsive (without sending heartbeats) before being considered dead by the Kafka broker or the Kafka consumer group coordinator. When a consumer is marked as dead, its partitions are reassigned to other consumers in the same group.

  • auto_offset_reset: The auto_offset_reset configuration in Kafka dictates how Kafka consumers behave when they start consuming messages from a topic where no valid offset is found. earliest: If the consumer has no previously committed offset (it's a new consumer group or the offset was deleted), it will start consuming from the earliest offset (the beginning of the log).

  • security_protocol: The security_protocol configuration in Kafka determines how the client (consumer/producer) communicates with the Kafka brokers in terms of the security layer. The value SSL indicates that the client will communicate with the Kafka brokers using SASL (Simple Authentication and Security Layer) for authentication and SSL (Secure Sockets Layer) for encryption.

  • sasl_mechanisms: This configuration in Kafka specifies the authentication mechanism used when the Kafka client connects to the broker using SSL (Simple Authentication and Security Layer). When set to PLAIN, it indicates that SASL/PLAIN authentication will be used, which is a simple mechanism that involves sending a username and password in plain text (though typically over an encrypted connection). SASL_SCRAM_SHA512, used by aedifion, is an authentication mechanism that uses the Salted Challenge-Response Authentication Mechanism (SCRAM) with SHA-512 hashing to securely verify user credentials while protecting against eavesdropping and replay attacks.

  • sasl_username & sasl_password: The parameters sasl_.username and sasl._password in Kafka represent the username and password used for authenticating Kafka clients (producers or consumers) to the Kafka broker when using SASL (Simple Authentication and Security Layer).

  • topics: In Kafka, topics are fundamental to the way data is organized and distributed within the system. A topic is essentially a named channel where messages (events or records) are produced by producers and consumed by consumers. Kafka topics allow multiple producers and consumers to publish and retrieve data efficiently.

  • observations: A configuration block for a Kafka topic. Also see "Payload format".

    • Including the name: mtw.aed. + a project_handle,
    • a number of partition: A partition is a subset of the topic’s data. Having only one partition means all the messages for this topic will be placed into a single partition, processed sequentially,
    • a replication_factor: the replication_factor is set to 3, which means each partition will be replicated across 3 Kafka brokers,
    • and finally a format: Line protocol is a textual data format often associated with time-series databases, most commonly InfluxDB. It represents data as a line of text that specifies a measurement, tags (for metadata), fields (the data itself), and a timestamp.
  • metadata: Same configuration block as for the observation, just with the data streams metadata as target instead of the observations.

Payload format

The payload format (consisting of the key, the timestamp, and the value) is identical to the MQTT API payload format.

Example messaging via Kafka

An example for sending data in the format required by aedifion via KafkaProducer using Python programming language.

# Imports the Influxdb line protocol and the Python Kafka producer. 
from influxdb.line_protocol import make_lines 
from kafka import KafkaProducer

# Function that converts a payload dictionary with a key and a number into 
# influx line protocoll 
def make_lineprotocol_payload(fqdn, value, timestamp=None) -> str:
    if timestamp is None:
        timestamp = datetime.now().timestamp()
    point = {"measurement": fqdn,
             "time": int(timestamp * 10**9), 
             "fields": {"value": value}}
    data = {"points": [point]}
    return make_lines(data).strip()

# Topic of the aedifion project. Usual a data and a metadata topic exists
topic = "mtw.aed.project_handle.data"

# Sample data 
# First data type (integer, float, boolean) determines what is expected 
# and stored in future. 
data = {"key": "My_ID", "number": 84} 

# Create Kafka producer with SASL/SSL security
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol=SSL,
    sasl_mechanism=SASL_SCRAM_SHA512,
    sasl_plain_username=sasl_plain_username,
    sasl_plain_password=sasl_plain_password,
    value_serializer=lambda v: v.encode("utf-8"),  # Serializing the line protocol 
                                                   # payload to bytes
)

# Use make_lineprotocol_payload to convert the sample data
LineProtocol_Payload = make_lineprotocol_payload(fqdn=data["key"], value=data["number"])

# Send the data (First publication to a new datapoint may take up to one hour; 
# afterwards data appears immediately)
producer.send(topic, LineProtocol_Payload)
An example for receiving data via KafkaConsumer using Python programming language.

from kafka import KafkaConsumer

# Initialize Kafka consumer (Reading permission must be granted by aedifion support)
consumer = KafkaConsumer(
    topic,
    bootstrap_servers = bootstrap_servers,
    security_protocol = security_protocol,
    sasl_mechanism = sasl_mechanism,
    sasl_plain_username = sasl_plain_username,
    sasl_plain_password = sasl_plain_password,  
    auto_offset_reset = "earliest",  # Read from the beginning of the topic 
    enable_auto_commit = True,  # Auto-commit offsets after receiving messages
    group_id = Group_id,  # Consumer group id
    value_deserializer=lambda x: x.decode("utf-8"),
)

# Poll messages from the topic
for message in consumer:
    print(f"Received message: {message.value} from topic: {message.topic}, 
    partition: {message.partition}, 
    offset: {message.offset}")

References and further readings

[1] Apache Kafka: https://kafka.apache.org/
[2] Confluent-kafka (Python): confluentinc/confluent-kafka-python
[3] Kafka-python: https://pypi.org/project/kafka-python/
[4] Kafka-python (docs): https://kafka-python.readthedocs.io/en/master/
[5] Kafka-python (tutorials): daveklein/kafka-python-tutorials