高级编程9,ConcurrentBag的实现原理

4. ConcurrentBag 如何实现迭代器模式

看完上面的代码后,我很好奇ConcurrentBag<T>是如何实现IEnumerator来实现迭代访问的,因为ConcurrentBag<T>是通过分散在不同线程中的ThreadLocalList来存储数据的,那么在实现迭代器模式时,过程会比较复杂。

后面再查看了源码之后,发现ConcurrentBag<T>为了实现迭代器模式,将分在不同线程中的数据全都存到一个List<T>集合中,然后返回了该副本的迭代器。所以每次访问迭代器,它都会新建一个List<T>的副本,这样虽然浪费了一定的存储空间,但是逻辑上更加简单了。

/// <summary>
/// 本地帮助器方法释放所有本地列表锁
/// </summary>
private void ReleaseAllLocks()
{
    // 该方法用于在执行线程同步以后 释放掉所有本地锁
    // 通过遍历每个线程中存储的 ThreadLocalList对象 释放所占用的锁
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {

        if (currentList.m_lockTaken)
        {
            currentList.m_lockTaken = false;
            Monitor.Exit(currentList);
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 从冻结状态解冻包的本地帮助器方法
/// </summary>
/// <param name="lockTaken">The lock taken result from the Freeze method</param>
private void UnfreezeBag(bool lockTaken)
{
    // 首先释放掉 每个线程中 本地变量的锁
    // 然后释放全局锁
    ReleaseAllLocks();
    m_needSync = false;
    if (lockTaken)
    {
        Monitor.Exit(GlobalListsLock);
    }
}

/// <summary>
/// 本地帮助器函数等待所有未同步的操作
/// </summary>
private void WaitAllOperations()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    ThreadLocalList currentList = m_headList;
    // 自旋等待 等待其它操作完成
    while (currentList != null)
    {
        if (currentList.m_currentOp != (int)ListOperation.None)
        {
            SpinWait spinner = new SpinWait();
            // 有其它线程进行操作时,会将cuurentOp 设置成 正在操作的枚举
            while (currentList.m_currentOp != (int)ListOperation.None)
            {
                spinner.SpinOnce();
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// 本地帮助器方法获取所有本地列表锁
/// </summary>
private void AcquireAllLocks()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    bool lockTaken = false;
    ThreadLocalList currentList = m_headList;

    // 遍历每个线程的ThreadLocalList 然后获取对应ThreadLocalList的锁
    while (currentList != null)
    {
        // 尝试/最后 bllock 以避免在获取锁和设置所采取的标志之间的线程港口
        try
        {
            Monitor.Enter(currentList, ref lockTaken);
        }
        finally
        {
            if (lockTaken)
            {
                currentList.m_lockTaken = true;
                lockTaken = false;
            }
        }
        currentList = currentList.m_nextList;
    }
}

/// <summary>
/// Local helper method to freeze all bag operations, it
/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added
/// to the dictionary
/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations
/// 3- Wait for all un-synchronized operations to be done
/// </summary>
/// <param name="lockTaken">Retrieve the lock taken result for the global lock, to be passed to Unfreeze method</param>
private void FreezeBag(ref bool lockTaken)
{
    Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

    // 全局锁定可安全地防止多线程调用计数和损坏 m_needSync
    Monitor.Enter(GlobalListsLock, ref lockTaken);

    // 这将强制同步任何将来的添加/执行操作
    m_needSync = true;

    // 获取所有列表的锁
    AcquireAllLocks();

    // 等待所有操作完成
    WaitAllOperations();
}

/// <summary>
/// 本地帮助器函数返回列表中的包项, 这主要由 CopyTo 和 ToArray 使用。
/// 这不是线程安全, 应该被称为冻结/解冻袋块
/// 本方法是私有的 只有使用 Freeze/UnFreeze之后才是安全的 
/// </summary>
/// <returns>List the contains the bag items</returns>
private List<T> ToList()
{
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));
    // 创建一个新的List
    List<T> list = new List<T>();
    ThreadLocalList currentList = m_headList;
    // 遍历每个线程中的ThreadLocalList 将里面的Node的数据 添加到list中
    while (currentList != null)
    {
        Node currentNode = currentList.m_head;
        while (currentNode != null)
        {
            list.Add(currentNode.m_value);
            currentNode = currentNode.m_next;
        }
        currentList = currentList.m_nextList;
    }

    return list;
}

/// <summary>
/// Returns an enumerator that iterates through the <see
/// cref="ConcurrentBag{T}"/>.
/// </summary>
/// <returns>An enumerator for the contents of the <see
/// cref="ConcurrentBag{T}"/>.</returns>
/// <remarks>
/// The enumeration represents a moment-in-time snapshot of the contents
/// of the bag.  It does not reflect any updates to the collection after 
/// <see cref="GetEnumerator"/> was called.  The enumerator is safe to use
/// concurrently with reads from and writes to the bag.
/// </remarks>
public IEnumerator<T> GetEnumerator()
{
    // Short path if the bag is empty
    if (m_headList == null)
        return new List<T>().GetEnumerator(); // empty list

    bool lockTaken = false;
    try
    {
        // 首先冻结整个 ConcurrentBag集合
        FreezeBag(ref lockTaken);
        // 然后ToList 再拿到 List的 IEnumerator
        return ToList().GetEnumerator();
    }
    finally
    {
        UnfreezeBag(lockTaken);
    }
}

由上面的代码可知道,为了获取迭代器对象,总共进行了三步主要的操作。

  1. 使用FreezeBag()方法,冻结整个ConcurrentBag<T>集合。因为需要生成集合的List<T>副本,生成副本期间不能有其它线程更改损坏数据。
  2. ConcurrrentBag<T>生成List<T>副本。因为ConcurrentBag<T>存储数据的方式比较特殊,直接实现迭代器模式困难,考虑到线程安全和逻辑,最佳的办法是生成一个副本。
  3. 完成以上操作以后,就可以使用UnfreezeBag()方法解冻整个集合。

那么FreezeBag()方法是如何来冻结整个集合的呢?也是分为三步走。

  1. 首先获取全局锁,通过Monitor.Enter(GlobalListsLock, ref lockTaken);这样一条语句,这样其它线程就不能冻结集合。
  2. 然后获取所有线程中ThreadLocalList的锁,通过`AcquireAllLocks()方法来遍历获取。这样其它线程就不能对它进行操作损坏数据。
  3. 等待已经进入了操作流程线程结束,通过WaitAllOperations()方法来实现,该方法会遍历每一个ThreadLocalList对象的m_currentOp属性,确保全部处于None操作。

完成以上流程后,那么就是真正的冻结了整个ConcurrentBag<T>集合,要解冻的话也类似。在此不再赘述。

HashMap

图解HashMap(一)/)

图解HashMap(二)/)

HashMap实际上是一个“链表散列”的数据结构,即数组和链表的结合体

图片 1

image.png

简单地说,HashMap 在底层将 key-value 当成一个整体进行处理,这个整体就是一个 Entry 对象。HashMap 底层采用一个 Entry[] 数组来保存所有的 key-value 对,当需要存储一个 Entry 对象时,会根据 hash 算法来决定其在数组中的存储位置,在根据 equals 方法决定其在该数组位置上的链表中的存储位置;当需要取出一个Entry 时,也会根据 hash 算法找到其在数组中的存储位置,再根据 equals 方法从该位置上的链表中取出该Entry。

扩容:当 HashMap 中的元素个数超过数组大小 loadFactor时,就会进行数组扩容,loadFactor的默认值为 0.75,这是一个折中的取值。也就是说,默认情况下,数组大小为 16,那么当 HashMap 中元素个数超过 160.75=12 的时候,就把数组的大小扩展为 2*16=32,即扩大一倍,然后重新计算每个元素在数组中的位置,而这是一个非常消耗性能的操作,所以如果我们已经预知 HashMap 中元素的个数,那么预设元素的个数能够有效的提高 HashMap 的性能。

Fail-Fast 机制:HashMap 不是线程安全的,因此如果在使用迭代器的过程中有其他线程修改了 map,那么将抛出 ConcurrentModificationException,这就是所谓 fail-fast 策略。
例如:当某一个线程 A 通过 iterator去遍历某集合的过程中,若该集合的内容被其他线程所改变了;那么线程 A 访问集合时,就会抛出ConcurrentModificationException 异常,产生 fail-fast 事件。
实现原理:判断 modCount 跟 expectedModCount 是否相等,如果不相等就表示已经有其他线程修改了 Map。
解决方案:建议使用“java.util.concurrent 包下的类”去取代“java.util 包下的类”

遍历方式:
第一种:

   Map map = new HashMap();
  Iterator iter = map.entrySet().iterator();
  while (iter.hasNext()) {
  Map.Entry entry = (Map.Entry) iter.next();
  Object key = entry.getKey();
  Object val = entry.getValue();
    }

效率高,以后一定要使用此种方式
第二种:

Map map = new HashMap();
  Iterator iter = map.keySet().iterator();
  while (iter.hasNext()) {
  Object key = iter.next();
  Object val = map.get(key);
  }

效率低,以后尽量少使用!

只读集合

创建集合后,它们是只读的。

3.队列

代表了一个先进先出的对象集合。当您需要对各项进行先进先出的访问时,则使用队列。当您在列表中添加一项,称为入队,当您从列表中移除一项时,称为出队

添加队列元素时加上lock,因为多线程可以同时访问,所以对队列进行锁定访问。

图片 2

 

图片 3图片 4

using System;
using System.Collections;

namespace CollectionsApplication
{
   class Program
   {
      static void Main(string[] args)
      {
         Queue q = new Queue();

         q.Enqueue('A');
         q.Enqueue('M');
         q.Enqueue('G');
         q.Enqueue('W');

         Console.WriteLine("Current queue: ");
         foreach (char c in q)
            Console.Write(c + " ");
         Console.WriteLine();
         q.Enqueue('V');
         q.Enqueue('H');
         Console.WriteLine("Current queue: ");         
         foreach (char c in q)
            Console.Write(c + " ");
         Console.WriteLine();
         Console.WriteLine("Removing some values ");
         char ch = (char)q.Dequeue();
         Console.WriteLine("The removed value: {0}", ch);
         ch = (char)q.Dequeue();
         Console.WriteLine("The removed value: {0}", ch);
         Console.ReadKey();
      }
   }
}

View Code

当上面的代码被编译和执行时,它会产生下列结果:

Current queue: 
A M G W 
Current queue: 
A M G W V H 
Removing values
The removed value: A
The removed value: M

图片 5图片 6

class Program
    {
        static void Main()
        {
            var dm = new DocumentManager();

            ProcessDocuments.Start(dm);

            // Create documents and add them to the DocumentManager
            for (int i = 0; i < 1000; i++)
            {
                Document doc = new Document("Doc " + i.ToString(), "content");
                dm.AddDocument(doc);
                Console.WriteLine("Added document {0}", doc.Title);
                Thread.Sleep(new Random().Next(20));
            }

        }
    }

Program

图片 7图片 8

public class ProcessDocuments
  {
    public static void Start(DocumentManager dm)
    {
      Task.Factory.StartNew(new ProcessDocuments(dm).Run);
    }

    protected ProcessDocuments(DocumentManager dm)
    {
      if (dm == null)
        throw new ArgumentNullException("dm");
      documentManager = dm;
    }

    private DocumentManager documentManager;

    protected void Run()
    {
      while (true)
      {
        if (documentManager.IsDocumentAvailable)
        {
          Document doc = documentManager.GetDocument();
          Console.WriteLine("Processing document {0}", doc.Title);
        }
        Thread.Sleep(new Random().Next(20));
      }
    }
  }

ProcessDocuments

图片 9图片 10

public class DocumentManager
  {
    private readonly Queue<Document> documentQueue = new Queue<Document>();

    public void AddDocument(Document doc)
    {
      lock (this)
      {
        documentQueue.Enqueue(doc);
      }
    }

    public Document GetDocument()
    {
      Document doc = null;
      lock (this)
      {
        doc = documentQueue.Dequeue();
      }
      return doc;
    }

    public bool IsDocumentAvailable
    {
      get
      {
        return documentQueue.Count > 0;
      }
    }
  }

DocumentManager

图片 11图片 12

public class Document
  {
    public string Title { get; private set; }
    public string Content { get; private set; }

    public Document(string title, string content)
    {
      this.Title = title;
      this.Content = content;
    }
  }

Document

4.栈

代表了一个后进先出的对象集合。当您需要对各项进行后进先出的访问时,则使用堆栈。当您在列表中添加一项,称为推入元素,当您从列表中移除一项时,称为弹出元素。

图片 13

图片 14图片 15

class Program
    {
        static void Main()
        {
            var alphabet = new Stack<char>();
            alphabet.Push('A');
            alphabet.Push('B');
            alphabet.Push('C');

            Console.Write("First iteration: ");
            foreach (char item in alphabet)
            {
                Console.Write(item);
            }
            Console.WriteLine();

            Console.Write("Second iteration: ");
            while (alphabet.Count > 0)
            {
                Console.Write(alphabet.Pop());
            }
            Console.WriteLine();


        }
    }

Program

First iteration: CBA
Second iteration: CBA

5.链表

LinkedList<T>是一个双向链表,其元素指向它前面和后面的元素,这样通过移动下一个元素就可以正向遍历整个链表。通过移动到前一个元素可以反向遍历这个链表

链表的优点是,如果将元素插入列表的中间位置,使用链表会很快,在插入一个元素时,只需要修改上一个元素的Next引用和下一个元素的Previous引用,使他们引用所插入的元素。

图片 16图片 17

public class Document
  {
    public string Title { get; private set; }
    public string Content { get; private set; }
    public byte Priority { get; private set; }

    public Document(string title, string content, byte priority)
    {
      this.Title = title;
      this.Content = content;
      this.Priority = priority;
    }
  }

Document

图片 18图片 19

public class PriorityDocumentManager
  {
    private readonly LinkedList<Document> documentList;

    // priorities 0.9
    private readonly List<LinkedListNode<Document>> priorityNodes;

    public PriorityDocumentManager()
    {
      documentList = new LinkedList<Document>();

      priorityNodes = new List<LinkedListNode<Document>>(10);
      for (int i = 0; i < 10; i++)
      {
        priorityNodes.Add(new LinkedListNode<Document>(null));
      }
    }

    public void AddDocument(Document d)
    {
      Contract.Requires<ArgumentNullException>(d != null, "argument d must not be null");
      //  if (d == null) throw new ArgumentNullException("d");

      AddDocumentToPriorityNode(d, d.Priority);
    }

    private void AddDocumentToPriorityNode(Document doc, int priority)
    {
      Contract.Requires<ArgumentException>(priority >= 0 && priority < 10, "priority value must be between 0 and 9");
      //if (priority > 9 || priority < 0)
      //    throw new ArgumentException("Priority must be between 0 and 9");

      if (priorityNodes[priority].Value == null)
      {
        --priority;
        if (priority >= 0)
        {
          // check for the next lower priority
          AddDocumentToPriorityNode(doc, priority);
        }
        else // now no priority node exists with the same priority or lower
        // add the new document to the end
        {
          documentList.AddLast(doc);
          priorityNodes[doc.Priority] = documentList.Last;
        }
        return;
      }
      else // a priority node exists
      {
        LinkedListNode<Document> prioNode = priorityNodes[priority];
        if (priority == doc.Priority)
        // priority node with the same priority exists
        {
          documentList.AddAfter(prioNode, doc);

          // set the priority node to the last document with the same priority
          priorityNodes[doc.Priority] = prioNode.Next;
        }
        else // only priority node with a lower priority exists
        {
          // get the first node of the lower priority
          LinkedListNode<Document> firstPrioNode = prioNode;

          while (firstPrioNode.Previous != null &&
             firstPrioNode.Previous.Value.Priority == prioNode.Value.Priority)
          {
            firstPrioNode = prioNode.Previous;
            prioNode = firstPrioNode;
          }

          documentList.AddBefore(firstPrioNode, doc);

          // set the priority node to the new value
          priorityNodes[doc.Priority] = firstPrioNode.Previous;
        }
      }
    }

    public void DisplayAllNodes()
    {
      foreach (Document doc in documentList)
      {
        Console.WriteLine("priority: {0}, title {1}", doc.Priority, doc.Title);
      }
    }

    // returns the document with the highest priority
    // (that's first in the linked list)
    public Document GetDocument()
    {
      Document doc = documentList.First.Value;
      documentList.RemoveFirst();
      return doc;
    }

  }

PriorityDocumentManager

图片 20图片 21

 class Program
  {
    static void Main()
    {
        PriorityDocumentManager pdm = new PriorityDocumentManager();
        pdm.AddDocument(new Document("one", "Sample", 8));
        pdm.AddDocument(new Document("two", "Sample", 3));
        pdm.AddDocument(new Document("three", "Sample", 4));
        pdm.AddDocument(new Document("four", "Sample", 8));
        pdm.AddDocument(new Document("five", "Sample", 1));
        pdm.AddDocument(new Document("six", "Sample", 9));
        pdm.AddDocument(new Document("seven", "Sample", 1));
        pdm.AddDocument(new Document("eight", "Sample", 1));

        pdm.DisplayAllNodes();

    }
  }

Program

 

6.有序列表

SortedList基于键对集合进行排序.

class Program
  {
    static void Main()
    {
      var books = new SortedList<string, string>();
      books.Add("sty", "");
      books.Add("abc", "");
      books.Add("123", "");
      foreach (var item in books.Keys)
      {
          Console.WriteLine(item);
      }

    }
  }

123
abc
sty

 

7.字典

字典:用于在名称/值对中存储信息,字典的名称即键不能重复.

图片 22

HashTable和Dictionary

1.HashTable大数据量插入数据时需要花费比Dictionary大的多的时间。

2.for方式遍历HashTable和Dictionary速度最快。

3.在foreach方式遍历时Dictionary遍历速度更快。

4.HashTable在取值时需要进行类型转换,Dictionary不用做类型转换。

在单线程的时候使用Dictionary更好一些,多线程的时候使用HashTable更好。

 

有序字典SortedList和SortedDictionary

SortedDictionary 泛型类是检索运算复杂度为 O(log n) 的二叉搜索树,其中 n 是字典中的元素数。就这一点而言,它与 SortedList 泛型类相似。这两个类具有相似的对象模型,并且都具有 O(log n) 的检索运算复杂度。这两个类的区别在于内存的使用以及插入和移除元素的速度:

  • SortedList 使用的内存比 SortedDictionary 少。

  • SortedDictionary 可对未排序的数据执行更快的插入和移除操作:它的时间复杂度为 O(log n),而SortedList 为 O(n)。

  • 如果使用排序数据一次性填充列表,则 SortedList 比 SortedDictionary 快。

8.集

包含不重复元素的集合,叫“集”。.NET包含2个集。HashSet<T>和SortedSet<T>,它们继承ISet;SortedSet是一个有序集.

ISet提供了Add方法,如果HashSet中存在这个元素,再次使用Add方法不会抛出异常,返回bool值是否添加

var companyTeams = new HashSet<string>() { "Ferrari", "McLaren", "Mercedes" };
var traditionalTeams = new HashSet<string>() { "Ferrari", "McLaren" };
var privateTeams = new HashSet<string>() { "Red Bull", "Lotus", "Toro Rosso", "Force India", "Sauber" };

if (privateTeams.Add("Williams"))
    Console.WriteLine("Williams added");
if (!companyTeams.Add("McLaren"))
    Console.WriteLine("McLaren was already in this set");

IsSubsetOf方法判断了traditionalTeams集合是否companyTeams的子集
IsSupersetOf方法判断了companyTeams集合是否traditionalTeams的超集(包含它拥有的所有元素,并且多余它的元素)

var companyTeams = new HashSet<string>() { "Ferrari", "McLaren", "Mercedes" };
var traditionalTeams = new HashSet<string>() { "Ferrari", "McLaren" };
var privateTeams = new HashSet<string>() { "Red Bull", "Lotus", "Toro Rosso", "Force India", "Sauber" };

if (traditionalTeams.IsSubsetOf(companyTeams))
{
  Console.WriteLine("traditionalTeams is subset of companyTeams");
}

if (companyTeams.IsSupersetOf(traditionalTeams))
{
   Console.WriteLine("companyTeams is a superset of traditionalTeams");
}

SortedSet的UnionWith方法可以修改这个集合,并且包含传入的集合

var allTeams = new SortedSet<string>(companyTeams);
allTeams.UnionWith(privateTeams);
allTeams.UnionWith(traditionalTeams);

 

9.可视察的集合

如果需要记录集合何时添加和删除元素的信息,可以使用ObservableCollection<T>,这个本身是为WPF定制的。

ObservableCollection<T>类用于创建自定义集合,在内部使用List<T>类,重写虚方法RemoveItem和SetItem()方法触发CollectionChanged事件。

图片 23图片 24

class Program
  {
    static void Main()
    {
      var data = new ObservableCollection<string>();
      data.CollectionChanged += Data_CollectionChanged;
      data.Add("One");
      data.Add("Two");
      data.Insert(1, "Three");
      data.Remove("One");

    }

    static void Data_CollectionChanged(object sender, System.Collections.Specialized.NotifyCollectionChangedEventArgs e)
    {
      Console.WriteLine("action: {0}", e.Action.ToString());

      if (e.OldItems != null)
      {
        Console.WriteLine("starting index for old item(s): {0}", e.OldStartingIndex);
        Console.WriteLine("old item(s):");
        foreach (var item in e.OldItems)
        {
          Console.WriteLine(item);
        }
      }
      if (e.NewItems != null)
      {
        Console.WriteLine("starting index for new item(s): {0}", e.NewStartingIndex);
        Console.WriteLine("new item(s): ");
        foreach (var item in e.NewItems)
        {
          Console.WriteLine(item);
        }
      }


      Console.WriteLine();

    }
  }

View Code

Data_CollectionChanged方法接收了NotifyCollectionChangedEventArgs,包含了集合的变化信息,Action属性给出了是否添加或删除一项的信息,对于删除的项,会设置OldItems属性,列出删除的项

对于添加的项,会设置NewItems属性,列出添加的项。

action: Add
starting index for new item(s): 0
new item(s):
One

action: Add
starting index for new item(s): 1
new item(s):
Two

action: Add
starting index for new item(s): 1
new item(s):
Three

action: Remove
starting index for old item(s): 0
old item(s):
One

 

10.位数组

2. 用于数据存储的TrehadLocalList类

接下来我们来看一下ThreadLocalList类的构造,该类就是实际存储了数据的位置。实际上它是使用双向链表这种结构进行数据存储。

[Serializable]
// 构造了双向链表的节点
internal class Node
{
    public Node(T value)
    {
        m_value = value;
    }
    public readonly T m_value;
    public Node m_next;
    public Node m_prev;
}

/// <summary>
/// 集合操作类型
/// </summary>
internal enum ListOperation
{
    None,
    Add,
    Take
};

/// <summary>
/// 线程锁定的类
/// </summary>
internal class ThreadLocalList
{
    // 双向链表的头结点 如果为null那么表示链表为空
    internal volatile Node m_head;

    // 双向链表的尾节点
    private volatile Node m_tail;

    // 定义当前对List进行操作的种类 
    // 与前面的 ListOperation 相对应
    internal volatile int m_currentOp;

    // 这个列表元素的计数
    private int m_count;

    // The stealing count
    // 这个不是特别理解 好像是在本地列表中 删除某个Node 以后的计数
    internal int m_stealCount;

    // 下一个列表 可能会在其它线程中
    internal volatile ThreadLocalList m_nextList;

    // 设定锁定是否已进行
    internal bool m_lockTaken;

    // The owner thread for this list
    internal Thread m_ownerThread;

    // 列表的版本,只有当列表从空变为非空统计是底层
    internal volatile int m_version;

    /// <summary>
    /// ThreadLocalList 构造器
    /// </summary>
    /// <param name="ownerThread">拥有这个集合的线程</param>
    internal ThreadLocalList(Thread ownerThread)
    {
        m_ownerThread = ownerThread;
    }
    /// <summary>
    /// 添加一个新的item到链表首部
    /// </summary>
    /// <param name="item">The item to add.</param>
    /// <param name="updateCount">是否更新计数.</param>
    internal void Add(T item, bool updateCount)
    {
        checked
        {
            m_count++;
        }
        Node node = new Node(item);
        if (m_head == null)
        {
            Debug.Assert(m_tail == null);
            m_head = node;
            m_tail = node;
            m_version++; // 因为进行初始化了,所以将空状态改为非空状态
        }
        else
        {
            // 使用头插法 将新的元素插入链表
            node.m_next = m_head;
            m_head.m_prev = node;
            m_head = node;
        }
        if (updateCount) // 更新计数以避免此添加同步时溢出
        {
            m_count = m_count - m_stealCount;
            m_stealCount = 0;
        }
    }

    /// <summary>
    /// 从列表的头部删除一个item
    /// </summary>
    /// <param name="result">The removed item</param>
    internal void Remove(out T result)
    {
        // 双向链表删除头结点数据的流程
        Debug.Assert(m_head != null);
        Node head = m_head;
        m_head = m_head.m_next;
        if (m_head != null)
        {
            m_head.m_prev = null;
        }
        else
        {
            m_tail = null;
        }
        m_count--;
        result = head.m_value;

    }

    /// <summary>
    /// 返回列表头部的元素
    /// </summary>
    /// <param name="result">the peeked item</param>
    /// <returns>True if succeeded, false otherwise</returns>
    internal bool Peek(out T result)
    {
        Node head = m_head;
        if (head != null)
        {
            result = head.m_value;
            return true;
        }
        result = default(T);
        return false;
    }

    /// <summary>
    /// 从列表的尾部获取一个item
    /// </summary>
    /// <param name="result">the removed item</param>
    /// <param name="remove">remove or peek flag</param>
    internal void Steal(out T result, bool remove)
    {
        Node tail = m_tail;
        Debug.Assert(tail != null);
        if (remove) // Take operation
        {
            m_tail = m_tail.m_prev;
            if (m_tail != null)
            {
                m_tail.m_next = null;
            }
            else
            {
                m_head = null;
            }
            // Increment the steal count
            m_stealCount++;
        }
        result = tail.m_value;
    }


    /// <summary>
    /// 获取总计列表计数, 它不是线程安全的, 如果同时调用它, 则可能提供不正确的计数
    /// </summary>
    internal int Count
    {
        get
        {
            return m_count - m_stealCount;
        }
    }
}

从上面的代码中我们可以更加验证之前的观点,就是ConcurentBag<T>在一个线程中存储数据时,使用的是双向链表ThreadLocalList实现了一组对链表增删改查的方法。

集合类的实现原理

BitArray类的方法和属性

下表列出了一些BitArray类的常用属性:

属性 描述
Count 获取包含在BitArray元素的数量
IsReadOnly 获取一个值,指示BitArray是否是只读
Item 获取或设置在所述BitArray的特定位置的比特的值
Length 获取或设置在BitArray元素的数量

下表列出了一些BitArray类的常用方法:

 

S.N 方法名称及用途
1 public BitArray And( BitArray value ); 
执行对指定BitArray的相应元素在当前BitArray元素的按位与运算
2 public bool Get( int index ); 
获取在所述BitArray的特定位置的比特的值
3 public BitArray Not();
反转当前BitArray所有的位值,使设置为true的元素被更改为false,并设置为false元素更改为true
4 public BitArray Or( BitArray value ); 
在执行对指定BitArray的相应元素在当前BitArray的元素的按位或操作
5 public void Set( int index, bool value ); 
设置在所述BitArray为指定值的特定位置的比特值
6 public void SetAll( bool value ); 
设置在BitArray所有位设置为指定值
7 public BitArray Xor( BitArray value ); 
执行关于对在指定BitArray的相应元素中的当前BitArray的元素按位异或运算

当需要存储位,但不知道事先比特数就使用它。您可以通过使用一个整数索引,它从零开始访问BitArray集合中的项。

图片 25图片 26

using System;
using System.Collections;

namespace CollectionsApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            //creating two  bit arrays of size 8
            BitArray ba1 = new BitArray(8);
            BitArray ba2 = new BitArray(8);
            byte[] a = { 60 };
            byte[] b = { 13 };

            //storing the values 60, and 13 into the bit arrays
            ba1 = new BitArray(a);
            ba2 = new BitArray(b);

            //content of ba1
            Console.WriteLine("Bit array ba1: 60");
            for (int i = 0; i < ba1.Count; i++)
            {
                Console.Write("{0, -6} ", ba1[i]);
            }
            Console.WriteLine();

            //content of ba2
            Console.WriteLine("Bit array ba2: 13");
            for (int i = 0; i < ba2.Count; i++)
            {
                Console.Write("{0, -6} ", ba2[i]);
            }
            Console.WriteLine();


            BitArray ba3 = new BitArray(8);
            ba3 = ba1.And(ba2);

            //content of ba3
            Console.WriteLine("Bit array ba3 after AND operation: 12");
            for (int i = 0; i < ba3.Count; i++)
            {
                Console.Write("{0, -6} ", ba3[i]);
            }
            Console.WriteLine();

            ba3 = ba1.Or(ba2);
            //content of ba3
            Console.WriteLine("Bit array ba3 after OR operation: 61");
            for (int i = 0; i < ba3.Count; i++)
            {
                Console.Write("{0, -6} ", ba3[i]);
            }
            Console.WriteLine();

            Console.ReadKey();
        }
    }
}

View Code

让我们编译和运行上面的程序,这将产生以下结果:

Bit array ba1: 60 
False False True True True True False False 
Bit array ba2: 13
True False True True False False False False 
Bit array ba3 after AND operation: 12
False False True True False False False False 
Bit array ba3 after OR operation: 61
True False True True False False False False 

3. ConcurrentBag实现新增元素

接下来我们看一看ConcurentBag<T>是如何新增元素的。

/// <summary>
/// 尝试获取无主列表,无主列表是指线程已经被暂停或者终止,但是集合中的部分数据还存储在那里
/// 这是避免内存泄漏的方法
/// </summary>
/// <returns></returns>
private ThreadLocalList GetUnownedList()
{
    //此时必须持有全局锁
    Contract.Assert(Monitor.IsEntered(GlobalListsLock));

    // 从头线程列表开始枚举 找到那些已经被关闭的线程
    // 将它所在的列表对象 返回
    ThreadLocalList currentList = m_headList;
    while (currentList != null)
    {
        if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)
        {
            currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe
            return currentList;
        }
        currentList = currentList.m_nextList;
    }
    return null;
}
/// <summary>
/// 本地帮助方法,通过线程对象检索线程线程本地列表
/// </summary>
/// <param name="forceCreate">如果列表不存在,那么创建新列表</param>
/// <returns>The local list object</returns>
private ThreadLocalList GetThreadList(bool forceCreate)
{
    ThreadLocalList list = m_locals.Value;

    if (list != null)
    {
        return list;
    }
    else if (forceCreate)
    {
        // 获取用于更新操作的 m_tailList 锁
        lock (GlobalListsLock)
        {
            // 如果头列表等于空,那么说明集合中还没有元素
            // 直接创建一个新的
            if (m_headList == null)
            {
                list = new ThreadLocalList(Thread.CurrentThread);
                m_headList = list;
                m_tailList = list;
            }
            else
            {
               // ConcurrentBag内的数据是以双向链表的形式分散存储在各个线程的本地区域中
                // 通过下面这个方法 可以找到那些存储有数据 但是已经被停止的线程
                // 然后将已停止线程的数据 移交到当前线程管理
                list = GetUnownedList();
                // 如果没有 那么就新建一个列表 然后更新尾指针的位置
                if (list == null)
                {
                    list = new ThreadLocalList(Thread.CurrentThread);
                    m_tailList.m_nextList = list;
                    m_tailList = list;
                }
            }
            m_locals.Value = list;
        }
    }
    else
    {
        return null;
    }
    Debug.Assert(list != null);
    return list;
}
/// <summary>
/// Adds an object to the <see cref="ConcurrentBag{T}"/>.
/// </summary>
/// <param name="item">The object to be added to the
/// <see cref="ConcurrentBag{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.</param>
public void Add(T item)
{
    // 获取该线程的本地列表, 如果此线程不存在, 则创建一个新列表 (第一次调用 add)
    ThreadLocalList list = GetThreadList(true);
    // 实际的数据添加操作 在AddInternal中执行
    AddInternal(list, item);
}

/// <summary>
/// </summary>
/// <param name="list"></param>
/// <param name="item"></param>
private void AddInternal(ThreadLocalList list, T item)
{
    bool lockTaken = false;
    try
    {
        #pragma warning disable 0420
        Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);
        #pragma warning restore 0420
        // 同步案例:
        // 如果列表计数小于两个, 因为是双向链表的关系 为了避免与任何窃取线程发生冲突 必须获取锁
        // 如果设置了 m_needSync, 这意味着有一个线程需要冻结包 也必须获取锁
        if (list.Count < 2 || m_needSync)
        {
            // 将其重置为None 以避免与窃取线程的死锁
            list.m_currentOp = (int)ListOperation.None;
            // 锁定当前对象
            Monitor.Enter(list, ref lockTaken);
        }
        // 调用 ThreadLocalList.Add方法 将数据添加到双向链表中
        // 如果已经锁定 那么说明线程安全  可以更新Count 计数
        list.Add(item, lockTaken);
    }
    finally
    {
        list.m_currentOp = (int)ListOperation.None;
        if (lockTaken)
        {
            Monitor.Exit(list);
        }
    }
}

从上面代码中,我们可以很清楚的知道Add()方法是如何运行的,其中的关键就是GetThreadList()方法,通过该方法可以获取当前线程的数据存储列表对象,假如不存在数据存储列表,它会自动创建或者通过GetUnownedList()方法来寻找那些被停止但是还存储有数据列表的线程,然后将数据列表返回给当前线程中,防止了内存泄漏。

在数据添加的过程中,实现了细颗粒度的lock同步锁,所以性能会很高。删除和其它操作与新增类似,本文不再赘述。

HashSet

对于 HashSet 中保存的对象,请注意正确重写其 equals 和 hashCode 方法,以保证放入的对象的唯一性。这两个方法是比较重要的,希望大家在以后的开发过程中需要注意一下。

图片 27

image.png

集合

 


 

1.集合接口和类型

接口

说明

IEnumerable<T>

如果foreach语句用于集合,就需要IEnumerable接口.这个借口定义了方法GetEnumerator(),他返回一个实现了IEnumerator接口的枚举

ICollection<T>

ICollection<T>接口有泛型集合类实现.使用这个借口可以获得集合中的元素个数(Count属性),把集合复制到数组中(CopyTo()方法),还可以从集合中添加和删除元素(Add(),Remove(),Clear())

List<T>

IList<T>接口用于可通过位置访问其中的元素列表,这个接口定义了一个 索引器,可以在集合的指定位置插入或删除 mount些项(Insert()和Remove()方法).IList<T>接口派生自ICollection<T>接口

ISet<T>

ISet<T>接口是.NET4中新增的.实现这个接口的集允许合并不同的集.获得两个集的交集,检查两个集合是否重叠.ISet<T>接口派生自ICollection<T>接口

IDictionary<TKey,TValue>

IDictionary<TKey,TValue>接口由包含键和值的泛型集合类 实现.使用这个接口可以访问所有的键和值,使用键类型的索引器可以访问某些项,还可以添加或删除某些项

ILookup<TKey,TValue>

ILookup<TKey,TValue>接口类似于IDictionary<TKey,TValue>接口,实现该接口的集合有键和值,且可以通过一个键包含多个值

IComparer<T>

接口ICommparer<T>由比较器实现,通过Comparer()方法给集合中的元素排序

IEqualityComparer<T>

接口IEqualityComparer<T>由一个比较器实现,该比较器可用于字典中的键.使用这个接口,可以对对象进行相等性比较.在.NET中,这个接口也由数组和元组实现

IProducerConsumerColllection<T>

IProducerConsumerCollection<T>接口是.NET4中新增的,它支持新的线程安全的集合类

IReadOnlyList<T>、

IReadOnlyDictionary<T>、

IReadOnlyCollection<T>

初始化后不能修改的集合,只能检索对象,不能添加和删除.

 

2.列表

先看看一个实例:

图片 28图片 29

[Serializable]
  public class Racer : IComparable<Racer>, IFormattable
  {
    public int Id { get; private set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public string Country { get; set; }
    public int Wins { get; set; }

    public Racer(int id, string firstName, string lastName, string country)
      : this(id, firstName, lastName, country, wins: 0)
    {
    }
    public Racer(int id, string firstName, string lastName, string country, int wins)
    {
      this.Id = id;
      this.FirstName = firstName;
      this.LastName = lastName;
      this.Country = country;
      this.Wins = wins;
    }

    public override string ToString()
    {
      return String.Format("{0} {1}", FirstName, LastName);
    }

    public string ToString(string format, IFormatProvider formatProvider)
    {
      if (format == null) format = "N";
      switch (format.ToUpper())
      {
        case null:
        case "N": // name
          return ToString();
        case "F": // first name
          return FirstName;
        case "L": // last name
          return LastName;
        case "W": // Wins
          return String.Format("{0}, Wins: {1}", ToString(), Wins);
        case "C": // Country
          return String.Format("{0}, Country: {1}", ToString(), Country);
        case "A": // All
          return String.Format("{0}, {1} Wins: {2}", ToString(), Country, Wins);
        default:
          throw new FormatException(String.Format(formatProvider,
                "Format {0} is not supported", format));
      }
    }

    public string ToString(string format)
    {
      return ToString(format, null);
    }

    public int CompareTo(Racer other)
    {
      if (other == null) return -1;
      int compare = string.Compare(this.LastName, other.LastName);
      if (compare == 0)
        return string.Compare(this.FirstName, other.FirstName);
      return compare;
    }
  }

View Code


LinkedHashSet

  • LinkedHashSet 是 Set 的一个具体实现,其维护着一个运行于所有条目的双重链接列表。此链接列表定义了迭代顺序,该迭代顺序可为插入顺序或是访问顺序。
  • LinkedHashSet 继承与 HashSet,并且其内部是通过 LinkedHashMap 来实现的。有点类似于我们之前说的LinkedHashMap 其内部是基于 Hashmap 实现一样,不过还是有一点点区别的(具体的区别大家可以自己去思考一下)。
  • 如果我们需要迭代的顺序为插入顺序或者访问顺序,那么 LinkedHashSet 是需要你首先考虑的。
创建列表

使用默认的构造函数创建一个空列表,元素添加到列表后,列表容量会扩大到可接纳4个元素。
如果添加了第5个元素,列表大小会重新设置为8个元素。每次都会将列表的容量重新设置为原来的2倍.

var intList=new List<int>();

 

如果列表的容量变了,整个集合就要重新分配到新的内存块中,我们可以在初始化时设置它的容量:

List<int> intList=new List<int>(10);

如果列表的个数超过10个,可以设置容量Capacity:

intList.Capacity = 20;

如果列表的元素已经添加完了,列表会存在多余的容量空间。可以使用TrimExcess方法去除不要的容量:

intList.TrimExcess();

a.集合初始值设定项

使用初始化构造器初始化设定项

int[] arr = { 1, 2, 3 };
var intList = new List<int>(arr) ;

括号中初始化

var intList = new List<int>() { 4, 5 };

 

b.添加元素

intList.Add(5);

添加数组

intList.AddRange(new int[] { 3, 5 });

c.插入元素

intList.Insert(3, 4);

d.访问元素

使用索引获取:

var value = intList[3];

循环遍历:

foreach (var item in intList)
{
     var res = item;
}

forEach方法:

class List<T> : IList<T>
{
    private T[] items;
    public void forEach(Action<T> action)
    {
        if (action == null) throw new ArgumentNullException("action");
        foreach (var item in items)
        {
            action(item);
        }
    }
}

然后我们可以这样调用:

intList.ForEach(m => Console.WriteLine(m));

e.删除元素

按索引删除,比较快

intList.RemoveAt(3);

按元素值删除

intList.Remove(4);

f.搜索

在集合中搜索元素。可以查找索引和元素。

FindIndex通过匹配元素值,获得索引:

intList.FindIndex(m => m==4);

FindIndex方法参数Predicate<T>传入匹配的表达式,返回匹配的元素索引值,Predicate<T>委托表示定义一组条件并确定指定对象是否符合这些条件的方法

intList.Find(m => m == 4);
intList.FindAll(m => m > 2);

Find返回了匹配条件的元素值,FindAll返回了匹配条件的所有元素

g.排序

列表使用Sort方法进行元素排序

intList.Sort();

intList.Sort((m, n) => m);

Sort(Comparison<T> comparison)方法参数中的委托Comparison含有2个参数,方法将这2个元素进行比较,然后返回绝对值,如果返回-1的,元素需要排前面,返回1的元素需要排后面.

Sort(IComparer<T> comparer)方法参数中是一个比较接口,接口实现Comparer方法

图片 30图片 31

public enum CompareType
  {
    FirstName,
    LastName,
    Country,
    Wins
  }

  public class RacerComparer : IComparer<Racer>
  {
    private CompareType compareType;
    public RacerComparer(CompareType compareType)
    {
      this.compareType = compareType;
    }

    public int Compare(Racer x, Racer y)
    {
      if (x == null && y == null) return 0;
      if (x == null) return -1;
      if (y == null) return 1;

      int result;
      switch (compareType)
      {
        case CompareType.FirstName:
          return string.Compare(x.FirstName, y.FirstName);
        case CompareType.LastName:
          return string.Compare(x.LastName, y.LastName);
        case CompareType.Country:
          result = string.Compare(x.Country, y.Country);
          if (result == 0)
            return string.Compare(x.LastName, y.LastName);
          else
            return result;
        case CompareType.Wins:
          return x.Wins.CompareTo(y.Wins);
        default:
          throw new ArgumentException("Invalid Compare Type");
      }
    }
  }

View Code

 

h.类型转换

 Converter委托

public delegate TOutput Converter<in TInput, out TOutput>(TInput input);

ConvertAll可以将一种类型的集合转换为另一种类型的集合。

intList.ConvertAll(m => m.ToString());

一、前言

笔者最近在做一个项目,项目中为了提升吞吐量,使用了消息队列,中间实现了生产消费模式,在生产消费者模式中需要有一个集合,来存储生产者所生产的物品,笔者使用了最常见的List<T>集合类型。

由于生产者线程有很多个,消费者线程也有很多个,所以不可避免的就产生了线程同步的问题。开始笔者是使用lock关键字,进行线程同步,但是性能并不是特别理想,然后有网友说可以使用SynchronizedList<T>来代替使用List<T>达到线程安全的目的。于是笔者就替换成了SynchronizedList<T>,但是发现性能依旧糟糕,于是查看了SynchronizedList<T>的源代码,发现它就是简单的在List<T>提供的API的基础上加了lock,所以性能基本与笔者实现方式相差无几。

最后笔者找到了解决的方案,使用ConcurrentBag<T>类来实现,性能有很大的改观,于是笔者查看了ConcurrentBag<T>的源代码,实现非常精妙,特此在这记录一下。

Hashtable

HashMap和HashTab的异同

1.HashTable 基于 Dictionary 类,而 HashMap 是基于 AbstractMap。Dictionary 是任何可将键映射到相应值的类的抽象父类,而 AbstractMap 是基于 Map 接口的实现,它以最大限度地减少实现此接口所需的工作。

2.HashMap 的 key 和 value 都允许为 null,而 Hashtable 的 key 和 value 都不允许为 null。HashMap 遇到 key 为 null 的时候,调用 putForNullKey 方法进行处理,而对 value 没有处理;Hashtable遇到 null,直接返回 NullPointerException。

3.Hashtable 方法是同步,而HashMap则不是。我们可以看一下源码,Hashtable 中的几乎所有的 public 的方法都是 synchronized 的,而有些方法也是在内部通过 synchronized 代码块来实现。所以有人一般都建议如果是涉及到多线程同步时采用 HashTable,没有涉及就采用 HashMap,但是在 Collections 类中存在一个静态方法:synchronizedMap(),该方法创建了一个线程安全的 Map 对象,并把它作为一个封装的对象来返回。

线程安全的集合类

ConcurrentQueue 线程安全版本的Queue 
ConcurrentStack线程安全版本的Stack 
ConcurrentBag线程安全的对象集合 
ConcurrentDictionary线程安全的Dictionary 
BlockingCollection

 

笔者水平有限,如果错误欢迎各位批评指正!

附上ConcurrentBag<T>源码地址:戳一戳

LinkedHashMap

LinkedHashMap是Hash表和链表的实现,并且依靠着双向链表保证了迭代顺序是插入的顺序
其实 LinkedHashMap 几乎和 HashMap 一样:从技术上来说,不同的是它定义了一个 Entry<K,V> header,这个 header 不是放在 Table 里,它是额外独立出来的。LinkedHashMap 通过继承 hashMap 中的 Entry<K,V>,并添加两个属性 Entry<K,V> before,after,和 header 结合起来组成一个双向链表,来实现按插入顺序或访问顺序排序。
HashMap,LinkedHashMap,TreeMap的区别

BitVector32

提供了一个简单结构,该结构以32位内存存储布尔和小数值

对于内部使用的布尔值和小整数,BitVector32 比 BitArray 更有效。 BitArray 可以按需要无限地扩大,但它有内存和性能方面的系统开销,这是类实例所要求的。 相比之下,BitVector32 只使用 32 位。

BitVector32 结构可以设置成包含小整数的若干节或包含布尔值的若干位标志,但不能同时包含两者。BitVector32.Section 是 BitVector32 中的窗口,且由最小数量的连续位构成,连续位可以包含 CreateSection 中指定的最大值。 例如,带有最大值 1 的节只由一个位构成,而带有最大值 5 的节由三个位构成。 可以创建带有最大值 1 的 BitVector32.Section 作为布尔值,从而使您能够在同一 BitVector32 中存储整数和布尔值。

BitVector32 既可以设置为节,也可以设置为位标志,分别有成员可以应用于这两种情形。 例如,BitVector32.Item 属性是作为节设置的 BitVector32 的索引器,而 BitVector32.Item 属性是作为位标志设置的BitVector32 的索引器。 CreateMask 创建一系列屏蔽,这些屏蔽可用于访问作为位标志设置的 BitVector32 中的单个位。

在作为节设置的 BitVector32 上使用屏蔽可能会导致意外的结果。

图片 32图片 33

using System;
using System.Collections.Specialized;


public class SamplesBitVector32  {

   public static void Main()  {

      // Creates and initializes a BitVector32 with all bit flags set to FALSE.
      BitVector32 myBV = new BitVector32( 0 );

      // Creates masks to isolate each of the first five bit flags.
      int myBit1 = BitVector32.CreateMask();
      int myBit2 = BitVector32.CreateMask( myBit1 );
      int myBit3 = BitVector32.CreateMask( myBit2 );
      int myBit4 = BitVector32.CreateMask( myBit3 );
      int myBit5 = BitVector32.CreateMask( myBit4 );

      // Sets the alternating bits to TRUE.
      Console.WriteLine( "Setting alternating bits to TRUE:" );
      Console.WriteLine( "   Initial:         {0}", myBV.ToString() );
      myBV[myBit1] = true;
      Console.WriteLine( "   myBit1 = TRUE:   {0}", myBV.ToString() );
      myBV[myBit3] = true;
      Console.WriteLine( "   myBit3 = TRUE:   {0}", myBV.ToString() );
      myBV[myBit5] = true;
      Console.WriteLine( "   myBit5 = TRUE:   {0}", myBV.ToString() );

   }

}

/*
This code produces the following output.

Setting alternating bits to TRUE:
   Initial:         BitVector32{00000000000000000000000000000000}
   myBit1 = TRUE:   BitVector32{00000000000000000000000000000001}
   myBit3 = TRUE:   BitVector32{00000000000000000000000000000101}
   myBit5 = TRUE:   BitVector32{00000000000000000000000000010101}


*/
BitVector用作节集合

using System;
using System.Collections.Specialized;


public class SamplesBitVector32  {

   public static void Main()  {

      // Creates and initializes a BitVector32.
      BitVector32 myBV = new BitVector32( 0 );

      // Creates four sections in the BitVector32 with maximum values 6, 3, 1, and 15.
      // mySect3, which uses exactly one bit, can also be used as a bit flag.
      BitVector32.Section mySect1 = BitVector32.CreateSection( 6 );
      BitVector32.Section mySect2 = BitVector32.CreateSection( 3, mySect1 );
      BitVector32.Section mySect3 = BitVector32.CreateSection( 1, mySect2 );
      BitVector32.Section mySect4 = BitVector32.CreateSection( 15, mySect3 );

      // Displays the values of the sections.
      Console.WriteLine( "Initial values:" );
      Console.WriteLine( "tmySect1: {0}", myBV[mySect1] );
      Console.WriteLine( "tmySect2: {0}", myBV[mySect2] );
      Console.WriteLine( "tmySect3: {0}", myBV[mySect3] );
      Console.WriteLine( "tmySect4: {0}", myBV[mySect4] );

      // Sets each section to a new value and displays the value of the BitVector32 at each step.
      Console.WriteLine( "Changing the values of each section:" );
      Console.WriteLine( "tInitial:    t{0}", myBV.ToString() );
      myBV[mySect1] = 5;
      Console.WriteLine( "tmySect1 = 5:t{0}", myBV.ToString() );
      myBV[mySect2] = 3;
      Console.WriteLine( "tmySect2 = 3:t{0}", myBV.ToString() );
      myBV[mySect3] = 1;
      Console.WriteLine( "tmySect3 = 1:t{0}", myBV.ToString() );
      myBV[mySect4] = 9;
      Console.WriteLine( "tmySect4 = 9:t{0}", myBV.ToString() );

      // Displays the values of the sections.
      Console.WriteLine( "New values:" );
      Console.WriteLine( "tmySect1: {0}", myBV[mySect1] );
      Console.WriteLine( "tmySect2: {0}", myBV[mySect2] );
      Console.WriteLine( "tmySect3: {0}", myBV[mySect3] );
      Console.WriteLine( "tmySect4: {0}", myBV[mySect4] );

   }

}

View Code

图片 34图片 35

/*
This code produces the following output.

Initial values:
        mySect1: 0
        mySect2: 0
        mySect3: 0
        mySect4: 0
Changing the values of each section:
        Initial:        BitVector32{00000000000000000000000000000000}
        mySect1 = 5:    BitVector32{00000000000000000000000000000101}
        mySect2 = 3:    BitVector32{00000000000000000000000000011101}
        mySect3 = 1:    BitVector32{00000000000000000000000000111101}
        mySect4 = 9:    BitVector32{00000000000000000000001001111101}
New values:
        mySect1: 5
        mySect2: 3
        mySect3: 1
        mySect4: 9

*/

View Code

 

11.不变的集合

Net提供的不可变集合

ImmutableStack<int> a1 = ImmutableStack<int>.Empty;
ImmutableStack<int> a2 = a1.Push(10);
ImmutableStack<int> a3 = a2.Push(20);
ImmutableStack<int> a4 = a3.Push(30);
ImmutableStack<int> iv3 = a4.Pop(); 

使用Net不可变列表集合有一点要注意的是,当我们Push值时要重新赋值给原变量才正确,因为push后会生成一个新对象,原a1只是旧值:

ImmutableStack<int> a1 = ImmutableStack<int>.Empty;
a1.Push(10); //不正确,a1仍是空值值,push会生成新的栈。
a1 = a1.Push(10); //需要将新栈重新赋值给a1

NET提供的常用数据结构

1.ImmutableStack
2.ImmutableQueue
3.ImmutableList
4.ImmutableHashSet
5.ImmutableSortedSet
6.ImmutableDictionary<K, V>
7.ImmutableSortedDictionary<K, V>

不可变优点

1.集合共享安全,从不被改变
2.访问集合时,不需要锁集合(线程安全)
3.修改集合不担心旧集合被改变
4.书写更简洁,函数式风格。 var list = ImmutableList.Empty.Add(10).Add(20).Add(30);
5.保证数据完整性,安全性

不可变对象缺点

不可变本身的优点即是缺点,当每次对象/集合操作都会返回个新值。而旧值依旧会保留一段时间,这会使内存有极大开销,也会给GC造成回收负担,性能也比可变集合差的多。

图片 36

 

12.并发集合

线程安全的集合可防止多个线程以相互冲突的方式访问集合

.NET 的System.Collections.Concurrent提供了几个安全的类和功能:

说明
BlockingCollection<T>
ConcurrentBag<T>
ConcurrentDictionary<TKey, TValue>
ConcurrentQueue<T>
ConcurrentStack<T>
OrderablePartitioner<TSource>
Partitioner
Partitioner<TSource>

1)创建管道

将这些并发集合类用于管道,一个任务向一个集合类写入一些内容,同时另一个任务从该集合中读取内容

示例中多个任务形成一个管道.
第一个管道,
第1阶段的任务读取文件名,添加到队列,这个任务运行同时,
第2阶段的任务已经开始从队列中读取文件名并加载它们的程序,结果被写入另一个队列。
第3阶段同时启动,读取并处理第2个队列的内容,结果被写入一个字典。

第3阶段完成,并且内容已被最终处理,字典得到完整结果时,下一阶段才开始。
第4阶段从字典中读取内容,转换数据,然后写入队列中
第5阶段在项中添加颜色信息,然后把它们添加到另一个队列中,最后一个阶段显示信息。
第4到第6阶段也可以并发运行.

图片 37图片 38

class Program
  {
    static void Main(string[] args)
    {
      StartPipeline();
      Console.ReadLine();
    }

    private static async void StartPipeline()
    {
      var fileNames = new BlockingCollection<string>();
      var lines = new BlockingCollection<string>();
      var words = new ConcurrentDictionary<string, int>();
      var items = new BlockingCollection<Info>();
      var coloredItems = new BlockingCollection<Info>();

      Task t1 = PipelineStages.ReadFilenamesAsync(@"../../..", fileNames);
      ConsoleHelper.WriteLine("started stage 1");
      Task t2 = PipelineStages.LoadContentAsync(fileNames, lines);
      ConsoleHelper.WriteLine("started stage 2");
      Task t3 = PipelineStages.ProcessContentAsync(lines, words);
      await Task.WhenAll(t1, t2, t3);
      ConsoleHelper.WriteLine("stages 1, 2, 3 completed");

      Task t4 = PipelineStages.TransferContentAsync(words, items);
      Task t5 = PipelineStages.AddColorAsync(items, coloredItems);
      Task t6 = PipelineStages.ShowContentAsync(coloredItems);
      ConsoleHelper.WriteLine("stages 4, 5, 6 started");

      await Task.WhenAll(t4, t5, t6);

      ConsoleHelper.WriteLine("all stages finished");
    }
  }

Program

图片 39图片 40

public class ConsoleHelper
  {
    private static object syncOutput = new object();

    public static void WriteLine(string message)
    {
      lock (syncOutput)
      {
        Console.WriteLine(message);
      }
    }

    public static void WriteLine(string message, string color)
    {
      lock (syncOutput)
      {
        Console.ForegroundColor = (ConsoleColor)Enum.Parse(typeof(ConsoleColor), color);
        Console.WriteLine(message);
        Console.ResetColor();
      }
    }
  }

ConsoleHelper

图片 41图片 42

public static class PipelineStages
  {
    public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
    {
      return Task.Run(() =>
        {
          foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
          {
            output.Add(filename);
            ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
          }
          output.CompleteAdding();
        });
    }

    public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
    {
      foreach (var filename in input.GetConsumingEnumerable())
      {
        using (FileStream stream = File.OpenRead(filename))
        {
          var reader = new StreamReader(stream);
          string line = null;
          while ((line = await reader.ReadLineAsync()) != null)
          {
            output.Add(line);
            ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
          }
        }
      }
      output.CompleteAdding();
    }

    public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
    {
      return Task.Run(() =>
        {
          foreach (var line in input.GetConsumingEnumerable())
          {
            string[] words = line.Split(' ', ';', 't', '{', '}', '(', ')', ':', ',', '"');
            foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
            {
              output.AddOrIncrementValue(word);
              ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
            }
          }
        });
    }

    public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
    {
      return Task.Run(() =>
        {
          foreach (var word in input.Keys)
          {
            int value;
            if (input.TryGetValue(word, out value))
            {
              var info = new Info { Word = word, Count = value };
              output.Add(info);
              ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
            }
          }
          output.CompleteAdding();
        });
    }

    public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
    {
      return Task.Run(() =>
        {
          foreach (var item in input.GetConsumingEnumerable())
          {
            if (item.Count > 40)
            {
              item.Color = "Red";
            }
            else if (item.Count > 20)
            {
              item.Color = "Yellow";
            }
            else
            {
              item.Color = "Green";
            }
            output.Add(item);
            ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
          }
          output.CompleteAdding();
        });
    }

    public static Task ShowContentAsync(BlockingCollection<Info> input)
    {
      return Task.Run(() =>
        {
          foreach (var item in input.GetConsumingEnumerable())
          {
            ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
          }
        });
    }
  }

PipelineStages

 

2)使用BlockingCollection

第1阶段的ReadFilenamesAsync方法,实现了迭代目录文件名。在完成文件名添加后调用output.CompleteAdding();用以通知所有读取器不再等待集合中任何额外的项.如果没有调用的话,循环中读取器会添加等待更多的项.

图片 43图片 44

public static Task ReadFilenamesAsync(string path, BlockingCollection<string> output)
    {
      return Task.Run(() =>
        {
          foreach (string filename in Directory.EnumerateFiles(path, "*.cs", SearchOption.AllDirectories))
          {
            output.Add(filename);
            ConsoleHelper.WriteLine(string.Format("stage 1: added {0}", filename));
          }
          output.CompleteAdding();
        });
    }

ReadFilenamesAsync

下一阶段读取文件并将器内容添加到另一个集合中,由LoadContentAsync方法完成,该方法使用了输入集合传递的文件名,打开文件,把文件中的所有行添加到输出的集合中。在循环中用输入阻塞集合调用GetConsumingEnumerable()方法,以迭代各项,不使用也是可以的,但是值会迭代当前状态的集合。不会迭代以后添加的项。

如果在填充集合的同时,使用读取器读取集合,则需要使用GetConsumingEnumerable()方法获取阻塞集合的枚举器,而不是直接迭代集合

图片 45图片 46

public static async Task LoadContentAsync(BlockingCollection<string> input, BlockingCollection<string> output)
    {
      foreach (var filename in input.GetConsumingEnumerable())
      {
        using (FileStream stream = File.OpenRead(filename))
        {
          var reader = new StreamReader(stream);
          string line = null;
          while ((line = await reader.ReadLineAsync()) != null)
          {
            output.Add(line);
            ConsoleHelper.WriteLine(string.Format("stage 2: added {0}", line));
          }
        }
      }
      output.CompleteAdding();
    }

LoadContentAsync

 

3)使用ConcurrentDictionary

图片 47

图片 48图片 49

public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
    {
      return Task.Run(() =>
        {
          foreach (var line in input.GetConsumingEnumerable())
          {
            string[] words = line.Split(' ', ';', 't', '{', '}', '(', ')', ':', ',', '"');
            foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
            {
              output.AddOrIncrementValue(word);
              ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
            }
          }
        });
    }

ProcessContentAsync

图片 50

图片 51图片 52

public static class ConcurrentDictionaryExtension
  {
    public static void AddOrIncrementValue(this ConcurrentDictionary<string, int> dict, string key)
    {
      bool success = false;
      while (!success)
      {
        int value;
        if (dict.TryGetValue(key, out value))
        {
          if (dict.TryUpdate(key, value + 1, value))
          {
            success = true;
          }
        }
        else
        {
          if (dict.TryAdd(key, 1))
          {
            success = true;
          }
        }
      }
    }
  }

ConcurrentDictionaryExtension

图片 53

在完成第3个阶段后,第4到6阶段也可以并行运行,TransferContentAsync从字典中获取数据,进行类型转换,输出到BlockingCollection<string>中

图片 54图片 55

 public static Task ProcessContentAsync(BlockingCollection<string> input, ConcurrentDictionary<string, int> output)
    {
      return Task.Run(() =>
        {
          foreach (var line in input.GetConsumingEnumerable())
          {
            string[] words = line.Split(' ', ';', 't', '{', '}', '(', ')', ':', ',', '"');
            foreach (var word in words.Where(w => !string.IsNullOrEmpty(w)))
            {
              output.AddOrIncrementValue(word);
              ConsoleHelper.WriteLine(string.Format("stage 3: added {0}", word));
            }
          }
        });
    }

    public static Task TransferContentAsync(ConcurrentDictionary<string, int> input, BlockingCollection<Info> output)
    {
      return Task.Run(() =>
        {
          foreach (var word in input.Keys)
          {
            int value;
            if (input.TryGetValue(word, out value))
            {
              var info = new Info { Word = word, Count = value };
              output.Add(info);
              ConsoleHelper.WriteLine(string.Format("stage 4: added {0}", info));
            }
          }
          output.CompleteAdding();
        });
    }

    public static Task AddColorAsync(BlockingCollection<Info> input, BlockingCollection<Info> output)
    {
      return Task.Run(() =>
        {
          foreach (var item in input.GetConsumingEnumerable())
          {
            if (item.Count > 40)
            {
              item.Color = "Red";
            }
            else if (item.Count > 20)
            {
              item.Color = "Yellow";
            }
            else
            {
              item.Color = "Green";
            }
            output.Add(item);
            ConsoleHelper.WriteLine(string.Format("stage 5: added color {1} to {0}", item, item.Color));
          }
          output.CompleteAdding();
        });
    }

    public static Task ShowContentAsync(BlockingCollection<Info> input)
    {
      return Task.Run(() =>
        {
          foreach (var item in input.GetConsumingEnumerable())
          {
            ConsoleHelper.WriteLine(string.Format("stage 6: {0}", item), item.Color);
          }
        });
    }

View Code

 

 

13.性能

集合的方法常常有性能提示,给出大写O记录操作时间。

图片 56

O(1)表示无论集合中有多少数据项,这个操作需要的时间都不变。
O(n)表示对于集合执行一个操作需要的事件在最坏情况时是N.
O(log n)表示操作需要的时间随集合中元素的增加而增加

图片 57

  • 一、前言
  • 二、ConcurrentBag类
  • 三、 ConcurrentBag线程安全实现原理
    • 1. ConcurrentBag的私有字段
    • 2. 用于数据存储的TrehadLocalList类
    • 3. ConcurrentBag实现新增元素
    • 4. ConcurrentBag 如何实现迭代器模式
  • 四、总结
  • 笔者水平有限,如果错误欢迎各位批评指正!

LinkedList

LinkedList 是基于链表结构实现,所以在类中包含了 first 和 last 两个指针(Node)。Node 中包含了上一个节点和下一个节点的引用,这样就构成了双向的链表。每个 Node 只能知道自己的前一个节点和后一个节点,但对于链表来说,这已经足够了。

非泛型类集合

泛型集合类是在.NET2.0的时候出来的,也就是说在1.0的时候是没有这么方便的东西的。现在基本上我们已经不使用这些集合类了,除非在做一些和老代码保持兼容的工作的时候。来看看1.0时代的.NET程序员们都有哪些集合类可以用。

ArraryList后来被List<T>替代。

HashTable 后来被Dictionary<TKey,TValue>替代。 
Queue 后来被Queue<T>替代。 
SortedList 后来被SortedList<T>替代。 
Stack 后来被Stack<T>替代。

目录

ConcurrentHashMap

ConcurrentHashMap 的成员变量中,包含了一个 Segment 的数组(final Segment<K,V>[] segments;),而 Segment 是 ConcurrentHashMap 的内部类,然后在 Segment 这个类中,包含了一个 HashEntry 的数组(transient volatile HashEntry<K,V>[] table;)。而 HashEntry 也是 ConcurrentHashMap 的内部类。HashEntry 中,包含了 key 和 value 以及 next 指针(类似于 HashMap 中 Entry),所以 HashEntry 可以构成一个链表。

所以通俗的讲,ConcurrentHashMap 数据结构为一个 Segment 数组,Segment 的数据结构为 HashEntry 的数组,而 HashEntry 存的是我们的键值对,可以构成链表。

ConcurrentHashMap 的高并发性主要来自于三个方面:

  • 用分离锁实现多个线程间的更深层次的共享访问。
  • 用 HashEntery 对象的不变性来降低执行读操作的线程在遍历链表期间对加锁的需求。
  • 通过对同一个 Volatile 变量的写 / 读访问,协调不同线程间读 / 写操作的内存可见性。

使用分离锁,减小了请求 同一个锁的频率。

通过 HashEntery 对象的不变性及对同一个 Volatile 变量的读 / 写来协调内存可见性,使得 读操作大多数时候不需要加锁就能成功获取到需要的值。由于散列映射表在实际应用中大多数操作都是成功的 读操作,所以 2 和 3 既可以减少请求同一个锁的频率,也可以有效减少持有锁的时间。通过减小请求同一个锁的频率和尽量减少持有锁的时间 ,使得 ConcurrentHashMap 的并发性相对于 HashTable 和用同步包装器包装的 HashMap有了质的提高。


LRUCache原理

核心算法
map:存放数据的集合    new LinkedHashMap<K, V>(0, 0.75f, true);
size:当前LruCahce的内存占用大小
maxSize:Lrucache的最大容量
putCount:put的次数
createCount:create的次数
evictionCount:回收的次数
hitCount:命中的次数
missCount:丢失的次数

Lru是最近最少使用算法的简称,意思呢就是查询出最近的时间使用次数最少的那个对象。设置为true的时候,如果对一个元素进行了操作(put、get),就会把那个元素放到集合的最后,设置为false的时候,无论怎么操作,集合元素的顺序都是按照插入的顺序来进行存储的。
算法核心:当内容容量达到最大值的时候,只需要移除这个集合的前面的元素直到集合的容量足够存储数据的时候就可以了。

二、ConcurrentBag类

ConcurrentBag<T>实现了IProducerConsumerCollection<T>接口,该接口主要用于生产者消费者模式下,可见该类基本就是为生产消费者模式定制的。然后还实现了常规的IReadOnlyCollection<T>类,实现了该类就需要实现IEnumerable<T>、IEnumerable、 ICollection类。

ConcurrentBag<T>对外提供的方法没有List<T>那么多,但是同样有Enumerable实现的扩展方法。类本身提供的方法如下所示。

名称 说明
Add 将对象添加到 ConcurrentBag 中。
CopyTo 从指定数组索引开始,将 ConcurrentBag 元素复制到现有的一维 Array 中。
Equals(Object) 确定指定的 Object 是否等于当前的 Object。 (继承自 Object。)
Finalize 允许对象在“垃圾回收”回收之前尝试释放资源并执行其他清理操作。 (继承自 Object。)
GetEnumerator 返回循环访问 ConcurrentBag 的枚举器。
GetHashCode 用作特定类型的哈希函数。 (继承自 Object。)
GetType 获取当前实例的 Type。 (继承自 Object。)
MemberwiseClone 创建当前 Object 的浅表副本。 (继承自 Object。)
ToArray 将 ConcurrentBag 元素复制到新数组。
ToString 返回表示当前对象的字符串。 (继承自 Object。)
TryPeek 尝试从 ConcurrentBag 返回一个对象但不移除该对象。
TryTake 尝试从 ConcurrentBag 中移除并返回对象。

ArrayList

底层采用Object[]数组来实现
ArrayList 可以理解为动态数组,用 MSDN 中的说法,就是 Array 的复杂版本。与 Java 中的数组相比,它的容量能动态增长。ArrayList 是 List 接口的可变数组的实现。实现了所有可选列表操作,并允许包括 null 在内的所有元素。除了实现 List 接口外,此类还提供一些方法来操作内部用来存储列表的数组的大小。(此类大致上等同于 Vector 类,除了此类是不同步的。)

四、总结

下面给出一张图,描述了ConcurrentBag<T>是如何存储数据的。通过每个线程中的ThreadLocal来实现线程本地存储,每个线程中都有这样的结构,互不干扰。然后每个线程中的m_headList总是指向ConcurrentBag<T>的第一个列表,m_tailList指向最后一个列表。列表与列表之间通过m_locals 下的 m_nextList相连,构成一个单链表。

数据存储在每个线程的m_locals中,通过Node类构成一个双向链表。
PS: 要注意m_tailListm_headList并不是存储在ThreadLocal中,而是所有的线程共享一份。

图片 58

以上就是有关ConcurrentBag<T>类的实现,笔者的一些记录和解析。

SparseArray

SparseArray 的使用及实现原理
优势:

  • 避免了基本数据类型的装箱操作
  • 不需要额外的结构体,单个元素的存储成本更低
  • 数据量小的情况下,随机访问的效率更高

有优点就一定有缺点

  • 插入操作需要复制数组,增删效率降低
  • 数据量巨大时,复制数组成本巨大,gc()成本也巨大
  • 数据量巨大时,查询效率也会明显下降

三、 ConcurrentBag线程安全实现原理

1. ConcurrentBag的私有字段

ConcurrentBag线程安全实现主要是通过它的数据存储的结构和细颗粒度的锁。

   public class ConcurrentBag<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
    {
        // ThreadLocalList对象包含每个线程的数据
        ThreadLocal<ThreadLocalList> m_locals;

        // 这个头指针和尾指针指向中的第一个和最后一个本地列表,这些本地列表分散在不同线程中
        // 允许在线程局部对象上枚举
        volatile ThreadLocalList m_headList, m_tailList;

        // 这个标志是告知操作线程必须同步操作
        // 在GlobalListsLock 锁中 设置
        bool m_needSync;

}

首选我们来看它声明的私有字段,其中需要注意的是集合的数据是存放在ThreadLocal线程本地存储中的。也就是说访问它的每个线程会维护一个自己的集合数据列表,一个集合中的数据可能会存放在不同线程的本地存储空间中,所以如果线程访问自己本地存储的对象,那么是没有问题的,这就是实现线程安全的第一层,使用线程本地存储数据

然后可以看到ThreadLocalList m_headList, m_tailList;这个是存放着本地列表对象的头指针和尾指针,通过这两个指针,我们就可以通过遍历的方式来访问所有本地列表。它使用volatile修饰,不允许线程进行本地缓存,每个线程的读写都是直接操作在共享内存上,这就保证了变量始终具有一致性。任何线程在任何时间进行读写操作均是最新值。对于volatile修饰符,感谢我是攻城狮指出描述错误。

最后又定义了一个标志,这个标志告知操作线程必须进行同步操作,这是实现了一个细颗粒度的锁,因为只有在几个条件满足的情况下才需要进行线程同步。

本文由澳门威斯尼人平台登录发布于计算机编程,转载请注明出处:高级编程9,ConcurrentBag的实现原理

相关阅读