001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.collect; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkState; 021 import static com.google.common.collect.Multisets.checkNonnegative; 022 023 import com.google.common.annotations.Beta; 024 import com.google.common.annotations.VisibleForTesting; 025 import com.google.common.collect.Serialization.FieldSetter; 026 import com.google.common.primitives.Ints; 027 028 import java.io.IOException; 029 import java.io.ObjectInputStream; 030 import java.io.ObjectOutputStream; 031 import java.io.Serializable; 032 import java.util.Iterator; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Set; 036 import java.util.concurrent.ConcurrentHashMap; 037 import java.util.concurrent.ConcurrentMap; 038 import java.util.concurrent.atomic.AtomicInteger; 039 040 import javax.annotation.Nullable; 041 042 /** 043 * A multiset that supports concurrent modifications and that provides atomic 044 * versions of most {@code Multiset} operations (exceptions where noted). Null 045 * elements are not supported. 046 * 047 * @author Cliff L. Biffle 048 * @author mike nonemacher 049 * @since 2.0 (imported from Google Collections Library) 050 */ 051 public final class ConcurrentHashMultiset<E> extends AbstractMultiset<E> implements Serializable { 052 053 /* 054 * The ConcurrentHashMultiset's atomic operations are implemented primarily in terms of 055 * AtomicInteger's atomic operations, with some help from ConcurrentMap's atomic operations on 056 * creation and removal (including automatic removal of zeroes). If the modification of an 057 * AtomicInteger results in zero, we compareAndSet the value to zero; if that succeeds, we remove 058 * the entry from the Map. If another operation sees a zero in the map, it knows that the entry is 059 * about to be removed, so this operation may remove it (often by replacing it with a new 060 * AtomicInteger). 061 */ 062 063 /** The number of occurrences of each element. */ 064 private final transient ConcurrentMap<E, AtomicInteger> countMap; 065 066 // This constant allows the deserialization code to set a final field. This 067 // holder class makes sure it is not initialized unless an instance is 068 // deserialized. 069 private static class FieldSettersHolder { 070 static final FieldSetter<ConcurrentHashMultiset> COUNT_MAP_FIELD_SETTER = 071 Serialization.getFieldSetter(ConcurrentHashMultiset.class, "countMap"); 072 } 073 074 /** 075 * Creates a new, empty {@code ConcurrentHashMultiset} using the default 076 * initial capacity, load factor, and concurrency settings. 077 */ 078 public static <E> ConcurrentHashMultiset<E> create() { 079 // TODO(schmoe): provide a way to use this class with other (possibly arbitrary) 080 // ConcurrentMap implementors. One possibility is to extract most of this class into 081 // an AbstractConcurrentMapMultiset. 082 return new ConcurrentHashMultiset<E>(new ConcurrentHashMap<E, AtomicInteger>()); 083 } 084 085 /** 086 * Creates a new {@code ConcurrentHashMultiset} containing the specified 087 * elements, using the default initial capacity, load factor, and concurrency 088 * settings. 089 * 090 * <p>This implementation is highly efficient when {@code elements} is itself 091 * a {@link Multiset}. 092 * 093 * @param elements the elements that the multiset should contain 094 */ 095 public static <E> ConcurrentHashMultiset<E> create(Iterable<? extends E> elements) { 096 ConcurrentHashMultiset<E> multiset = ConcurrentHashMultiset.create(); 097 Iterables.addAll(multiset, elements); 098 return multiset; 099 } 100 101 /** 102 * Creates a new, empty {@code ConcurrentHashMultiset} using {@code mapMaker} 103 * to construct the internal backing map. 104 * 105 * <p>If this {@link MapMaker} is configured to use entry eviction of any kind, this eviction 106 * applies to all occurrences of a given element as a single unit. However, most updates to the 107 * multiset do not count as map updates at all, since we're usually just mutating the value 108 * stored in the map, so {@link MapMaker#expireAfterAccess} makes sense (evict the entry that 109 * was queried or updated longest ago), but {@link MapMaker#expireAfterWrite} doesn't, because 110 * the eviction time is measured from when we saw the first occurrence of the object. 111 * 112 * <p>The returned multiset is serializable but any serialization caveats 113 * given in {@code MapMaker} apply. 114 * 115 * <p>Finally, soft/weak values can be used but are not very useful: the values are created 116 * internally and not exposed externally, so no one else will have a strong reference to the 117 * values. Weak keys on the other hand can be useful in some scenarios. 118 * 119 * @since 7.0 120 */ 121 @Beta 122 public static <E> ConcurrentHashMultiset<E> create( 123 GenericMapMaker<? super E, ? super Number> mapMaker) { 124 return new ConcurrentHashMultiset<E>(mapMaker.<E, AtomicInteger>makeMap()); 125 } 126 127 /** 128 * Creates an instance using {@code countMap} to store elements and their 129 * counts. 130 * 131 * <p>This instance will assume ownership of {@code countMap}, and other code 132 * should not maintain references to the map or modify it in any way. 133 * 134 * @param countMap backing map for storing the elements in the multiset and 135 * their counts. It must be empty. 136 * @throws IllegalArgumentException if {@code countMap} is not empty 137 */ 138 @VisibleForTesting ConcurrentHashMultiset(ConcurrentMap<E, AtomicInteger> countMap) { 139 checkArgument(countMap.isEmpty()); 140 this.countMap = countMap; 141 } 142 143 // Query Operations 144 145 /** 146 * Returns the number of occurrences of {@code element} in this multiset. 147 * 148 * @param element the element to look for 149 * @return the nonnegative number of occurrences of the element 150 */ 151 @Override public int count(@Nullable Object element) { 152 AtomicInteger existingCounter = safeGet(element); 153 return (existingCounter == null) ? 0 : existingCounter.get(); 154 } 155 156 /** 157 * Depending on the type of the underlying map, map.get may throw NullPointerException or 158 * ClassCastException, if the object is null or of the wrong type. We usually just want to treat 159 * those cases as if the element isn't in the map, by catching the exceptions and returning null. 160 */ 161 private AtomicInteger safeGet(Object element) { 162 try { 163 return countMap.get(element); 164 } catch (NullPointerException e) { 165 return null; 166 } catch (ClassCastException e) { 167 return null; 168 } 169 } 170 171 /** 172 * {@inheritDoc} 173 * 174 * <p>If the data in the multiset is modified by any other threads during this 175 * method, it is undefined which (if any) of these modifications will be 176 * reflected in the result. 177 */ 178 @Override public int size() { 179 long sum = 0L; 180 for (AtomicInteger value : countMap.values()) { 181 sum += value.get(); 182 } 183 return Ints.saturatedCast(sum); 184 } 185 186 /* 187 * Note: the superclass toArray() methods assume that size() gives a correct 188 * answer, which ours does not. 189 */ 190 191 @Override public Object[] toArray() { 192 return snapshot().toArray(); 193 } 194 195 @Override public <T> T[] toArray(T[] array) { 196 return snapshot().toArray(array); 197 } 198 199 /* 200 * We'd love to use 'new ArrayList(this)' or 'list.addAll(this)', but 201 * either of these would recurse back to us again! 202 */ 203 private List<E> snapshot() { 204 List<E> list = Lists.newArrayListWithExpectedSize(size()); 205 for (Multiset.Entry<E> entry : entrySet()) { 206 E element = entry.getElement(); 207 for (int i = entry.getCount(); i > 0; i--) { 208 list.add(element); 209 } 210 } 211 return list; 212 } 213 214 // Modification Operations 215 216 /** 217 * Adds a number of occurrences of the specified element to this multiset. 218 * 219 * @param element the element to add 220 * @param occurrences the number of occurrences to add 221 * @return the previous count of the element before the operation; possibly 222 * zero 223 * @throws IllegalArgumentException if {@code occurrences} is negative, or if 224 * the resulting amount would exceed {@link Integer#MAX_VALUE} 225 */ 226 @Override public int add(E element, int occurrences) { 227 if (occurrences == 0) { 228 return count(element); 229 } 230 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences); 231 232 while (true) { 233 AtomicInteger existingCounter = safeGet(element); 234 if (existingCounter == null) { 235 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences)); 236 if (existingCounter == null) { 237 return 0; 238 } 239 // existingCounter != null: fall through to operate against the existing AtomicInteger 240 } 241 242 while (true) { 243 int oldValue = existingCounter.get(); 244 if (oldValue != 0) { 245 checkArgument(occurrences <= Integer.MAX_VALUE - oldValue, 246 "Overflow adding %s occurrences to a count of %s", 247 occurrences, oldValue); 248 int newValue = oldValue + occurrences; 249 if (existingCounter.compareAndSet(oldValue, newValue)) { 250 // newValue can't == 0, so no need to check & remove 251 return oldValue; 252 } 253 } else { 254 // In the case of a concurrent remove, we might observe a zero value, which means another 255 // thread is about to remove (element, existingCounter) from the map. Rather than wait, 256 // we can just do that work here. 257 AtomicInteger newCounter = new AtomicInteger(occurrences); 258 if ((countMap.putIfAbsent(element, newCounter) == null) 259 || countMap.replace(element, existingCounter, newCounter)) { 260 return 0; 261 } 262 break; 263 } 264 } 265 266 // If we're still here, there was a race, so just try again. 267 } 268 } 269 270 /** 271 * Removes a number of occurrences of the specified element from this 272 * multiset. If the multiset contains fewer than this number of occurrences to 273 * begin with, all occurrences will be removed. 274 * 275 * @param element the element whose occurrences should be removed 276 * @param occurrences the number of occurrences of the element to remove 277 * @return the count of the element before the operation; possibly zero 278 * @throws IllegalArgumentException if {@code occurrences} is negative 279 */ 280 @Override public int remove(@Nullable Object element, int occurrences) { 281 if (occurrences == 0) { 282 return count(element); 283 } 284 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences); 285 286 AtomicInteger existingCounter = safeGet(element); 287 if (existingCounter == null) { 288 return 0; 289 } 290 while (true) { 291 int oldValue = existingCounter.get(); 292 if (oldValue != 0) { 293 int newValue = Math.max(0, oldValue - occurrences); 294 if (existingCounter.compareAndSet(oldValue, newValue)) { 295 if (newValue == 0) { 296 // Just CASed to 0; remove the entry to clean up the map. If the removal fails, 297 // another thread has already replaced it with a new counter, which is fine. 298 countMap.remove(element, existingCounter); 299 } 300 return oldValue; 301 } 302 } else { 303 return 0; 304 } 305 } 306 } 307 308 /** 309 * Removes exactly the specified number of occurrences of {@code element}, or 310 * makes no change if this is not possible. 311 * 312 * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect 313 * when the element count is smaller than {@code occurrences}. 314 * 315 * @param element the element to remove 316 * @param occurrences the number of occurrences of {@code element} to remove 317 * @return {@code true} if the removal was possible (including if {@code 318 * occurrences} is zero) 319 */ 320 public boolean removeExactly(@Nullable Object element, int occurrences) { 321 if (occurrences == 0) { 322 return true; 323 } 324 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences); 325 326 AtomicInteger existingCounter = safeGet(element); 327 if (existingCounter == null) { 328 return false; 329 } 330 while (true) { 331 int oldValue = existingCounter.get(); 332 if (oldValue < occurrences) { 333 return false; 334 } 335 int newValue = oldValue - occurrences; 336 if (existingCounter.compareAndSet(oldValue, newValue)) { 337 if (newValue == 0) { 338 // Just CASed to 0; remove the entry to clean up the map. If the removal fails, 339 // another thread has already replaced it with a new counter, which is fine. 340 countMap.remove(element, existingCounter); 341 } 342 return true; 343 } 344 } 345 } 346 347 /** 348 * Adds or removes occurrences of {@code element} such that the {@link #count} 349 * of the element becomes {@code count}. 350 * 351 * @return the count of {@code element} in the multiset before this call 352 * @throws IllegalArgumentException if {@code count} is negative 353 */ 354 @Override public int setCount(E element, int count) { 355 checkNonnegative(count, "count"); 356 while (true) { 357 AtomicInteger existingCounter = safeGet(element); 358 if (existingCounter == null) { 359 if (count == 0) { 360 return 0; 361 } else { 362 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(count)); 363 if (existingCounter == null) { 364 return 0; 365 } 366 // existingCounter != null: fall through 367 } 368 } 369 370 while (true) { 371 int oldValue = existingCounter.get(); 372 if (oldValue == 0) { 373 if (count == 0) { 374 return 0; 375 } else { 376 AtomicInteger newCounter = new AtomicInteger(count); 377 if ((countMap.putIfAbsent(element, newCounter) == null) 378 || countMap.replace(element, existingCounter, newCounter)) { 379 return 0; 380 } 381 } 382 break; 383 } else { 384 if (existingCounter.compareAndSet(oldValue, count)) { 385 if (count == 0) { 386 // Just CASed to 0; remove the entry to clean up the map. If the removal fails, 387 // another thread has already replaced it with a new counter, which is fine. 388 countMap.remove(element, existingCounter); 389 } 390 return oldValue; 391 } 392 } 393 } 394 } 395 } 396 397 /** 398 * Sets the number of occurrences of {@code element} to {@code newCount}, but 399 * only if the count is currently {@code expectedOldCount}. If {@code element} does 400 * not appear in the multiset exactly {@code expectedOldCount} times, no changes will 401 * be made. 402 * 403 * @return {@code true} if the change was successful. This usually indicates 404 * that the multiset has been modified, but not always: in the case that 405 * {@code expectedOldCount == newCount}, the method will return {@code true} if 406 * the condition was met. 407 * @throws IllegalArgumentException if {@code expectedOldCount} or {@code newCount} is negative 408 */ 409 @Override public boolean setCount(E element, int expectedOldCount, int newCount) { 410 checkNonnegative(expectedOldCount, "oldCount"); 411 checkNonnegative(newCount, "newCount"); 412 413 AtomicInteger existingCounter = safeGet(element); 414 if (existingCounter == null) { 415 if (expectedOldCount != 0) { 416 return false; 417 } else if (newCount == 0) { 418 return true; 419 } else { 420 // if our write lost the race, it must have lost to a nonzero value, so we can stop 421 return countMap.putIfAbsent(element, new AtomicInteger(newCount)) == null; 422 } 423 } 424 int oldValue = existingCounter.get(); 425 if (oldValue == expectedOldCount) { 426 if (oldValue == 0) { 427 if (newCount == 0) { 428 // Just observed a 0; try to remove the entry to clean up the map 429 countMap.remove(element, existingCounter); 430 return true; 431 } else { 432 AtomicInteger newCounter = new AtomicInteger(newCount); 433 return (countMap.putIfAbsent(element, newCounter) == null) 434 || countMap.replace(element, existingCounter, newCounter); 435 } 436 } else { 437 if (existingCounter.compareAndSet(oldValue, newCount)) { 438 if (newCount == 0) { 439 // Just CASed to 0; remove the entry to clean up the map. If the removal fails, 440 // another thread has already replaced it with a new counter, which is fine. 441 countMap.remove(element, existingCounter); 442 } 443 return true; 444 } 445 } 446 } 447 return false; 448 } 449 450 // Views 451 452 @Override Set<E> createElementSet() { 453 final Set<E> delegate = countMap.keySet(); 454 return new ForwardingSet<E>() { 455 @Override protected Set<E> delegate() { 456 return delegate; 457 } 458 @Override public boolean remove(Object object) { 459 try { 460 return delegate.remove(object); 461 } catch (NullPointerException e) { 462 return false; 463 } catch (ClassCastException e) { 464 return false; 465 } 466 } 467 }; 468 } 469 470 private transient EntrySet entrySet; 471 472 @Override public Set<Multiset.Entry<E>> entrySet() { 473 EntrySet result = entrySet; 474 if (result == null) { 475 entrySet = result = new EntrySet(); 476 } 477 return result; 478 } 479 480 @Override int distinctElements() { 481 return countMap.size(); 482 } 483 484 @Override public boolean isEmpty() { 485 return countMap.isEmpty(); 486 } 487 488 @Override Iterator<Entry<E>> entryIterator() { 489 // AbstractIterator makes this fairly clean, but it doesn't support remove(). To support 490 // remove(), we create an AbstractIterator, and then use ForwardingIterator to delegate to it. 491 final Iterator<Entry<E>> readOnlyIterator = 492 new AbstractIterator<Entry<E>>() { 493 private Iterator<Map.Entry<E, AtomicInteger>> mapEntries = countMap.entrySet().iterator(); 494 495 @Override protected Entry<E> computeNext() { 496 while (true) { 497 if (!mapEntries.hasNext()) { 498 return endOfData(); 499 } 500 Map.Entry<E, AtomicInteger> mapEntry = mapEntries.next(); 501 int count = mapEntry.getValue().get(); 502 if (count != 0) { 503 return Multisets.immutableEntry(mapEntry.getKey(), count); 504 } 505 } 506 } 507 }; 508 509 return new ForwardingIterator<Entry<E>>() { 510 private Entry<E> last; 511 512 @Override protected Iterator<Entry<E>> delegate() { 513 return readOnlyIterator; 514 } 515 516 @Override public Entry<E> next() { 517 last = super.next(); 518 return last; 519 } 520 521 @Override public void remove() { 522 checkState(last != null); 523 ConcurrentHashMultiset.this.setCount(last.getElement(), 0); 524 last = null; 525 } 526 }; 527 } 528 529 @Override public void clear() { 530 countMap.clear(); 531 } 532 533 private class EntrySet extends AbstractMultiset<E>.EntrySet { 534 @Override ConcurrentHashMultiset<E> multiset() { 535 return ConcurrentHashMultiset.this; 536 } 537 538 /* 539 * Note: the superclass toArray() methods assume that size() gives a correct 540 * answer, which ours does not. 541 */ 542 543 @Override public Object[] toArray() { 544 return snapshot().toArray(); 545 } 546 547 @Override public <T> T[] toArray(T[] array) { 548 return snapshot().toArray(array); 549 } 550 551 private List<Multiset.Entry<E>> snapshot() { 552 List<Multiset.Entry<E>> list = Lists.newArrayListWithExpectedSize(size()); 553 // Not Iterables.addAll(list, this), because that'll forward right back here. 554 Iterators.addAll(list, iterator()); 555 return list; 556 } 557 558 @Override public boolean remove(Object object) { 559 if (object instanceof Multiset.Entry) { 560 Multiset.Entry<?> entry = (Multiset.Entry<?>) object; 561 Object element = entry.getElement(); 562 int entryCount = entry.getCount(); 563 if (entryCount != 0) { 564 // Safe as long as we never add a new entry, which we won't. 565 @SuppressWarnings("unchecked") 566 Multiset<Object> multiset = (Multiset) multiset(); 567 return multiset.setCount(element, entryCount, 0); 568 } 569 } 570 return false; 571 } 572 } 573 574 /** 575 * @serialData the ConcurrentMap of elements and their counts. 576 */ 577 private void writeObject(ObjectOutputStream stream) throws IOException { 578 stream.defaultWriteObject(); 579 stream.writeObject(countMap); 580 } 581 582 private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { 583 stream.defaultReadObject(); 584 @SuppressWarnings("unchecked") // reading data stored by writeObject 585 ConcurrentMap<E, Integer> deserializedCountMap = 586 (ConcurrentMap<E, Integer>) stream.readObject(); 587 FieldSettersHolder.COUNT_MAP_FIELD_SETTER.set(this, deserializedCountMap); 588 } 589 590 private static final long serialVersionUID = 1; 591 } 592