[kotlin] 동시성 제어하기

2024. 3. 10. 23:59Java/Spring

동시성 문제는 여러 개의 스레드가 동시에 실행될 때 발생할 수 있는 문제로, 데이터 일관성 문제, 경쟁 조건 등이 대표적인 예입니다.

  • 데이터 일관성 문제: 여러 스레드가 동일한 데이터를 동시에 수정할 때, 데이터의 일관성이 깨질 수 있습니다. 예를 들어, 한 스레드가 데이터를 수정하는 도중에 다른 스레드가 데이터를 수정하면 데이터가 손상될 수 있습니다.
  • 경쟁 조건: 여러 스레드가 서로 경쟁하면서 자원을 사용할 때, 스레드 간의 충돌로 인해 예상치 못한 결과가 발생할 수 있습니다. 예를 들어, 한 스레드가 자원을 사용하고 있는 도중에 다른 스레드가 자원을 사용하려고 하면 충돌이 발생할 수 있습니다.

이번 포스트에서는 포인트 선물하기를 코틀린에서 동시성 문제를 해결할 때 사용할 수 있는 방법들을 소개하고자 합니다 .

volatile

공유 변수의 값이 변경될 때마다 모든 스레드가 해당 값을 최신 상태로 읽을 수 있도록 하는 키워드입니다. 변수 값 변경 시 모든 스레드에 즉시 알립니다. Kotlin 1.9.0 에서 kotlin.concurrent.Volatile 은 stable로 지원합니다.

class Account(var balance: Int = 0) {
    companion object {
        val totalBalance = AtomicInteger(0)
    }
}

fun sendPoint(sender: Account, receiver: Account, amount: Int) = runBlocking {
    sender.balance -= amount
    receiver.balance += amount
    Account.totalBalance.addAndGet(amount)
    println("[SEND] $amount")
}

fun main() {
    val sender = Account(100)
    val receiver = Account(0)
    val jobs = List(5) {
        launch {
            sendPoint(sender, receiver, 10)
        }
    }
    jobs.forEach { it.join() }
    println("[TOTAL] ${Account.totalBalance}")
}

하지만 @volatile은 atomic read, write는 보장하지만 atomicity of larger actions을 보장하지 않습니다. 즉, 변수 값 읽기/쓰기만 동기화합니다. 여러 스레드가 동시에 변수를 증가시키는 경우, 읽기/쓰기 사이에 다른 스레드의 변경 내용이 반영되지 않을 수 있습니다.

totalBalance 변수를 증가시키는 task이 여러 단계로 이루어져 있다면, 이 작업 전체가 원자적으로 수행되지 못할 수 있습니다. 만약 두 스레드가 동시에 point를 보내는 작업을 수행한다면, 다음과 같은 상황이 발생할 수 있습니다.

  1. 첫 번째 스레드가 totalBalance 변수의 값을 읽습니다.
  2. 두 번째 스레드가 totalBalance 변수의 값을 읽습니다.
  3. 첫 번째 스레드가 읽은 값에 1을 더하고, 더한 값을 totalBalance 변수에 씁니다.
  4. 두 번째 스레드가 읽은 값에 1을 더합니다.
  5. 두 번째 스레드가 더한 값을 totalBalance 변수에 씁니다.
fun sendPoint(sender: Account, receiver: Account, amount: Int) = runBlocking {
    sender.balance -= amount
    receiver.balance += amount

    // totalBalance 수정 (비원자적)
    val prevTotalBalance = Account.totalBalance.get()
    val newTotalBalance = prevTotalBalance + amount
    Account.totalBalance.compareAndSet(prevTotalBalance, newTotalBalance)

    println("[SEND] $amount")
}

@Volatile은 공유자원에 접근하여 write하는 스레드가 1개이고 공유자원을 read하는 스레드가 여러 개일 경우 유용합니다. 하지만 여러 스레드에서 동시에 접근하여 read/write 하는 공유자원은 @Volatile만으로는 안전하지 않습니다. 자원의 가시성을 확보하여 동기화에 유용하지만, 원자성 연산을 보장하지 않아 동시성 이슈를 해결에 충분하지 못합니다.

