Kafka

[KAFKA] 카프카 브로커 설치

mobile 2022. 2. 15. 15:11
반응형
설치 OS : CentOS Linux release 7.9.2009 (Core)
KAFKA Version : 2.8.1 

 

(1) 자바 설치

카프카 브로커를 실행하기 위해 JDK가 필요하다.

카프카 브로커는 스칼라와 자바로 작성되어 JVM 환경 위에서 실행되기 때문이다.

// openJDK 설치
$ sudo yum install -y java-1.8.0-openjdk-devel.x86_64
 
// 자바 버전 확인
$ java -version

 

(2) 카프카 다운로드

카프카 브로커를 실행하기 위해서 카프카 바이너리 패키지를 다운로드한다.

카프카 공식 홈페이지 다운로드 페이지에서 다운로드할 수 있다. (https://kafka.apache.org/downloads)

// 카프카 다운로드
 
// 압축해제
$ tar xvf kafka_2.13-2.8.1.tgz

 

 

(3) 카프카 브로커 실행 옵션 지정

config 폴더에 있는 server.properties 파일에 카프카 브로커 운영에 필요한 옵션들을 지정할 수 있다. 

목적이 테스트라면 advertised.listeners 라인 정도만 주석 해제후 원하는 IP로 설정하면 충분하다.

// vim config/server.properties

############################# Server Basics #############################

## 실행하는 카프카 브로커의 번호를 적는다.
## 클러스터를 구축할 때 브로커들을 구분하기 위해 유일한 번호를 설정해야 한다.
broker.id=0

############################# Socket Server Settings #############################

## 카프카 브로커가 통신을 위해 열어둘 인터페이스 IP, Port, 프로토콜을 설정할 수 있다.
## 설정이 없으면 모든 IP와 Port에서 접속 할 수 있다.
#listeners=PLAINTEXT://:9092

## 카프카 클라이언트 또는 카프카 커맨드 라인 툴에서 접속할 때 사용하는 IP와 Port 정보 설정.
#advertised.listeners=PLAINTEXT://your.host.name:9092

## SASL_SSL, SASL_PLAIN 보안 설정 시 프로토콜 매핑을 위한 설정
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

## 네트워크를 통한 처리를 할때 사용할 네트워크 스레드 개수 설정
num.network.threads=3

## 카프카 브로커 내부에서 사용할 스레드 개수를 지정
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

## 데이터를 저정할 디렉토리 위치
log.dirs=/tmp/kafka-logs

## 파티션 개수를 명시하지 않고 토픽을 생성할 때 기본 설정되는 파티션 개수다.
## 파티션 개수가 많아지면 병렬 처리 데이터양이 늘어난다.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

## 카프카 브로커가 저장한 파일이 삭제되기까지 걸리는 시간을 설정
## 가장 작은 단위를 기준으로 하므로 log.retention.ms=값 을 설정하여 운영하는 것을 추천
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

## 카프카 브로커가 저장할 파일의 최대 크기를 지정한다. 
## 데이터 양이 많아 설정된 크기를 채우게 되면 새로운 파일이 생성된다.
log.segment.bytes=1073741824

## 카프카 브로커가 저장한 파일을 삭제하기 위해 체크하는 간격을 지정
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

## 카프카 브로커와 연동할 주키퍼의 IP Port을 설정
zookeeper.connect=localhost:2181

## 주키퍼 세션타임아웃 시간을 지정
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

 

(4) 주키퍼 실행

카프카 바이너리가 포함된 폴더에는 브로커와 같이 실행할 주키퍼가 준비되어 있다. 

분산 코디네이션 서비스를 제공하는 주키퍼는 카프카의 설정 리더 정보, 컨트롤러 정보를 담고 있어 카프카를 실행하는 데에 필요한 필수 애플리케이션이다.

 

// 주키퍼 실행
$ bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

// 주키퍼 실행 확인
$ jps -vm

 

 

(5) 카프카 브로커 실행 및 로그 확인

-daemon 옵션과 함께 카프카 브로커를 백그라운드 모드로 실행할 수 있다. kafka-server-start.sh 명령어를 통해 카프카 브로터를 실행한 뒤 jps 명령어를 통해 주키퍼와 브로커 프로세스의 동작 여부를 알 수 있다.

 

// 카프카 브로커 실행
$ bin/kafka-server-start.sh -daemon config/server.properties

// 실행 확인
$ jps -m

 

 

설정 완료!

반응형