#!/usr/bin/env vpython3
# Copyright 2020 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Unit tests for lockfile.py"""

import logging
import os
import shutil
import sys
import tempfile
import threading
import unittest

if sys.version_info.major == 2:
  import mock
  import Queue
else:
  from unittest import mock
  import queue as Queue

DEPOT_TOOLS_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, DEPOT_TOOLS_ROOT)

from testing_support import coverage_utils

import lockfile


class LockTest(unittest.TestCase):
  def setUp(self):
    self.cache_dir = tempfile.mkdtemp(prefix='lockfile')
    self.addCleanup(shutil.rmtree, self.cache_dir, ignore_errors=True)

  def testLock(self):
    with lockfile.lock(self.cache_dir):
      # cached dir locked, attempt to lock it again
      with self.assertRaises(lockfile.LockError):
        with lockfile.lock(self.cache_dir):
          pass

    with lockfile.lock(self.cache_dir):
      pass

  @mock.patch('time.sleep')
  def testLockConcurrent(self, sleep_mock):
    '''testLockConcurrent simulates what happens when two separate processes try
    to acquire the same file lock with timeout.'''
    # Queues q_f1 and q_sleep are used to controll execution of individual
    # threads.
    q_f1 = Queue.Queue()
    q_sleep = Queue.Queue()
    results = Queue.Queue()

    def side_effect(arg):
      '''side_effect is called when with l.lock is blocked. In this unit test
      case, it comes from f2.'''
      logging.debug('sleep: started')
      q_sleep.put(True)
      logging.debug('sleep: waiting for q_sleep to be consumed')
      q_sleep.join()
      logging.debug('sleep: waiting for result before exiting')
      results.get(timeout=1)
      logging.debug('sleep: exiting')

    sleep_mock.side_effect = side_effect

    def f1():
      '''f1 enters first in l.lock (controlled via q_f1). It then waits for
      side_effect to put a message in queue q_sleep.'''
      logging.debug('f1 started, locking')

      with lockfile.lock(self.cache_dir, timeout=1):
        logging.debug('f1: locked')
        q_f1.put(True)
        logging.debug('f1: waiting on q_f1 to be consumed')
        q_f1.join()
        logging.debug('f1: done waiting on q_f1, getting q_sleep')
        q_sleep.get(timeout=1)
        results.put(True)

      logging.debug('f1: lock released')
      q_sleep.task_done()
      logging.debug('f1: exiting')

    def f2():
      '''f2 enters second in l.lock (controlled by q_f1).'''
      logging.debug('f2: started, consuming q_f1')
      q_f1.get(timeout=1)  # wait for f1 to execute lock
      q_f1.task_done()
      logging.debug('f2: done waiting for q_f1, locking')

      with lockfile.lock(self.cache_dir, timeout=1):
        logging.debug('f2: locked')
        results.put(True)

    t1 = threading.Thread(target=f1)
    t1.start()
    t2 = threading.Thread(target=f2)
    t2.start()
    t1.join()
    t2.join()

    # One result was consumed by side_effect, we expect only one in the queue.
    self.assertEqual(1, results.qsize())
    sleep_mock.assert_called_once_with(1)


if __name__ == '__main__':
  logging.basicConfig(
      level=logging.DEBUG if '-v' in sys.argv else logging.ERROR)
  sys.exit(
      coverage_utils.covered_main(
          (os.path.join(DEPOT_TOOLS_ROOT, 'git_cache.py')),
          required_percentage=0))