synchronized

한 번에 한 스레드만 접근할 수 있도록 하는 방법입니다. synchronized method or block을 스레드가 참조하는 동안 다른 스레드는 대기하게 됩니다.

1) synchronized method

메서드 전체가 동기화되어야 하거나 간단하고 직관적인 동기화 방법입니다. 하지만 상대적으로 성능 저하 가능성이 있습니다.

class Account(var balance: Int = 0) {
  companion object {
    @Volatile
    var totalBalance = 0

    @Synchronized
    fun sendPoint(sender: Account, receiver: Account, amount: Int) {
      // 송금 전에 송금자가 충분한 잔고를 가지고 있는지 확인
      if (sender.balance >= amount) {
        sender.balance -= amount
        receiver.balance += amount
        totalBalance += amount
        println("[SEND] $amount 포인트")
        receiver.receivePoint(amount)
      } else {
        println("[FAILED] 송금자가 잔고 부족")
      }
    }
  }

  @Synchronized
  fun receivePoint(amount: Int) {
    balance += amount
    println("[RECEIVE] $amount 포인트를 받았습니다.")
  }
}

fun main() {
  val sender = Account(100)
  val receiver = Account(0)
  val jobs = List(5) {
    launch {
      Account.sendPoint(sender, receiver, 10)
    }
  }
  jobs.forEach { it.join() }
  println("[TOTAL] ${Account.totalBalance}")
}

2) synchronized block

class Account(var balance: Int = 0) {
  companion object {
    @Volatile
    var totalBalance = 0

    fun sendPoint(sender: Account, receiver: Account, amount: Int) {
       // 송금 전에 송금자가 충분한 잔고를 가지고 있는지 확인
        if (sender.balance < amount) {
          println("[FAILED] 송금자가 잔고 부족")
        }
        synchronized(this) {
          sender.balance -= amount
          receiver.balance += amount
          totalBalance += amount
          println("[SEND] $amount 포인트")
          receiver.receivePoint(amount)
        }
      }
    }
  }

  fun receivePoint(amount: Int) {
    synchronized(this) {
      balance += amount
      println("[RECEIVE] $amount 포인트를 받았습니다.")
    }
  }
}

fun main() {
  val sender = Account(100)
  val receiver = Account(0)
  val jobs = List(5) {
    launch {
      Account.sendPoint(sender, receiver, 10)
    }
  }
  jobs.forEach { it.join() }
  println("[TOTAL] ${Account.totalBalance}")
}

synchronized는 임계 구역을 보호하는 데 유용합니다. 하지만 위 코드에서는 다음과 같은 문제가 발생할 수 있습니다.

  • synchronized 사용으로 인해 전체 Account 객체에 대한 lock이 걸려 성능 저하
  • totalBalance 업데이트 시에도 전체 객체 lock 필요 -> 불필요한 락으로 인한 동시성 제어 효율 저하

임계 구역 내에서 다른 스레드의 작업 완료를 기다리는 경우, 현재 스레드가 자신을 블록하여 CPU 자원을 낭비합니다. 그리고 다른 스레드가 임계 구역을 떠날 때까지 Unlock을 기다리는 스레드들이 CPU 자원을 소모하며 대기하게 됩니다. 스레드 경쟁이 심할수록 더 많은 스레드들이 블록 및 잠금 해제 대기를 하게 됩니다. 임계 구역 코드가 길다면 낭비 가능성이 더욱 높아지게 됩니다.

synchronized 키워드 방식은 병렬성에 취약하여 대규모 서비스에서 synchronized 키워드를 많이 사용할 경우 심각한 성능 저하가 생길 수 있습니다.

atomic 변수

원자적으로 연산을 수행할 수 있는 thread-safe 자료구조입니다. 여러 스레드가 동시에 접근하더라도 데이터 일관성을 유지할 수 있습니다. atomic 변수는 java.util.concurrent 패키지에 있으며, AtomicInteger, AtomicLong 등이 있습니다.

class Account(var balance: Int = 0) {
    companion object {
        val totalBalance = AtomicInteger(0)
    }
}

