diff --git a/build.gradle b/build.gradle
index 3ec442d..117560c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -47,6 +47,20 @@ repositories {
password = githubToken
}
}
+ maven {
+ url = 'https://maven.pkg.github.com/first-ticket/common-jpa'
+ credentials {
+ username = githubUser
+ password = githubToken
+ }
+ }
+ maven {
+ url = 'https://maven.pkg.github.com/first-ticket/common-messaging'
+ credentials {
+ username = githubUser
+ password = githubToken
+ }
+ }
}
ext {
@@ -55,30 +69,58 @@ ext {
}
dependencies {
+ // 공통 모듈
+ implementation 'com.first-ticket:common:0.0.4-SNAPSHOT'
+ implementation 'com.first-ticket:common-jpa:0.0.2-SNAPSHOT'
+ implementation 'com.first-ticket:common-messaging:0.0.2-SNAPSHOT'
+
+ // Spring Boot
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
+ implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-web'
+ implementation 'org.springframework.boot:spring-boot-starter-aop'
+
+ // Spring Cloud
implementation 'org.springframework.cloud:spring-cloud-starter-config'
implementation 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client'
- implementation 'org.springframework.retry:spring-retry'
- implementation 'org.springframework.boot:spring-boot-starter-aop'
- implementation 'com.first-ticket:common:0.0.4-SNAPSHOT'
+
+ // kafka
+ implementation 'org.springframework.kafka:spring-kafka'
+
+ // flyway
+ implementation 'org.flywaydb:flyway-core'
+ implementation 'org.flywaydb:flyway-database-postgresql'
+
+ // 모니터링
+ implementation 'io.micrometer:micrometer-registry-prometheus'
+ implementation 'io.micrometer:micrometer-tracing-bridge-brave'
+ runtimeOnly 'io.zipkin.reporter2:zipkin-reporter-brave'
+
+ // JWT
+ implementation 'io.jsonwebtoken:jjwt-api:0.12.6'
+ runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.6'
+ runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.6'
+
+ // Lombok
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
+
+ // PostgreSQL
+ runtimeOnly 'org.postgresql:postgresql'
+
+ // Test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
+ testImplementation 'org.testcontainers:postgresql'
testImplementation 'org.springframework.boot:spring-boot-testcontainers'
testImplementation 'org.testcontainers:junit-jupiter'
+ testImplementation 'org.springframework.kafka:spring-kafka-test'
testImplementation 'org.springframework.restdocs:spring-restdocs-mockmvc'
asciidoctorExt 'org.springframework.restdocs:spring-restdocs-asciidoctor'
testCompileOnly 'org.projectlombok:lombok'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testAnnotationProcessor 'org.projectlombok:lombok'
-
- // JWT
- implementation 'io.jsonwebtoken:jjwt-api:0.12.6'
- runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.6'
- runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.6'
}
dependencyManagement {
diff --git a/src/main/java/com/firstticket/queueservice/config/JpaConfig.java b/src/main/java/com/firstticket/queueservice/config/JpaConfig.java
new file mode 100644
index 0000000..6b50027
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/config/JpaConfig.java
@@ -0,0 +1,17 @@
+package com.firstticket.queueservice.config;
+
+import org.springframework.boot.autoconfigure.domain.EntityScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
+
+@Configuration
+@EntityScan(basePackages = {
+ "com.firstticket.queueservice",
+ "com.firstticket.common.messaging.inbox"
+})
+@EnableJpaRepositories(basePackages = {
+ "com.firstticket.queueservice",
+ "com.firstticket.common.messaging.inbox"
+})
+public class JpaConfig {
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java b/src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java
new file mode 100644
index 0000000..4756952
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java
@@ -0,0 +1,87 @@
+package com.firstticket.queueservice.programmeta.application;
+
+import com.firstticket.queueservice.programmeta.application.dto.CancelProgramCommand;
+import com.firstticket.queueservice.programmeta.application.dto.CreateProgramMetaCommand;
+import com.firstticket.queueservice.programmeta.application.dto.UpdateProgramTimeCommand;
+import com.firstticket.queueservice.programmeta.domain.ProgramMeta;
+import com.firstticket.queueservice.programmeta.domain.ProgramMetaRepository;
+import com.firstticket.queueservice.programmeta.domain.event.ProgramEvents;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * ProgramMeta 도메인의 Command 처리 서비스.
+ * Kafka Consumer 가 전달한 Command 를 받아 Aggregate 의 상태를 변경하고,
+ * 필요 시 도메인 이벤트를 발행한다.
+ */
+@Slf4j
+@RequiredArgsConstructor
+@Service
+public class ProgramMetaService {
+ private final ProgramMetaRepository programMetaRepository;
+ private final ProgramEvents programEvents;
+
+ /**
+ * program.created 처리.
+ * ProgramMeta 새로 생성하여 저장.
+ */
+ public void handleCreated(CreateProgramMetaCommand command) {
+ log.info("Program created. programId={}, status={}", command.programId(), command.status());
+
+ programMetaRepository.findById(command.programId())
+ .ifPresentOrElse(
+ existing -> log.info("ProgramMeta already exists. skip created. programId={}", command.programId()),
+ () -> {
+ ProgramMeta programMeta = ProgramMeta.of(
+ command.programId(),
+ command.openAt(),
+ command.closeAt(),
+ command.status()
+ );
+ programMetaRepository.save(programMeta);
+ }
+ );
+ }
+
+ /**
+ * program.time.updated 처리.
+ * 기존 ProgramMeta 의 openAt / closeAt 갱신.
+ * Meta 가 존재하지 않으면 경고 로그만 남긴다 (이벤트 순서가 어긋난 경우 대비).
+ */
+ public void handleTimeUpdated(UpdateProgramTimeCommand command) {
+ log.info("Program time updated. programId={}, openAt={}, closeAt={}",
+ command.programId(), command.openAt(), command.closeAt());
+
+ programMetaRepository.findById(command.programId())
+ .ifPresentOrElse(
+ programMeta -> {
+ programMeta.updateTime(command.openAt(), command.closeAt());
+ programMetaRepository.save(programMeta);
+ },
+ () -> log.warn("ProgramMeta 없음 (time updated). programId={}", command.programId())
+ );
+ }
+
+ /**
+ * program.cancelled 처리.
+ * 1. ProgramMeta 의 status 를 CANCELLED 로 갱신.
+ * 2. ProgramCancelledEvent 발행 (queuetoken Aggregate 가 토큰 정리).
+ */
+ public void handleCancelled(CancelProgramCommand command) {
+ log.info("Program cancelled. programId={}", command.programId());
+
+ programMetaRepository.findById(command.programId())
+ .ifPresentOrElse(
+ programMeta -> {
+ programMeta.cancel();
+ programMetaRepository.save(programMeta);
+ },
+ () -> log.warn("ProgramMeta 없음 (cancelled). programId={}", command.programId())
+ );
+
+ // queuetoken Aggregate 에 이벤트 발급
+ programEvents.publishProgramCancelled(command.programId());
+ }
+
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CancelProgramCommand.java b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CancelProgramCommand.java
new file mode 100644
index 0000000..7d22890
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CancelProgramCommand.java
@@ -0,0 +1,21 @@
+package com.firstticket.queueservice.programmeta.application.dto;
+
+import com.firstticket.queueservice.programmeta.domain.ProgramStatus;
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+
+import java.util.UUID;
+
+/**
+ * program.cancelled 이벤트 처리용 Command.
+ */
+public record CancelProgramCommand(
+ ProgramId programId,
+ ProgramStatus status
+) {
+ public static CancelProgramCommand of(UUID programId, String status) {
+ return new CancelProgramCommand(
+ ProgramId.of(programId),
+ ProgramStatus.parse(status)
+ );
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CreateProgramMetaCommand.java b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CreateProgramMetaCommand.java
new file mode 100644
index 0000000..4b83d4f
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/CreateProgramMetaCommand.java
@@ -0,0 +1,34 @@
+package com.firstticket.queueservice.programmeta.application.dto;
+
+import com.firstticket.queueservice.programmeta.domain.ProgramStatus;
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+/**
+ * program.created 이벤트 처리용 Command.
+ * Kafka Consumer 가 외부 Payload 를 변환하여 application 에 전달한다.
+ *
+ *
openAt / closeAt 은 생성 시점엔 스케줄 미정이라 null 가능.
+ */
+public record CreateProgramMetaCommand(
+ ProgramId programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt,
+ ProgramStatus status
+) {
+ public static CreateProgramMetaCommand of(
+ UUID programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt,
+ String status
+ ) {
+ return new CreateProgramMetaCommand(
+ ProgramId.of(programId),
+ openAt,
+ closeAt,
+ ProgramStatus.parse(status)
+ );
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/application/dto/UpdateProgramTimeCommand.java b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/UpdateProgramTimeCommand.java
new file mode 100644
index 0000000..ea8be73
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/application/dto/UpdateProgramTimeCommand.java
@@ -0,0 +1,28 @@
+package com.firstticket.queueservice.programmeta.application.dto;
+
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+/**
+ * program.time.updated 이벤트 처리용 Command.
+ * 스케줄 (openAt / closeAt) 등록 / 변경 시 사용.
+ */
+public record UpdateProgramTimeCommand(
+ ProgramId programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt
+) {
+ public static UpdateProgramTimeCommand of(
+ UUID programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt
+ ) {
+ return new UpdateProgramTimeCommand(
+ ProgramId.of(programId),
+ openAt,
+ closeAt
+ );
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java
new file mode 100644
index 0000000..f43a925
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java
@@ -0,0 +1,93 @@
+package com.firstticket.queueservice.programmeta.domain;
+
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+import java.util.Objects;
+
+/**
+ * Program 의 메타 정보 (Aggregate Root).
+ * program-service 의 이벤트로 갱신되는 캐시 / 읽기 모델.
+ *
+ * 원본은 program-service 가 소유하므로 queue-service 는 이 객체를
+ * 영구 저장하지 않으며, 필요 시 program 토픽의 처음부터 재구독으로 복구한다.
+ */
+@Getter
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class ProgramMeta {
+
+ private ProgramId programId;
+ private LocalDateTime openAt;
+ private LocalDateTime closeAt;
+ private ProgramStatus status;
+
+ /**
+ * 새 ProgramMeta 생성. 일반적으로 program.created 이벤트 처리 시 호출.
+ *
+ * @param programId UUID 형태의 program 식별자
+ * @param openAt 예매 오픈 시각 (생성 시점엔 null 가능)
+ * @param closeAt 예매 종료 시각 (생성 시점엔 null 가능)
+ * @param status program-service 가 전달한 상태 문자열 ("DRAFT" / "CANCELLED")
+ */
+ public static ProgramMeta of(ProgramId programId, LocalDateTime openAt,
+ LocalDateTime closeAt, ProgramStatus status) {
+ Objects.requireNonNull(programId, "programId는 null일 수 없습니다.");
+ Objects.requireNonNull(status, "status는 null일 수 없습니다.");
+ validateSchedule(openAt, closeAt);
+ return new ProgramMeta(
+ programId,
+ openAt,
+ closeAt,
+ status
+ );
+ }
+
+ /**
+ * 스케줄 갱신. program.time.updated 이벤트 처리 시 호출.
+ */
+ public void updateTime(LocalDateTime newOpenAt, LocalDateTime newCloseAt) {
+ validateSchedule(newOpenAt, newCloseAt);
+ this.openAt = newOpenAt;
+ this.closeAt = newCloseAt;
+ }
+
+ private static void validateSchedule(LocalDateTime openAt, LocalDateTime closeAt) {
+ if (openAt != null && closeAt != null && openAt.isAfter(closeAt)) {
+ throw new IllegalArgumentException("openAt은 closeAt보다 늦을 수 없습니다");
+ }
+ }
+
+ /**
+ * 프로그램 취소 처리. program.cancelled 이벤트 처리 시 호출.
+ */
+ public void cancel() {
+ this.status = ProgramStatus.CANCELLED;
+ }
+
+ /**
+ * 현재 시각 기준 활성 여부.
+ * CANCELLED 가 아니고, 스케줄이 설정됐고, 현재 시각이 openAt ~ closeAt 사이일 때 true.
+ */
+ public boolean isActiveAt(LocalDateTime now) {
+ if (status == ProgramStatus.CANCELLED) return false;
+ if (openAt == null || closeAt == null) return false;
+ return !openAt.isAfter(now) && !closeAt.isBefore(now);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ProgramMeta that)) return false;
+ return programId.equals(that.programId);
+ }
+
+ @Override
+ public int hashCode() {
+ return programId.hashCode();
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMetaRepository.java b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMetaRepository.java
new file mode 100644
index 0000000..7e22090
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMetaRepository.java
@@ -0,0 +1,32 @@
+package com.firstticket.queueservice.programmeta.domain;
+
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * ProgramMeta 영속화 인터페이스.
+ * 구현체는 인프라 계층 (infrastructure/persistence) 에 위치한다.
+ */
+public interface ProgramMetaRepository {
+
+ /**
+ * ProgramMeta 저장 (overwrite).
+ */
+ void save(ProgramMeta programMeta);
+
+ Optional findById(ProgramId programId);
+
+ List findAll();
+
+ void deleteById(ProgramId programId);
+
+ /**
+ * 현재 시각 기준 활성 (CANCELLED 아니고 openAt ~ closeAt 사이) 인
+ * 모든 program 의 ID 목록을 반환한다.
+ * AdmissionScheduler 등에서 활성 프로그램 순회 시 사용.
+ */
+ List findActiveProgramIds(LocalDateTime now);
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramStatus.java b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramStatus.java
new file mode 100644
index 0000000..ea8e7ac
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramStatus.java
@@ -0,0 +1,43 @@
+package com.firstticket.queueservice.programmeta.domain;
+
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Program 의 생명주기 상태.
+ * program-service 의 이벤트로 갱신되며, queue-service 는 이 상태를 캐시한다.
+ *
+ * 활성/비활성 (OPEN / CLOSED) 은 별도 상태로 관리하지 않고
+ * openAt / closeAt 시간으로 동적 판단한다.
+ */
+public enum ProgramStatus {
+ /**
+ * 프로그램이 생성된 기본 상태.
+ * 스케줄 (openAt / closeAt) 등록 / 변경과 무관하게 유지된다.
+ * 시간 조건이 맞으면 토큰 발급 / 입장 허용.
+ */
+ DRAFT,
+
+ /**
+ * 프로그램이 취소된 상태.
+ * 모든 대기 토큰을 정리하고 신규 토큰 발급을 거부한다.
+ */
+ CANCELLED;
+
+ /**
+ * 문자열을 ProgramStatus 로 변환.
+ * 외부 (Kafka 페이로드 등) 에서 받은 문자열의 포맷 편차를 정정한다
+ * (공백 / 대소문자 정규화 후 변환).
+ *
+ * @throws NullPointerException value 가 null 일 때
+ * @throws IllegalArgumentException 지원하지 않는 status 일 때
+ */
+ public static ProgramStatus parse(String value) {
+ Objects.requireNonNull(value, "status는 null일 수 없습니다");
+ try {
+ return ProgramStatus.valueOf(value.trim().toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("지원하지 않는 program status: " + value, e);
+ }
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/domain/event/ProgramEvents.java b/src/main/java/com/firstticket/queueservice/programmeta/domain/event/ProgramEvents.java
new file mode 100644
index 0000000..86ed106
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/domain/event/ProgramEvents.java
@@ -0,0 +1,17 @@
+package com.firstticket.queueservice.programmeta.domain.event;
+
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+
+/**
+ * Program 도메인 이벤트 발행 인터페이스.
+ *
+ * 구현체는 Spring 의 ApplicationEventPublisher 를 위임 사용하며,
+ * 인프라 계층 (infrastructure/event) 에 위치한다.
+ */
+public interface ProgramEvents {
+ /**
+ * Program 취소 이벤트 발행.
+ * queuetoken Aggregate 의 EventListener 가 수신해 토큰을 정리한다.
+ */
+ void publishProgramCancelled(ProgramId programId);
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/domain/vo/ProgramId.java b/src/main/java/com/firstticket/queueservice/programmeta/domain/vo/ProgramId.java
new file mode 100644
index 0000000..11ae84e
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/domain/vo/ProgramId.java
@@ -0,0 +1,26 @@
+package com.firstticket.queueservice.programmeta.domain.vo;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * Program 을 식별하는 Value Object.
+ * 원본은 program-service 가 소유하며, queue-service 는 동일한 UUID 를 참조한다.
+ */
+public record ProgramId(UUID id) {
+ public ProgramId {
+ Objects.requireNonNull(id, "ProgramId는 null일 수 없습니다");
+ }
+
+ public static ProgramId of(UUID id) {
+ return new ProgramId(id);
+ }
+
+ public static ProgramId fromString(String id) {
+ return new ProgramId(UUID.fromString(id));
+ }
+
+ public String asString() {
+ return id.toString();
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/event/ProgramEventsImpl.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/event/ProgramEventsImpl.java
new file mode 100644
index 0000000..ba3fb0e
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/event/ProgramEventsImpl.java
@@ -0,0 +1,25 @@
+package com.firstticket.queueservice.programmeta.infrastructure.event;
+
+import com.firstticket.queueservice.programmeta.domain.event.ProgramEvents;
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+import com.firstticket.queueservice.shared.event.ProgramCancelledEvent;
+import lombok.RequiredArgsConstructor;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+/**
+ * ProgramEvents 의 Spring 구현체.
+ * 도메인이 Spring 에 직접 의존하지 않도록 인프라 계층에서 위임만 수행한다.
+ */
+@Component
+@RequiredArgsConstructor
+public class ProgramEventsImpl implements ProgramEvents {
+
+ private final ApplicationEventPublisher applicationEventPublisher;
+ @Override
+ public void publishProgramCancelled(ProgramId programId) {
+ applicationEventPublisher.publishEvent(
+ new ProgramCancelledEvent(programId.id())
+ );
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java
new file mode 100644
index 0000000..9fd5d65
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java
@@ -0,0 +1,100 @@
+package com.firstticket.queueservice.programmeta.infrastructure.messaging;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.firstticket.queueservice.programmeta.application.ProgramMetaService;
+import com.firstticket.queueservice.programmeta.application.dto.CancelProgramCommand;
+import com.firstticket.queueservice.programmeta.application.dto.CreateProgramMetaCommand;
+import com.firstticket.queueservice.programmeta.application.dto.UpdateProgramTimeCommand;
+import com.firstticket.queueservice.programmeta.infrastructure.messaging.payload.ProgramCancelledPayload;
+import com.firstticket.queueservice.programmeta.infrastructure.messaging.payload.ProgramCreatedPayload;
+import com.firstticket.queueservice.programmeta.infrastructure.messaging.payload.ProgramTimeUpdatedPayload;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+/**
+ * program 도메인 이벤트 Kafka Consumer.
+ * Payload 역직렬화 + Command 변환 + Application Service 호출의 책임을 가진다.
+ *
+ * 처리 실패 시 ack 하지 않아 Kafka 가 재전송하도록 한다 (at-least-once).
+ * 도메인 액션이 idempotent 하므로 중복 수신해도 안전하다.
+ */
+@Slf4j
+@Component
+@RequiredArgsConstructor
+public class ProgramKafkaConsumer {
+
+ private final ProgramMetaService programMetaService;
+ private final ObjectMapper objectMapper;
+
+ @KafkaListener(topics = "${kafka.topics.program-created}")
+ public void onProgramCreated(ConsumerRecord record, Acknowledgment ack) {
+ log.info("Received program.created. key={}", record.key());
+ try {
+ ProgramCreatedPayload payload = objectMapper.readValue(
+ record.value(), ProgramCreatedPayload.class);
+
+ CreateProgramMetaCommand command = CreateProgramMetaCommand.of(
+ payload.programId(), payload.openAt(), payload.closeAt(), payload.status());
+
+ programMetaService.handleCreated(command);
+ ack.acknowledge();
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ // 메시지 자체가 잘못됨 → 건너뜀 (재시도해도 같은 결과)
+ log.error("잘못된 메시지. 건너뜀. topic={}, key={}, value={}",
+ record.topic(), record.key(), record.value(), e);
+ ack.acknowledge();
+ } catch (Exception e) {
+ // 일시 장애 → 재전송 기다림
+ log.error("program.created 처리 실패. record={}", record, e);
+ }
+ }
+
+ @KafkaListener(topics = "${kafka.topics.program-time-updated}")
+ public void onProgramTimeUpdated(ConsumerRecord record, Acknowledgment ack) {
+ log.info("Received program.time.updated. key={}", record.key());
+ try {
+ ProgramTimeUpdatedPayload payload = objectMapper.readValue(
+ record.value(), ProgramTimeUpdatedPayload.class);
+
+ UpdateProgramTimeCommand command = UpdateProgramTimeCommand.of(
+ payload.programId(), payload.openAt(), payload.closeAt());
+
+ programMetaService.handleTimeUpdated(command);
+ ack.acknowledge();
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ log.error("잘못된 메시지. 건너뜀. topic={}, key={}, value={}",
+ record.topic(), record.key(), record.value(), e);
+ ack.acknowledge();
+
+ } catch (Exception e) {
+ log.error("program.time.updated 처리 실패. record={}", record, e);
+ }
+ }
+
+ @KafkaListener(topics = "${kafka.topics.program-cancelled}")
+ public void onProgramCancelled(ConsumerRecord record, Acknowledgment ack) {
+ log.info("Received program.cancelled. key={}", record.key());
+ try {
+ ProgramCancelledPayload payload = objectMapper.readValue(
+ record.value(), ProgramCancelledPayload.class);
+
+ CancelProgramCommand command = CancelProgramCommand.of(
+ payload.programId(), payload.status());
+
+ programMetaService.handleCancelled(command);
+ ack.acknowledge();
+ } catch (JsonProcessingException | IllegalArgumentException e) {
+ log.error("잘못된 메시지. 건너뜀. topic={}, key={}, value={}",
+ record.topic(), record.key(), record.value(), e);
+ ack.acknowledge();
+
+ } catch (Exception e) {
+ log.error("program.cancelled 처리 실패. record={}", record, e);
+ }
+ }
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCancelledPayload.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCancelledPayload.java
new file mode 100644
index 0000000..b365c9d
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCancelledPayload.java
@@ -0,0 +1,13 @@
+package com.firstticket.queueservice.programmeta.infrastructure.messaging.payload;
+
+import java.util.UUID;
+
+/**
+ * program.cancelled 토픽 페이로드.
+ * 프로그램 취소 시 발행. queue-service 가 활성 토큰 모두 정리.
+ */
+public record ProgramCancelledPayload(
+ UUID programId,
+ String status
+) {
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCreatedPayload.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCreatedPayload.java
new file mode 100644
index 0000000..51f6db9
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCreatedPayload.java
@@ -0,0 +1,16 @@
+package com.firstticket.queueservice.programmeta.infrastructure.messaging.payload;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+/**
+ * program.created 토픽 페이로드.
+ * 프로그램 생성 시점에는 스케줄 미등록이므로 openAt/closeAt 은 null 가능.
+ */
+public record ProgramCreatedPayload(
+ UUID programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt,
+ String status
+) {
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramTimeUpdatedPayload.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramTimeUpdatedPayload.java
new file mode 100644
index 0000000..855a4fa
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramTimeUpdatedPayload.java
@@ -0,0 +1,15 @@
+package com.firstticket.queueservice.programmeta.infrastructure.messaging.payload;
+
+import java.time.LocalDateTime;
+import java.util.UUID;
+
+/**
+ * program.time.updated 토픽 페이로드.
+ * 스케줄 등록 / 변경 시 발행.
+ */
+public record ProgramTimeUpdatedPayload(
+ UUID programId,
+ LocalDateTime openAt,
+ LocalDateTime closeAt
+) {
+}
diff --git a/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java
new file mode 100644
index 0000000..32f6ae2
--- /dev/null
+++ b/src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java
@@ -0,0 +1,156 @@
+package com.firstticket.queueservice.programmeta.infrastructure.redis;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.firstticket.queueservice.programmeta.domain.ProgramMeta;
+import com.firstticket.queueservice.programmeta.domain.ProgramMetaRepository;
+import com.firstticket.queueservice.programmeta.domain.ProgramStatus;
+import com.firstticket.queueservice.programmeta.domain.vo.ProgramId;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.ScanOptions;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.time.LocalDateTime;
+import java.util.*;
+
+/**
+ * ProgramMetaRepository 의 Redis 구현체.
+ *
+ * 키 패턴: {@code queue:program:meta:{programId}}
+ * 값: ProgramMeta 의 필드를 담은 JSON 문자열.
+ */
+@Slf4j
+@Repository
+@RequiredArgsConstructor
+public class RedisProgramMetaRepository implements ProgramMetaRepository {
+
+ private static final String KEY_PREFIX = "queue:program:meta:";
+ private static final String KEY_PATTERN = KEY_PREFIX + "*";
+
+ // JSON 필드 이름 상수
+ private static final String FIELD_PROGRAM_ID = "programId";
+ private static final String FIELD_OPEN_AT = "openAt";
+ private static final String FIELD_CLOSE_AT = "closeAt";
+ private static final String FIELD_STATUS = "status";
+
+ private final StringRedisTemplate redisTemplate;
+ private final ObjectMapper objectMapper;
+
+ /**
+ * ProgramMeta 저장 (overwrite).
+ *
+ * 이벤트 수신 시마다 호출되어 캐시를 갱신. 같은 programId 의 기존 값은 덮어쓴다.
+ * openAt / closeAt 이 null 이면 빈 문자열로 저장한다 (null 직렬화 회피).
+ */
+ @Override
+ public void save(ProgramMeta programMeta) {
+ try {
+ Map data = Map.of(
+ FIELD_PROGRAM_ID, programMeta.getProgramId().asString(),
+ FIELD_OPEN_AT, programMeta.getOpenAt() == null ? "" : programMeta.getOpenAt().toString(),
+ FIELD_CLOSE_AT, programMeta.getCloseAt() == null ? "" : programMeta.getCloseAt().toString(),
+ FIELD_STATUS, programMeta.getStatus().name()
+ );
+ String json = objectMapper.writeValueAsString(data);
+ redisTemplate.opsForValue().set(buildKey(programMeta.getProgramId()), json);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException("ProgramMeta 직렬화 실패", e);
+ }
+ }
+
+ /**
+ * programId 로 ProgramMeta 단건 조회.
+ */
+ @Override
+ public Optional findById(ProgramId programId) {
+ String json = redisTemplate.opsForValue().get(buildKey(programId));
+ if (json == null) return Optional.empty();
+ return Optional.of(deserialize(json));
+ }
+
+ /**
+ * 모든 ProgramMeta 조회.
+ * SCAN 으로 키 목록을 가져온 후 각 키별 GET.
+ */
+ @Override
+ public List findAll() {
+ ArrayList result = new ArrayList<>();
+ try (Cursor cursor = redisTemplate.scan(
+ ScanOptions.scanOptions().match(KEY_PATTERN).count(100).build()
+ )) {
+ while (cursor.hasNext()) {
+ String json = redisTemplate.opsForValue().get(cursor.next());
+ if (json != null) {
+ result.add(deserialize(json));
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * programId 로 ProgramMeta 삭제.
+ * 이미 없어도 안전 (Redis DEL 의 멱등성).
+ */
+ @Override
+ public void deleteById(ProgramId programId) {
+ redisTemplate.delete(buildKey(programId));
+ }
+
+ /**
+ * 현재 시각 기준 활성 프로그램 ID 목록 조회.
+ *
+ * findAll() 결과를 도메인의 {@link ProgramMeta#isActiveAt} 로 필터링.
+ * 활성 = CANCELLED 아니고 스케줄 설정됐고 현재 시각이 범위 안.
+ *
+ * 현재 모든 ProgramMeta 를 메모리로 가져와 필터링하는 본질이라,
+ * 프로그램 수가 많아지면 비효율. 미래엔 별도 인덱스 키
+ * (예: queue:program:active = Set of programIds) 도입 고려.
+ */
+ @Override
+ public List findActiveProgramIds(LocalDateTime now) {
+ return findAll().stream()
+ .filter(programMeta -> programMeta.isActiveAt(now))
+ .map(ProgramMeta::getProgramId)
+ .toList();
+ }
+
+ // ===== 헬퍼 =====
+
+ private String buildKey(ProgramId programId) {
+ return KEY_PREFIX + programId.asString();
+ }
+
+ /**
+ * JSON 문자열을 ProgramMeta 도메인 객체로 복원.
+ * 빈 문자열은 null 로 변환 (openAt / closeAt 의 미정 상태 표현).
+ */
+ private ProgramMeta deserialize(String json) {
+ try {
+ Map data = objectMapper.readValue(
+ json,
+ new TypeReference