Choi G.J bio photo

Choi G.J

IT System Engineer

Kafka Tip

Overview

KAFKA

설치/설정관련

server.properties

  • 카프카 서버 설정 ( 클러스터들 주소, 주키퍼 주소 등 )
  • broker.id 는 클러스터마다 바꿔줘야 함

zookeeper.properties

  • 주키퍼 구동 관련 (포트 등 )

connect-distributed.properties

  • connect 사용시 distribute 모드의 설정 ( 기본 connector 설정 및 cluster groupId 정도, 나머지는 REST API로 connerter마다 설정 )

log4j.properties

  • kafka 로그 관련 connect-log4j.properties

주키퍼 기동

./bin/zkServer.sh start
./bin/zkServer.sh stop

카프카 기동

./bin/kafka-server-start.sh ./config/server.properties
nohup ./bin/kafka-server-start.sh ./config/server.properties &

카프카 커넥트 기동

./bin/kafka-server-stop.sh ./config/server.properties

CLI 명령

토픽 생성

./kafka-topics.sh --zookeeper {zookeeper-servers}:2181/{name} --replication-factor 1 --partitions 1 --topic {topic} --create

토픽에 Publish

./kafka-console-producer.sh --broker-list {kafka-servers}:9092 --topic {topic}

토픽을 consuming

./kafka-console-consumer.sh --bootstrap-server {kafka-servers}:9092 --topic {topic} --from-beginning

토픽정보 조회

./kafka-topics.sh —-bootstrap-server {kafka-servers}:9092 —-list

토픽특성확인

./kafka-topics.sh --zookeeper  {kafka-servers}:2181/{name} --topic {topic} --describe 

토픽설정 변경 (retain)

./kafka-configs.sh --zookeeper {kafka-servers}:2181/{name} --entity-type topics --entity-name {topic} --add-config "retention.ms=3600000" --alter

토픽설정 확인 (retain)

./kafka-configs.sh --zookeeper {kafka-servers}:2181/{name} --entity-type topics --entity-name {topic} --describe

토픽의 replica 재지정 ( rf.json 파일을 만들어서 처리해야 함 )

./kafka-reassign-partitions.sh --zookeeper {kafka-servers}:2181/{name} --reassignment-json-file ./rf.json --execute

rf.json
{
	"version" : 1,
	"partitions" : [
	  { "topic" : "{topic}"
	    ,"partition" : 0
	    ,"replicas" : [1,2]
	  }
	]
}

컨슈머 그룹 리스트 조회

./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --all-topics --list

컨슈머 그룹의 정보조회(offset, LAG 등)

./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --group {connect-group-name}--describe

컨슈머 그룹의 정보조회(offse 재지정)

./kafka-consumer-groups.sh --bootstrap-server {kafka-servers}:9092 --group {consumer-group-name} ——topic {topic} —-reset-offsets --to-latest --execute

// 특정 토픽의 시간조건(time -1, -2, timestamp)의 offset 정보 조회

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list aveditorspare01.ugedit.nfra.io:9092 --topic platform-api-apache-logs --time -1

KAFKA Connect

kafka connect distribute REST API

connector 목록조회

GET http://{kafka-connect-server}:8083/connectors

connector 등록

POST http://{kafka-connect-server}:8083/connectors

connector 상세내용 조회

GET http://{kafka-connect-server}:8083/connectors/{connector-name}

connector 수정

PUT http://{kafka-connect-server}:8083/connectors/{connector-name}

connector 삭제

DELETE http://{kafka-connect-server}:8083/connectors/{connector-name}

connector 상태조회

GET http://{kafka-connect-server}:8083/connectors/{connector-name}/status

0번째 테스크의 상태조회

GET http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/status

0번째 테스크 재시작

PUT http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/restart

0번째 테스크 정지

PUT http://{kafka-connect-server}:8083/connectors/{connector-name}/tasks/0/pause

Schema Registry

kafka registry 설정

여러 오픈소스가 존재하는 듯하지만 confluent걸 사용 (confluent 풀패키지를 받아야 함… )

압축해제 후

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

설정파일 조정 schema-registry.properties

주키퍼를 사용하는 방법이 있는듯하지만, kafka를 직접 이요하는 방법 사용을 위해 아래 부분 설정 ( 9092 카프카 port) kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092