fun sendPoint(sender: Account, receiver: Account, amount: Int) = runBlocking {
    sender.balance -= amount
    receiver.balance += amount
    Account.totalBalance.addAndGet(amount)
    println("[SEND] $amount points")
}

fun main() {
    val sender = Account(100)
    val receiver = Account(0)
    val jobs = List(5) {
        launch {
            sendPoint(sender, receiver, 10)
        }
    }
    jobs.forEach { it.join() }
    println("[TOTAL] ${Account.totalBalance}")
}

atomic 변수는 다음과 같은 장점을 같습니다.

  • atomic 변수는 compare-and-swap(CAS) 연산을 사용하여 원자적으로 값을 변경합니다. CAS 연산을 통해 다른 스레드의 접근을 막지 않기 때문에 여러 스레드가 동시에 접근하더라도 데이터 일관성을 유지할 수 있습니다.
  • atomic 변수는 일반적인 변수와 달리 메모리에 한 번만 저장됩니다. 이는 여러 스레드가 동시에 접근할 때 메모리 접근 횟수를 줄일 수 있어 메모리 효율성을 높일 수 있습니다.
  • atomic 변수를 사용하면 여러 스레드가 동시에 접근할 때, context switching이 감소합니다. atomic 변수가 원자적으로 연산을 수행하기 때문에 스레드 간의 충돌을 방지하고,lock 사용을 줄임으로써 context switching이 최소화하여 시스템의 성능을 향상 시킬 수 있습니다.
    하지만 복잡한 경우에는 다른 방식을 사용해야 합니다.

thread-safe (concurrent) collections

여러 스레드가 동시에 접근할 수 있는 컬렉션입니다. 스레드 안전성을 보장하며, 동시성 작업을 효율적으로 수행할 수 있습니다.

class Account(private val initialBalance: Int) {
  private val balance = AtomicInteger(initialBalance)

  fun sendPoint(amount: Int, receiver: Account) {
    if (balance.addAndGet(-amount) < 0) {
      println("[FAILED] ${this.name}님의 포인트가 부족합니다.")
      return
    }

    receiver.receivePoint(amount)
    println("[SEND] ${this.name}님이 ${receiver.name}님에게 ${amount}포인트를 선물했습니다.")
  }

  fun receivePoint(amount: Int) {
    balance.addAndGet(amount)
    println("[RECEIVE] ${this.name}님이 ${amount}포인트를 받았습니다.")
  }
}

fun main() = runBlocking {
  // 캐시처럼 사용할 ConcurrentHashMap
  val users = ConcurrentHashMap<String, Account>()

  // 사용자 추가
  users["Karina"] = Account("Karina", 100)
  users["Giselle"] = Account("Giselle", 150)
  users["Ningning"] = Account("Ningning", 50)
  users["Winter"] = Account("Winter", 200)

  // 10개의 무작위 송금 작업 실행
  val jobs = List(10) {
    launch {
      val senderName = users.keys.random()
      val receiverName = users.keys.random()
      val amount = (1..10).random()

      // ConcurrentHashMap에 없는 사용자 확인
      val sender = users[senderName] ?: run {
        println("[FAILED] ${senderName}님은 존재하지 않는 사용자입니다.")
        return@launch
      }

      // 송금자 잔고 확인
      if (sender.balance.addAndGet(-amount) < 0) {
        println("[FAILED] ${senderName}님의 포인트가 부족합니다.")
        return@launch
      }

      // 수신자 정보 확인
      val receiver = users[receiverName] ?: run {
        println("[FAILED] ${receiverName}님은 존재하지 않는 사용자입니다.")
        return@launch
      }

      // 포인트 송금
      receiver.receivePoint(amount)
      println("[SEND] ${senderName}님이 ${receiverName}님에게 ${amount}포인트를 선물했습니다.")
    }
  }

  // 모든 작업 종료 기다림
  jobs.forEach { it.join() }

  // 최종 잔고 출력
  users.forEach { (name, account) -> println("[TOTAL] $name님: ${account.balance.get()}포인트") }
}

ConcurrentHashMap은 여러 사용자가 동시에 데이터에 접근해야 하는 경우 HashMap보다 ConcurrentHashMap을 사용하면 락을 사용하여 동시 접근을 제어하여 시스템의 안정적인 동작을 기대할 수 있습니다.
concurrent collection은 여러 사용자가 동시에 데이터를 읽고 쓰는 경우에 유용합니다. 그리고 여러 사용자가 동시에 티켓을 예약하는 경우 데이터 경쟁 가능성이 높은 경우에 concurrent collection을 활용합니다. 대표적으로 caching system이 있습니다. 다음은 ConcurrentHashMap을 사용하는 라이브러리입니다. explain Guava for cache, explain Caffeine for cache

semaphore

lock 이 아닌 semaphore 기반으로 여러 스레드가 공유 자원에 접근하는 것을 제어합니다. 한 번에 접근할 수 있는 스레드의 수를 제한함으로써 데이터 일관성 문제를 예방할 수 있습니다.

class Account(var balance: Int = 0)

fun sendPoint(sender: Account, receiver: Account, amount: Int, semaphore: Semaphore) = runBlocking {
  // 세마포어 획득
  semaphore.acquire()
  try {
    // 송금 전에 송금자가 충분한 잔고를 가지고 있는지 확인
    if (sender.balance >= amount) {
      sender.balance -= amount
      receiver.balance += amount
      println("[SEND] $amount 포인트")
    } else {
      println("[FAILED] 송금자가 잔고 부족")
    }
  } finally {
    // 세마포어 해제
    semaphore.release()
  }
}

fun main() = runBlocking {
  val sender = Account(100)
  val receiver = Account(0)
  val semaphore = Semaphore(1) // 이체 시 동시성을 1로 제한
  val jobs = List(5) {
    launch {
      sendPoint(sender, receiver, 10, semaphore)
    }
  }
  jobs.forEach { it.join() }
  println("[TOTAL] sender : ${sender.balance}")
  println("[TOTAL] receiver: ${receiver.balance}")
}

mutex

lock을 이용하여 한 번에 한 스레드만 접근할 수 있도록 합니다. 이를 통해 데이터 일관성 문제를 해결할 수 있습니다. 뮤텍스를 획득하고 해제할 경우 명시적으로 lock 획득/해제 호출이 필요합니다.

class Account(var balance: Int = 0)

fun sendPoint(sender: Account, receiver: Account, amount: Int, mutex: Mutex) = runBlocking {
    with(mutex) {
        // 잠금 획득
        lock()
        try {
            sender.balance -= amount
            receiver.balance += amount
            println("[SEND] $amount points")
        } finally {
            // 잠금 해제
            unlock()
        }
    }
}

fun main() = runBlocking {
    val sender = Account(100)
    val receiver = Account(0)
    val mutex = Mutex()
    val jobs = List(5) {
        launch {
            sendPoint(sender, receiver, 10, mutex)
        }
    }
    jobs.forEach { it.join() }
    println("[TOTAL] sender : ${sender.balance}")
    println("[TOTAL] receiver: ${receiver.balance}")
}

withlock() with ReentrantLock

lock을 이용하여 한 번에 한 스레드만 접근할 수 있도록 합니다. block 내 코드를 실행하기 때문에 자동으로 lock 획득/해제가 됩니다. 이에 따라 블럭 withlock() 외부에 있는 코드는 lock 해제까지 영향을 받게 됩니다. ReentrantLock을 사용해서 다시 재진입이 가능합니다.

fun sendPoint(sender: Account, receiver: Account, amount: Int, lock: ReentrantLock) = runBlocking {
  lock.withLock {
    // 송금 전에 송금자가 충분한 잔고를 가지고 있는지 확인
    if (sender.balance >= amount) {
      sender.balance -= amount
      receiver.balance += amount
      println("[SEND] $amount 포인트")
    } else {
      println("[FAILED] 송금자 잔고 부족")
    }
  }
}