설정방식 조회 ( backward, forward, all )

http://localhost:8081/config

등록된 스키마 조회

http://localhost:8081/subjects
http://localhost:8081/subjects/{스키마이름}
http://localhost:8081/subjects/{스키마이름}/version/1

새로운 스키마로 전송할때마다, 버젼이 쌓임.

스키마파일 (id로 찾음)는 kafka에 _schemas 토픽에 담겨져있음.

설전변환

curl --header "Content-Type: application/json" -X PUT   --data '{"compatibility": "FULL"}'

설정방식 특징

backward(기본) : 추가 default가 없으면 produce시 에러 , default가 null이 아닌값으로 지정되어야 함 (필드 삭제시는 괜찮음)

forward : 삭제시 default 없으면 에러 produce시 에러 (backward와 반대)

full : 필드추가, 필드삭제 모두 default가 고려 안되면 produce 못함

기타 팁

kafka connect 옵션

"errors.tolerance" : "all"      <= 이 옵션을 줘야, task가 포맷이 이상한 데이터가 들어와도 죽지 않는다. ( 기본은 none )

토픽 삭제 관련

  • 토픽삭제시 connect가 topic을 바라보고 있어서, 삭제 후 다시 생기는 문제점이 있음. => 토픽을 삭제해야한다면 kafka-connect를 중지시키고 삭제 후 재 생성 필요

JMX관련 설정

kafka 구동시 아래의 옵션을 추가해서 remote jmx port를 활성화시킴

export KAFKA_JMX_OPTS=’-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.rmi.port=1099’

JMX를 이용해서 KAFKA로그 레벨 변경하기

jconsole등으로 접근하여 kafka > kafka.Log4jController > setLogLevel ( kafka.request.logger ) 을 수정

카프카 Scale Out

  • 카프카 확장은 카프카 브로커서버를 추가하고 ( id=4, id=5 클러스터 번호 추가) 투입하면 바로 사용가능 => but 기존 파티션들은 추가된 4,5번에 투입안되었으므로 reassign을 통해서 파티션 재구성 필요

주키퍼 앙상플 Scale Out

  • 주키퍼가 추가되면, 주키퍼서버에 다른 클러스터 정보가 들어있으므로 기존 클러스터에 정보 추가 후 리붓 필요 ( 마스터를 가장 나중에 하고 하나씩 재기동 )

카프카 뒷단 구성

  • NIFI <= 비쥬얼라이징된 데이터 파이프라인 구성 툴 ( connect sink, source 역할 가능 )
  • 브루클린이라는 connect랑 비슷하지만 다른 kafka 큐로 보내는 구성에 특화된게 있는듯

카프카 모니터링

  • burrow ( <= 링크드인에서 만든 lag 감시)
  • 기본적으로 JMX로 데이터 수집하고, 처리함

카프카 KSQL

  • SQL 문법처럼 Stream테이블을 생성하고, 토픽을 연결해서 변환하는 Stream 파이프라인을 만드는데 사용
  • Confluent 패치키에 포함되어있음.

카프카 스트림

  • Java로 Stream프로그램
  • bulk로 받지 않고, 건건이 처리

kafka 클라이언트

  • conductor 좋은듯 (유료됨)
  • kafka manager 좀 아쉬운 점이 많음.

kafka-connect task 밸런싱 문제

connector 등록시 task를 10개로 해도 3대 노드 중 한군데서만 task파티션이 수행되는 이슈가 있음. 파티션수를 충분히 잡아서 여러 task에서 골고루 connect가 수행되도록 처리해야할 필요가 있음.

kafka consumer 옵션

fetch.min.bytes

  • 한번에 가져올 수 있는 최소 데이터 사이즈 ( 데이터가 누적될 때까지 기다림)

fetch.max.wait.ms

  • fetch.min.bytes 설정보다 적은 경우, 기다리는 최대시간

enable.auto.commit

  • 백그라운드 자동 offset 커밋 auto.commit.interval.ms

auto.offset.reset

  • earliest, latest, none 초기오프셋이 없을때 어떤 오프셋을 가져올지

session.timeout.ms heartbeat.interval.ms max.poll.records max.poll.interval.ms