fun main() = runBlocking {
  val sender = Account(100)
  val receiver = Account(0)
  val lock = ReentrantLock()
  val jobs = List(5) {
    launch {
      sendPoint(sender, receiver, 10, lock)
    }
  }
  jobs.forEach { it.join() }
  println("[TOTAL] sender : ${sender.balance}")
  println("[TOTAL] receiver: ${receiver.balance}")
}

actors

Actor는 독립적인 실행 단위로, 자신의 상태와 메시지를 처리하는 로직을 가지고 있습니다. Actor 간의 통신은 메시지를 통해 이루어지며, 메시지는 Actor의 mailbox에 저장됩니다.

스레드와는 달리, Actor는 상태를 가지지 않으며, 실행 중에 다른 Actor나 함수를 호출할 수 있습니다. 이로 인해 Actor 방식은 스레드 간의 경쟁 조건을 방지하고, 자원 관리를 간편하게 할 수 있습니다.

sealed class Message {
    data class SendPoint(val amount: Int) : Message()
}

class Account(var balance: Int = 0)

class AccountActor(val account: Account) : CoroutineScope {
    private val channel = Channel<Message>()
    override val coroutineContext = SupervisorJob() + Dispatchers.Default + channel

    fun send(message: Message) {
        launch {
            channel.send(message)
        }
    }

    fun receive(): Message = channel.receive()
}

fun main() = runBlocking {
    val sender = Account(100)
    val receiver = Account(0)
    val senderActor = AccountActor(sender)
    val receiverActor = AccountActor(receiver)

    val jobs = List(5) {
        launch {
            senderActor.send(Message.SendPoint(10))
            receiverActor.send(Message.SendPoint(-10))
        }
    }

    jobs.forEach { it.join() }

    println("[TOTAL] sender : ${sender.balance}")
    println("[TOTAL] receiver: ${receiver.balance}")
}

Message 클래스를 sealed 클래스로 정의하고, SendPoint 클래스를 그 하위 클래스로 정의합니다. sealed class를 사용하면 메시지의 타입이 명확하게 정의되기 때문에, 다른 스레드나 코루틴에서 잘못된 메시지를 전송하는 것을 방지할 수 있습니다. 이를 통해 안전한 동시성을 보장할 수 있습니다. 따라서 Message 클래스의 하위 클래스인 SendPoint, ReceivePoint 등을 정의하여 각각의 메시지 타입을 명확하게 구분할 수 있습니다.

AccountActor 클래스를 정의하여 Account 객체와 통신할 수 있는 채널을 생성합니다. sendPoint() 메소드는 채널을 통해 amount 값을 전송하고, receivePoint() 메소드는 채널로부터 값을 수신합니다. main 함수에서는 AccountActor 객체를 생성하고, sendPoint() 메소드를 호출하여 sender와 receiver의 balance를 수정합니다. 모든 작업이 완료된 후에는 balance 값을 출력합니다.

결론

다음 상황에서 동기화 방식을 추천합니다.

  1. 일반적으로 synchronized나 ReentrantLock을 추천합니다.
  2. 특정 변수값 변경 시 모든 스레드에 동기화가 필요한 간단한 경우에는 volatile, atomic 변수을 사용하는 것을 추천합니다. 예를 들면 10번 출석체크를 했는지 횟수를 카운팅하는 시스템에서 count 변수 증가를 할 때 적절합니다.
  3. 여러 스레드가 동시에 컬렉션에 접근해야 하는 경우 concurrent 컬렉션을 이용하여 동기화를 추천합니다. 예를 들면 온라인 쇼핑몰에서 여러 고객이 동시에 제품 목록을 조회하고 장바구니에 담는 시스템에서 productList 컬렉션 관리할 때 적절합니다.
  4. 특정 메서드 실행 중 다른 스레드의 접근을 차단해야 하는 경우에는 synchronized method 동기화를 추천합니다. 예시로는 파일 시스템에서 파일을 읽고 쓰는 메서드 동기화가 있습니다.
  5. 특정 코드 블록만 동기화해야 하는 경우에는 synchronized block 동기화를 추천합니다. 예시로 여러 스레드가 동시에 데이터베이스에 접근해야 하는 경우, 접근 코드만 동기화가 있습니다.